RocketMQ源码以及关键流程分析
2024-03-04 21:46:35 1 举报
AI智能生成
登录查看完整内容
"RocketMQ是一款开源的消息中间件,支持高并发、高可用、高可靠、低延迟等特性。源码分析主要关注关键流程,如消息的生产、存储、消费、事务等。消息的生产过程包括生产者将消息发送到Broker,Broker存储消息,Consumer从Broker获取消息。存储环节涉及到消息的分发策略、消息刷盘、索引文件构建等。消费环节包括Push和Pull模型,Pull模型中Consumer主动拉取消息。事务消息涉及到本地事务和分布式事务的处理。RocketMQ提供了多种配置和优化策略,以满足不同场景的需求。"
作者其他创作
大纲/内容
void load():加载位点信息
void updateOffset():更新缓存位点信息
long readOffset():读取本地位点信息
void persistAll():持久化全部队列的位点信息
void persist():持久化某一个队列的位点信息
void remove():删除某一个队列的位点信息
void updateConsumeOffsetToBroker():将本地消费位点持久化到Broker中
OffsetStore接口核心方法
消费者启动时会同时启动位点管理器,RocketMQ设计了远程位点管理和本地位点管理两种位点管理方式.集群消费时,位点由客户端提交给Broker保存.广播消费时,位点保存在消费者本地磁盘上
以上Broker处理代码中有3个核心变量.hasCommitOffsetFlag:Pull请求中的sysFlag参数是决定Broker是否执行持久化消费位点的一个因素.brokerAllowSuspend:Broker是否能挂起。如果Broker是挂起状态,将不能持久化位点。storeOffsetEnable:true表示Broker需要持久化消费位点,false则不用持久化位点
实现过程是从Rebalance服务中获取全部消费的队列信息,再调用persistAll()方法持久化全部队列的位点信息
3.还有一种持久化位点的机制,那就是消费者在关闭时持久化位点信息,以Push消费者程序关闭为例,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#shutdown(long)
理论上位点信息越是及时上报Broker,越能减少消息重复的可能性,RocketMQ再设计时并不完全支持Exactly-Once类型的幂等语义,因为实现该语义的代价颇大,并且使用该场景极少,再加上用户侧实现幂等的代价更小,故而RocketMQ在设计时将幂等操作交由用户处理
客户端消费进度保存也叫消费进度持久化,RocketMQ4.2.0支持定时持久化和不定时持久化两种方式
消费进度保存机制
客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡的,以支持全部队列的正常消费的?
Rebalance服务的类图
AllocateMessageQueueStrategy allocateMessageQueueStrategy:MessageQUeue消息分配策略的实现
MQClientInstance mQClientFactory:client实例对象
RebalanceImpl的核心属性
boolean lock():为MessageQueue加锁
void doRebalance():执行Rebalance操作
void messageQueueChanged():通知Message发生变化,这个方法在Push和Pull两个类中被重写
boolean removeUnnecessaryMessageQueue():去掉不再需要的MessageQueue
void dispatchPullRequest():执行消息拉取请求
boolean updateProcessQueueTableInRebalance():在Rebalance中更新processQueue
RebalanceImpl的核心方法
消费者实例在收到Broker通知后是怎么执行Reblance的?这个操作是通过调用MQClientInstance.rebalanceImmediately()来实现的
这种设计是RocketMQ种典型的锁方式,执行wakeup命令后,this.waitForRunning()就会暂停,再执行this.mqClientFactory.doRebalance()
2.判断Rebalance开关,如果没有被暂停,则调用RebalancePushImpl.rebalance()方法
1.获取当前Topic的全部MessageQueue(代码中是mqSet)和该Topic的所有消费者的clientId(代码中是cidAll)只有当两者都不为空时,才执行Rebalance
2.将全部的MessageQueue(代码中时mqAll)和消费者客户端(cidAll)进行排序。由于不是所有消费者的客户端都能彼此通信,所以将mqAll和cidAll排序的目的在于,保证所有消费者客户端在做Rebalance的时候,看到的MessageQueue列表和消费者客户端都是一样的试图,做Rebalance时才不会分配错
allocate():执行队列分配操作,该方法必须满足全部队列都能分配到消费者
getName():获取当前分配算法的名字
目前队列分配策略有五种实现:AllocateMessageQueueAveragely:平均分配,也就是默认使用的策略(强烈推荐)AllocateMessageQueueAveragelyByCircle:环形分配策略AllocateMessageQueueByConfig:手动配置AllocateMessageQueueConsistentHash:一致性Hash分配AllocateMessageQueueByMachineRoom:机房分配策略
3.按照当前设置的队列分配策略执行Queue分配。队列分配策略接口AllocateMessageQueueStrategy,该接口中,有两个方法allocate()和getName()
PullRequest初始化的具体实现,新增的PullRequest对象将被分配出去拉取MessageQueue中的消息。
5.执行messageQueueChanged()方法,如果有MessageQueue订阅关系发生变化,则更新本地订阅关系版本,修改本地消费者有限流的一些参数,然后发送心跳,通知所有Broker,当前订阅关系发生了改变
4.Topic队列重新分配,这里也就是客户端Rebalance的核心逻辑之处,根据是集群消费还是广播消费分别执行MessageQueue重新分配的逻辑,以集群消费为例分析
Rebalance过程
消费者的Rebalance机制
RocketMQ设计了消息过滤,来解决大量无意义流量的传输:即对于客户端不需要的消息,Broker就不会传输给客户端,以免浪费宽带,RocketMQ4.2.0支持Tag过滤、SQL92过滤、Filter Server过滤
第一步:用户发送一个带Tag的消息
第三步:在Broker端做Tag过滤。消费者在Pull消息时,RocketMQ Broker会根据Tag的HasCode进行对比,如果不满足条件,消息不会返回给消费者,以节约带宽也许你们会问,为什么不直接用字符串进行对比和过滤呢?原因是HashCode对比存在Hash碰撞而导致过滤失败,字符串比较的速度相较HashCode慢。HashCode对比是数字比较,Java底层可以直接通过位运算进行对比,而字符串对比需要按照字符顺序比较,相比位运算更加耗时。由于HashCode对比有Hash碰撞的危险,所以才引出第四步
第四步:客户端Tag过滤。Hash碰撞相信大家都有所了解,就是不同的Tag计算出来的Hash值可能是一样的,在这种情况下过滤的消息是错误的,所以RocketMQ设计了客户端字符串对比功能,用来做第二次Tag过滤
Tag过滤为什么设计成Broker端使用Hash过滤,而客户端使用Tag字符串进行对比过滤呢?Broker端使用Hash过滤可以快速过滤海量消息,即使偶尔有\"漏网之鱼\",在客户端字符串过滤后也能被成功过滤。这种层次设计 的过滤方式在做系统时可以参考
Tag过滤流程
第二步:消费者Pull消息
第一次过滤:使用Bloom过滤器的isHit()方法做第一次过滤。Bloom过滤器效率高,但是也存在缺陷,即只能判断不需要的消息,过滤后的消息也不保证都是需要消费的。
第二次过滤:执行编译后的SQL方法evaluate()即可过滤出最终的结果在使用SQL过滤前,需要在启动Broker时配置如下几个参数:enableConsumeQueueExt=truefilterSupportRetry=trueenablePropertyFilter=trueenableCalcFilterBitMap=true
SQL过滤流程
这是一种不常用但是非常灵活的过滤方式,要使用Filter Server过滤必须在启动Broker时,添加如下配置:filterServerNums=大于0的数字.这样就可以启动一个或多个过滤服务器,每个过滤服务在启动时会自动注册到Namesrv中
第一步:用户消费者从Namesrv获取Topic路由信息,同时上传自定义的过滤器实现类源代码到FilterServer中,FilterServer编译并实例化过滤器类
第二步:用户发送拉取消息请求到FilterServer,FilterServer通过Pull consumer从Broker拉取消息,执行过滤类中的过滤方法,返回过滤后的消息
FilterServer过滤流程
消息过滤
transactionListener:事务监听器,主要功能是执行本地事务和执行事务回查。事务监听器包含executeLocalTransaction()和checkLocalTransaction()两个方法。executeLocalTransaction()方法执行本地事务,checkLocalTransaction()方法是当生产者由于各种问题导致未发送Commit或Rollback消息给Broker时,Broker回调生产者查询本地事务专改的处理方法
executorService:Broker回查请求处理的线程池
事务消息的环境初始化主要用于初始化Broker回查请求处理的线程池,在初始化事务消息生产者时我们可以指定初始化对象,如果不指定初始化对象,那么这里会初始化一个单线程的线程池
start():事务消息生产者启动方法,与普通启动方法不同,增加了this.defaultMQProducerImpl.initTransactionEnv()的调用,即增加了初始化事务消息的环境信息
TransactionMQProducer的核心属性和方法:
第一步,数据校验,判断TransactionListener的值是否为null、消息Topic为空检查、消息体为空检查等
第二步:消息预处理。预处理的主要功能是在消息扩展字段中设置消息类型。MessageConst.PROPERTY_TRANSACTION_PREPARED表示当前消息是事务Half消息。MessageConst.PROPERTY_PRODUCER_GROUP用于设置发送消息的生产者组名,以及设置事务消息的扩展字段
第三步:发送事务消息,调用同步发送消息的方法将事务消息发送出去
发送Half消息的过程。事务消息的发送是通过sendMessageInTransaction()方法来完成的
brokerAddr:存储当前Half消息的Broker服务器的socket地址
localTransactionState:本地事务执行结果
transactionId:事务消息的事务id
endTranactionOneway():以发送oneway消息的方式发送该RPC请求给Broker.
发送Commit或Rollback消息在本地事务处理完成后,根据本地事务的执行结果调用DefaultMQProducerImpl.endTransaction()方法通知Broker进行Commit或Rollback当前Half消息发送完成后,会返回生产者消息发送到哪个Broker、消息位点是多少、再根据本地事务的执行结果封装EndTransactionRequestHeader对象,最后调用MQClientAPIimpl.endTransactionOneway()方法通知Broker进行Commit或Rollback
生产者发送事务消息主要分为如下两个阶段:1.发送Half消息的过程2.发送Commit或Rollback消息
prepareMessage():用于保存Half事务消息,用户可以对其进行Commit或Rollback
deletePrepareMessage():用于删除事务消息,一般用于Broker回查失败的Half消息。
commitMessage():用于提交事务消息,使消费者可以正常地消费事务消息
rollbackMessage():用于回滚事务消息,回滚后消费者将不能够消费该消息。通常用于生产者主动进行Rollback时,以及Broker回查的生产者本地事务失败时
open():用于打开事务服务
close():用于关闭事务服务
1.TransactionalMessageService.事务消息主要用于处理服务,默认实现类是TransactionalMessageServiceImpl.如果想自定义事务消息处理实现类,需要实现TransactionMessageService接口,然后通过ServiceProvider.loadClass()方法进行加载。TransactionalMessageService接口的基本操作定义如下
2.transactionMessageCheckListener.事务消息回查监听器,默认实现类是DefaultTransactionalMessageCheckListener.如果想自定义回查监听处理,需要继承AbstractTransactionalMessageCheckListener接口,然后通过ServiceProvider.loadClass()方法被加载
3.transactionalMessageCheckService.事务消息回查服务是一个线程服务,定时调用transactionalMessageService.check()方法,检查超时的Half消息是否需要回查
第一个单独处理,sendMessage()这里获取消息中的扩展字段MessageConst.PROPERTY_TRANSACTION_PREPARED的值,如果该值为True则返回当前消息是事务消息;再判断当前Broker的配置是否支持事务消息,如果不支持就返回生产者不支持事务消息的信息;如果支持,则调用TransactionalMessageService#prepareMessage()方法保存Half消息
Prepared消息其实就是Half消息,其实现逻辑是,设置当前Half消息的queueOffset值为0,而不是其真实的位点值。这样该位点就不会建立ConsumeQueue索引,自然也不能被消费者消费
第二个单独处理:存储前事务消息预处理,处理方法是TransactionalMessageBridge.praseHalfMessageInner()
上面三个事务处理类完成初始化后,Broker就可以处理事务消息了。Broker存储事务消息和普通消息都是通过SendMessageProcessor类进行处理的,只是在存储消息时有两处事务消息需要单独处理。
Broker存储事务消息。在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。3个核心的初始化变量
timeout:事务消息超时时间,如果消息在这个事件内没有进行Commit或Rollback,则执行第一次回查,默认6000ms
事务消息的最大回查次数默认15次
RMQ_SYS_TRANS_OP_HALF_TOPIC:也叫OP主题,当事务消息被Commit或Rollback后,会将原始事务消息的offset保存在该OP主题中
RMQ_SYS_TRANS_HALF_TOPIC和RMQ_SYS_TRANS_OP_HALF_TOPIC配合流程。首先,取出RMQ_SYS_TRANS_HALF_TOPIC中达到回查条件但没有回查过的消息,到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中确认是否已经会回查,如果没有回查过则发起回查操作。
check()
填充removeMap的过程
第二步:检查是否有消息需要回查。如果从RMQ_SYS_TRANS_HALF_TOPIC主题中获取Half消息为空的次数超过允许的最大次数或者没有消息,那么表示目前没有需要再回查的消息了,可以结束本次回查过程,当然如果传入的位点是非法的,则继续下一个回查的位点。代码中的核心参数:msgExt:Half消息对象getMessageNullCount:当前空消息的次数MAX_RETRY_COUNT_WHEN_HALF_NULL:表示可以允许的最大Half消息为空的次数,超过则结束回查,默认为1次,并且不能配置。messageQueue:RMQ_SYS_TRANS_HALF_TOPIC主题中正在被检查的队列如果RMQ_SYS_TRANS_HALF_TOPIC中已经没有待回查的消息,则立即终止当前的回查过程
第三步,回查次数校验,消息是否过期校验。如果Half消息回查次数已经超过了允许的最大回查次数,则不再回查,实现该校验的方法是TransactionMessageServiceImpl.needDisard();如果Half消息对应的CommitLog已经过期,那么也不回查,该校验实现的方法是TransactionalMessageServiceImpl.needSkip()
第五步:最终判断是否需要回查生产者本地事务执行结果满足图中条件之一就可以进行回查:1.如果没有OP消息,并且当前Half消息在免疫期外2.当前Half消息存在OP消息,并且最后一个本批次OP消息中的最后一个消息在免疫期外,也就是满足回查时间3.Broker与客户端有时间差4.重新将当前Half消息存储在RMQ_SYS_TRANS_HALF_TOPIC主题中,原因是回查是一个异步过程,Broker不确定回查的结果是成功还是失败,所以RocketMQ做最坏的打算,如果回查失败则下次继续回查;如果本地回查成功则写入OP消息,下次再读取Half消息时也不会回查
第六步:执行回查。在当前批次的Half消息回查完毕后,更新Half主题和OP主题的消费位点,推进回查进度。Broker将回查消息通过回查线程池异步地发送给生产者,执行事务结果回查
然后具体分析会回查方法TransactinalMessageServiceImpl.check()的实现过程。获取RMQ_SYS_TRANS_HALF_TOPIC主题的全部队列,依次循环每一个队列中的全部未消费的消息,确认是否需要回查。对于每一条消息又是如何确认是否需要回查的呢?具体逻辑在TransactionalMessageServiceImpl#check()方法中的while(true)代码中
发送Half事务消息、执行Commit/Rollback命令、事务回查过程简图
checkMax:最大回查次数,如果回查超过这个次数,事务消息将被忽略。回查的实现逻辑是每间隔一定时间执行TransactionalMessageServiceImpl#check()方法,判断哪些消息超时,对超时的消息开始执行回查
TransactionalMessageCheckService是一个线程服务,它在后台一直执行run()方法,run()方法一直调用waitForRunning()方法。关于waitForRunning()方法,这是RocketMQ的Broker中典型的\"sleep\"实现方式。该方式可以大致理解为\"休息\"一段时间再执行onWaitEnd()方法,而TransactionalMessageCheckService服务重写了onWaitEnd()方法.接下来分析下代码中的核心变量。
第一步,End_Transaction请求校验,主要检查项如下1.Broker角色检查。Slave Broker不处理事务消息2.事务消息类型检查。EndTransactionProcessor只处理Commit或Rollback类型的事务消息,其余消息不处理,这里区分了事务回查
commitMessage():提交Half消息/这是事务消息服务接口中的一个方法,根据消息位点查询了Half消息,并将Half消息返回
checkPrepareMessage():Half消息数据校验。校验内容包括发送消息的生产者组与当前执行Commit/Rollback的生产者是否一致,当前Half消息是否与请求Commit/Rollback的消息是否是同一条消息
endMessageTransaction():消息对象类型转化,将MessageExt对象转化为MessageExtBrokerInner对象,并且还原消息之前的Topic和ConsumeQueue等信息
sendFinalMessage():将还原后的事务消息最终发送到CommitLog中,一旦发送成功,消费者就可以正常拉取消息并消费
如果消息被标记为已删除,则调用addRemoveTagInTransactionOp()方法,利用标记为已删除的OP消息构造Message消息对象,并且调用存储方法保存消息
TransactionalMessageUtil.buildOpTopic()方法跟保存Half消息时的逻辑类似
Half消息保存在名为MixAll.RMQ_SYS_TRANS_HALF_TOPIC的Topic中,执行Commit和Rollback后的消息都保存在MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC对象中,以便Broker判断是否需要回查生产者事务的执行状态
调用存储层方法,真正地将OP消息保存到了CommitLog中
deletePrepareMessage():在sendFinalMessage()执行成功后,删除Half消息。其实RocketMQ是不能真正删除消息的,其实质是顺序写磁盘,相当于做了一个\"假删除\"。\"假删除\"通过putOpMessage()方法将消息保存到TransactionMessageUtil.buildOpTopic()的Topic中,并且做上标记TransactionalMessageUtil.REMOVETAG,表示消息已删除
第二步,进行Commit或Rollback。根据生产者请求头中的参数判断,是Commit请求还是Rollback请求,然后分别进行处理
rollbackMessage():该方法与CommitMessage()方法一样,都是查询Half消息并返回消息对象。
checkPrepareMessage():消息校验,与Commit调用的是同一个方法
deletePrepareMessage():删除Half消息,与Commit调用的是同一个方法
Rollback实现逻辑。Rollback并没有真正删除消息,而是标记Half消息为删除,在Broker回查时机会跳过不检查
事务消息机制。事务消息的发送和处理总结为四个过程:1.生产者发送事务消息和执行本地事务2.Broker存储事务消息3.Broker回查事务消息4.Broker提交或回滚事务消息
事务消息
RocketMQ客户端中有两个独立的消费者实现类分别为DefaultMQPullConsumer和DefaultMQPushConsumer
DefualtMQPullConsumer的继承关系
instanceName:实例名,顾名思义每个实例都需要取不一样的名字,加入要在多个机器上部署多个程序进程,那么每个进程的实例名必须不相同,否则程序会启动失败,因为在创建MQClient时,会用到IP和instancename名称来
vipChannelEnabled:这是一个boolean值,表示是否开启VIP通道。VIP通道和非VIP通道的区别是使用不同的端口号进行通信
clientCallbackExecutorThreads:客户端回调线程数。该线程数等于Netty通信层回调线程的个数,默认值为Runtime.getRuntime().availableProcessors();表示当前有效的CPU个数
pollNameServerInterval:获取Topic路由信息间隔,单位为ms,默认为30000ms(30s)
defaultMQPullConsumer:默认pull消费者的具体实现
consumerGroup:消费者组名字
brokerSuspendMaxTimeMills:在长轮询模式下,Broker的最大挂起请求时间,建议不要修改此值
consumerTimeoutMillsWhenSuspend:在长轮询模式下,消费者的最大请求超时时间,必须比brokerSuspendMaxTimeMills大,不建议修改
messageModel:消费模式,现在支持集群模式消费和广播模式消费
messageQueueListener:消息路由信息变化时回调处理监听器,一般在重新平衡时被调用
offsetStore:位点存储模块。集群模式位点会持久化到Broker中,广播模式持久化到本地文件中(某个实例消费失败,生产者也不会重发),位点存储模块有两个实现类,分别为RemoteBrokerOffsetStore和LocalFileOffsetStore
allocateMessageQUeueStrategy:消费Queue分配策略管理器,默认是平均分配策略private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
maxReconsumeTimes:最大重试次数,可以配置
核心属性
registerMessageQueueListener():注册队列变化监听器,当队列发生变化是会被监听到
pull():从Broker中Pull消息,如果有PullCallback参数,则表示异步拉取
pullBlockIfNotFound():长轮询方式拉取,如果没有拉取到消息,那么Broker会讲请求Hold住一段时间,当有消息来临时再发送pull请求
updateConsumeOffset():更新某一个Queue的消费位点
fetchConsumeOffset():查找某个Queue的消费位点
sendMessageBack():如果消费发送失败,则可以讲消息重新发回Broker,这个消费者组延迟一段时间后可以再消费(也就是重试)
fetchSubscribeMessageQueues():获取一个Topic的全部Queue信息
核心方法
1.最初创建defaultMQPullConsumerImpl时的状态为ServiceState.CREATE_JSUT然后设置消费者的默认启动状态为失败
2.检查消费者的配置比,如消费者组名、消费类型、Queue的分配策略等参数是否符合规范,将订阅关系数据发给Rebalance服务对象
3.校验消费者实例名,如果时默认的名字,则更改为当前的程序进程id
5.设置Rebalance对象消费组、消费类型、Queue分配策略、MQClientInstance等参数
6.对BrokerAPI的封装类pullAPIWrapper进行初始化,同时注册消息,过滤filter
7.初始化位点管理器并加载位点信息,位点管理器分为本地管理和远程管理,集群消费时消费位点保存在Broker中,由远程管理器管理,广播消息时位点在本地,由本地管理其管理
8.本地注册消费者实例,如果注册成功,则表示消费者启动成功
Pull消费启动流程
defaultMQPushConsumerImpl:默认的Push消费者具体实现类
consumeFromWhere:一个枚举,表示从什么位点开始消费,CONSUME_FROM_LAST_OFFSET:默认从上次消费的位点开始消费,相当于断点继续CONSUME_FROM_TIMESTAMP:从指定时间开始消费CONSUME_FROM_FIRST_OFFSET:从ConsumeQueue的最小位点开始消费
allocateMessageQueueStrategy:消费者订阅topic-queue策略
subscription:订阅关系,表示当前消费者订阅了哪些Topic的哪些Tag
messageListener:消息Push回调监听器
consumeThreadMin:最小消费线程数,必须小于consumeThreadMaxconsumeThreadMax:最大线程数,必须大于consumeThreadMin
adjustThreadPoolNumsThreshold:动态调整消费线程池的线程数大小,开源版本不支持
consumeConcurrentlyMaxSpan:并发消息的最大位点差,,如果Pull消息的位点差超过该值,拉取变慢
pullThresholdForQueue:一个Queue能缓存的最大消息数,超过该值则采取拉取流控措施,默认是1000
pullThresholdSizeForQueue:一个Queue最大能缓存的消息字节数,单位是MB,默认是10MB
pullThresholdForTopic:一个Topic最大能缓存的消息数。超过该值则采取拉取流控措施,该字段值默认是-1,该值根据pullThresholdForQueue的配置决定是否生效,pullThresholdForTopic的优先级低于pullThresholdForQueue
pullInterval:拉取间隔,单位为ms
consumeMessageBatchMaxSize:消费者每次批量消费时,最多消费多少条消息,默认是1
pullBatchSize:一次最大拉取多少条消息,默认32条
postSubscriptionWhenPull:每次拉取消息时是否更新订阅关系,默认false
maxReconsumeTimes:最大重试次数,默认-1,表示最大重试次数为16次
suspendCurrentQueueTimeMillis为段轮询场景设置的挂起时间,比如顺序消息场景
核心属性和方法
1-7和Pull模式类似
8.初始化消费服务并启动,之所以用户\"感觉\"消息是Broker主动推送给自己的,是因为DefaultMQPushConsumer通过Pull服务将消息拉取到本地,再通过Callbakc的形式,将本地消息Push给用户的消费代码,DefaultMQPushConsumer和DefaultMQPullConsumer获取消息的方式一样,本质上都是拉取。消费服务分为两种,即并行消费服务和顺序消费服务,对应的实现类分别是ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService根据用户监听器继承的不同接口初始化不同的消费服务程序
9.启动MQClientInstance实例
10.更新本地订阅关系和路由信息,通过Broker检查是否支持消费者的过滤类型;向集群中的所有Broker发送消费者组的心跳信息
11.立即执行一次Rebalancethis.mQClientFactory.rebalanceImmediately();
Push消费启动流程
Consumer启动机制
1.fetchSubscribeMessageQueues(String topic).拉取全部可以消费的Queue.如果某一个Broker下线,这里也可以实时感知到
2.遍历全部Queue,拉取每个Queue可以消费的消息
3.如果拉取到消息,则执行用户编写的消费代码
Pull消费流程
Pull方式。用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求,再RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPullConsume是默认的Pull消费者实现类
启动PullMessageService的拉取服务
1.PullMessageService不断拉取消息。pullRequestQueue中保存着待拉取地Topic和Queue消息,程序不断从pullRequestQueue中获取PullRequest并执行拉取消息方法
1.基本校验。校验ProcessQueue是否dropped;校验消费者服务状态是否正常;校验消费者是否被挂起。在Rebalance时,org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance方法在运行时设置ProcessQueue.setDropped(true)的逻辑,,设置成功后,在执行拉取消息时,将不再拉取dropped为true的ProcessQueue
2.拉取条数、字节数限制检查。如果本地缓存消息数量大于配置的最大拉取条数(默认1000,可以调整),则延迟一段时间再拉取;如果本地缓存消息字节数大于配置的最大缓存字节数,则延迟i短时间再拉取,这两种校验方式都相当于本地流控
本地缓存队列的Span如果大于配置的最大差值(默认2000,可以调整),则认为本地消费过慢,需要执行本地流控
队列锁定
1.订阅关系校验。如果待拉取的Topic在本地缓存中订阅关系为空,则本地拉取不执行,待下一个正常心跳或者Rebalance后订阅关系恢复正常,方可正常拉取
start()方法和shudown()方法分别在启动和关闭服务时使用
updateCorePoolSize():更新消费线程池的核心线程数
incCorePoolSize():增加一个消费线程池的核心线程数
decCorePoolSize():减少一个消费者线程池的核心线程数
getCorePoolSize():获取消费线程池的核心线程数
consumeMessageDirectly():如果一个消息已经被消费过了,但是还项再消费一次,就需要实现这个方法
submitConsumeRequest():将消息封装成线程池任务,提交给消费服务,消费服务再将消息传递给业务消费进行处理
ConsumeMessageConcurrentlyService
ConsumeMessageOrderlyService
1.ConsumeMessageService消息消费分发。ConsumeMessageService服务通过DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest接口接收消息消费任务后,将消息按照固定条数封装成多个ConsumeRequest任务对象,并发送到消费线程池,等待分发给业务消费;ConsumeMessageOrderlyService先将Pull的全部消息放在一个本地队列中然后提交一个ConsumeRequest到消费者线程池
第一步:消费执行前进行预处理。执行消费前的hook和重试消息预处理。消费前的hook可以理解为消费前的消息预处理(比如消息格式校验)。如果拉取的消息来自重试队列,则将Topic重置为原来的Topic,而不用重试Topc名
第三步:消费结果统计和执行消费后的hook.客户端原声支持基本消费指标统计,比如消费耗时;消费后的hook和消费前的hook要一一对应,用户可以用消费后的hook统计与自身业务相关的指标
第四步:消费结果处理。包含消费指标统计、消费重试处理和消费位点处理。消费指标主要是对消费成功和失败的TPS的统计;消费重试处理主要将消费重试次数+1;消费位点处理主要根据消费结果更新消费位点记录
2.消费消息。消费的主要逻辑再ConsumeMessageService接口的两个实现类中,以并发消费为例.消费消息主要分为消费前预处理、消费回调、消费结构统计、消费结果处理4个步骤
ConsumeMessageService是一个通用的消费服务接口,它包含两个实现类,org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService和org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService,这两个实现类分别用于并发消费和顺序消费
2.封装拉取请求和拉取后的回调对象PullCallback。这里主要将消息拉取请求和拉取结果处理封装成PullCallback,并通过调用PullAPIWrapper.pullKernelImpl()方法将拉取请求发出去。如果拉取到消息,那么将消息保存到对应的本地缓存队列ProcessQueue中,然后将这些消息交给ConsumeService服务;
3.并发消费和顺序消费校验。在并发消费时,processQueue.getMaxSpan()方法是用于计算本地缓存队列中的哥消息和最后一个消息的offset差值。顺序消费时,如果当前拉取的队列在Broker没有被锁定,说明已经由拉取正在执行,当前拉取请求晚点执行,如果不是第一次拉取,需要先计算最新的拉取位点并修正最新的待拉取位点信息,再执行拉取
4.拉取消息之前先将MessageListenerConcurrently/MessageListenerOrderly进行初始化,并调用start()方法进行启动,由于ConsumeMessageService内部是一个线程,所以需要看run()方法
2.消费者拉取消息并消费,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
Push消费者拉取消息
1.初始化Push消费者实例。业务代码初始化DefaultMQPushConsumer实例,启动Push服务PullMessageService.该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中
2.核心-消费消息。由消费服务ConsumeMessageConcurrentlyService或者ConsumeMessagOrderlyService将本地缓存队列中的消息不断放入到消费线程池,异步回调业务消费代码,此时业务代码可以消费消息
3.核心-保存消费进度。业务代码消费后,将消费结果返回给消费服务,再由消费服务将消费进度保存在本地,由消费进度管理服务定时和不定时地持久化到本地(LcoalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中,对于消费失败地消息,RocketMQ客户端处理后发回给Broker,并告知消费失败
Push消费流程
Push方式。代码介入非常简单,适合大部分业务场景。缺点灵活度差,在了解消费原理后,排查消费问题可简单快捷.在RocketMQ中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是默认的Push消费者实现类
Pull和Push方式的比较
并发消费
顺序消息的ConsumeRequest中并没有保存需要消费的消息,再顺序消费时通过调用ProcessQueue.takeMessage()方法获取需要消费的消息,而且消费也是同步进行的。
batchSize:一次从本地缓存中获取多少条消息回调给用户消费。顺序消息是如何通过ProcessQueue.takeMesages()获取消息给业务代码消费的呢?
从msgTreeMap中获取batchSize数量的消息放入consumingMsgOrderlyTreeMap中,并返回给用户消费,由于当前的MessageQueue是被Synchronized锁住的,并且获取的消费消息也是按照消费位点顺序排列的,所以消费时用户能按照物理位点顺序消费消息
RocketMQ支持自动提交offset和手动提交offset两种方式。以自动提交offset为例,手动提交与其完全一致,先看入参
msg:当前处理的一批消息
status:消费结果的状态。目前支持SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT两种状态
消费成功后,程序会执行commit()方法提交当前位点,统计消费成功的TPS。消费失败后,程序会统计消费失败的TPS,通过执行makeMessageToCOnsumeAgin()方法删除消费失败的消息,通过定时任务将消费失败的消息在延迟一定时间后,重新提交到消费线程池
makeMessagToConsumeAgin()方法将消息从consumingMsgOrderlyTreeMap中删除再重新放入本地缓存度列msgTreeMap中,等待下次被重新消费
submitConsumeRequestLater()方法会执行一个定时任务,延迟一定时间后重新将消费请求发送到消费线程池中,以供下一轮消费
做完这两个操作后,试想以下,消费线程在下一次消费时会发生什么事情?如果是从msgTreeMap中获取一批消息,那么返回的消息又是那些呢?消息物理位点最小的,也就是之前未成功消费的消息,如果顺序消息消费失败,会再次投递给消费者消费,直到消费成功,以此来保证顺序性
如果消费失败,又是怎么保证顺序的呢?来看processConsumeResult()实现
takeMessages()方法实现
顺序消费
RocketMQ是一个消息队列,FIFO先进先出规则如何再消费失败时保证消息的顺序呢?可以从消费任务实现类ConsumeRequest和本地缓存队列ProcessQueue的设计来看主要差异
RocketMQ的消费方式包含Pull和Push两种
Consumer消费方式
setDelayTimeLevel():设置延迟级别,延迟多久消费者可以消费
setTags():消息过滤的标记,用户可以订阅某个Topic的某些Tag,这样Broker只会把订阅了topic-tag的消息发送给消费者
setKeys():设置消息的key,多个key可以用MessageConst.KEY_SEPARATOR(空格)分隔或者直接用另一个重载方法。如果Broker中的messageIndexEnable=true则会根据key创建消息的hash索引,帮助用户快速过滤
Body:消息体,字节数组,需要注意生产者使用什么编码,消费者也必须使用相同编码节码,否则会产生乱码
Properties:消息扩展信息,Tag、Keys、延迟级别都保存在这里
Flag:目前没用
Topic:主题名字,可以通过RocketMQ Console创建
消息结构
生产者启动的流程比消费者启动的流程更加简单一般用户使用DefaultMQProducer的构造函数构造一个生产者实例,并设置各种参数,比如,Namesrv地址、生产者组名等,调用start()方法启动生产者实例,start()方法调用了生产者默认实现类的start()方法启动,这里我们主要分析start()方法内部是怎么实现的
这里初始化了namespace、producerGroup以及defaultMQProducerImpl
1.构造函数初始化实例
源码中关键点在于defualtMQProducerImpl的启动方法
调用了重载的start()方法
2.启动生产者实例start()
这里可以看到ServiceState第一次初始化出来的时候就是CREATE_JUST
4.将生产者置为启动失败
5.生产者参数校验,执行checkConfig()方法,校验生产者实例设置的各种参数,比如生产者组名是否为空,是否满足命名规则,长度是否满足等等
producerTable:当前Client实例的全部生产者的内部实例
consumerTable:当前client实例的全部消费者的内部实例
adminExtTable:当前client实例的全部管理实例
mQClientAPIImpl:其实每个client也是一个NettyServer,也会支持Broker访问,这里实现了全部Client支持的接口
mQAdminImpl:管理接口的本地实现类
topicRouteTable:当前生产者、消费者中全部Topic的本地缓存路由信息
scheduledExecutorService:本地定时任务。比如定期获取当前Namesrv地址、定期同步Namesrv信息、定期更新Topic路由信息、定期发送心跳给Broker、定期清理已下线的Broker、定期持久化消费位点、定期调整消费线程数
clientRemotingProcessor:请求的处理器,从处理方法processRequest()中我们可以直到目前支持哪些功能接口
MQClientInstance中核心组件
updateTopicRouteInfoFromNameServer:从多个Namesrv中获取最新Topic路由信息,更新本地缓存
cleanOfflineBroker:清理已下线的Broker
checkClientInBroker:检查Client是否在Broker中有效
sendHeartbeatToAllBrokerWithLock:发送客户端的心跳给所有的broker
registerConsumer:在本地注册一个消费者
unregisterConsumer:取消本地注册的消费者
registerProducer:在本地注册一个生产者
unregisterProducer:取消本地注册的生产者
registerAdminExt:注册一个管理实例
rebalanceImmediately:立即执行一次Rebalance.该操作是通过RokcetMQ的一个CountDownLatch2锁来实现的
doRebalance:对于所有已经注册的消费者实例,执行一次Rebalance
findBrokerAddressInAdmin:在本地缓存中查找Master或Slave Broker信息
findBrokerAddressInSubscribe:在本地缓存中查找Slave Broker信息
findBrokerAddressInPublish:在本地缓存中查找Master Broker地址
findConsumerIdList:查找消费者id列表
findBrokerAddressByTopic:通过Topic名字查找Broker地址
resetOffset:重置消费位点
getConsumerStastusL获取一个订阅关系中每个队列的消费速度
getTopicRouteTable:获取本地缓存Topic路由
consumeMessageDirectly:直接将消息发送给指定的消费者消费,和正常投递不同的是,指定了已经订阅的消费者组中的一个,而不是全部已经订阅的消费者,一般使用于在消费消息后,某个消费者组想再香妃一次的场景
consumerRunningInfo:获取消费者的消费统计信息,包含消费RT、消费TPS
MQClientInstance中核心方法
7.注册本地路由信息
8.启动MQClientInstance,置为服务启动失败状态
10.启动通信模块.this.mQClientAPIImpl.start();
11..启动各种定时任务。this.startScheduledTask();
12.启动消息拉取服务。this.pullMessageService.start();
13.启动负载均衡服务。this.rebalanceService.start();
14.启动push服务,this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
15.置为服务启动状态。this.serviceState = ServiceState.RUNNING;
Producer启动流程
业务层:通常指直接调用RocketMQClient发送API的业务代码
消息处理层:指RocketMQ Client获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作
通信层:指RocketMQ基于Netty封装的一个RPC通信服务,RocketMQ的各个组件之间的通信全部使用该通信层
1.调用defaultMQProducerImpl.send()方法发送消息
2.通过设置的发送超时时间,调用defaultMQProducerImpl.send()方法发送消息,设置的超时事件可以通过sendMsgTimeout进行变更,其默认值为3s
msg:我们拼装好的Message
communicationMode:通信模式,同步、异步还是单向,默认调用send(Message msg)是同步调用
sendCallback:对于异步模式,需要设置发送完成后的回调
timeout:超时时间:默认3s
入参分析
this.makeSureStateOK()
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
2.执行this.tryToFindTopicPublishInfo(msg.getTopic());获取Topic路由信息,如果不存在则发出异常提醒用户。如果本地缓存没有路由信息,更新到本地,再返回
3.计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的,同步为3次,异步1次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
执行过程
3.执行defaultMQProducerImpl.sendDefaultImpl()方法.该方法是发送消息的核心方法
Producer消息发送流程
Producer
BROKER_CHANNEL_EXPIRED_TIME:Broker存活的事件周期,默认为120s
topicQUeueTable:保存Topic和队列的信息,也叫真正的路由信息。一个Topic全部的Queue可能分布在不同的Broker中,也可能分布在同一个Broker中
brokerAddrTable:存储了Broker名字和Broker信息的对应关系
clusterAddrTable:集群和Broker的对应关系
brokerLiveTable:当前在线的Broker地址和Broker信息的对应关系
filterServerTable:过滤服务器信息
RequestCode.REGISTER_BROKER:Broker注册自身信息到Namesrv
RequestCode.UNREGISTER_BROKER:Broker取消注册自身信息到Namesrv
RequestCode.GET_ROUTEINFO_BY_TOPIC:获取Topic路由信息
RequestCode.WIPE_WRITE_PERM_OF_BROKER:删除Broker的写权限
RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:获取全部Topic名字
RequestCode.DELETE_TOPIC_IN_NAMESRV:删除Topic信息
RequestCode.UPDATE_NAMESRV_CONFIG:更新Namesrv配置,当前配置是实时生效的
RequestCode.GET_NAMESRV_CONFIG:获取Namesrv配置
Namesrv支持的全部API在DefaultRequestProcessor类中
曾几何时,RocketMQ也采用Zookeeper作为协调者,但是繁杂的运行机制和过多的依赖导致RocketMQ最终完全重新开发了一个零依赖、更简洁的Namesrv来替换
Namesrv核心数据结构和API
第一步:脚本和启动参数配置。启动命令 nohup ./bin/mqnamesrv -c ./conf/namesrv.conf > dev/null 2>&1 &通过脚本配置启动基本参数,比如配置文件路径、JVM参数,调用NamesrvStartup.main()方法,解析命令行的参数,将处理好的参数转化为Java实例,传递给NamesrvController实例
第三步:NamesrvController在初始化后添加JVM Hook.Hook中会调用NamesrvController.shutdown()方法来关闭整个Namesrv服务
第四步:调用NamesrvController.start()方法,启动整个Namesrv。其实start()方法只启动了Namesrv接口处理线程池
Namesrv启动流程
为什么需要了解停止流程呢?RocketMQ在设计之初已经考虑了很多异常情况,比如Namesrv异常退出、突然断电、内存被打满等等,只有了解了正常的停止流程才能对异常退出导致的问题进行精确的分析和排障。
1.关闭Netty服务端,主要是关闭Netty事件处理器、时间监听器等全部已经初始化的组件
2.关闭Namesrv接口处理线程池
3.关闭全部已经启动的定时任务
Namesrv关闭流程
路由注册。registerBrokerWithFilterServer()方法中的this.namesrvController.getRouteInfoManager.registerBroker()方法,该方法的主要功能是将request解码为路由对象,保存在Namesrv中。在路由信息注册完成后,Broker会每隔30s发送一个注册请求给集群中全部的Namesrv,俗称心跳信,会把最新的Topic路由信息注册到Namesrv中
font color=\"#e74f4c\
Namesrv路由原理。Namesrv获取的Topic路由信息来自Broker定时心跳,心跳时Broker将Topic信息和其他信息发送到Namesrv。Namesrv通过RequestCode.REGISTER_BROKER接口将心跳中的Broker信息和Topic信息存储在Namesrv中
Namesrv
Broker存储目录结构
第一步:初始化启动环境。这是由./bin/mqbroker和./bin/runbroker.sh两个脚本来完成的,/bin/mqbroker脚本主要用于设置RocketMQ根目录环境变量if [ -z \"$ROCKETMQ_HOME\" ] ; then ....fiexport ROCKETMQ_HOMEsh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@./bin/runbroker.sh脚本的主要功能是检测JDK的环境配置和JVM的参数配置。JDK的环境配置的检查逻辑的实现代码如下:[ ! -e \"$JAVA_HOME/bin/java\" ] && JAVA_HOME=$HOME/jdk/java[ ! -e \"$JAVA_HOME/bin/java\" ] && JAVA_HOME=/usr/java[ ! -e \"$JAVA_HOME/bin/java\" ] && error_exit \
MixAll.properties2Object()方法的主要功能是,按照properties中配置的key与目标对象字段名是否相同来设置对应的值
BrokerStartup中调用BrokerController#initialize()
第二步:初始化BrokerController该初始化主要包含ROcketMQ启动命令行参数解析、Broker各个模块配置参数解析、Broker各个模块初始化、进程关机Hook初始化等过程.RocketMQ启动命令行参数解析。其代码在BrokerStartup.createBrokerController()方法中,RocketMQ的启动参数支持启动命令指定,也可以在配置文件中进行配置。通常,命令行参数的优先级大于配置文件。通过第三方库将命令行输入参数解析为commandLine对象,再获取输入参数值。命令行参数的启动比较简单,如果大量的RocketMQ配置项放在启动命令中,就会导致启动命令较长,难以维护,一般推荐启动RocketMQ使用配置文件的方式。配置文件在createBrokerController()方法中被解析的代码如图所示在brokerConfig、nettyServerConfig、nettyClientConfig、messageStoreConfig这些基本配置对象初始化完毕后,还有后续代码依据各种启动条件重新调整部分参数。在各个配置对象初始化完毕后,程序会调用BrokerController.initialize()方法对Broker的各个模块进行初始化xxxConfigManager.load()方法的功能是加载Broker基础数据配置,包含Broker中的Topic、消费位点、订阅关系、消费过滤(无实际数据需要加载).这些配置加载成功后,初始化存储层服务对象messageStore和Broker监控统计对象brokerStats.然后,Broker会初始化通信层服务和一些列定时任务,通信层服务主要初始化正常通信通道、VIP通信通道和通信线程池。这里以VIP通道为例,分析通信层服务初始化,以消费进度定时持久化为例,分析定时任务初始化。fastConfig就是VIP通信层的配置,其配置对象\"克隆\
this.messageStore():存储层服务,比如CommitLog、ConsumeQueue存储管理
this.remotingServer:普通通道请求处理服务。一般的请求都是在这里被处理的
this.fastRemotingServer:VIP通道请求处理服务,如果普通通道比较忙,那么可以使用VIP通道,一般作为客户端降级使用
this.brokerOuterAPI:Broker访问对外接口的封装对象
this.pullRequestHoldService:Pull长轮询服务
this.clientHouseKeepingService:清理心跳超时的生产者、消费者、过滤服务器
this.filterServerManager:过滤服务器管理
接下来将Broker信息注册到Namesrv,并处理Master与Slave的关系
this.brokerStatsManager:Broker监控数据统计管理
this.brokerFastFailure:Broker快速失败处理
第三步,启动RocketMQ的各个组件组件启动在BrokerController.start()方法中
Broker启动流程BrokerStartup.java类主要负责为真正的启动过程做准备,解析脚本传过来的参数,初始化Broker配置,创建BrokerController实例等工作。BrokerController.java类是Broker的掌控者,它管理和控制Broker的各个模块,包含通信模块、存储模块、索引模块、定时任务等。在BrokerController全部模块初始化并启动成功后,将在日志中输出info信息\"boot success\"
Broker关闭流程Broker关闭只是调用BrokerStartup.java中注册JVM Hook 的BrokerController.shutdown()方法,该方法再调用各个模块关闭方法,最后关闭整个进程
为什么写文件这么快?RocketMQ的存储涉及中,很大一部分是基于Kafka的涉及进行优化的。font color=\"#ec7270\
存储概述
1.Broker接收客户端发送消息的请求并做预处理。SendMessageProcessor.processRequest()方法会自动被调用接收、解析客户端请求为消息实例。该方法执行分为四个过程:解析请求参数、执行发送处理前的Hook、调用保存方法存储消息、执行发送处理后的Hook随着RocketMQ版本的迭代更新,通信层的协议也出现了不兼容的变化,比如解析请求需要根据不同的客户端请求协议版本做不同处理
2.Broker存储前预处理消息.预处理方法为SendMessageProcessor.sendMessage()Netty是异步执行的,也就是说,请求发送到Broker被处理后,返回结果时,在客户端的处理线程已经不再时发送亲贵的线程,那么客户端如何确定返回结果对应哪个请求呢?很简单,我们可以通过返回标志来判断。其次,做一系列存储前发送请求的数据检查,比如死信消息处理、Broker是否拒绝事务消息处理、消息基本检查等。消息基本检查方法为AbstractSendMessageProcessor.msgCheck():该方法的主要功能如下:a.校验Broker是否配置可写b.校验Topic名字是否为默认值c.校验Topic配置是否存在d.校验queueId与读写队列数是否匹配e.校验Broker是否支持事务消息(msgCheck之后进行的校验)
begin:CommitLog加锁开始时间,写CommitLog成功后,该值为0diff:当前时间和CommitLog持有锁时间的差值如果isOSPageCacheBusy()方法返回true,则表示当前有消息正在写入CommitLog,并且持有锁的时间超过设置的阈值
3.执行DefaultMessageStore.putMessage()方法进行消息校验和存储模块检查。在真正保存消息前,会对消息数据做基本检查、对存储服务做可用性检查、对Broker做是否Slave的检查等总结如下:a.校验存储模块是否已经关闭b.校验Broker是否是Slavec.校验存储模块运行标记d.校验Topic长度e.校验扩展信息的长度f.校验操作系统Page Cache是否繁忙
根据消息是单个消息还是批量消息来调用AppendMessageCallback.doAppend()方法,并将消息写入PageCache,该方法的功能包含以下几点:1.查找即将写入的消息物理Offset2.事务消息单独处理。这里主要处理Prepared类型和Rollback类型的消息,设置消息queueOffset为03.序列化消息,并将序列化结果保存到ByteBuffer中(文件内存映射的PageCache或Direct Memory,简称DM).特别地,如果将输盘设置为异步刷盘,那么当transientStorePoolEnable=true时,会先写入DM,DM中地数据再异步写入文件内存映射地PageCache中,因为消费者始终时从PageCache中读取消息消费的,所以这个机制也称为\"读写分离\"4.更新消息所在Queue的位点
在消息存储完成后,会处理刷盘逻辑和主从同步逻辑,分别调用(有些版本是handleDiskFlush()方法和handleHA()方法)CommitLog.submitFlushRequest()和submitReplicaRequest()在Broker处理发送消息时,由于处理器SendMessageProcessor本身是一个线程池服务,所以涉及了快速失败逻辑,方便在高峰时自我保护。实现代码在BrokerFastFailure.cleanExpiredRequest()方法中在BrokerController启动BrokerFailure服务时,会启动一个定时任务处理快速失败的的异常
1.Broker存储流程RocketMQ首先将消息数据写入操作系统PageCache,然后定时将数据刷入磁盘。接下来主要分析RocketMQ是如何接收发送消息请求并将消息写入PageCache的,整个过程如图
每个MappedFileQueue包含多个MappedFile,就是真是的物理CommitLog文件.在Java中通过java.nio.MappedByteBuffer来实现文件的内存映射,即文件读写都是通过MappedByteBuffer(其实是PageCache)来操作的。写入数据时先加锁,然后通过Append方式写入最新MappedFile。对于读取消息,大部分情况下用户只关心最新数据,而这些数据都在PageCache中,也就是说,读写文件就是在PageCache中进行的,其速度几乎等于直接操作内存的速度
2.内存映射机制与高效磁盘。RocketMQ在存储涉及中通过内存映射、顺序写文件等方式实现了高吞吐。RocketMQ的基本数据结构:CommitLog:RocketMQ对存储消息的物理文件的抽象实现,也就是对物理CommitLog文件的具体实现。MappedFile:CommitLog文件在内存中的映射文件,映射文件同时具有内存的写入速度和与磁盘一样可靠的持久化方式.MappedFileQueue:映射文件队列中有全部的CommitLog映射文件,第一个映射文件为最先过期的文件,最后一个文件是最后过期的文件,最新的消息总是写入最后一个映射文件。
GroupCommitService就是CommitLog.GroupCommitService--同步刷盘任务。在Broker存储消息到PageCache后,在Broker存储消息到PageCache后,同步将PageCache刷到磁盘,在返回客户端消息并写入结果
FlushRealTimeService就是CommitLog.FlushRealTimeService--异步刷盘服务。在Broker存储消息到PageCache后,立即返回客户端写入结果,然后异步刷盘服务将PageCache异步刷到磁盘,
CommitRealTimeService就是CommitLog.CommitRealTimeService--异步转存服务。Broker通过读写分离将消息写入直接内存(Direct Meomory,简称DM),然后通过异步转存服务,将DM中的数据再次存储到PageCache中,以供异步刷盘服务将PageCache刷到磁盘中,转存服务过程如上
将消息成功保存到CommitLog映射文件后,调用submitFlushRequest()/handleDiskFlush()方法处理刷盘逻辑,同步刷盘、异步刷盘都是在这个方法中发起的
3.文件刷盘机制消息存储完成后,会被操作系统持久化到磁盘,也就是刷盘。RocketMQ支持两种刷盘方式,在Broker启动时配置flushDiskType=SYNC_FLUSH表示同步刷盘.配置flushDiskType=ASYNC_FLUSH表示异步刷盘刷盘涉及以下3个线程服务。如图所示
存储消息线程:主要负责将消息存储到PageCache或者DM中,存储成功后通过调用handleDiskFlush()/submitFlushRequest()方法将同步刷盘请求\"发送给\"GroupCommitService服务,并在该刷盘请求上执行锁等待
正常同步刷盘线程会间隔10ms执行一次CommitLog.GroupService.doCommit()方法,该方法循环每一个同步刷盘请求,如果刷盘成功,那么唤醒等待刷盘请求锁的存储消息线程,并告知刷盘成功
同步刷盘服务线程:通过GroupCommitService类实现的同步刷盘服务
同步刷盘
commitDataLeastPages:最小转存PageCache的Page数,默认为4
1.获取转存参数。整个转存过程的参数都是可配置的。
转存代码
org.apache.rocketmq.store.MappedFileQueue#commit
org.apache.rocketmq.store.MappedFile#commit
writePosition:DM中已写入的消息位置
committedPosition:已经转存的消息位置
org.apache.rocketmq.store.MappedFile#commit0CommitLog.this.mappedFileQueue.commit()方法最终会调用MappedFile.commit0()方法进行真正的数据转存MappedFile.commit0()方法的作用就是将writeBuffer(DM)中的数据读取出来,写入fileChannel(CommitLog映射文件)
2.执行转存数据,转存实现代码。转存过程主要调用CommitLog.this.mappedFileQueue.commit()方法转存数据,并且统计了转存耗时,如果转存耗时特别大,说明系统繁忙,应该考虑增加系统资源或者扩容
3.转存失败.唤醒异步刷盘线程。转存数据失败,并不代表没有数据被转存到PageCache中,而是说明有部分数据转存成功,部分数据转存失败。所以可以唤醒刷盘线程执行刷盘操作,而如果转存成功,则正常进行异步刷盘即可
异步转存数据是通过CommitRealTimeService.run()方法实现的
flushCommitLogTimed:是否定时刷盘,设置为True表示定时刷盘;设置为False表示实时刷盘,默认为False.即实时刷盘
interval:在Broker中配置项名是flushIntervalCommitLog,刷盘间隔默认为500ms
flushPhysicQueueLeastPages:每次刷盘的页数,默认为4页
flushPhysicQueueThoroughInterval:两次刷盘操作的最长间隔时间,默认为10s
this.hold()方法的功能是,在映射文件被销毁时尽量不要对在读写的数据造成困扰。所以MappedFile自己实现了引用计数功能,只有存在引用时才会执行刷盘操作
3.执行刷盘。最终刷盘逻辑是在MappedFile.flush()下面进行两个数据校验:this.isAbleToFlush(flushLeastPages)和this.hold()在配置读写分离的场景下,writeBuffer和fileChannel总是不为空。此时要调用this.fileChannel.force(false)方法刷盘;而正常刷盘则是调用this.mappedByteBuffer.force()方法
4.记录Checkpoint和耗时日志。这里主要记录最后刷盘成功时间和刷盘耗时超过500ms的情况
异步刷盘操作。在异步转存服务和存储服务把消息写入Page Cache后,由异步刷盘服务将消息刷入磁盘中,过程如图。异步刷盘服务的主要功能是将PageCache中的数据异步刷入磁盘,并记录Checkpoint信息.异步刷盘的实现代码主要在CommitLog.FlushRealTimeService.run()方法中
异步刷盘。如果Broker配置读写分离,则异步刷盘过程包含异步转存数据和真正的异步刷盘操作。
同步刷盘、异步刷盘对比
3.1同步刷盘和异步刷盘
Broker消息存储机制
maxOffsetPy:表示当前Master Broker存储的所有消息的最大物理位点
maxPhyOffsetPulling:表示拉取的最大消息位点
diff:是上面两者的差值,表示还有多少消息没有拉取
StoreUtil.TOTAL_PYHSICAL_MEMORY_SIZE:表示当前Master Broker全部的物理内存
Master-Slave读写分离机制。通常Master提供读写处理,如果Master负载较高就从Slave读取
从代码中可以看到,消费者始终从mappedByteBuffer(即Pagecache)读取消息。
初始化writeBuffer后,当生产者将消息发送到Broker时,Broker将消息写入writeBuffer,然后被异步转存服务不断地从DM中Commit到Page中,消费者此时从哪儿读取数据呢?消费者拉取消息的实现在MappedFile.selectMappedBuffer()方法中
Direct Memory-Page Cache的读写分离机制以上逻辑通过MappedFile.appendMessagesInner()方法来实现,核心代码如图
ConsumerQueue消费队列。主要用于消费拉取消息、更新消费位点等所用的索引。源代码参考org.apache.rocketmq.store.ConsumerQueue.该文件内保存了消息的物理位点、消息体大小、消息Tag的Hash值物理位点:消息在CommitLog中的位点值消息体大小:包含消息Topic值大小、CRC值大小、消息体大小等全部数据的总大小,单位是字节Tag的Hash值:由MessageExtBrokerInner.tagsString2tagsCode()方法计算得来。如果消息有Tag值,那么该值可以通过String的Hashcode获得
在Hash碰撞时,Hash槽位中保存的总是最新消息的指针,这是因为在消息队列中,用户最关心的总是最新的数据
索引的数据结构
RequestMessageService类图
第一步:从CommitLog中查找未创建索引的消息,将消息组装成DispatchRequest对象.该逻辑主要在CommitLog.checkMesageAndReturnSize()方法中实现
第二步:调用doDispatch()方法,该方法会循环多个索引处理器(这里初始化了CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex两个索引处理器)并调用索引处理器的dispatch()方法来处理DispatchRequestCommitLogDispatcherBuildConsumeQueue索引处理器用于构建ConsumeQueue,CommitLogDispatcherBuildIndex用于构建IndexFileConsumeQueue是必须创建的,IndexFile是否需要创建则是通过设置messageIndexEnable为True或False来实现的,默认为True.ConsumeQueue的索引信息被保存到PageCache后,其持久化的过程和CommitLog异步刷盘的过程类似,执行DefaultMessageStore.FlushConsumeQueueService服务
ReputMessageService服务启动后的执行过程。doReput()方法用于创建索引的入口,通常通过以下几个步骤来创建索引:
创建ConsumeQueue和IndexFile。ConsumeQueue和IndexFile两个索引都是由ReputMessageService类创建的
索引创建失败怎么办?如果消息写入CommitLog后Broker宕机了,那么ConsumeQueue和IndexFile索引肯定就创建失败了,此时ReputMessageService如何保证创建索引的可靠性呢?ConsumeQueue和IndexFile每次刷盘时都会做Checkpoint操作,Broker每次重启的时候可以根据Checkpoint信息得知哪些消息还未创建索引,
索引的构建过程
group:消费者组名
Topic:主题名字,group订阅了Topic才能拉取到消息
queueId:一般一个Topic会有很多分区,客户端轮询全部分区拉取并消费消息
offset:拉取位点大于等于该值的消息
maxMsgNums:一次拉取多少消息,在客户端由pullBatchSize进行配置
messageFilter:消息过滤器
第一步:拉取前校验DefaultMessageStore服务是否已经关闭(正常关闭进程时会被关闭)校验
第二步:根据Topic和queueId查找ConsumeQueue索引映射文件。判断根据查找到的ConsumeQueue索引文件校验传入的待查询的位点值是否核里,如果不合理,重新计算下一次可以拉取的位点值
第三步:循环查询满足maxMsgNums条数的消息。循环从ConsumeQueue中读取消息管理位点、消息大小和消息Tag的Hash值。先做Hash过滤,再使用过滤后的消息物理位点到CommitLog中查找消息体,并放入结果列表中
第四步:监控指标统计,返回拉取的消息结果
getMessage()方法查询消息的过程
1.按照位点查消息RocketMQ支持Pull和Push两种消费模式,Push模式是基于Pull模式的,两种模式都是通过拉取消息进行消费和提交位点的。这里我们主要讲Broker在处理客户端拉取消息请求时是怎么查询消息的。
第一步:查找这个Topic下的所有Queue
第二步:在每一个队列中查找起始时间、结束时间对应的起始offset和最后消息的offset
如何根据时间查找物理位点呢?主要在于构建Consume Queue,这个文件是按照时间顺序写的,每条消息的索引数据结构大小是固定20字节。可以根据时间做二分折半搜索,找到与时间最接近的一个位点。
2.按照时间段查消息。这是社区提供的管理平台的功能,输入Topic、起始时间、结束时间可以查到这段时间内的消息,这是一个根据Consume Queue索引查询消息的扩展查询
第一步:调用indexService.queryOffset()方法,通过Topic、key查找目标消息的物理位点信息
第二步:根据物理位点信息在CommitLog中循环查找消息体内容
第三步:返回查询结果
索引如何使用?
Broker CommitLog索引机制
定时执行
先看commitLog文件删除
当满足三个条件之中的任一条件时执行删除操作
第一,当前时间等于已经配置的删除时间
第二,磁盘使用空间超过85%
第三,手动执行删除(开源版本RocketMQ4.2.0不支持)
第二步:检查每一个CommitLog文件是否过期,如果已过期则立即通过调用destroy()方法进行删除。在删除前会做一系列检查:检查文件被引用的次数、清理映射的所有内存数据对象、释放对象.
deleteExpiredFileByTime()方法的实现分为如下两步
核心逻辑,DefaultMessageStore.this.commitLog.deleteExpiredFile()方法直接调用了this.mappedFileQueue.deleteExpiredFileByTime()方法
DefaultMessageStore.CleanCommitLogService类提供的一个线程服务周期性地执行删除操作this.deleteExpiredFiles()的功能是删除过期文件
this.redeleteHangedFiel()方法表示再次删除被挂起的过期文件,为什么会有被挂起的文件呢?第一次删除有可能失败,比如有线程引用该过期文件,内存映射清理失败等,都可能导致删除失败,如果文件已经关闭,删除前检查没有通过,则可以通过第二次删除来处理。
1.CommitLog文件的删除过程
ConsumeQueue、IndexFile文件的删除过程。ConsumeQueue和IndexFile都是索引文件,在CommitLog文件被删除后,对应的索引文件起始没有存在的意义,并且占用磁盘空间,所以这些文件应该被删除。RocketMQ的删除策略是定时检查,满足删除条件时会删除过期或者无意义的文件。最终程序调用CleanConsumeQueueService.deleteExpiredFiles()方法来删除索引文件
Broker过期文件删除机制。RocketMQ中主要保存了CommitLog、ConsumeQueue、IndexFile三种数据文件。由于内存和磁盘都是有限的资源,Broker不可能永久地保存所有数据,所以一些超过保存期限的数据会被定期删除。RocketMQ通过设置数据过期时间来删除额外的数据文件,具体的实现逻辑是通过DefaultMessageStore.start()方法中的this.addScheduleTask();来实现的
同步复制是指客户端发送消息到Master,Master将消息同步复制到Slave的过程,可以通过设置参数brokerRole=BrokerRole.SYNC_MASTER来实现。这种消息配置的可靠性很强,但是效率比较低,适用于金融、在线教育等对消息有强可靠需求的场景
实例初始化
方法调用
Broker主从同步的逻辑是通过SlaveSynchronize.syncAll()方法来实现的。该方法在BrokerController.start()方法中被调用,每隔60s同步一次,并且同步周期不能修改,该实例在BrokerController的构造方法中被初始化消息数据是生产者发送的消息,保存在CommitLog中,由HAService服务实时同步到SlaveBroker中,所有实现类都在org.apache.rocketmq.store.ha包下
名词解释。
第一步:Master Broker在启动时,初始化一个BrokerOuterAPI,这个服务的功能包含Broker注册到Namesrv、Broker从Namesrv解绑、获取Topic配置信息、获取消费者位点信息、获取延迟位点信息及订阅关系等。
第二步:Slave Broker在初始化Controller的定时任务时,会初始化SlaveSynchronize服务,每60s调用一次SlaveSynchronize.syncAll()方法
第三步:syncAll()方法依次调用4种配置数据(Topic配置、消费者位点、延迟位点、订阅关系配置)的同步方法同步全量数据
第五步:Topic配置和订阅关系配置随着保存内存信息的同时持久化到磁盘上;消费者位点通过BrokerController初始化定时任务持久化到磁盘上;延迟位点信息通过ScheduleMessageService定时将内存持久化到磁盘上
配置数据同步流程。配置数据包含4种类型:Topic配置、消费者位点、延迟位点、订阅关系配置。每种配置数据由一个继承自ConfigManager的类来管理,继承关系如图。Slave如何从Master同步这些配置呢?我们先来看一下初始化服务的步骤
同步复制。在CommitLog将消息存储到PageCache后,会调用CommitLog的handleHA()/submitReplicaRequest方法处理同步复制。当BrokerRole配置为SYNC_MASTER时表示当前Master Broker需要同步将消息\"发送\
ReadSocketService后台服务不断接收Slave Broker上报的offset,每上报一次都通知HAService.notifyTransferSome()方法,判断Slave同步的位点是否大于Master标记的已同步位点,如果大于则更新标记值,同时通知同步复制服务GroupTransferService.GroupTransferService扫描所有的同步请求,依次判断哪些GroupCommitRequest的待同步复制的位点是比已同步位点小的,释放GroupCommitRequest中的锁,消息处理线程可以将消息存储成功的结果返回给生产者
消费队列文件(ConsumeQueue)和索引文件(IndexFile)这两个文件是在SlaveBroker上追加CommitLog后由ReputMessageService进行创建的,所以不需要同步
CommitLog数据同步流程。CommitLog的数据同步分为同步复制和异步复制两种。同步复制是指生产者生产消息后,等待Master Broker将数据同步到Slave Broker后,再返回生产者数据存储状态;异步复制是指生产者在生产消息后,不用等待Slave同步,直接返回Master存储结果
主从同步流程
Broker主从同步机制。
abort文件创建流程
aboirt文件删除流程
abort是一个空文件,标记当前Broker是否正常关机,Broker进程正常启动的时候,创建该文件。Broker进程正常停止后,该文件就会删除;如果异常退出,则文件依旧存在,创建和删除的过程如图
physicMsgTimestamp:最后一条已存储CommitLog的消息的存储时间
logicsMsgTimestamp:最后一条已存储Consume Queue的消息的存储时间
indexMsgTimestamp:最后一条已存储IndexFile的消息的存储时间
checkpoint是检查点文件,保存Broker最后正常存储各种数据的时间,在重启Broker时,恢复程序知道从什么时候恢复数据。检查点逻辑由StoreCheckpoint类实现。在StoreCheckpoint类中保存了3个时间,更新过程如图.
概述。Broker关机恢复是指恢复CommitLog、Consume Queue、Index File等数据文件。Broker关机分为正常调用命令关机和异常被迫进程终止关机两种情况。恢复过程的设计目标是使正常停止的进程实现零数据丢失,异常停止的进程实现最少量的数据丢失,与关机恢复相关的主要文件有两个:abort和checkpoint.
第一步:Broker异常退出检查。如果abort文件存在,说明上次是异常退出的。
第四步:加载全部Consume Queue文件及数据(如图#2、#3)。调用loadConsumeQueue方法,读取./consumequeue/Topic/queueId/目录,加载全部Topic、queueId作为ConsumeQueue对象,再调用load()方法初始化每一个ConsumeQueue
初始化StoreCheckpoint对象
在StoreCheckpoint构造方法中初始化三个时间戳
第五步:初始化Checkpoint文件为StoreCheckpoint对象,并且初始化三个数据:physicMsgTimestamp、logicsMsgTimestsamp和indexMsgTimestamp.
第六步:加载IndexFile索引(#4部分)。加载./index目录下的全部索引文件,如果上次进程异常退出并且索引文件操作的最后时间戳大于Checkpoint中保存的时间,则说明当前文件有部分数据可能存在错误,须立即销毁文件
recoverCOnsumeQueue()方法通过循环所有Topic对应的ConsumeQueue,依次调用ConsumeQUeue.recover()方法执行数据恢复
recoverTopicQueueTable():纠正Consume Queue中最小消费位点和恢复ComitLog内存中的TopicTable(#5.4)
第七步:恢复全部数据(#5部分)lastExitOK=True,表示上次进程正常退出。全部恢复数据主要恢复ConsumeQueue、CommitLog、内存中的consumeQueueTable,并纠正Consume Queue中的最新位点值。
Broker关机恢复流程。Broker在启动时会初始化abort、checkpoint两个文件。正常关闭进程时会删除abort文件,将checkpoint文件刷盘;异常关闭时,通常来不及删除abort文件。由此,在重新启动Broker时会根据abort判断是否需要异常停止进程,而后恢复数据。Broker启动时,会启动存储服务DefaultMessageStore.存储服务在初始化时执行load方法加载全部数据,这里主要分析数据加载流程。Broker关机的恢复过程可以分为以下几步.
Broker的关机恢复机制
Broker
SCHEDULE_TOPIC:一个系统内置的Topic,用来保存所有定时消息。RocketMQ全部未执行的延迟消息保存在这个内部Topic中(现如今保存在TopicValidator中)
FIRST_DELAY_TIME:第一次执行定时任务的延迟时间,默认为1000ms
DELAY_FOR_A_WHILE:第二次及以后的定时任务检查间隔时间,默认为100ms
DELAY_FOR_A_PERIOD:如果延迟消息到时间投递时却失败了,会在DELAY_FOR_A_PERIOD中设置的ms后重新尝试投递,默认为10 000ms
delayLevelTable:保存延迟队列和延迟时间的映射关系
offsetTable:保存延迟级别及相应的消费位点
timer:用于执行定时任务,线程名叫ScheduleMessageTImerThread
queueId2DelayLevel():将queueid转化为延迟级别delayLevel2QueueId():将延迟级别转化为queueId一个延迟级别保存在一个Queue中,延迟级别和Queue之间的转化关系为queueId = delayLevel -1
updateOffset():更新延迟消息的Topic的消费位点
computeDeliverTimestamp():根据延迟级别和消息的存储时间计算该延迟消息的投递时间
start():启动延迟消息服务。启动第一次延迟消息投递的检查定时任务和持久化消费位点的定时任务
shutdown():关闭start()方法中启动的timer任务
load():加载延迟消息的消费位点信息和全部延迟级别信息,延迟级别可以通过messageDelayLevel字段进行设置,默认1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
parseDelayLevel();格式化所有延迟级别信息,并保存到内存中
DeliverDelayedMessageTimerTask内部类用于检查延迟消息是否可以投递,DeliverDelayedMessageTImerTask是TimerTask的一个扩展实现
概述。什么是延迟消息呢?延迟消息也叫定时消息,一般地,生产者在发送消息后,消费者希望在指定的一段时间后再消费。常规做法是,把信息存储在数据库中,使用定时任务扫描,符合条件的数据再发送给消费者。典型的业务场景春节买票30分钟内完成订单支付。RocketMQ延迟消息是通过ScheduleMessageService类实现的
msg.getDelayTimeLevel()是发送消息时可以设置的延迟级别,如果该值大于0,则表示当前处理的消息是一个延迟消息,将对该消息做如下修改:1.将原始Topic、queueId备份在消息的扩展字段中,全部的延迟消息都保存在SCHEDULE_TOPIC的Topic中2.备份原始Topic、queueId为延迟消息的Topic、queueId。备份的目的是当消息到达投递时间时会恢复原始的Topic和queueId,继而被消费者拉取并消费
经过处理后,该消息会被正常保存到CommitLog中,然后创建ConsumeQueue和IndexFile两个索引。在创建ConsumeQueue时,从CommitLog中获取的消息内容会单独进行处理,单独处理的逻辑方法是CommitLog.checkMessageAndReturnSize().有一个很精巧的设计:在CommitLog中查询出消息后,调用computeDeliverTimestamp()方法计算消息具体的投递时间,再将该时间保存在ConsumeQueue的tagCode中。这样设计的好处是,不需要检查CommitLog大文件,在定时任务检查消息是否需要投递时,只需要检查ConsumeQueue中的tagCode(不再是Tag的Hash值,而是消息可以投递的时间,单位是ms),如果满足条件再通过查询CommitLog将消息投递出去即可,如果每次都查询CommitLog,那么可想而知,效率会很低
timer:定时检查延迟消息是否可以投递的定时器
delayLevelTable:该字段用于保存全部的延迟级别
level:延迟级别
timeDelay:延迟时间
offset:延迟级别对应的ConsumeQueue的消费位点,扫描时从这个位点开始
timeDelay:参数表示延迟时间
delayLevel:延迟级别。
offset:待检查消息的ConsumeQueue的位点值
correctDeliverTimestamp():纠正投递时间
executeOnTimeup():定时扫描核心方法
DeliverDelayedMessageTimerTask默认执行run()方法,run()方法直接调用executeOnTimeup()方法扫描当前位点的消息是否满足投递条件
第四步:如果第三步投递失败,或者消息没有达到投递时间条件,则重新提交一个定时任务到timer中,以供下次检查
核心方法的执行步骤
this.timer.schedule()定时任务只执行一次,那么之后发送的消息是如何进行投递的呢?在DeliverDelayedMessageTimeTask.executeOnTimeup()方法中,DeliverDelayed-MessageTimerTask类是ScheduleMessageService类的一个内部类,同时也是this.timer.schedule()方法的输入参数
核心字段和方法
延迟消息
RocketMQ源码以及关键流程分析
0 条评论
回复 删除
下一页