rocketmq源码(四)——Producer启动流程
2024-09-10 11:26:18 1 举报
"rocketmq源码(四)——Producer启动流程"这篇文章深入分析了RocketMQ中Producer的启动过程。文章首先介绍了Producer启动的主要步骤,包括配置加载、生产者组初始化、生产者实例创建等。接着,文章详细分析了生产者组初始化和生产者实例创建的具体实现,包括各种参数配置、主题路由信息的获取、生产者状态的维护等。最后,文章总结了Producer启动过程中的关键知识点,包括生产者组、生产者实例、主题路由信息等。
作者其他创作
大纲/内容
定时从NameSrv获取Topic信息updateTopicRouteInfoFromNameServer()
从 RocketMQ 服务器拉取消息并分发给相应的消费者进行处理pullMessageService
cluster
netty客服端配置nettyClientConfig
将
将TopicRouteData转换为TopicPublishInfo
放入MQProducerInner中
DefaultMQProducer
RPCHook
new MQClientInstance
brokerName
producer.start();
启动netty客服端this.mQClientAPIImpl.start();
定时任务
this.start(true);
asyncSenderThreadPoolQueue
创建MQClientInstance
放入空TopicPublishInfo到topicPublishInfoTable
new
消费者组内进行负载均衡rebalanceService
向所有Broker发送心跳信息并上传过滤器sendHeartbeatToAllBrokerWithLock
TopicPublishInfo
queueDatas
defaultAsyncSenderExecutor
客服端idclientId
brokerDatas
readQueueNums
brokerAddrs
getOrCreateMQClientInstance
TopicRouteData Topic路由信息
perm
RocketMQ 客户端的核心组件,负责管理客户端与服务器之间的交互,包括消息的发送、接收、消费者和生产者的管理、配置管理、网络通信和状态维护等功能
实现消息生产者的功能defaultMQProducer
启动MQClientInstancemQClientFactory.start();
Topic下面的写队列messageQueueList
从producerTable取出topicKey集合
创建MQClientInstance完成后向producerTable注册producer
topicRouteData
MQClientInstance:
提供管理操作的接口。它封装了各种管理功能,如创建主题、删除主题、查询主题信息、查询消费者组信息等mQAdminImpl
清理离线BrokercleanOfflineBroker
startScheduledTask()
设置Namesrv地址,用来拉起取broker信息setNamesrvAddr
负责处理客户端与服务器之间的远程通信请求clientRemotingProcessor
启动定时任务this.startScheduledTask();
从namesrv更新或者拉取Topic信息updateTopicRouteInfoFromNameServer(topic);
负责管理和维护消费者的状态和统计信息consumerStatsManager
defaultMQProducer
提供了各种 API 供上层调用,如消息发送、消息拉取、消费者和生产者的注册等。mQClientAPIImpl
writeQueueNums
客服端配置clientConfig
this.defaultMQProducerImpl.start();
remotingClient.start();
实例唯一递增idinstanceIndex
0 条评论
回复 删除
下一页