rocketmq源码
2025-07-20 09:47:19 0 举报
rocketmq源码
作者其他创作
大纲/内容
为了重试的时候,发往不同的broker
在linux用命令启动Namesrv的时候,会调用NamesrvStartup类的main方法
this.mqClientFactory.doRebalance();
private String configStorePath = System.getProperty(\"user.home\") + File.separator + \"namesrv\" + File.separator + \"namesrv.properties\";
还有很多注册处理器的
this.remotingClient.start();
impl.pullMessage(pullRequest);
是
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
this.pullMessage(pullRequest);
this.rebalanceService.start();
循环内
producer.setNamesrvAddr(\"127.0.0.1:9876\");
ConsumeMessageConcurrentlyService
平均分配策略:假设有5个队列,消费者组里有3个消费者。1、2队列给1消费者;3、4队列给2消费者;5队列给3消费者。
pullCallback.onSuccess(pullResult);
onSuccess
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
Namesrv启动过程
RemotingCommand response = this.font color=\"#e855a4\
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
顺序消费
NamesrvController controller = createNamesrvController(args);
这里可以看到生产者是轮询发给不同队列,生产者的负载均衡策略是轮询
一堆定时任务
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
this.font color=\"#ed77b6\
这里面会移除brokerAddrTable里的,BrokerData里的brokerAddrs,brokerAddrs存的是broker的主从节点。如果brokerAddrs移除后为空,那么brokerAddrTable对应的brokername会被移除
AllocateMessageQueueAveragely
定时任务,扫描不活跃的broker
构建RemotingCommand,然后给队列发消息
int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos);
for (boolean continueConsume = true; continueConsume; )
PullCallback pullCallback = new PullCallback() { public void onSuccess(PullResult pullResult{} public void onException(Throwable e){}};
主要看run
this.mQClientAPIImpl = new MQClientAPIImpl(this.font color=\"#e855a4\
mq = mqSelected;
synchronized (objLock) {}
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
如果配置文件没有配置namesrv的地址,可以去系统参数里或者环境变量里获取到
if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(font color=\"#ed77b6\
this.mQClientAPIImpl.start();
namesrv默认端口9876
if (commandLine.hasOption('c'))
这里可能暂时看不懂,得先看后面的代码
PullRequest pullRequest = this.pullRequestQueue.take();
循环平均分配策略:假设有5个队列,消费者组里有3个消费者。1队列给1消费者;2队列给2消费者;3队列给3消费者;4队列给消费者1,5队列给消费者2
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.font color=\"#e855a4\
boolean initResult = controller.initialize();
brokerAddrTable就是namesrv维护的所有broker,broker注册到namesrv,都放在这个集合里,生产者和消费者获取所有broker,就从这个集合获取。
存储broker配置文件里,存储路径相关的
this.remotingServer.registerProcessor(RequestCode.font color=\"#ed77b6\
NettyRequestProcessor processor = pair.getObject1();
MessageQueue mq = null;
netty的API,重点看pipleline里的对象
启动broker的时候,通过-c指定配置文件
根据策略分配队列给消费者实例
和正常发送的区别就是根据我们自己定义的selector找到mq
this.brokerOuterAPI.registerBrokerAll(...)
重试次数
AllocateMessageQueueAveragelyByCircle
#存储路径storePathRootDir=/app/rocketmq/storestorePathCommitLog=/app/rocketmq/store/commitlogstorePathConsumeQueue=/app/rocketmq/store/consumequeuestorePathIndex=/app/rocketmq/store/indexstoreCheckpoint=/app/rocketmq/store/checkpointabortFile=/app/rocketmq/store/abort
public class TopicPublishInfo { private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();}
start(createBrokerController(args))
this.namesrvController.getRouteInfoManager().registerBroker(...)
并发消费
mQClientFactory.registerProducer(this.font color=\"#e855a4\
producer
callback.callback(response);
mQClientFactory.start();
final NamesrvConfig namesrvConfig = new NamesrvConfig();
createBrokerController(args)
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis())
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
impl.doRebalance();
start(createBrokerController(args));
controller.start();
DefaultMQProducer producer = new DefaultMQProducer(\"ProducerGroupName\");
启动一堆定时任务
这里就解释了,brokerName配置一样的,就是主从节点,主从节点会放在一个集合里。其实最终broker节点是放在BrokerAddrs里
ConsumeMessageOrderlyService
if (null == topicPublishInfo || !topicPublishInfo.ok())
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
if (lastBrokerName == null)
this.pullMessageService.start();
消费消息
consumer
RemotingUtil.closeChannel(next.getValue().getChannel());
return controller
在linux用命令启动broker的时候,会调用BrokerStartup类的main方法
String lastBrokerName = null == mq ? null : mq.getBrokerName();
获取消费组下所有消费者
this.remotingServer.start();
for (; times < timesTotal; times++)
this.remotingServer = new NettyRemotingServer(this.font color=\"#ed77b6\
String file = commandLine.getOptionValue('c'); if (file != null) { configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); properties2SystemEnv(properties); MixAll.properties2Object(font color=\"#ed77b6\
this.defaultMQPushConsumerImpl.start();
加锁来实现顺序消费
从生产者本地缓存拿,TopicPublishInfo内含有队列信息
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
long last = next.getValue().getLastUpdateTimestamp();
nettyServerConfig.setListenPort(10911);
本地没有,就从namesrv拿
this.remotingClient.registerProcessor(RequestCode.font color=\"#e855a4\
否
再次发请求,拿消息
this.consumeExecutor.submit(consumeRequest);
controller.getConfiguration().registerConfig(properties);
获取topic下所有队列
continueConsume = false;
this.font color=\"#e855a4\
consumer.subscribe(font color=\"#ed77b6\
List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
String file = commandLine.getOptionValue('c');
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
case FOUND:
否,重试的时候才会走这边
return tpInfo.selectOneMessageQueue(lastBrokerName);
拿到所有的namesrv
start(controller);
serverHandler
NameSrv接收请求
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
if (null == brokerData)
SendResult sendResult = producer.send(msg);
this.defaultMQProducerImpl.start();
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.font color=\"#ed77b6\
this.registerProcessor();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.font color=\"#e855a4\
PullMessageService.run()
this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; this.kvConfigManager = new KVConfigManager(this); this.routeInfoManager = new RouteInfoManager(); this.brokerHousekeepingService = new BrokerHousekeepingService(this); this.font color=\"#ed77b6\
it.remove();
发请求,看看队列里有没有消息
这个循环说明消费完队列所有消息才放开锁
命令行启动namesrv的时候,同时输入-c参数指定自己的配置文件,配置文件可以修改namesrv端口等
RebalanceService.run()
broker启动过程
nettyServerConfig.setListenPort(9876);
for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue();
this.brokerOuterAPI.start();
用线程池执行
for (final String namesrvAddr : nameServerAddressList)
private String namesrvAddr = System.getProperty(MixAll.font color=\"#ed77b6\
pair.getObject2().submit(requestTask);
font color=\"#e855a4\
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
如果采用顺序发送
也可以不指定-c,在如下路径创建namesrv.properties文件,写自己的配置
if (!msgs.isEmpty())
return selectOneMessageQueue();
final BrokerConfig brokerConfig = new BrokerConfig();
处理心跳的,nettyServerConfig.getServerChannelMaxIdleTimeSeconds()时间内没收到读写请求,长连接就断开
consumer.start();
Netty的API,与namesrv建立连接,主要关注pipeline里的
这里的顺序消费是对于一个消费者而言的,一个消费者有多个线程去消费消息,这里加锁,是保证只有一个线程去消费当前队列的消息,等消费完了这个队列所有消息才放开锁让其他线程可以消费这个队列。如果多台机器里的消费者消费同一个队列,RocketMQ是没保证顺序的。
producer.start();
获取到上次发送的broker,这里其实为了重试的时候,发往不同的broker
一堆线程池
return controller;
说明队列里有消息
this.startScheduledTask();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
while (it.hasNext())
this.pullAPIWrapper.pullKernelImpl(...)
this.defaultMQProducer = defaultMQProducer;
ConsumeRequest.run()
一大堆,根据客户端的请求类型,绑定对应的处理器
font color=\"#ed77b6\
DefaultRequestProcessor

收藏

收藏
0 条评论
下一页