rocketmq4.9.1源码分享
2022-08-25 17:03:27 0 举报
rocketmq4.9.1源码分享
作者其他创作
大纲/内容
new DLedgerEntryPusher
MQClientInstance.start()1. 更新nameSrvAddr,使用httpGet方式,url=http://**-1?nofix=12. netty客户端MQClientAPIImpl3. 启动定时startScheduledTask() 1)每120秒更新静态addrs 2)每30秒更新TopicRouteInfo 3)每30秒移除下线的broker(获取brokerAddrTable中addr到topicRouteTable中判重BrokerAddrs)、mQClientAPIImpl.sendHearbeat每个Broker心跳检测、发送自定义消息过滤到Broker中,mQClientAPIImpl.registerMessageFilterClass 4)每5秒提交一次位点, 5)每1分钟处理一次堆积数据(需要实现接口ConsumeMessageService)4. 启动消费服务5. 启动rebalance服务6. 启动生产者
->5. RebalanceService.start()
new BrokerConfig
doWork()
NameServer
Broker
start(BrokerController controller)
new NettyServerConfig
dLedgerServer.startup()
Producer
appendAsFollower写入本地pageCache
LEADER 执行:1、主才会执行逻辑,对客户端future处理2、获取peerWaterMarksByTerm中的水位线,取中间一个值;(ack更新水位线)3、更新主的CommittedIndex到当前水位线;4、遍历在线任务,水位线-上次处理的水位线,拉取所有任务循环complete响应;5、如果本次水位线与上次相同,遍历在线任务的超时时间,超时的任务做超时响应 waitForRunning,休眠小于1秒,可被唤醒6、检查在线任务中是否有未超过水位线且未处理的future,遍历响应(隔1毫秒)
长链接30秒检测心跳信息
printWaterMark()
future.whenCompleteAsync返回结果给Leader
new MemberState
getBrokerStats().record();
protectBroker()
长链接30秒发送心跳,topic信息
consumerFilterManager.persist()
DefaultMQProducer.start()1. 设置生产者组;2. Impl.start()初始化;3. 配置traceDispatcher
->4. PullMessageService.start()
new BrokerController
remotingServer.start()
brokerStatsManager.start()
每10秒扫描brokerLiveTable,检测到上次心跳包时间超过120秒,则移除路由表中与broker相关的所有路由信息。 brokerLiveTable(broker心跳状态) topicQueueTable(消息队列路由信息) brokerAddrTable(broker基础信息) clusterAddrTable(集群信息) filterServerTable
new DefaultMessageStore
initialize()
Consumer
new NettyRemotingServer
new NettyClientConfig
messageStore.start()
load();recover();flushDataService.start();cleanSpaceService.start();
EntryHandler.start()
start()
createBrokerController(String[] args)
返回BeginIndex对应的返回BeginIndex对应的
brokerFastFailure.start()
FOLLOWER 执行:
new DLedgerServer(dLedgerConfig)
PUSH(Follower channelRead0)
1)TRUNCATE 2)COMPARE 3)COMMIT
dLedgerServer.handleAppend(request)阻塞:dledgerFuture.thenApply
APPEND(命令模式使用)
2、处理Leader推送推来的数据
COMPARE|TRUNCATE
GET(命令模式使用)
长链接30秒获取topic信息
printMasterAndSlaveDiff()
更新committedIndex
roleChangeHandler.startup()循环,没有找到用法
fileWatchService.start()
DLedgerCommitLog
brokerOuterAPI.start()
1、先处理compareOrTruncateRequests内容数据:
future.complete(SUCCESS)
clientHousekeepingService.start()
EntryDispatcher.start()循环
fastRemotingServer.start()
BrokerController.main
Broker管理路由注册路由删除
getMessageStore().dispatchBehindBytes()
PULL(无实现)
flushConsumeQueueService.start()
filterServerManager.start()
initialRpcHooks();
consumerOffsetManager.persist()
reputMessageService.start()
initialTransaction();
initialAcl();
storeStatsService.start()
new DLedgerRpcNettyService
new MessageSerializer
信息存储topic路由broker节点
dLedgerEntryPusher.startup()
this.remotingServer.start(); 注册一些请求事件PUSH、VOTE等this.remotingClient.start();
->6. DefaultMQProducerImpl.start(false)
读取writeRequestMap内容数据,key=getLedgerEndIndex() + 1;
COMMIT
dLedgerStore.appendAsLeader(dLedgerEntry) 1)写入MmapFile, 2)计算DLedgerEntry,key=ledgerEndIndex + 1 3)updateLedgerEndIndexAndTerm()
dLedgerRpcService.startup()
new BrokerStats
dLedgerServer.handleAppend(request)
new DLedgerMmapFileStore
DefaultMQProducerImpl.start()1. checkConfig(),检查topic;2. MQClientInstance构造;3. producerTable缓存;4. topicPublishInfoTable缓存.5. MQClientInstance.start()
dLedgerLeaderElector.startup()
路由发送者消费者
->2. MQClientAPIImpl1. 初始化bootstrap2. NettyConnectManageHandler连接管理3. NettyClientHandler处理server端推送的请求、处理server端的响应并执行Callback4. 扫描超时请求并处理;
METADATA(命令模式使用)
new MessageStorePluginContext
APPEND
commitLog.start()
QuorumAckChecker.start()
asyncPutMessage(MessageExtBrokerInner msg)
new DLedgerLeaderElector
stateMaintainer.start()
new DLedgerConfig()
dLedgerStore.startup()
VOTE
pullRequestHoldService.start()
HEART_BEAT
返回MemberState信息
LEADERSHIP_TRANSFER
0 条评论
下一页