RocketMQ面试题
2024-02-15 22:30:41 26 举报
AI智能生成
RocketMQ面试题主要涉及消息队列的基本概念、RocketMQ的架构和组件、消息的发送与消费方式、消息的顺序性和可靠性保证、消息的过滤和路由机制等方面。此外,还可能考察如何进行消息的持久化存储、如何实现消息的高可用性、如何处理消息的延迟和重试机制等。面试者需要具备对消息队列技术的理解和应用能力,能够根据实际场景选择合适的消息队列解决方案,并能够解决常见的问题和挑战。
作者其他创作
大纲/内容
参考自
1 为什么使用消息队列<br>
解耦
订单支付成功需要调用库存服务、营销服务等,引入消息队列,下游服务自己去调用即可,不仅<b>解耦</b>还<b>缩短链路</b><br>
异步
降低响应时间
削峰
削峰填谷<br>
2 为什么选择RocketMQ
思路:从可靠性、性能、吞吐量考虑、社区活跃度<br>
技术选型对比
ActiveMQ,老古董了,不考虑
RabbitMQ<br>
性能和吞吐量不太理想
Kafka<br>
虽然性能和吞吐量比较好,但是延迟比较高
RocketMQ
性能好,高吞吐量,稳定可靠,低延迟
综合考虑:面向C端客户,对性能、吞吐量、低延迟、可靠有较高的要求,所以选择RocketMQ
3 RocketMQ优缺点
优点
单机吞吐量:十万级<br>
可用性高
可靠性高
功能完善,有顺序消息、事务消息、定时消息
支持10亿级别的消息堆积,不会因为堆积导致性能下降
分布式架构,扩展性高
电商业务场景在阿里双11已经经历了多次考验
缺点
支持的客户端语言只有Java和C++(不成熟)
4 消息队列有哪些模型
队列模型
多个生产者往一个队列发消息,一个队列被多个消费者消费,<b>每条消息只能被一个消费者消费</b>
发布/订阅模型
生产者发布消息到主题中,消费者从主题中订阅消息,<b>每份订阅中,订阅者都可以接收到主题的所有消息</b>
5 RocketMQ的消息模型是怎么样的
<br>
6 消息的模式有哪些<br>
集群模式(默认)
消费者组内的消费者共同分担消息,一个队列只被一个消费者消费
广播模式
消费者组内的消费者独立消费全量消息
7 RocketMQ的基本架构是怎么样的?
RocketMQ 一共有四个部分组成:<b>NameServer,Broker,Producer 生产者,Consumer 消费者</b>,它们对应了:发现、存、发、收,为了保证高可用,一般每一部分都是<b>集群部署</b>的<br>
8 能介绍下这四部分吗?
通信方面
Broker、Producer、Consumer三者都与NameServer建立<b>长连接</b>且会<b>定时注册/获取</b>topic路由信息,<br>Broker是向<b>所有节点</b>建立链接,而Producer和Consumer只选择<b>其中一个</b>;<br><br>Producer只与Broker的<b>Master</b>建立<b>长连接</b>(要发消息,只选Master<b>写入</b>),而Consumer与<b>Master、Slave</b>建立<b>长连接</b>(要订阅消息,可以从Master/Slave<b>读取</b>);<br>
功能方面<br>
NameServer
管理Broker(心跳检测)
管理Topic、Broker的路由信息
Broker
消息存储和转发
Broker 内部维护着一个个 Consumer <b>Queue</b>,用来存储<b>消息的索引</b>,真正存储消息的地方是 <b>CommitLog(日志文件)</b>
单个Broker与所有NameServer保持长连接(底层是Netty),发送心跳包和Topic注册<br>
Producer
分布式集群部署,通过负载策略(默认是轮询)发给Broker集群<br>
3种发送方式
同步发送
同步阻塞,有响应。一般用于<b>重要消息通知</b>,比如<b>重要邮件通知、营销短信</b><br>
异步发送
异步非阻塞,有响应。一般用于<b>链路耗时较长且对响应时间敏感</b>的场景,比如用户视频上传后通知启动转码服务<br>
单向发送<br>
只负责发送,没有响应。适用于用于某些耗时非常短但对<b>可靠性要求并不高</b>的场景,例如<b>日志收集</b>
消费者
2种消费模式(消息模式)
集群模式
广播模式
2种消费者类型
Pull型
自己拉取,自己控制重试
Push型
消费者发起长轮询给Broker,一有消息就会投递给消费者<br>
有维护好的重试机制、消费进度管理<br>
9 如何保证消息不丢失?
从3个阶段考虑
生产阶段
通过请求确认机制保证可靠传递
有重试机制,默认3次<br>
业务方做异常兜底(重试机制不一定能发送成功)
请求确认机制做保障<br>
同步发送,如果响应为发送失败或异常,应该重试
异步发送,在回调方法检查,发送失败或异常,应该重试<br>
存储阶段
配置<b>可靠性优先的Broker参数</b>避免宕机丢消息
消息只要持久化到<b>CommitLog(日志文件)</b>中,即使Broker宕机,未消费的消息也能重新恢复再消费。<br>
Broker的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache中(内存中),但是<b>同步刷盘更可靠</b>,它是Producer发送消息后<b>等数据持久化到磁盘之后</b>再返回响应给Producer。
Broker通过<b>主从模式</b>保证高可用<br>
消费阶段
消费重试机制,默认16次
消费进度管理
10 如何处理消息重复的问题呢?<br>
业务方自己实现
业务幂等<br>
<i>保证每条消息都有一个惟一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性。</i>
消息去重<br>
11 怎么处理消息积压?<br>
考虑:消息积压是由于消息未被消费,堆积导致的,考虑提高消费能力即可
消费者扩容<br>
当消息Queue的数量>消费者数量时,增加消费者来提高消费能力<br>
消息迁移Queue扩容
当消息Queue的数量<=消费者数量时,再扩容消费者就没什么用了,就得考虑扩容消息Queue。<br>新建一个临时topic,设置多一些Queue,使用部分消费者转发消息到新的topic,转发很快的,用新的消费者消费新topic,消费完后恢复原样<br>
调整消费者线程池的配置
合理调整线程池的参数,提高消费的并发性,加快消费
优化消费逻辑的代码
如果不是顺序消息,考虑使用线程池将消费逻辑异步化
调整消费重试策略
可能由于消费失败导致重试,才导致了消息积压,合理调整重试的策略<br>
12 顺序消息如何实现
是什么:顺序消息是指消费顺序与生产顺序一致
2种顺序消息<br>
全局顺序
某个 Topic 下的<b>所有消息</b>都要保证顺序;
部分顺序<br>
只要保证<b>每一组</b>消息被顺序消费即可
如何实现
全局顺序
背景
RocketMQ 默认情况下不保证顺序,比如创建一个 Topic ,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer ,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。
先把 Topic 的<b>读写队列数设置为 一</b>,然后Producer Consumer 的<b>并发设置,也要是一</b>
为了保证整个 Topic全局消息有序,只能<b>消除所有的并发处理</b>,各部分都设置成单线程处理 ,这时候就完全牺牲RocketMQ的高并发、高吞吐的特性了
部分顺序
<b>同一个生产者</b>将<b>同一组消息 串行地</b>发送给同一个队列<br>
消费者<b>不能并发</b>处理顺序消息(代码中要使用顺序消费模式)<br>
13 如何实现消息过滤
Broker端过滤
按照 Consumer 的去重逻辑进行过滤,这样做的好处是<b>避免了无用的消息传输到 Consumer 端</b>,缺点是<b>加重了 Broker 的负担</b>,实现起来相对复杂
Filter Server过滤
Consumer端过滤
Tag过滤
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
SQL过滤
// 只有订阅的消息有这个属性a, a >=0 and a <= 3<br>consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
总结:一般采用<b>Cosumer端过滤</b>,如果希望<b>提高吞吐量</b>,可以采用<b>Broker过滤</b>。
14 延时消息了解吗?
目前RocketMQ支持的延时级别是有限的
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";<br>
怎么实现延时消息
Broker收到延时消息了,会先发送到主题(<b>SCHEDULE_TOPIC_XXXX</b>)的相应时间段的Message Queue中,然后通过一个<b>定时任务 轮询</b>这些队列,到期后,把消息<b>投递到目标Topic的队列</b>中,然后消费者就可以正常消费这些消息。
总结:临时存储+定时任务
15 怎么实现分布式消息事务的?<br>
半消息:是指暂时还不能被 Consumer 消费的消息,Producer 成功<b>发送到 Broker</b> 端的消息,但是此消息被标记为 “<b>暂不可投递</b>” 状态,只有等 Producer 端执行完本地事务后<b>经过二次确认</b>了之后,Consumer 才能消费此条消息。
事务消息的流程
16 死信队列知道吗?
死信队列用于处理无法被正常消费的消息,即死信消息
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行<b>消息重试</b>;<b>达到最大重试次数</b>后,若消费依然失败,则表明消费者在正常情况下<b>无法正确地消费该消息</b>,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其<b>发送到</b>该消费者对<b>应的特殊队列</b>中,该特殊队列称为<b>死信队列</b>。
死信消息的特点
不再被消费者消费
<b>有效期与正常消息相同,均为 3 天</b>,3 天后会被自动删除。因此,需要在死信消息产生后的 3 天内及时处理。<br>
死信队列的特点<br>
一个死信队列对应一个 <b>Group ID</b>, 而不是对应单个消费者实例
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列<br>
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic
17 如何保证RocketMQ的高可用?
NameServer
因为是无状态,且不相互通信的,所以只要<b>集群部署</b>就可以保证高可用
Broker
概述:Broker的高可用是通过<b>集群</b>和<b>主从</b>实现的<br>
Broker有2种角色,Master和Slave。<b>Producer只能向Master角色的Broker写入消息,Cosumer可以从Master和Slave角色的Broker读取消息。</b>
消费端高可用
Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave读,当 Master 不可用或者繁忙的时候, Consumer 的<b>读请求</b>会被<b>自动切换</b>到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了<b>读的高可用</b>。<br>
发送端高可用
在创建 Topic 的时候,把 Topic 的<b>多个Message Queue</b> 创建在<b>多个 Broker 组</b>上(相同 Broker 名称,不同 brokerId机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,<b>其他组Master 仍然可用</b>, Producer 仍然可以发送消息 RocketMQ 目前还<b>不支持把Slave自动转成 Master</b> ,如果机器资源不足,需要把 Slave 转成 Master ,则要<b>手动停止</b> Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。这就实现了<b>写的高可用</b>
18 说一下RocketMQ的整体工作流程?
RocketMQ是一个分布式消息队列,也就是<b>消息队列+分布式系统</b><br>
作为<b>消息队列</b>,它是发-存-收的一个模型,对应的就是Producer、Broker、Cosumer;
作为<b>分布式系统</b>,它要有服务端、客户端、注册中心,对应的就是Broker、Producer/Consumer、NameServer
它主要的工作流程:RocketMQ由<b>NameServer</b>注册中心集群、<b>Producer</b>生产者集群、<b>Consumer</b>消费者集群和若干<b>Broker</b>(RocketMQ进程)组成
交互流程
1 Broker在启动的时候去向<b>所有</b>的NameServer注册,并保持<b>长连接</b>,每30s发送一次<b>心跳</b><br>
2 Producer在发送消息的时候从NameServer获取Broker服务器(Master)地址,根据<b>负载均衡算法</b>选择一台服务器来发送消息
3 Conusmer消费消息的时候同样从NameServer获取Broker(Master/Slave)地址,然后主动拉取消息来消费(<b>长轮询,主动发起请求,服务端有消息则推送消息给Consumer</b>)
19 为什么RocketMQ不采用Zookeeper作为注册中心呢?<br>
<b>基于可用性的考虑</b>,根据<b>CAP理论</b>,同时<b>最多只能满足两个点</b>,而<b>Zookeeper满足的是CP</b>,也就是说Zookeeper并不能保证服务的可用性,Zookeeper在进行选举的时候,整个<b>选举的时间太长</b>,期间整个集群都处于不可用的状态,而这<b>对于一个注册中心来说肯定是不能接受</b>的,<b>作为服务发现来说就应该是为可用性而设计</b>
<b>基于性能的考虑</b>,<b>NameServer本身的实现非常轻量</b>,而且可以通过增加机器的方式<b>水平扩展</b>,增加集群的抗压能力,而<b>Zookeeper的写是不可扩展的(ZK的写只能由Leader写入并且要同步给所有节点才能认为写入成功,存在延迟,影响性能)</b>,Zookeeper要解决这个问题只能通过划分领域,划分多个Zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的
<b>持久化的机制来带的问题</b>,ZooKeeper 的 ZAB 协议对每一个写请求,会在<b>每个 ZooKeeper 节点上保持写一个事务日志</b>,同时再加上<b>定期的将内存数据镜像(Snapshot)到磁盘</b>来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实<b>没有太大的必要</b>,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
<b>消息发送应该弱依赖注册中心</b>,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后<b>缓存到本地</b>,如果<b>NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响</b>
20 Broker是怎么保存数据的呢?<br>
RocketMQ主要的存储文件包括
CommitLog文件
ConsumeQueue文件
Index文件<br>
消息存储的整体设计
<b>CommitLog</b>:<b>消息主体以及元数据</b>的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是<b>顺序写入</b>日志文件,当文件满了,写入下一个文件
<b>ConsumeQueue</b>:消息消费队列,引入的目的主要是<b>提高消息消费的性能</b>。Consumer根据ConsumeQueue来查找待消费的消息。其中,<b>ConsumeQueue</b>(逻辑消费队列)作为消费消息的索引,<b>保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。</b>ConsumeQueue文件可以看成是基于Topic的CommitLog索引文件,故ConsumeQueue文件夹的组织方式如下:<b>topic/queue/file三层组织结构</b>,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样ConsumeQueue文件采取<b>定长设计,</b>可以<b>像数组一样随机访问每一个条目</b>
<b>IndexFile</b>:IndexFile(索引文件)提供了一种可以<b>通过key或时间区间</b>来查询消息的方法。Index文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现<b>HashMap</b>结构,故RocketMQ的索引文件其底层实现为<b>hash索引</b>。
总结
RocketMQ采用的是混合型的存储结构,即为<b>Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。</b>
针对Producer和Consumer分别采用了<b>数据(CommitLog)和索引(ConsumeQueue、IndexFile)部分相分离</b>的存储结构,<b>Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。</b>
只要消息被刷盘<b>持久化</b>至磁盘文件CommitLog中,那么Producer发送的消息就<b>不会丢失</b>。正因为如此,Consumer也就肯定有机会去<b>消费</b>这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持<b>长轮询</b>模式,如果一个消息拉取请求未拉取到消息,Broker<b>允许等待30s</b>的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService<b>不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。</b>
21 说说RocketMQ怎么对文件进行读写的?
RocketMQ对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——<b>PageCache</b>、<b>顺序读写</b>、<b>零拷贝</b>
PageCache、顺序读取<br>
在RocketMQ中,<b>ConsumeQueue</b>逻辑消费队列存储的数据较少,并且是<b>顺序读取</b>,<b>在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存</b>,即使在有消息堆积情况下也不会影响性能。而对于<b>CommitLog</b>消息存储的日志数据文件来说,读取消息内容时候会产生较多的<b>随机访问</b>读取,严重<b>影响性能</b>。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。
<b>页缓存(PageCache)是OS对文件的缓存</b>,用于加速对文件的读写。一般来说,<b>程序对文件进行顺序读写的速度几乎接近于内存的读写速度</b>,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的<b>写入</b>,OS会先<b>写入至Cache</b>内,随后通过异步的方式由pdflush内核线程将Cache内的数据<b>刷盘至</b>物理磁盘上。对于数据的<b>读取</b>,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会<b>顺序对其他相邻块的数据文件进行预读取</b>。
零拷贝
另外,RocketMQ主要通过<b>MappedByteBuffer</b>对文件进行<b>读写</b>操作。其中,利用了NIO中的FileChannel模型<b>将磁盘上的物理文件直接映射到用户态的内存地址中</b>(这种Mmap的方式减少了传统IO,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)
说说什么是零拷贝?
1. 从磁盘复制数据到内核态内存;<br>2. 从内核态内存复制到用户态内存;<br>3. 然后从用户态内存复制到网络驱动的内核态内存;<br>4. 最后是从网络驱动的内核态内存复制到网卡中进行传输。
所以,可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升I/O的性能。零拷贝比较常见的实现方式是mmap,这种机制在Java中是通过MappedByteBuffer实现的
22 消息刷盘怎么实现的呢?
提供了2种策略
同步刷盘
在消息达到Broker的内存之后,<b>必须刷到commitLog日志文件</b>中才算成功,<b>然后返回</b>Producer数据已经发送成功。
异步刷盘
异步刷盘是指消息<b>达到Broker内存后就返回</b>Producer数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog日志文件中。
Broker 在消息的<b>存取时直接操作的是内存</b>(内存映射文件),这可以提供系统的吞吐量,但是<b>无法避免机器掉电时数据丢失</b>,所以需要<b>持久化</b>到磁盘中。
刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,<b>如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。</b>
<b>异步</b>而言,只是<b>唤醒对应的线程</b>,不保证执行的时机,流程如图所示。<br>
23 能说下 RocketMQ 的负载均衡是如何实现的?
RocketMQ中的负载均衡都在Client端完成,主要可以分为
Producer端发送消息时候的负载均衡
Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在<b>获取了TopicPublishInfo路由信息</b>后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的<b>messageQueueList中选择一个队列</b>(MessageQueue)进行发送消息。具这里有一个<b>sendLatencyFaultEnable</b>开关变量,如果开启,在<b>随机递增取模</b>的基础上,再过滤掉not available的Broker代理。
<i>所谓的"latencyFaultTolerance",是指对之前失败的,<b>按一定的时间做退避</b>。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是<b>实现消息发送高可用的核心关键所在</b></i>
Consumer端订阅消息的负载均衡
Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,<b>其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息</b>。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端知道从Broker端的哪一个消息队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即<b>Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费</b>
Consumer端的心跳包发送<br>
在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的<b>所有Broker实例发送心跳包</b>(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。<b>Broker</b>端在收到Consumer的心跳消息后,<b>会将它维护在ConsumerManager的本地缓存变量—consumerTable</b>,同时并<b>将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable</b>中,为之后做Consumer端的<b>负载均衡</b>提供可以<b>依据</b>的元数据信息。
Consumer端实现负载均衡的核心类—RebalanceImpl
在Consumer实例的启动流程中的启动MQClientInstance实例部分,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。<br><br>通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,这个方法是实现Consumer端负载均衡的核心。<br><br>rebalanceByTopic()方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理
(1) 从rebalanceImpl实例的本地缓存变量—topicSubscribeInfoTable中,<b>获取该Topic主题下的消息消费队列集合</b>(mqSet);<br><br>(2) 根据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端发送通信请求,<b>获取该消费组下消费者Id列表</b>;<br><br>(3) 先对Topic下的消息消费队列、消费者Id排序,然后用<b>消息队列分配策略算法(默认为:消息队列的平均分配算法)</b>,计算出待拉取的消息队列。这里的平均分配算法,<b>类似于分页的算法,将所有MessageQueue排好序类似于记录,将所有消费端Consumer排好序类似页数</b>,<b>并求出每一页需要包含的平均size和每个页面记录的范围range,最后遍历整个range而计算出当前Consumer端应该分配到的的MessageQueue</b>。<br><br>(4)太长了,详情见https://javabetter.cn/sidebar/sanfene/rocketmq.html#_22-%E8%83%BD%E8%AF%B4%E4%B8%8B-rocketmq-%E7%9A%84%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1%E6%98%AF%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0%E7%9A%84<br>
24 RocketMQ消息长轮询了解吗?
所谓的长轮询,就是Consumer 拉取消息,如果对应的 Queue 如果没有数据,<b>Broker 不会立即返回,而是把 PullReuqest hold起来</b>,<b>等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。</b>
0 条评论
下一页