RocketMQ八股文整理
2025-04-24 18:49:01 1 举报
AI智能生成
RocketMQ是一个分布式、队列模型的消息中间件,具有高性能、高可靠性和高伸缩性的特性。它支持事务消息,保证了消息发送与业务操作的一致性,广泛应用于企业级应用中以解耦系统、削峰填谷、异步处理消息等场景。RocketMQ具有易于使用的发布/订阅模型,核心文件类型包括配置文件、源代码文件和JAR包,有助于快速构建基于消息系统的企业级应用。使用“高性能”、“稳定可靠”、“易扩展”等修饰语可以准确描述其核心优势。
作者其他创作
大纲/内容
mq的作用
解耦
不同模块或服务之间的耦合度降低。发送者和接收者都不需要关心来源和去处
异步
发送者将消息放在队列后即可继续处理其他任务,而不需要等待接收者的相应。系统可以将那些耗时的任务放在队列中异步处理,从而快速相应用户的请求
削峰填谷
削峰
大量请求转化为消息发送到消息队列中,队列用来充当缓冲区,将大量请求按照顺序排队,这样就可以削减请求高峰时对后端服务的直接压力
填谷
消费者从队列中按照一定速率读取消息并进行处理,可以根据后端处理能力和当前负载情况动态调整消费者的消费速率,达到填谷的效果
数据传输和持久化
可在不同应用程序或服务之间传输数据
消息队列还提供了持久化的功能,确保消息即使在系统故障后也不会丢失
可靠性保证
高可靠性和可用性,确保消息的传递不会丢失。一些消息队列还提供了消息重试、消息确认等机制,确保消息的可靠性
技术选型
RabbitMQ
优点
并发性能极强,性能极好,延时低
也支持部分高级功能:死信队列、消息重试、延迟队列
缺点
使用erlang小众语言
吞吐量比较低,w级
集群拓展麻烦
应用
传统的MQ,都有使用
RocketMQ
优点
吞吐量高,10w+级
功能完备,支持很多高级功能
分布式拓展性方便
使用java开发,阿里开源
缺点
应用
用于大规模吞吐、复杂业务
Kafka
优点
吞吐量极高,ms级时延
极高的可用性和可靠性
分布式拓展方便
缺点
功能较为简单,没有额外的高级功能
收到消息写入磁盘缓冲区并没有直接落磁盘,机器出故障会丢失磁盘缓冲区的数据
应用
大数据流计算,实际计算和日志采集中大规模使用
rocketmq的架构
架构图
NameServer
是什么
名称服务,是rocketmq的路由和寻址中心,负责维护Broker的元数据信息,包括Broker地址、Topic和Queue等信息,Producer和Consumer在启动时需要连接到NameServer获取Broker的地址信息来建立连接
nameserver还负责监控broker的状态,并提供自动发现和故障恢复的能力
功能
注册和发现Broker
每个Broker启动都得向所有的Nameserver进行注册,每个系统每隔一段时间,定时发送请求到nameserver去拉取最新的集群broker信息
路由管理
nameserver会记录每个topic的路由信息,包括哪些Broker存储这个Topic的消息,以及发送消息时应该选择哪个Broker作为目标
心跳检测
nameserver会定期从Broker接收心跳检测信息,以便了解Broker的状态,,如是否存活、负载情况
每个Broker每30s会向nameserver发送一个心跳,nameserver每10s启动一个定时任务,检查超过120s没发送心跳的Broker,将其剔除
Broker会跟每个Nameserver都建立一个TCP长连接,然后定时通过TCP长连接发送心跳
动态扩缩容
当新增或下线Broker时,nameserver会负责更新并通知客户端新的路由信息
Producer生产者
Producer发送消息的三种方式
同步发送
消息发送方会一直等待消息发送的结果返回,会阻塞等待直到收到Broker的确认响应,然后才会继续执行后续的代码逻辑。这种方式可以确保消息的可靠性,但是会对发送方的性能产生一定的影响
异步发送
发送方不会等待Broker的确认响应,而是立即返回并继续执行后续的代码逻辑。发送方可以注册一个回调函数,在Broker确认收到消息后,会触发回调函数执行响应的逻辑。异步发送已一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如视频上传后启动转码服务,转码完成后通知推送转码结果等。这种方式可以提高发送方的性能,但是消息的可靠性需要自行处理
单向发送
类似于udp,发送发只发送消息,并不等待Broker的确认响应,也没有回调函数执行相应逻辑。该模式适用于一些不需要关心消息是否成功送达的场景,如日志采集、统计数据等
Broker
是什么
消息中转服务器,负责存储和转发消息。RocketMQ支持多个Broker构成集群,每个Broker都拥有独立的存储空间和消息队列
Broker的心跳机制
Broker的主从架构
Broker高可用
为了保证MQ的数据不丢失而且具备一定的高可用性,一般Broker部署成Master-Slave模式。master需要在接收消息之后,将数据同步给slave,这样一旦master挂了,还有slave有一份数据
Broker的主从架构有没有实现读写分离
消费者在获取数据的时候,有可能从master获取数据,也有可能从slave获取数据。
一切都会由master broker根据情况来决定
作为消费者的系统在获取消息的时候第一次会先发送请求到master broker上,maser broker在返回消息的时候,会分局当前master broker的负载情况和slave broker的同步情况,向消费者系统建议下一次拉取消息的时候是从master拉取还是从slave拉取
不同版本Broker主从自动切换的区别
rocketmq4.5版本之前master宕机了,slave没法自动切换成master的,需要手动处理,,导致中间一段时间不可用
rocketmq4.5版本之后,支持了dledgerj机制,一个master broker可以对应多个slave broker,一旦master宕机,会从多个slave中通过dledger技术和Raft协议算法进行leader选举,将其中一个slave选举为master,这个过程是自动进行的,,而且也很快几秒
Topic作为一个数据集合是怎么在Broker集群中存储的
Topic下所有数据以messageQueue的形式,分布式存储在多台broker机器上
生产者是如何把消息发送给Broker的
生产者一定是投递消息到master broker的,然后master broker会同步数据给他的slave brokers,实现一份数据多份副本,保证master故障的时候数据不丢失,并且可以自动把slave切换为master提供服务
Consumer消费者
负责从Broker中消费消息
消息的两种消费模式
广播消费
消息会发给消费者组的中每一个消费者进行消费
集群消费
默认就是集群消费
一个消费者组共同消费一个主题的多个队列,一个队列只会被一个消费者消费
RocketMQ中核心数据模型
Topic
消息主题
是消息的逻辑分类单位。Producer将消息发送到特定的Topic中,Consumer从指定的Topic中消费消息
Tag
标签,可以看作为子主题,是消息的第二级类型
作用
使用标签,同一个业务模块不同目的的消息就可以使用相同Topic而不同的Tag来标识。
Consumer Group
概念
消费者组是一组具有相同消费逻辑的消费者实例的集合。在rocketmq中,一个主题可以由多个消费者组进行订阅和消费
在RocketMQ中,鼎业这的概念是通过消费组来体现的。每个消费组都消费Topic中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是一条消息被Consumer Group1消费过,也会再给Consumer Group2消费
Consumer Group 中包含多个 Consumer,同一个组中的消费者是竞争消费的关系,每个消费者负责消费Group中的一部分消息
主要作用
负载均衡::分摊消费压力,,提高消费速度和并发处理能力
同一消费组中多个消费者实例均匀分配所有消息
容错能力:自动故障转移
当消费者组内某个实例宕机后,会重新分配消息给剩余的消费者实例
多订阅模式:支持不同业务逻辑
同一个topic被多个消费者组订阅,每个消费者组独立消费全量消息,互不影响
示例:topic是PaymentTopic,消费者组1:账户对账,存储到数据库,消费者组2:发送支付消息。两组消费同样的支付消息
MessageQueue
消息队列
MessageQueue是Topic的逻辑分区,消息通过特定的负载均衡算法被路由到不同的MessageQueue,一个Topic可以有多个Message Queue,每个Queue都是独立的存储单元。
MessageQueue的特点
唯一标识
Topic+队列编号
消息顺序性
负载均衡
高可用性
顺序消费
Message Queue、Topic、Broker之间的关系
假设现在一个Topic指定创建了4个MessageQueue,集群以后两个Broker,那么每个Broker放两个MessageQueue,实现Topic数据的分布式存储
rocketmq引入messagequeue本质是一个数据分片机制,topic有1w条数据,指定4个messagequeue,每个messagequeue就存放2500个
生产者发送消息的时候写入哪个MessageQueue
创建topic要指定messagequeue的数量,这个信息是已经存放在nameserver中
生产者通过nameserver获取一个topic有几个messagequeue,这些messagequeue在哪台broker机器上
通过一定的负载均衡算法定位messagequeue,默认使用的轮询算法,也可以采用hash算法
master出现故障怎么办:生产者有一个查收农户sendLatencyFaultEnable,开启后,在一个broker故障后,自动回避一段时间不要访问这个broker,过段时间再去访问,一定时间后,master已经恢复好了,比如slave切换变成了master
offset
在topic的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除
rocketmq为每个消费组在每个队列上都维护一个消费位置offset,位置前面的消息都被消费过,位置后面的消息还没有被消费
工作流程
流程图
1、启动nameserver,它会等待broker、producer、consumer的连接
2、启动broker,会和nameserver建立连接,定时发送心跳包。心跳包中包含当前broker信息(ip、port等)、Topic信息
3、启动producer,启动时先随机和nameserver集群中的一台建立长连接,并从nameserver中获取当前发送Topic所有的Broker的地址。然后通过负载算法选择一个队列,默认是轮询,也可以是hash,并与queue所在的broker建立长连接,并进行消息的发送
4、broker接收producer发送的消息,消息刷盘分为同步刷盘和异步刷盘两种策略;broker中的maser和slave同步策略分为同步复制和异步复制两种模式。
当配置为同步复制时,master需要先将消息复制到slave节点,然后再返回“写成功状态”响应给生产者;当配置为同步刷盘时,还需要将消息写入磁盘中,再返回“写成功状态”;要是配置的是异步刷盘和异步复制,则消息只要发送到master节点,就直接返回“写成功状态”
5、启动consumer,和producer类似先随机和一台nameserver建立连接,获取topic的路由信息,然后在和需要订阅的broker建立连接,获取消息
怎么保障数据可靠性
数据在哪里可能会丢失
生产阶段
问题
生产者发送消息到broker,可能丢失
解决
同步发送或异步发送,响应失败需重试
使用事务消息
存储阶段
问题
宕机丢数据
解决
broker的刷盘机制
同步刷盘
只有消息持久化到commitlog磁盘中,才响应producer接收消息成功
异步刷盘
只要消息存储到pagecache中,就返回结果,刷盘是异步的
broker的消息使用同步刷盘机制,更加可靠
broker 集群支持master和slave同步复制和异步复制,生产者的消息都是发送给master,但是消费既可以从master消费,也可以从slave消费。同步复制模式可以保证即使master宕机,消息在slave中也有备份,保证了消息不会丢失
消费阶段
问题
如何保证消息被成功消费
解决
之前的版本消息者消费数据后都需要手动commit确认的,后来版本不需要,自动帮我们做了
保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有的消费业务逻辑之后,再发送消息确认。因为消息队列中维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条
延时消息
延时消息是什么
延时消息,只需要在生成消息的时候设置消息的延时级别
之前版本的rocketmq支持的延时级别是有限的
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
后来的版本也是可以支持自定义时间的延时消息
延时消息是怎么实现的
八个字:临时存储 +定时任务
rocketmq每个延时级别都会对应一个主题的特定队列,并会为每个延时级别创建一个定时任务轮询队列中的消息,到期后,把消息投递到目标Topic的队列中,然后消费者就可以正常消费这些消息
生产者发送延时消息 → Broker接收 → 存入SCHEDULE_TOPIC_XXXX → 定时任务检查 → 到期后投递到目标Topic → 消费者消费
死信队列
死信队列用于存储那些无法被正常处理消息,这些消息被称为死信(dead letter)
消费者在处理消息时发生异常,且达到了最大重试次数。当消费失败的原因排查并解决后,可以重发这些死信消息,让消费者重新消费;如果暂时无法处理,为避免到期后死信消息被删除(默认有效期与正常消息相同,均为3天,3天后会被自动删除),可以先将死信消息导出并进行保存
rocketmq控制台提供对死信消息的查询、导出和重发的功能
特点
一个死信队列对应一个group ID,而不是对应单个消费者实例
如果一个group ID未产生死信消息,就不会为其创建相应的死信队列
一个死信队列包含了对应Group ID产生的所有的死信消息,不论该消息属于哪个Topic
如何处理消息重复的问题
rocketmq确保消息一定投递,消息不丢失,但还是有可能造成消息重复,例如:下游处理成功后,但给上游的ack消息丢失,上游又重新发送一遍,这样下游就处理了两遍
处理消息重复问题,主要由业务端自行保证
消息去重
mq端消息去重:消息携带业务唯一标识,可以通过雪花算法生成全局唯一ID,很少用
业务端通过数据库的主键索引消息去重:保证每条消息都有一个唯一编号,通常是业务相关的,比如订单号,消费的记录需要落库,具体做法是可以建立一个消费记录表,拿个这个消息做数据库的insert操作,给这个消息做一个唯一主键或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息
消息幂等(幂等性是指一个操作可以执行多次而不会产生副作用,即无论执行多少次,结果都是相同的,对业务没有影响)
数据库乐观锁,where条件加一个状态,保证操作只能被执行一次
消息积压的问题
消费者扩容
如果当前Topic的messageQueue的数量大于消费者数量
就可以对消费者进行扩容,增加消费者来提高消费能力,尽快把积压的消息消费完
消费迁移Queue临时扩容
如果当前Topic的messageQueue的数量小于或等于消费者的数量,那再扩容消费者数量就没什么用,就要考虑扩容messageQueue
可以新建一个临时的Topic,临时的Topic对设置一些messageQueue,然后先用一些消费者把消费的数据丢到临时的Topic,因为不用业务处理,只是转发一下消息,还是很快的。接着用扩容的消费者区消费新的Topic里的树,消费完了之后,恢复原状
消息积压的核心在于定位问题
如果是消费者出现问题,例如热点数据竞争锁导致消费者消费满的消息积压,此时应该先解决热点数据的问题
如果各端消费正常,但是还是消息积压,那说明到达系统瓶颈,可以增加机器处理或者对上游进行一些限流操作
顺序消息如何实现
顺序消息
顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行
部分顺序消息
是什么
只要保证每一组消息被顺序消费即可,比如订单消息,只要保证同一订单ID的消息能按照顺序消费即可
实现
部分顺序消息相对比较好实现,生产端需要做到把同ID的消息发送到同一个messageQueue;在消费过程中,从同一个messageQueue读取消息就是顺序处理的
发送端使用 MessageQueueSelector 类来控制把消息发送哪个 MessageQueue
全局顺序消息
是什么
指某个Topic下所有的消息都要保证顺序
实现
要保证全局顺序消息,把Topic的队列数量设置为1,消费者也只有一个,完全牺牲了rocketmq的高并发、高吞吐的特性
分布式事务消息
同步消息有什么问题
使用普通消息和订单事务无法保证一致的原因
本事上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。而基于RocketMQ的分布式事务消息,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性
事务消息的两个阶段
发送一个半事务消息
发送方执行完本地事务后,并根据本地事务执行成功与否,向Broker发送确认半事务消息状态(commit/rollback,半事务消息只有commit状态才会真正向下游投递
如果因为网络问题导致某条事务消息的二次确认丢失,Broker会通过扫描发现某条消息长期处于“半事务消息”时,会主动向生产者询问该消息的最终状态
这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性
事务消息的流程
1、生产者将半事务消息发送至 RocketMQ Broker
2、Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂时不能投递,为半事务消息
3、生产者开始执行本地事务逻辑
4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者
二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者
5、在网络问题或生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认记过为unknown未知状态,经过固定时间后,服务端将对消息生产者发起消息回查
注意:重查次数有参数配置,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置
注意:事务消息回查步骤:生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果,然后生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端继续按照原步骤对半事务消息进行处理
消息是推还是拉
pull拉的模式就是消费者轮询,通过不断轮询的方式检查数据是否发生变化,变化的话就把数据拉回来
优点
消费者可以完全掌握消息的数量和速度,可以大大避免客户端消息堆积的情况
缺点
消息可能不及时,并且消费者需要不断的进行轮询,对mq中间件造成一定的压力
场景
适合实时性要求没那么高的场景
push推的方式就是消费者端和服务端建立TCP长连接或者注册一个回调,当服务端数据发生变化,立即通过长连接将数据推送给客户端
优点
消息是实时的,一旦有消息过来消费者能立马感知到。而且对消费者也比较简单,不需要轮询,只需要等推送就行
缺点
但是如果客户端没有做好流控,一点服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃
场景
适合实时性要求比较高的场景
RocketMQ是一种伪Push的模式,基于长轮询的方式
是什么
长轮询就是长连接 和 轮询 的结合来进行拉消息的。
长轮询就是消费者向中间件发起一个长轮询请求,建立一定时间的长连接,如果有数据就直接返回,如果没有也不会立即断开连接,而是等待一会,等待过程中如果有新数据到达,再把消息返回。如果超时还没有消息,那就结果了等待下一次长轮询
RocketMQ的push模式其实底层的实现还是基于pull实现的,只不过他把pull给封装的比较好,让你以为是在push。push其实也是pull实现的,是通过Broker和消费者之间建立15s的长连接,当Broker有数据,消费者就pull走数据,15s后长连接结束继续建立15s的长连接,实现了Broker的push数据的效果
rocketmq和kafka都是支持基于长轮询进行拉取消息的
优点
长轮询可以解决频发请求但无更新数据的问题,提升请求效率
同时也能够使消费者在有新数据到达时即使获取到数据,类似于推送的效果,具有一定的即时性
原理
Broker是怎么保存数据的
RocketMQ主要的存储文件包括 CommitLog 文件、ConsumeQueue文件、IndexFile文件
消息存储的整体设计
CommitLog文件
消息主体以及元数据的存储主体,存储producer写入的消息主题内容,顺序写入日志文件,根据偏移量查找,单个文件大小默认1G,写满后自动生成新的文件
ConsumeQueue 文件
一个MessageQueue就对应一个ConsumeQueue
存储commitLog消息的索引,保存了指定Topic下队列消息在 CommitLog中的起始物理偏移量offset、消息大小size、消息Tag的hashCode值
Consumer可根据ConsumeQueue来查询待消费的消息,提高根据Topic遍历CommitLog文件的检索能力,提高消息消费的性能
目录组织结构
ConsumeQueue文件可以看做是基于Topic的CommitLog索引文件
ConsumeQueue文件的组织方式:topic/queue/file 三层组织结构
每个条目共20个字节,分别为8字节的offset,4字节的size,8字节的tag hashcode,单个文件由30w个条目组成,可以像数组一样随机访问每一个条目,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约为5.72M
IndexFile文件
indexFile(索引文件)提供了一种可以通过 key 或 时间区间来查询消息的索引机制,其核心设计类似与文件系统中的HashMap,用于快速定位消息在commitLog中的物理位置
解决了什么问题
CommitLog 是顺序写入的文件,直接遍历查找消息效率极低(尤其是按 Message Key 或时间范围查询)
ConsumeQueue 仅支持按 Topic + Queue 检索,无法满足灵活查询需求。
如何解决
通过 IndexFile 构建 哈希索引,支持:按 Key 查询(如订单ID、消息ID)。按时间范围查询(如查询某时间段内的消息)
总结
rocketmq采用的是混合型的存储结构,即为 Broker 单个实例下的所有队列共用一个日志数据文件 commitLog文件来存储
针对producer和consumer分别采用了数据和索引部分相分离的存储结构,数据存储到commitLog中,索引存储在ConsumeQueue中
commitLog通过同步或异步刷盘
服务端支持长轮询
rocketMQ的具体做法,使用 Broker 端的后台服务线程—ReputMessageService 不停地分发请求并异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)数据。
消息刷盘是怎么实现的
Broker 在消息的存取直接操作的是内存(内存映射文件),这可以提高系统的吞吐量,但是无法避免机器掉电时的数据丢失,所以需要持久化到磁盘中。
通过NIO提供的MappedByteBuffer将磁盘恩建直接映射到JVM进程的虚拟内存空间,使得文件读写可以像操作内存一样高效
两种刷盘策略
同步刷盘
异步刷盘
刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入磁盘,如果是同步刷盘的话,在Broker把消息写到commitLog映射区后,就会等待写入完成。如果是异步刷盘,只是唤醒对应的线程,不保证执行的时机
流程
RocketMq的负载均衡是如何实现的
Producer的负载均衡
轮询
自定义策略:通过MessageQueueSelector接口实现自定义队列选择策略,比如按业务 key 哈希
consumer的负载均衡
广播消费模式
所有的消费者都能收到所有消息,不会负载均衡
集群模式
五种策略,默认是AllocateMessageQueueAveragely严格平均分配

收藏

收藏
0 条评论
下一页