RocketMQ思维导图
2023-02-22 20:09:28 0 举报
AI智能生成
根据源码画的RocketMQ思维导图,每一个节点都标记了关联源码位置
作者其他创作
大纲/内容
消息拉模式
推模式
推模式与拉模式的区别
拉模式实现原理
拉模式的消息拉取启动
DefaultLitePullConsumerImpl#start
负载均衡初始化
RebalanceLitePullImpl#initRebalanceImpl
触发消息队列变更事件
RebalanceLitePullImpl#messageQueueChanged
更新拉取任务
DefaultLitePullConsumerImpl#updatePullTask
提交拉取任务
DefaultLitePullConsumerImpl#startPullTask
拉取消息
PullTaskImpl#run
放入消息消费阻塞队列
DefaultLitePullConsumerImpl#submitConsumeRequest
消息消费
DefaultLitePullConsumerImpl#poll
负载均衡
消费者负载均衡
更新主题路由信息
DefaultMQPushConsumerImpl#updateTopicSubscribeInfoWhenSubscriptionChanged
注册消费者
Comsume发送心跳
MQClientInstance#sendHeartbeatToAllBrokerWithLock
Broker心跳请求处理
ClientManageProcessor#heartBeat
进行消费者注册
ConsumerManager#registerConsumer
唤醒负载均衡服务
MQClientInstance#rebalanceImmediately
根据主题进行负载均衡
RebalanceImpl#rebalanceByTopic
更新处理队列
RebalanceImpl#updateProcessQueueTableInRebalance
Rebalance的触发
消费者启动时触发
消费者变更时触发
ConsumerManager#registerConsumer
Channel变更
ConsumerManager#updateChannel
主题信息订阅变更
ConsumerManager#updateSubscription
变更请求发送
DefaultConsumerIdsChangeListener#handle
变更通知请求处理
ClientRemotingProcessor#processRequest
消费者停止时触发
DefaultMQPushConsumerImpl#shutdown
发送取消注册请求
MQClientAPIImpl#unregisterClient
Broker处理消费者取消注册的请求
ConsumerManager#unregisterConsumer
触发变更事件
ConsumerManager#unregisterConsumer
消费者定时触发
RebalanceService#run
主从模式下的消费进度管理
选择Broker发送拉取请求
PullAPIWrapper#recalculatePullFromWhichNode
返回建议的BrokerID
PullMessageProcessor#processRequest
订阅分组配置
是否建议从Slave节点拉取的设置
DefaultMessageStore#getMessage
总结
消费进度持久化
RemoteBrokerOffsetStore#persistAll
获取Broker的信息并选择Broker
MQClientInstance#findBrokerAddressInSubscribe
更新Broker端的消费进度
RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
主从模式下的消费进度同步
主节点处理从节点的数据同步
BrokerController#start->handleSlaveSynchronize
从节点循环进行数据同步
SlaveSynchronize#syncAll
同步消费进度
SlaveSynchronize#syncConsumerOffset
主从同步实现原理
主从同步启动
DefaultMessageStore->HAService#start
监听从节点连接请求
HAService->AcceptSocketService#beginAccept
处理从节点连接请求
HAService->AcceptSocketService#run
等待主从复制传输结束
GroupTransferService#run
处理主从同步请求
CommitLog#submitReplicaRequest
等待传输完成
HAService->GroupTransferService#doWaitTransfer
从节点运行
HAClient
连接主节点
HAService->HAClient#connectMaster
发送主从同步消息拉取偏移量
HAService->HAClient#connectMaster#isTimeToReportOffset
处理网络可读事件
HAService->HAClient#processReadEvent
消息写入ComitLog
HAService->HAClient#dispatchReadRequest
主从网络处理
HAConnection
可读事件监听
ReadSocketService
处理可读事件
HAConnection->ReadSocketService#processReadEvent
更新消息偏移量
HAService#notifyTransferSome
Master节点向从节点发送同步消息
WriteSocketService
发送数据
HAConnection->WriteSocketService#transferData
主从同步流程
有新消息写入之后的同步流程
顺序消息实现原理
顺序消息启动
ConsumeMessageOrderlyService#start
加锁定时任务
消息队列加锁
RebalanceImpl#lockAll
拉取消息
拉取前加锁
RebalanceImpl#lock
顺序消息消费
提交到线程池
ConsumeMessageService#submitConsumeRequest
消费时的消息队列锁
ConsumeMessageOrderlyService#ConsumeRequest#run
加锁操作
MessageQueueLock#fetchLockObject
消息消费锁
processQueue.getConsumeLock().lock()
锁分类
向Broker申请的消息队列锁
消费者处理拉取消息时的消息队列锁
消息消费锁
事务的实现原理
两阶段提交
发送事务消息
DefaultMQProducerImpl#sendMessageInTransaction
half消息处理
Broker#SendMessageProcessor
设置half消息的相关属性
TransactionalMessageBridge#parseHalfMessageInner
结束事务
DefaultMQProducerImpl#endTransaction
Broker事务结束请求处理
EndTransactionProcessor
删除half消息
TransactionalMessageServiceImpl#deletePrepareMessage
向OP队列中添加消息
TransactionalMessageBridge#putOpMessage
事务状态检查启动
TransactionalMessageCheckService#onWaitEnd
状态检查
TransactionalMessageServiceImpl#check
构建OP队列的消息队列对象MessageQueue
获取half队列的消费进度和OP消费队列的消费进度
从OP队列中拉取消息
处理每一个half消息
更新消费进度
重新添加half消息
TransactionalMessageServiceImpl#putBackHalfMsgQueue
状态回查请求发送
AbstractTransactionalMessageCheckListener#resolveHalfMsg
事务状态回查请求处理
ClientRemotingProcessor#checkTransactionState
异步执行事务的状态检查
DefaultMQProducerImpl#checkTransactionState#run
NameServer的启动
NameServer启动
NamesrvStartup#main
创建NamesrvController
NamesrvStartup#createNamesrvController
启动NameServer
NamesrvStartup#start
NamesrvController
初始化
NamesrvController#initialize
启动
RemotingServer#start
Netty启动
NettyRemotingServer#start
Broker服务注册
Broker注册
BrokerController#start
发送注册请求
BrokerController#registerBrokerAll
NameServer处理
DefaultRequestProcessor#processRequest
Broker注册请求处理
DefaultRequestProcessor#registerBroker
注册处理
RouteInfoManager
注册Broker
RouteInfoManager#registerBroker
心跳检测
NameServer#initialize
下线Broker
RouteInfoManager#scanNotActiveBroker
路由剔除
RouteInfoManager#onChannelDestroy
消息发送
消息发送的方法类别
消息的发送
DefaultMQProducer -> DefaultMQProducerImpl#sendDefaultImpl
获取路由信息
DefaultMQProducerImpl#topicPublishInfoTable
主题路由信息
TopicPublishInfo
查找路由信息
DefaultMQProducerImpl#tryToFindTopicPublishInfo
从NameServer获取主题路由信息
MQClientInstance#updateTopicRouteInfoFromNameServer
发送请求
MQClientAPIImpl#getTopicRouteInfoFromNameServer
选择消息队列
MQFaultStrategy#selectOneMessageQueue
启用故障延迟机制
未启用故障延迟机制
选择消息队列
TopicPublishInfo#selectOneMessageQueue
故障延迟机制
DefaultMQProducerImpl#sendDefaultImpl
根据Broker名称获取失败条目
LatencyFaultToleranceImpl#faultItemTable
Broker失败条目
LatencyFaultToleranceImpl#FaultItem
判断Broker是否可用
FaultItem#isAvailable方法
条目排序
FaultItem#compareTo
未找到合适的消息队列是选择一个
LatencyFaultToleranceImpl#pickOneAtLeast
故障规避
MQFaultStrategy#selectOneMessageQueue
消息的存储
Broker对消息的处理
SendMessageProcessor#processRequest
单个消息的发送处理方法
SendMessageProcessor#asyncSendMessage
消息文件
CommitLog文件
CommitLog
ConsumeQueue文件
ConsumeQueue
Index文件
IndexFile
checkpoint文件
消息存储
DefaultMessageStore#asyncPutMessage
合法性校验
Broker存储检查
DefaultMessageStore#checkStoreStatus
消息长度检查
DefaultMessageStore#checkMessage
LMQ(Light Message Queue)时是否超过了最大消费数量
DefaultMessageStore#checkLmqMessage
消息写入
CommitLog#asyncPutMessage
创建编码对象
CommitLog#MessageExtEncoder
编码消息和写入内存Buffer
MessageExtEncoder#encode
写入内存映射文件
MappedFile
将内存中的内容写入映射文件
MappedFile#appendMessagesInner
执行写入
CommitLog.DefaultAppendMessageCallback#doAppend
内存映射流程
实时更新ConsumeQueue与Index文件
启动转发线程
DefaultMessageStore#start
发起转发
DefaultMessageStore->ReputMessageService#run
执行转发
DefaultMessageStore->ReputMessageService#rdoReput
根据消息更新ConsumeQueue文件
DefaultMessageStore->CommitLogDispatcherBuildConsumeQueue#putMessagePositionInfo
根据消息更新Index文件
DefaultMessageStore->CommitLogDispatcherBuildIndex#putMessagePositionInfo
ConsumeQueue与Index文件恢复
DefaultMessageStore#load
Broker正常停止文件恢复
CommitLog#recoverNormally
Broker异常停止文件恢复
CommitLog#recoverAbnormally
过期文件删除机制
DefaultMessageStore#addScheduleTask->cleanFilesPeriodically
刷盘机制
刷盘策略
同步刷盘
CommitLog.GroupCommitService
刷盘请求包装类
CommitLog.GroupCommitRequest
处理刷盘
CommitLog.CommitLog.run
刷盘线程的启动
BrokerController.start
刷盘请求的处理
GroupCommitService
等待刷盘请求
GroupCommitService#run->waitForRunning
添加刷盘请求,唤醒刷盘线程
GroupCommitService#putRequest
线程被唤醒,执行刷盘前的操作
GroupCommitService#onWaitEnd、swapRequests
读写链表
GroupCommitService.requestsWrite、requestsRead
执行刷盘
GroupCommitService#doCommit
刷盘超时监控
FlushDiskWatcher#run
异步刷盘
刷盘对象的的实例化
提交
在开启暂存池(transientStorePoolEnable)时,需要先提交
CommitRealTimeService#run
MappedFileQueue#commit
MappedFile#isAbleToCommit
刷盘
如果未开启暂存池,直接刷盘
FlushRealTimeService#run
MappedFileQueue#flush
MappedFile#flush
消息的拉取
消费模式
广播模式
集群模式
消息的获取模式
拉模式
推模式
消费者拉取消息
消费者的启动
DefaultMQPushConsumerImpl#start
主题订阅处理
copySubscription#copySubscription
创建MQClientInstance
消息拉取服务启动
PullMessageService#run
负载均衡服务启动
RebalanceService#run
负载均衡服务的唤醒
MQClientInstance#rebalanceImmediately
负载均衡
MQClientInstance#doRebalance
进行负载均衡
RebalanceImpl#doRebalance
根据主题进行负载均衡
RebalanceImpl#rebalanceByTopic
平均分配法
平均轮询分配法
更新处理队列
RebalanceImpl#updateProcessQueueTableInRebalance
添加拉取请求
dispatchPullRequest#pullRequestList
拉取消息
DefaultMQPushConsumerImpl#pullMessage
发送拉取消息请求
MQClientAPIImpl#pullMessage
异步消息拉取
MQClientAPIImpl#pullMessageAsync
创建回调函数PullCallback
Broker对消息拉取请求处理
拉取请求处理
PullMessageProcessor#processRequest
查找消息
DefaultMessageStore#getMessage
纠正偏移量
DefaultMessageStore#nextOffsetCorrection
消费者对拉取消息的处理
DefaultMQPushConsumerImpl#pullMessage#PullCallback
消息的消费
消息消费
ConsumeMessageConcurrentlyService#submitConsumeRequest
消费任务运行
ConsumeMessageConcurrentlyService->ConsumeRequest#run
处理消费结果
ConsumeMessageConcurrentlyService#processConsumeResult
设置ackIndex
ConsumeMessageConcurrentlyService#processConsumeResult
处理消费失败的消息
移除消息,更新拉取偏移量
ConsumeMessageConcurrentlyService#removeMessage、updateOffset
发送CONSUMER_SEND_MSG_BACK消息
ConsumeMessageConcurrentlyService#sendMessageBack
延迟级别
MessageStoreConfig#messageDelayLevel
Broker对请求的处理
SendMessageProcessor#asyncConsumerSendMsgBack
延迟消息处理
CommitLog#asyncPutMessage
拉取进度持久化
广播模式
更新进度
LocalFileOffsetStore#updateOffset
加载进度
LocalFileOffsetStore#load
读取消费进度
OffsetSerializeWrapper
持久化进度
LocalFileOffsetStore#persistAll
集群模式
更新进度
RemoteBrokerOffsetStore#updateOffset
加载
持久化
RemoteBrokerOffsetStore#persistAll
持久化的触发
MQClientInstance#startScheduledTask

收藏
0 条评论
下一页