RocketMQ
2022-05-30 22:44:40 0 举报
AI智能生成
整理了RocketMQ的消息发送、消息存储、消息消费、集群等知识点
作者其他创作
大纲/内容
基础
NameServer
用来取代zk的,是rocketMQ的大脑<br>所以也要先启动NameServer
NameServer比Zk性能高原因:不像ZK那样做很多事情
RocketMQ<br>设计思想
基于文件顺序读写,内存映射机制
容忍设计缺陷:消息只消费一次
可以保证消息顺序消费
消息存储
消息堆积能力
有消息文件过期机制
存储空间预警机制
消息存储性能
消息顺序存储在同一文件中(commitlog)
消费者可以过滤消息(tag)
消息高可用
同步刷盘不会丢失
异步刷盘会少量丢失
单节点开启异步复制时,只丢失少量
可引入双写机制
消费低延时
不发生消息堆积时,以长轮询模式实现准实时的消息推送
利用ack确保消息至少消费一次
消息回溯
已经消费完的消息,可以根据业务要求重新消费
定时消息
消息只能在指定时间之后才能被消费【16个等级,如5秒,15秒等】<br>收费版才支持任意时间精度
消息重试机制
重发时不会发给同一broker,因为有可能该broker已经挂了<b><font color="#c41230">【规避原则</font></b>】
消息类型
普通消息
定时消息
延时消息
事务消息
消息发送
单向发送:<br>oneWay<br>
Producer Group
发送同一类消息的生产者称之为一个生产者组
如果 Producer 中途意外宕机,<br>Broker 会主动回调 Producer Group 内的任意一台机器来确认事务状态
Producer
Producer 实例线程安全
Message里的 Key
业务层面的唯一性标识,可以用来查找消息(有索引文件)
RocketMQ不能保证messegeId唯一,所以要设置唯一key
topic
不同的消息类型(定时消息、事务消息)使用不同的主题
不同的业务类型消息(天猫订单、京东订单),使用不同的主题
不同的消息量级使用不同的主题
<b><font color="#f44336">一个topic下默认4个分片</font></b>,RocketMQ里没有队列的概念
tag
二级消息类型,用来进一步区分某个 Topic 下的消息分类,用于消费端过滤消息
淘宝交易订单的不同类型(女装订单、电器、化妆品订单)使用不同的tag
同步发送
发送后会在等到响应之后才算是发送完
MessageId
消息的全局唯一标识:机器 IP+消息偏移量【所以可能重复】
SendStatus
发送标识:成功、失败等
点击这里查看SendResult里状态标识
Queue
RocketMQ收到消息后,所有主题的消息都存储在commitlog中,然后采用异步转发到consumerQueue中<br>ConsumeQueue中,每个主题的数据会做数据分片(<b>Queue</b>)。Kafka中的存储是没有队列的概念的
<b>一个Topic,默认有4个Queue</b>,就不能保证顺序了。会返回给生产者一个queueId
异步发送
消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败
关注 bug师姐 ,gongzhonghao回复:扣扣号 获取联系方式可免费获取编辑版本
深入消息发送
生产者流程
验证消息、查找路由(第一次从NameServer)、消息发送
批量发送
一批的消息不超过<b><font color="#f44336">4M</font></b>,且具有相同主题、标签等。组装成一个List
消息重试
重发时,不会再发给同一broker<b><font color="#c41230">【规避原则】</font></b>,以提高重发成功率
选择
当发送的消息不重要时,采用one-way方式,以提高吞吐量;<br>
当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;
重试机制
Producer端重试
源码类名#方法名
DefaultMQProducerImpl
SendResult sendDefaultImpl ( Message msg, final CommunicationMode communicationMode<br>, final SendCallback sendCallback, final long timeout )<br>
不管怎么重试,总的超时时间是不会超过方法入参里的timeout的
重试次数
int timesTotal = communicationMode == CommunicationMode.<b><font color="#ff9800">SYNC</font></b> ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
从上面源码可知,<b>同步发送才有重试次数</b>,单向发送和异步发送只有一次发送机会
getRetryTimesWhenSendFailed(),默认重试次数是 2
重试
由下面源码可知,如果<b><font color="#d32f2f">获取MessageQueue和Topice超时超过总的超时限制(方法传进来的超时时间)则不会发送消息</font></b>
long costTime = beginTimestampPrev - beginTimestampFirst;<br>if (timeout < costTime) {<br> callTimeout = true;<br> break; // 跳出循环不走后面的发送代码,也不会重试<br> }
Consumer端重试
重试次数
Consumer默认是16次
consumer重试有时间间隔,可在配置文件中配置
重试
在广播模式下不会进行重试
<b><font color="#d32f2f">异常重试</font></b>:消费者发生异常在catch块里主动返回RECONSUME_LATER时会进行重试
<b><font color="#d32f2f">超时重试</font></b>:是指消费者没有返回RECONSUME_LATER,也没有返回consume_success时,<b><font color="#d32f2f">会一直重试到最大重试次数</font></b>
broker会一直重试到16次(默认)后丢进死信队列,已和架构组确认
版本问题
3.2.6不对超时进行处理(即broker不会重发消息),在重启消费者的时候会触一次<b>rebalance</b>,重新找一个消费者处理。<br>
4.x时如果消费者一直不回复broker,则broker会一直重试,直到最大重试次数后进入死信队列
减少重试
如果broker配置的重试次数是16,但我们消费者只想重试3次怎么办?
在消费代码的catch块里,获取重试次数reconsumeTimes,如果到了3次,直接返回成功
网络问题超时处理
若出现网络问题导致消息根本没有发送到消费者,则会重新投递
消息消费
基本概念
群组
使用相同group Id的订阅者属于同一个消费群组
群组消费(集群模式)
一条消息只需要被群组内的任意一个消费者消费即可
广播消费(广播模式)
每条消息,会推送给群组内的所有消费者,且保证被每个消费者消费一次
群组消费特点
一条消息只需要被群组内的任意一个消费者消费即可
不会保证失败重新投递时路由到同一台机器上
广播消费特点
不支持顺序消息,也不支持重置消费位点
每条消息要被群组的每台机器都消费一次
不会对消费失败的消息进行重新投递【业务自己关注】
消费者重启后,会从当前最新的消息开始消费,停服期间的消息不能再消费
消费方式
拉模式
会将broker里的Queue都拉过来,也可以指定MessageQueue读取消息
读取的offset,需要自己保存处理;而推模式很多事都不用管,比如偏移量直接在broker里
拉取消息后,返回结果会有以下四种状态之一:<br>Found(获取到消息),没有匹配到消息,没有新消息,非法偏移量
拉取消息数量配置注意
//客户端从broker一次拉取后,实际消费的最大数量 <= pullBatchSize<br> private int consumeMessageBatchMaxSize = 1;<br>
//客户端从broker一次拉取最大数量 <= 服务端的实际参数限制<br> private int pullBatchSize = 32;
推模式<br>(<font color="#c41230">拉模式封装</font>)
长轮询
Consumer控制长轮询,broker发送请求时,5s内有新消息时,还是通过该连接返回给消费者
流量控制<br>
判断 <b>消息个数、消息总大小、offset</b> 【最大和最小差距】三个指标,<br>超过任一设定值时,就等会儿再拉取
重新分布机制
消息队列<b><font color="#c41230">重新分布</font></b>由MQClientInstance(<b><font color="#c41230">最大的一个实例</font></b>)<br>持有的RebalanceService实现【<b><font color="#c41230">专门一个线程处理</font></b>】<br>所有生产者都持有一个MQC,所有消费者持有一个MQC
一个消费者可以分配多个消息队列,同一消息队列只能分配给一个消费者<br>ReblanceService<b>每隔20S</b>进行一次队列重新分配<br>每次重新分配时,会查回所有消费者,并对消费者列表和消息队列排序
分配算法(5种)<br><font color="#d32f2f">Rebalance策略</font>
平均分配【优先】:<br>AllocateMessageQueueAveragely<br>
平均轮询分配【优先】:<br>AllocateMessageQueueAveragelyByCircle<br>
一致性Hash【不推荐】:<br>AllocateMessageQueueConsistentHash<br>
根据配置指定:<br>AllocateMessageQueueByConfig
根据broker部署机房名:AllocateMessageQueueByMachineRoom<br>
消息确认(ACK)
PushConsumer会通过返回<b>ConsumeConcurrentlyStatus</b>里的不同状态作出不同处理
CONSUME_SUCCESS
表示真正消费成功了
RECONSUME_LATER
稍后<b><font color="#f44336">重试</font></b>:默认是10秒,<b><font color="#d32f2f">默认最多重试16次</font></b>,<font color="#c41230">最后会投到死信队列</font><font color="#c41230"><br></font>
可在broker.conf文件中配置Consumer端的<b>重试次数</b>和<b>重试时间间隔</b>
消息返回的是一个新的重试topic:%retry%+consurmergroup
集群模式时,Broker才会自动进行重试
RocketMQ以consumerGroup+queue为单位管理消费进度<br>只会<b><font color="#c41230">记录批次消息中最小的offset值</font></b>作为消费进度:保证都是消费成功<br>问题是<b><font color="#0076b3">重新投递时可能会重复</font></b>,需要自己保证幂等
如果<font color="#d32f2f">超时</font>了,既没有返回consume_success也没有返回reconsume_later,<br>则broker认为消息没有发送到消费者,<font color="#d32f2f">则也会重试</font>
消息进度存储
广播模式
消费进度都存在本地:.rocketmq_offsets
集群模式
消息进度存在broker上
推模式原理
推模式,是先通过拉模式拉到本地后,再推送给PushConsumer的
拉取都是批量拉取的,再分发给PushConsumer的。流量控制这些也是拉取者做的
PushConsumer是一条条提交的,而拉取者本质上还是批量提交,且只提交最早的那个偏移量
拉的时候是阻塞的,所以不会数据混乱(<font color="#f57c00">Kafka是多线程的</font>)
offset详解
Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置
集群模式时,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构
广播模式时,每个 Consumer 都收到这个 Topic 的全部消息,各个 Consumer 间相互没有干扰,<br> RocketMQ 使 用 LocalfileOffsetStore,把 Offset存到本地
rocketmq的指定offset消费,<b><font color="#d32f2f">只能是刚启动时有用</font></b>,后面都是接着上次的消费进度消费
深入消息
顺序消息
全局顺序消息
生产者和消费者的并发设置成1,Queue也设置成1【性能不高】<br>
mqadmin update Topic -t AllOrder -c DefaultCluster -r 1 -w 1 -n 127.0.0.1:9876
部分顺序消息
一个主题根据sharding key进行分区,在同一分区内保证生产和消费的FIFO<br>【通过不同的tag实现的】
延时消息
等待一段时间后才投递给消费者
比如创建订单时发送一条延时消息,30分钟后投递给消费者,消费者就会去检查是否支付,未完成则关闭订单
共16个级别。不支持任意精度的解释是:任意精度的延时需要做消息排序,这是有损耗的
msg.setDelayTimeLevel(5)代表延迟一分钟:<b><font color="#c41230">下标从1开始</font></b><br>1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br>
事务消息
<b><font color="#c41230">解决问题:本地事务执行与消息发送的原子性问题</font></b>
事务流程
1、应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ。<br> 默认的RMQ_SYS_TRANS_HALF_TOPIC主题<br><br>2、prepare消息发送成功后,应用模块执行数据库事务(本地事务)。<br> 在实现TransactionListener接口的类里处理<br><br>3、根据数据库事务执行的结果,再返回Commit或Rollback给MQ。<br><br>4、如果是Commit,MQ把消息下发给Consumer端【此时移到真实Topic里】<br> 如果是Rollback,直接删掉prepare消息。<br><br>5、如果第3步的执行返回结果是UNKNOW,或是超时的,<br> 则启动<b>定时任务(自己开发,麻破火山口)</b>回查事务状态(最多重试15次,<br> 超过了默认丢弃此消息),处理结果同第4步。也是在TransactionListener实现类里处理<br><br>6、MQ消费的成功机制由MQ自己保证。
事务回查机制
主要是为了不确认是否真正成功时的一个查询判断
通过transactionCheckInerval设置回查时间间隔,默认1分钟
<b><font color="#f15a23">事务消息是不支持延迟消息和批量消息</font></b>
死信队列<br>(DLQ)
无法被消费的消息称为<b>死信消息</b>(重试多次也不成功的消息),死信队列存储死信消息
死信3天后会被自动删除,所以要及时处理
一个死信队列对应一个GroupID,而不是单个消费者<br>对应GroupID有死信之后才会创建死信队列。控制台可查询死信
一个死信队列拥有GroupID的所有死信,不论是哪个topic的
消费幂等
重复场景
网络闪断或客户端宕机时,客户端应答失败,<b><font color="#c41230">生产者会重复发消息</font></b>,<br>消费者后面就会收到两条messageID一样的两条消息
<b><font color="#c41230">投递给消费者时</font></b>,网络闪断应答失败,也会重复投递
broker或客户端重启、扩容等会<b><font color="#c41230">触发Rebalance时</font></b>,也可能收到重复消息
解决办法
以唯一的消息key标识,例如订单号
有了唯一订单号,消费者在消费时可通过订单号先去查询判断是否已经处理过
消息过滤
类过滤
FilterServerConsumer类,4.3.0以后不支持了
表达式过滤
tag过滤
消费者可以指定tag消费,多个可以用“||”分隔,*表所有
由于在broker是通过消费者的tag的hashcode进行判断的,有哈希冲突,所以不准,<br>还需要消费者自己再做判断tag是否是自己订阅的,不是的就丢弃<br>
SQL92过滤
开启 SQL 过滤的话,Broker 需要开启参数 enablePropertyFilter=true,然后服务器重启生效
必须开启enablePropertyFilter = true
像写SQL语句一样,可以在SQL里写过滤<b><font color="#c41230">自定义的属性</font></b>的语句
一般最好不用,有可能类似慢询的sql语句,导致性能差。
存储设计
存储知识
目前只有ZeroMQ不需要存储
存储效率
文件系统 > KV存储 > 关系数据库
可靠性
关系数据库 > KV存储 > 文件系统
存储结构
commitLog<br>消息存储文件<br>
所有主题的所有消息,都会<b>顺序追加</b>在commitlog(<b>默认1G)</b>文件中,顺序追加大大增加了写消息的速度<b><br></b>
在broker里设置mapedFileSizeCommitLog可改变大小,写满一个commitlog文件又会新起一个
commitlog文件名为文件里第一条消息在整个commitlog文件组中的偏移量,<br>当知道某条消息的offset时,减去文件名就能算出在该文件中的绝对地址
commitlog的文件存储逻辑视图中,每个主题有4个字节存储消息的总长度
存储了consumeQueue里的队列、messageKey、Tag等所有信息<br>所以既保证了和consumequeue的一致性,也保证consumeQueue丢失也能找回来
消息写到commitlog文件中后,都是<b>异步转发消息条目</b>给ConsumeQueue和index文件的(不是转发的整个消息)
consumeQueue<br>消息消费队列
为什么要有consumeQueue?
commitlog采用顺序追加写消息,虽然写快了,但是我们消费消息的时候是按某个topic来消费的,<br>如果直接去commitlog中查找的话需要全量遍历,所以为了按topic快速查找就有了consumeQueue
consumeQueue中存储的是消息条目,不会存储消息的全量信息
存储设计
每一条消息对应的<b>consumeQueue结构</b>是:<b><font color="#d32f2f">8字节的commitlog偏移量+4字节消息长度+8字节的tag哈希码</font></b>
consumeQueue里每条消息占20字节,消费的消费进度,实际上就是consumeQueue里的下标
消息到达commitlog文件后,会异步转发到这里对应的主题topic的consumeQueue里,然后供消费者消费
一个consumeQueue文件默认能存<b>30万条</b>消息条目,即占<b>30万*20字节 =600万字节 = 5.8M</b>
使用consumeQueue查找
当消费者拿着topic的消费进度查找消息时,先用消费进度*20算出该条消息在consumeQueue中的实际位置,<br>然后取出20字节拿到该条消息在整个commitlog中的offset和消息长度k;然后通过offset去commitlog中<br>定位到消息的起始位置,然后向后取出k字节数据就是我们想要的数据
consumeQueue也支持按存储时间查找,由于在定位到具体的consumeQueue后采用的二分查找,所以也不是特快
分区
在磁盘里每个主题topic会对应一个<b>文件夹</b>,每个主题默认4个分区(Queue),RocketMQ里没有队列的概念
每个分区又会对应一个文件夹,分区文件夹里就是consumeQueue文件,每个文件只有几兆,基本都能读到内存中<br>
consumeQueue里的数据删除了,也可从commitlog里恢复
index<br>消息索引文件
为什么要有index文件?
commitlog文件只支持按偏移量的快速查找,为了解决按某个属性key的快速查找,<br>所以就有了index文件来<b>存储key与offset的关系</b>
index文件是基于磁盘文件实现的哈希索引
index存储结构
40字节的index文件头
最小和最大存储时间各占8字节
最小和最大偏移量各占8字节
哈希槽数占4字节(总的槽数)
index条目使用数占4字节
500万个哈希槽
属性key的hash值取余能算出具体位置,里面的值存的在此刻最新index条目的下标
如果有两个key的哈希值一样了,哈希槽里的值是最新index条目的下标,<br>最新index条目的最后4字节存的前一个冲突index条止的下标
2000万index条目
<b>index条目结构<br></b>(每个占20字节)
属性key的hashCode
之所以采用哈希存储,是为了将index条目设计为<b>定长结构</b>,方便检索与定位条目
物理偏移量offset(就是对应在commitlog文件中的位置)
时间戳
该时间(timeDiff) = 消息存储时间 - index文件头里的最小存储时间(第一条消息时间戳)
之所以有这个值,是因为消息里的key本身就不是唯一 的,冲突的会比较多,那么在查找消息时(会带上key和消息生成时间),<br>如果遇到冲突的,则通过“消息生成间 - 第一条消息时间” 与timediff作比较,如果前者大则说明些index条目<br>不是我们要找的消息,则继续往前找。所以<b>timediff存在的目的是为了帮助我们快速从众多冲突index条目中快速找到我们想要的</b><br>如果找到多个index条目,则去把这些消息都查出来,<b>然后比较key的真实内容作判断</b>
前一个相同哈希值key对应的index条目下标(占4字节)
新增index条目的偏移位置 = index头长度+ 哈希槽数*4 +当前index条目数*20
疑问?
consumeQueue在commitlog文件中查找数据时用到了数据长度才能准确将整条消息取出来,<br>为什么index条目查找时,只用了offset定位到了就能取出来,不用长度吗????
解决哈希冲突
当出现一个key的哈希值已经存在时,则将哈希槽里的值更新为当前index条目的下标。<br>前一个冲突的下标位置存在最新index条目的最后4个字节里
在查找数据时,通过哈希槽的存的下标找到最后一个相同哈希值的key对应的index条目,<br>然后通过该index条目里的offset找到消息后去验证消息里的key与该key是否一致
如果一致则说明找到了,如果不一致,则通过当前找到的index条目的最后4字节里存的<br>前一个index条目然后再通过当前offset去找到消息验证key,如果不是则继续找前一个条目
config文件
像topics.json、subscribeOffset.json等配置文件,不要在线上打开此文件,不然会阻塞
abort文件
abort文件是启动时创建,正常退出之前删除
<b>启动时若出现abort文件,说明broker是异常退出的</b><br>
正常退出时,会通过注册JVM的钩子函数删除abort文件
checkpoint文件
存储commitlog文件最后一次刷盘时间戳、consumeQueue最后一次刷盘时间、index索引文件最后一次刷盘时间
如何保证数据一致性?
全量消息都存储在commitlog文件中,然后异步生成转发任务更新ConsumeQueue文件、index文件<br>由于是异步,在broker重启时,就有可能出现数据并没有真正更新到ConsumeQueue和index文件中
加载时,会先判断是否有abort文件,有则是异常退出,没有则是正常退出
commitlog存储了全量消息,checkpoint文件又存储了所有刷盘点,重启时直接根据上次的刷盘点进行重新转发就行
<b>异步停止恢复时,会将最后一个有效文件中的所有消息<font color="#d32f2f">全部重新转发</font>到ConsumeQueue和index文件中,<br>虽然确保了不丢失消息,但带来了消息重复的问题。所以消费者一<font color="#d32f2f">定要做幂等设计</font></b>
RocketMQ收到消息后,所有主题的消息都存储在commitlog中,然后采用<b>异步转发</b>到consumerQueue中<br>ConsumeQueue中,每个主题的数据会做数据分片(Queue)。Kafka中的存储是没有队列的概念的
内存映射
rocketmq的存储与读写是基于JDK NIO的内存映射机制,<br>即先把消息写到内存,然后根据策略在不同的时机刷盘
内存映射就相当于是,一个文件的修改,会被同步到内存里,所以直接读取会很快
文件越大,效率越高<br>
commitLog、consumeQueue、indexFile的单个文件都被设计为固定长度<br>一个文件写满以后再创建一个文件,文件名是第一条消息对应的全局物理偏移量
文件刷盘机制
flushDiskType
通过broker里的该属性配置是异步还是同步刷盘<br>
同步刷盘<br>SYNC_FLUSH<br>
消息写入<b>pageCache</b>之后,立刻通知刷盘并等待,<br>刷完了后再唤醒该线程,最后返回写入成功状态
异步刷盘<br>ASYNC_FLUSH<br>
消息只是写了内存的页面缓存里就返回成功,会有一个单独的线程按照某个频率,统一执行刷盘操作
异步刷盘,可批量刷,提高性能<br>但是需要通过集群或数据备份保证宕机不丢数据
堆内存——>堆外内存——>映射内存——>磁盘
补充
consumeQueue的刷盘方式固定为异步刷盘,反正丢了也能重新生成
index文件是每更新一次index文件就将上一次的改动写入磁盘
过期文件删除
commitLog、consumeQueue共用一套过期文件机制
删除由定时任务来做,默认10秒执行一次
过期判断
通过fileReservedTime配置文件保留时间,默认72小时,超过这个时间该文件没有再次更新则认为过期
删除文件时间间隔
当多个物理文件删除时,需要间隔一段时间再删除另外一个
deletePhysicFilesInterval:默认100毫秒
删除的文件还在被引用时,<br>允许再等一段时间才删除
destoryMapedFileIntervalForcibly
删除条件
指定删除时间点
deleteWhen:04 ,表示凌晨4点删除文件
磁盘是否充足
diskSpaceCleanForciblyRatio:<br>文件删除水位,默认85就会删除
物理使用率
diskSpaceWarningLevelRatio:<br>默认达到90%就会阻止新消息插入
物理磁盘使用率
diskMaxUsedSpaceRatio:<br>默认75%,在该值以下表示正常
主从同步机制(HA)
broker主从机制
消息到达主之后,会同步到从服务上,一旦主服务挂了,从服务可以继续提供消费
集群部署模式
单Master
多主无从
性能最高,但是有一个master宕机期间,未被消费的消息不能消费
多主多从同步复制
线上数据要求高的推荐,主与从之间采用同步方式复制数据,<br>但刷盘方式都设置为异步刷盘
多主多从异步复制
主挂了,从还可以继续提供消息的消费。可能会丢失消息
rocketmq默认就有了2主2从同步、2主2从异步、2主无从三种部署方式配置
主从复制原理
主服务监听从服务连接,从服务主动连接主服务器
从服务向主拉取消息偏移
主服务判断从服务中commitlog最大偏移,<br>如果小于主服务中commitlog偏移,则主向从返回消息
读写分离机制
消费者从主服务拉取消息,如果主服务很忙或挂了,则从从服务拉取
推拉比较
推模式
优点
消息实时性高,没有消息不用做其他的操作
缺点
接收方可能处理不了这么多消息
可以有一个消息推一个,也可以缓存一批之后再推送
拉模式
优点
接收方可根据自己的情况来决定拉消息:处理不过来等会儿拉,或暂停拉取
消费者可以定义缓存多少数据后批量拉取回来
缺点
消息延迟:消费者不能频繁拉取,所以会有一定的延迟
太多无用拉请求:某段时间内一直没消息,则循环拉请求就会做无用功
推还是拉?
生产者
默认都是推模式
消费者
拉模式,推也是拉封装的
RocketMQ的长轮询
rocketMQ的推模式,是封装的拉模式,是先把数据拉到消费端,再做的推送
消费端
<b><font color="#f15a23">RebalanceService</font></b> 线程会根据 topic 的<b>队列数量</b>和当前消费组的<b>消费者个数</b>做负载均衡,<br>每个队列产生的 <b>pullRequest</b> 放入阻塞队列 <b>pullRequestQueue</b> 中
<b><font color="#f15a23">PullMessageService</font></b> 线程不断的从阻塞队列 <b>pullRequestQueue</b> 中获取 <b>pullRequest</b>,<br>然后通过网络请求 broker,这样实现的准实时拉取消息
Broker
PullMessageProcessor 里面的 processRequest 方法是用来处理拉消息请求的,有消息就直接返回
如果没有消息,且拉请求允许,则会<b style="color: rgb(241, 90, 35);">挂起当前请求,</b><font color="#381e11">有数据后,</font><b style="color: rgb(56, 30, 17);">ReputMessageService</b><font color="#381e11">会</font><b style=""><font color="#f15a23">唤醒</font></b>
<b><font color="#f15a23">ReputMessageService</font></b> 线程,这个线程用来不断地从 <b>commitLog</b> 中<b><font color="#f15a23">解析数据并分发请求</font></b>,<br>构建出 <b>ConsumeQueue</b> 和 <b>IndexFile</b> 两种类型的数据,并且也会有<b><font color="#f15a23">唤醒请求</font></b>的操作
Kafka 中的长轮询
消费者去 Broker 拉消息,传了一个超时时间,如果有消息立马返回<br>如 果没有的话,一直等到超时。超 时之后重新发起拉请求
Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些<b><font color="#f15a23">延迟请求</font></b>消息来了
0 条评论
下一页