RocketMQ思维导图
2023-02-22 20:09:28 0 举报
AI智能生成
根据源码画的RocketMQ思维导图,每一个节点都标记了关联源码位置
作者其他创作
大纲/内容
RocketMQ
推模式
推模式与拉模式的区别
拉模式的消息拉取启动DefaultLitePullConsumerImpl#start
负载均衡初始化RebalanceLitePullImpl#initRebalanceImpl
触发消息队列变更事件RebalanceLitePullImpl#messageQueueChanged
更新拉取任务DefaultLitePullConsumerImpl#updatePullTask
提交拉取任务DefaultLitePullConsumerImpl#startPullTask
拉模式实现原理
放入消息消费阻塞队列DefaultLitePullConsumerImpl#submitConsumeRequest
拉取消息PullTaskImpl#run
消息消费DefaultLitePullConsumerImpl#poll
超链接
消息拉模式
更新主题路由信息DefaultMQPushConsumerImpl#updateTopicSubscribeInfoWhenSubscriptionChanged
Comsume发送心跳MQClientInstance#sendHeartbeatToAllBrokerWithLock
Broker心跳请求处理ClientManageProcessor#heartBeat
进行消费者注册ConsumerManager#registerConsumer
注册消费者
更新处理队列RebalanceImpl#updateProcessQueueTableInRebalance
根据主题进行负载均衡RebalanceImpl#rebalanceByTopic
唤醒负载均衡服务MQClientInstance#rebalanceImmediately
消费者负载均衡
消费者启动时触发
Channel变更ConsumerManager#updateChannel
主题信息订阅变更ConsumerManager#updateSubscription
变更请求发送DefaultConsumerIdsChangeListener#handle
变更通知请求处理ClientRemotingProcessor#processRequest
消费者变更时触发ConsumerManager#registerConsumer
发送取消注册请求MQClientAPIImpl#unregisterClient
Broker处理消费者取消注册的请求ConsumerManager#unregisterConsumer
触发变更事件ConsumerManager#unregisterConsumer
消费者停止时触发DefaultMQPushConsumerImpl#shutdown
消费者定时触发RebalanceService#run
Rebalance的触发
负载均衡
选择Broker发送拉取请求PullAPIWrapper#recalculatePullFromWhichNode
订阅分组配置
是否建议从Slave节点拉取的设置DefaultMessageStore#getMessage
总结
返回建议的BrokerIDPullMessageProcessor#processRequest
更新Broker端的消费进度RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
获取Broker的信息并选择BrokerMQClientInstance#findBrokerAddressInSubscribe
消费进度持久化RemoteBrokerOffsetStore#persistAll
主节点处理从节点的数据同步BrokerController#start->handleSlaveSynchronize
同步消费进度SlaveSynchronize#syncConsumerOffset
从节点循环进行数据同步SlaveSynchronize#syncAll
主从模式下的消费进度同步
主从模式下的消费进度管理
主从同步启动DefaultMessageStore->HAService#start
监听从节点连接请求HAService->AcceptSocketService#beginAccept
处理从节点连接请求HAService->AcceptSocketService#run
等待传输完成HAService->GroupTransferService#doWaitTransfer
处理主从同步请求CommitLog#submitReplicaRequest
等待主从复制传输结束GroupTransferService#run
连接主节点HAService->HAClient#connectMaster
发送主从同步消息拉取偏移量HAService->HAClient#connectMaster#isTimeToReportOffset
消息写入ComitLogHAService->HAClient#dispatchReadRequest
处理网络可读事件HAService->HAClient#processReadEvent
从节点运行HAClient
更新消息偏移量HAService#notifyTransferSome
处理可读事件HAConnection->ReadSocketService#processReadEvent
可读事件监听ReadSocketService
发送数据HAConnection->WriteSocketService#transferData
Master节点向从节点发送同步消息WriteSocketService
主从网络处理HAConnection
主从同步流程
有新消息写入之后的同步流程
主从同步实现原理
顺序消息启动ConsumeMessageOrderlyService#start
消息队列加锁RebalanceImpl#lockAll
加锁定时任务
拉取前加锁RebalanceImpl#lock
拉取消息
提交到线程池ConsumeMessageService#submitConsumeRequest
加锁操作MessageQueueLock#fetchLockObject
消费时的消息队列锁ConsumeMessageOrderlyService#ConsumeRequest#run
消息消费锁processQueue.getConsumeLock().lock()
向Broker申请的消息队列锁
消费者处理拉取消息时的消息队列锁
消息消费锁
锁分类
顺序消息消费
顺序消息实现原理
两阶段提交
发送事务消息DefaultMQProducerImpl#sendMessageInTransaction
设置half消息的相关属性TransactionalMessageBridge#parseHalfMessageInner
half消息处理Broker#SendMessageProcessor
Broker事务结束请求处理EndTransactionProcessor
向OP队列中添加消息TransactionalMessageBridge#putOpMessage
删除half消息TransactionalMessageServiceImpl#deletePrepareMessage
结束事务DefaultMQProducerImpl#endTransaction
构建OP队列的消息队列对象MessageQueue
获取half队列的消费进度和OP消费队列的消费进度
从OP队列中拉取消息
处理每一个half消息
更新消费进度
状态检查TransactionalMessageServiceImpl#check
重新添加half消息TransactionalMessageServiceImpl#putBackHalfMsgQueue
状态回查请求发送AbstractTransactionalMessageCheckListener#resolveHalfMsg
异步执行事务的状态检查DefaultMQProducerImpl#checkTransactionState#run
事务状态回查请求处理ClientRemotingProcessor#checkTransactionState
事务状态检查启动TransactionalMessageCheckService#onWaitEnd
事务的实现原理
创建NamesrvControllerNamesrvStartup#createNamesrvController
初始化NamesrvController#initialize
Netty启动NettyRemotingServer#start
启动RemotingServer#start
NamesrvController
启动NameServerNamesrvStartup#start
NameServer启动NamesrvStartup#main
NameServer的启动
发送注册请求BrokerController#registerBrokerAll
Broker注册BrokerController#start
Broker注册请求处理DefaultRequestProcessor#registerBroker
注册BrokerRouteInfoManager#registerBroker
注册处理RouteInfoManager
路由剔除RouteInfoManager#onChannelDestroy
下线BrokerRouteInfoManager#scanNotActiveBroker
心跳检测NameServer#initialize
NameServer处理DefaultRequestProcessor#processRequest
Broker服务注册
消息发送的方法类别
消息的发送DefaultMQProducer -> DefaultMQProducerImpl#sendDefaultImpl
主题路由信息TopicPublishInfo
查找路由信息DefaultMQProducerImpl#tryToFindTopicPublishInfo
发送请求MQClientAPIImpl#getTopicRouteInfoFromNameServer
从NameServer获取主题路由信息MQClientInstance#updateTopicRouteInfoFromNameServer
获取路由信息DefaultMQProducerImpl#topicPublishInfoTable
启用故障延迟机制
未启用故障延迟机制
选择消息队列TopicPublishInfo#selectOneMessageQueue
选择消息队列MQFaultStrategy#selectOneMessageQueue
根据Broker名称获取失败条目LatencyFaultToleranceImpl#faultItemTable
判断Broker是否可用FaultItem#isAvailable方法
条目排序FaultItem#compareTo
未找到合适的消息队列是选择一个LatencyFaultToleranceImpl#pickOneAtLeast
Broker失败条目LatencyFaultToleranceImpl#FaultItem
故障规避MQFaultStrategy#selectOneMessageQueue
故障延迟机制DefaultMQProducerImpl#sendDefaultImpl
消息发送
单个消息的发送处理方法SendMessageProcessor#asyncSendMessage
Broker对消息的处理SendMessageProcessor#processRequest
CommitLog文件CommitLog
ConsumeQueue文件ConsumeQueue
Index文件IndexFile
checkpoint文件
消息文件
Broker存储检查DefaultMessageStore#checkStoreStatus
消息长度检查DefaultMessageStore#checkMessage
LMQ(Light Message Queue)时是否超过了最大消费数量DefaultMessageStore#checkLmqMessage
合法性校验
编码消息和写入内存BufferMessageExtEncoder#encode
创建编码对象CommitLog#MessageExtEncoder
执行写入CommitLog.DefaultAppendMessageCallback#doAppend
将内存中的内容写入映射文件MappedFile#appendMessagesInner
写入内存映射文件MappedFile
内存映射流程
消息写入CommitLog#asyncPutMessage
启动转发线程DefaultMessageStore#start
发起转发DefaultMessageStore->ReputMessageService#run
根据消息更新ConsumeQueue文件DefaultMessageStore->CommitLogDispatcherBuildConsumeQueue#putMessagePositionInfo
根据消息更新Index文件DefaultMessageStore->CommitLogDispatcherBuildIndex#putMessagePositionInfo
执行转发DefaultMessageStore->ReputMessageService#rdoReput
实时更新ConsumeQueue与Index文件
Broker正常停止文件恢复CommitLog#recoverNormally
Broker异常停止文件恢复CommitLog#recoverAbnormally
ConsumeQueue与Index文件恢复DefaultMessageStore#load
过期文件删除机制DefaultMessageStore#addScheduleTask->cleanFilesPeriodically
消息存储DefaultMessageStore#asyncPutMessage
刷盘策略
刷盘请求包装类CommitLog.GroupCommitRequest
处理刷盘CommitLog.CommitLog.run
刷盘线程的启动BrokerController.start
等待刷盘请求GroupCommitService#run->waitForRunning
添加刷盘请求,唤醒刷盘线程GroupCommitService#putRequest
读写链表GroupCommitService.requestsWrite、requestsRead
线程被唤醒,执行刷盘前的操作GroupCommitService#onWaitEnd、swapRequests
执行刷盘GroupCommitService#doCommit
刷盘请求的处理GroupCommitService
刷盘超时监控FlushDiskWatcher#run
同步刷盘CommitLog.GroupCommitService
刷盘对象的的实例化
MappedFile#isAbleToCommit
MappedFileQueue#commit
在开启暂存池(transientStorePoolEnable)时,需要先提交CommitRealTimeService#run
提交
MappedFile#flush
MappedFileQueue#flush
如果未开启暂存池,直接刷盘FlushRealTimeService#run
刷盘
异步刷盘
刷盘机制
消息的存储
广播模式
集群模式
消费模式
拉模式
消息的获取模式
消费者的启动DefaultMQPushConsumerImpl#start
主题订阅处理copySubscription#copySubscription
消息拉取服务启动PullMessageService#run
负载均衡服务的唤醒MQClientInstance#rebalanceImmediately
进行负载均衡RebalanceImpl#doRebalance
平均分配法
平均轮询分配法
添加拉取请求dispatchPullRequest#pullRequestList
负载均衡MQClientInstance#doRebalance
负载均衡服务启动RebalanceService#run
创建MQClientInstance
异步消息拉取MQClientAPIImpl#pullMessageAsync
发送拉取消息请求MQClientAPIImpl#pullMessage
创建回调函数PullCallback
拉取消息DefaultMQPushConsumerImpl#pullMessage
消费者拉取消息
拉取请求处理PullMessageProcessor#processRequest
纠正偏移量DefaultMessageStore#nextOffsetCorrection
查找消息DefaultMessageStore#getMessage
Broker对消息拉取请求处理
消费者对拉取消息的处理DefaultMQPushConsumerImpl#pullMessage#PullCallback
消息的拉取
消费任务运行ConsumeMessageConcurrentlyService->ConsumeRequest#run
设置ackIndexConsumeMessageConcurrentlyService#processConsumeResult
处理消费失败的消息
移除消息,更新拉取偏移量ConsumeMessageConcurrentlyService#removeMessage、updateOffset
处理消费结果ConsumeMessageConcurrentlyService#processConsumeResult
延迟级别MessageStoreConfig#messageDelayLevel
发送CONSUMER_SEND_MSG_BACK消息ConsumeMessageConcurrentlyService#sendMessageBack
Broker对请求的处理SendMessageProcessor#asyncConsumerSendMsgBack
延迟消息处理CommitLog#asyncPutMessage
消息消费ConsumeMessageConcurrentlyService#submitConsumeRequest
更新进度LocalFileOffsetStore#updateOffset
读取消费进度OffsetSerializeWrapper
加载进度LocalFileOffsetStore#load
持久化进度LocalFileOffsetStore#persistAll
更新进度RemoteBrokerOffsetStore#updateOffset
加载
持久化RemoteBrokerOffsetStore#persistAll
持久化的触发MQClientInstance#startScheduledTask
拉取进度持久化
消息的消费
收藏
0 条评论
回复 删除
下一页