RocketMQ专题
2026-01-06 10:13:54 0 举报
AI智能生成
本文档提供了RocketMQ的核心内容,它是一种轻量级、开源的分布式消息中间件。RocketMQ具有高性能、高可靠性、易于使用的特点,广泛应用于各种应用场景,如日志收集、事件源、交易系统等。文件类型为PDF,内部包含了详细的安装指南、核心概念介绍、架构分析、实战案例等。文中带有丰富的修饰语,如"易于使用"、"高可靠性",使主题更加鲜明,吸引力十足。
作者其他创作
大纲/内容
核心概念
MQ主要作用
异步
异步能提⾼系统的响应速度、吞吐量。
解耦
服务之间进⾏解耦,才可以减少服务之间的影响。提⾼系统整体的稳定性以及可扩展性。
另外,解耦后可以实现数据分发。⽣产者发送⼀个消息后,可以由⼀个或者多个消费者进⾏消费,并且消费者的增加或者减少对⽣产者没有影响。
削峰
以稳定的系统资源应对突发的流量冲击。
RocketMQ与其余中间件特点比较
Dledger⾼可⽤集群
如何解决脑裂问题
选举机制:Raft协议的基础是选举出⼀个领导者(Leader),其他节点(Follower)都从领导者获取数据。选举过程要求候选⼈必须获得集群中⼤多数节点的⽀持才能成为领导者。这确保了集群中只能有⼀个领导者,从⽽避免了脑裂问题。
任期(Term):Raft协议为每个选举周期设置了⼀个递增的任期编号。任期编号⽤于标识当前的领导者,确保旧的领导者不会再次被选为领导者。如果⼀个节点发现⾃⼰的任期⼩于其他节点,那么它会停⽌当前的⼯作并更新⾃⼰的任期。
⼼跳机制:领导者会定期向其他节点发送⼼跳消息,以保持与Follower节点的连接。当⼀个节点⻓时间未收到领导者的⼼跳时,它会认为当前领导者失效,并启动新⼀轮选举。这确保了当领导者出现故障时,系统能够快速地选出新的领导者。
⽇志复制:领导者负责将数据更新(⽇志条⽬)复制到其他节点。Follower节点只有在收到领导者的⽇志条⽬并将其写⼊本地⽇志后,才会响应客户端的请求。这确保了在发⽣脑裂情况下,不会出现多个节点试图同时修改同⼀份数据的情况。
集群
主从
master和slaver通过配置文件指定,master挂掉后,slaver不会升级为master。
Dledger⾼可⽤集群
如何解决脑裂问题
选举机制:Raft协议的基础是选举出⼀个领导者(Leader),其他节点(Follower)都从领导者获取数据。选举过程要求候选⼈必须获得集群中⼤多数节点的⽀持才能成为领导者。这确保了集群中只能有⼀个领导者,从⽽避免了脑裂问题。
任期(Term):Raft协议为每个选举周期设置了⼀个递增的任期编号。任期编号⽤于标识当前的领导者,确保旧的领导者不会再次被选为领导者。如果⼀个节点发现⾃⼰的任期⼩于其他节点,那么它会停⽌当前的⼯作并更新⾃⼰的任期。
⼼跳机制:领导者会定期向其他节点发送⼼跳消息,以保持与Follower节点的连接。当⼀个节点⻓时间未收到领导者的⼼跳时,它会认为当前领导者失效,并启动新⼀轮选举。这确保了当领导者出现故障时,系统能够快速地选出新的领导者。
⽇志复制:领导者负责将数据更新(⽇志条⽬)复制到其他节点。Follower节点只有在收到领导者的⽇志条⽬并将其写⼊本地⽇志后,才会响应客户端的请求。这确保了在发⽣脑裂情况下,不会出现多个节点试图同时修改同⼀份数据的情况。
在Dledger集群中,就不再单独指定各个broker的服务,⽽是由这些broker服务⾃⾏进⾏选举,产⽣⼀个Leader⻆⾊的服务,响应客户端的各种请求。⽽其他的broker服务,就作为Follower⻆⾊,负责对Leader上的数据进⾏备份
Dledger集群的选举是通过Raft协议进⾏的,Raft协议是⼀种多数同意机制。
运行架构
参考图
nameserver命名服务
nameServer不依赖于任何其他的服务,⾃⼰独⽴就能启动。并且,不管是broker还是客户端,都需要明确指定nameServer的服务地址。以⼀台电脑为例,nameServer可以理解为是整个RocketMQ的CPU,整个RocketMQ集群都要在CPU的协调下才能正常⼯作。
broker 核⼼服务
broker是RocketMQ中最为娇贵的⼀个组件。RockeMQ提供了各种各样的重要设计来保护broker的安全。同时broker也是RocketMQ中配置最为繁琐的部分。同样以电脑为例,broker就是整个RocketMQ中的硬盘、显卡这⼀类的核⼼硬件。RocketMQ最核⼼的消息存储、传递、查询等功能都要
由broker提供。
由broker提供。
client 客户端
Client包括消息⽣产者和消息消费者。同样以电脑为例,Client可以认为是RocketMQ中的键盘、⿏标、显示器
这类的输⼊输出设备。⿏标、键盘输⼊的数据需要传输到硬盘、显卡等硬件才能进⾏处理。但是键盘、⿏标是
不能直接将数据输⼊到硬盘、显卡的,这就需要CPU进⾏协调。通过CPU,⿏标、键盘就可以将输⼊的数据最
终传输到核⼼的硬件设备中。经过硬件设备处理完成后,再通过CPU协调,显示器这样的输出设备就能最终从
核⼼硬件设备中获取到输出的数据。
这类的输⼊输出设备。⿏标、键盘输⼊的数据需要传输到硬盘、显卡等硬件才能进⾏处理。但是键盘、⿏标是
不能直接将数据输⼊到硬盘、显卡的,这就需要CPU进⾏协调。通过CPU,⿏标、键盘就可以将输⼊的数据最
终传输到核⼼的硬件设备中。经过硬件设备处理完成后,再通过CPU协调,显示器这样的输出设备就能最终从
核⼼硬件设备中获取到输出的数据。
消息模型
参考图
每个MesasgeQueue都记录了⼀个最⼩位点和最⼤位点。
这⾥的位点代表每个MessageQueue上存储的消息的索引,也称为offset(偏移量)。每⼀条新记录的消息,都按照当前最⼤位点往后分配⼀个新的位点。这个位点就记录了这⼀条消息的存储位置。
⽣产者发送到某⼀个Topic下的消息,最终会保存在Topic
下的某⼀个MessageQueue中。⽽消费者来消费消息时,RocketMQ会在Broker端给每个消费者组记录⼀个消息的消费位点Offset。通过Offset控制每个消费者组的消息处理进度。这样,每⼀条消息,在⼀个消费者组当中只被处理⼀次。
下的某⼀个MessageQueue中。⽽消费者来消费消息时,RocketMQ会在Broker端给每个消费者组记录⼀个消息的消费位点Offset。通过Offset控制每个消费者组的消息处理进度。这样,每⼀条消息,在⼀个消费者组当中只被处理⼀次。
客户端模型
生产者
消息生产者的创建步骤
1.创建消息⽣产者producer,并指定⽣产者组名
2.指定Nameserver地址
客户端直接指定,例如 consumer.setNameSrvAddr("127.0.0.1:9876")
也可以通过读取系统环境变量NAMESRV_ADDR指定。其中第⼀种⽅式的优先级更⾼。
3.启动producer。消息⽣产者与服务端建⽴连接的过程。
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭⽣产者producer,释放资源。
消息确认机制
单向发送
单向发送⽅式下,消息⽣产者只管往Broker发送消息,⽽全然不关⼼Broker端有没有成功接收到消息。这就好⽐⽣产者向Broker发⼀封电⼦邮件,Broker有没有处理电⼦邮件,⽣产者并不知道。
参考代码
sendOneway⽅法没有返回值,如果发送失败,⽣产者⽆法补救。
单向发送有⼀个好处,就是发送消息的效率更⾼。适⽤于⼀些追求消息发送效率,⽽允许消息丢失的业务场景。⽐如⽇志。
同步发送
同步发送⽅式下,消息⽣产者在往Broker端发送消息后,会阻塞当前线程,等待Broker端的相应结果。
参考代码
SendResult sendResult = producer.send(msg);
SendResult来⾃于Broker的反馈。producer在send发出消息,到Broker返回SendResult的过程中,⽆法做其他的事情。
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
SEND_OK表示消息已经成功发送到Broker上。⾄于其他⼏种枚举值,都是表示消息在Broker端处理失败了
注意点
如果Broker端返回的SendStatus不是SEND_OK,也并不表示消息就⼀定不会推送给下游的消费者。仅仅只是表示Broker端并没有完全正确的处理这些消息。因此,如果要重新发送消息,最好要带上唯⼀的系统标识,这样在消费者端,才能⾃⾏做幂等判断。
异步发送
异步发送机制下,⽣产者在向Broker发送消息时,会同时注册⼀个回调函数。接下来⽣产者并不等待Broker的响应。当Broker端有响应数据过来时,⾃动触发回调函数进⾏对应的处理。
参考代码
当Broker端返回消息处理成功的响应信息SendResult时,就会调⽤onSuccess⽅法。
当Broker端处理消息超时或者失败时,就会调⽤onExcetion⽅法,⽣产者就可以在onException⽅法中进⾏补救措施。
注意点
⼀是与同步发送机制类似,触发了SendCallback的onException⽅法同样并不⼀定就表示消息不会向消费者推
送。如果Broker端返回响应信息太慢,超过了超时时间,也会触发onException⽅法。超时时间默认是3秒,可以通过
producer.setSendMsgTimeout⽅法定制。⽽造成超时的原因则有很多,消息太⼤造成⽹络拥堵、⽹速太慢、Broker端处理太慢等都可能造成
消息处理超时。
送。如果Broker端返回响应信息太慢,超过了超时时间,也会触发onException⽅法。超时时间默认是3秒,可以通过
producer.setSendMsgTimeout⽅法定制。⽽造成超时的原因则有很多,消息太⼤造成⽹络拥堵、⽹速太慢、Broker端处理太慢等都可能造成
消息处理超时。
⼆是在SendCallback的对应⽅法被触发之前,⽣产者不能调⽤shutdown()⽅法。
消费者
消息消费者创建步骤
1.创建消费者Consumer,必须指定消费者组名
2.指定Nameserver地址
客户端直接指定,例如 consumer.setNameSrvAddr("127.0.0.1:9876")
也可以通过读取系统环境变量NAMESRV_ADDR指定。其中第⼀种⽅式的优先级更⾼。
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer。消费者会⼀直挂起,持续处理消息。
消息确认机制
采⽤状态确认机制保证消费者⼀定能正常处理对应的消息
CONSUME_SUCCESS
RECONSUME_LATER
1、Broker不可能⽆限制的向消费失败的消费者推送消息。如果消费者⼀直没有恢复,Broker显然不可能⼀直⽆限制的推送,这会浪费集群很多的性能。所以,Broker会记录每⼀个消息的重试次数。如果⼀个消息经过很多次重试后,消费者依然⽆法正常处理,那么Broker会将这个消息推⼊到消费者组对应的死信Topic中。死信Topic相当于windows当中的垃圾桶。你可以⼈⼯介⼊对死信Topic中的消息进⾏补救,也可以直接彻底删除这些消息。RocketMQ默认的最⼤重试次数是16次。
2、为了让这些重试的消息不会影响Topic下其他正常的消息,Broker会给每个消费者组设计对应的重试Topic。MessageQueue是⼀个具有严格FIFO特性的数据结构。如果需要重试的这些消息还是放在原来的MessageQueue中,就会对当前MessageQueue产⽣阻塞,让其他正常的消息
⽆法处理。RocketMQ的做法是给每个消费者组⾃动⽣成⼀个对应的重试Topic。在消息需要重试时,会先移动到对应的重试Topic中。后续Broker只要从这些重试Topic中不断拿出消息,往消费者组重新推送即可。这样,这些重试的消息有了⾃⼰单独的队列,就不会影响到Topic下的
其他消息了。
⽆法处理。RocketMQ的做法是给每个消费者组⾃动⽣成⼀个对应的重试Topic。在消息需要重试时,会先移动到对应的重试Topic中。后续Broker只要从这些重试Topic中不断拿出消息,往消费者组重新推送即可。这样,这些重试的消息有了⾃⼰单独的队列,就不会影响到Topic下的
其他消息了。
3、RocketMQ中设定的消费者组都是订阅主题和消费逻辑相同的服务备份,所以当消息重试时,Broker只要往消费者组中随意⼀个实例推送即可。这是消息重试机制能够正常运⾏的基础。但是,在客户端的具体实现时,MQDefaultMQConsumer并没有强制规定消费者组不能重复。也就是说,你完全可以实现出⼀些订阅主题和消费逻辑完全不同的消费者服务,共同组成⼀个消费组。在这种情况下,RocketMQ不会报错,但是消息的处理逻辑就⽆法保持⼀致了。这会给业务带来很⼤的麻烦。这是在实际应⽤时需要注意的地⽅。
4、Broker端最终只通过消费者组返回的状态来确定消息有没有处理成功。⾄于消费者组⾃⼰的业务执⾏是否正常,Broker端是没有办法知道的。因此,在实现消费者的业务逻辑时,应该要尽量使⽤同步实现⽅式,保证在⾃⼰业务处理完成之后再向Broker端返回状态。⽽应该尽量避免异步的⽅式处理业务逻辑。
消费者组也可以⾃⾏指定起始消费位点
CONSUME_FROM_LAST_OFFSET
从对列的最后⼀条消息开始消费
CONSUME_FROM_FIRST_OFFSET
从对列的第⼀条消息开始消费
CONSUME_FROM_TIMESTAMP
从某⼀个时间点开始重新消费
consumer.setConsumerTimestamp("20131223171201");
消息模式
广播模式
⼴播模式和集群模式是RocketMQ的消费者端处理消息最基本的两种模式。集群模式下,⼀个消息,只会被⼀个消费者组中的多个消费者实例共同 处理⼀次。⼴播模式下,⼀个消息,则会推送给所有消费者实例处理,不再关⼼消费者组。
消费者核心代码
consumer.setMessageModel(MessageModel.BROADCASTING);
实现思路
默认模式(也就是集群模式)下,Broker端会给每个ConsumerGroup维护⼀个统⼀的Offset,这样,当Consumer来拉取消息时,就可以通过
Offset保证⼀个消息,在同⼀个ConsumerGroup内只会被消费⼀次。⽽⼴播模式的本质,是将Offset转移到Consumer端⾃⾏保管,包括Offset
的记录以及更新,全都放到客户端。这样Broker推送消息时,就不再管ConsumerGroup,只要Consumer来拉取消息,就返回对应的消息。
Offset保证⼀个消息,在同⼀个ConsumerGroup内只会被消费⼀次。⽽⼴播模式的本质,是将Offset转移到Consumer端⾃⾏保管,包括Offset
的记录以及更新,全都放到客户端。这样Broker推送消息时,就不再管ConsumerGroup,只要Consumer来拉取消息,就返回对应的消息。
注意点
1、Broker端不维护消费进度,意味着,如果消费者处理消息失败了,将⽆法进⾏消息重试。
2、Consumer端维护Offset的作⽤是可以在服务重启时,按照上⼀次消费的进度,处理后⾯没有消费过的消息。如果Offset丢了,Consuer依然可以拉取消息。
⽐如⽣产者发送了1~10号消息。消费者当消费到第6个时宕机了。当他重启时,Broker端已经把第10个消息都推送完成了。如果消费者端维护好了⾃⼰的Offset,那么他就可以在服务重启时,重新向Broker申请6号到10号的消息。但是,如果消费者端的Offset丢失了,消费者服务依然可以正常运⾏,但是6到10号消息就⽆法再申请了。后续这个消费者就只能获取10号以后的消息。
过滤消息
应用场景
同⼀个Topic下有多种不同的消息,消费者只希望关注某⼀类消息。
参考代码
生产端
消费端
consumer.subscribe("TagFilterTest", "TagA");
SQL过滤
通过Tag属性,只能进⾏简单的消息匹配。如果要进⾏更复杂的消息过滤,⽐如数字⽐较,模糊匹配等,就需要使⽤SQL过滤⽅式。SQL过滤⽅式可以通过Tag属性以及⽤户⾃定义的属性⼀起,以标准SQL的⽅式进⾏消息过滤。
参考代码
生产端
⽣产者端在发送消息时,出了Tag属性外,还可以增加⾃定义属性
消费端
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
消费者端在进⾏过滤时,可以指定⼀个标准的SQL语句,定制复杂的过滤规则。
注意点
如果需要使⽤⾃定义参数进⾏过滤,需要在Broker端,将参数enablePropertyFilter设置成true。这个参数默认是false。
Broker会在往Consumer推送消息时,在Broker端进⾏消息过滤。
这样的好处是减少了不必要的⽹络IO,但是缺点是加⼤了服务端的压⼒。不过在RocketMQ的良好设计下,更建议使⽤消息过滤机制
注意点
1、使⽤Tag过滤时,如果希望匹配多个Tag,可以使⽤两个竖线(||)连接多个Tag值。另外,也可以使⽤星号(*)匹配所有。
2、使⽤SQL顾虑时,SQL语句是按照SQL92标准来执⾏的。SQL语句中⽀持⼀些常⻅的基本操作:
数值⽐较,⽐如:>,>=,<,<=,BETWEEN,=;
字符⽐较,⽐如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
数值⽐较,⽐如:>,>=,<,<=,BETWEEN,=;
字符⽐较,⽐如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
3、Consumer不感兴趣的消息并不表示直接丢弃。通常是需要在同⼀个消费者组,定制另外的消费者实例,消费那些剩下的消息。但是,如果⼀直没有另外的Consumer,那么,Broker端还是会推进Offset。
顺序消息机制
应用场景
每⼀个订单有从下单、锁库存、⽀付、下物流等⼏个业务步骤。每个业务步骤都由⼀个消息⽣产者通知给下游服务。如何保证对每个订单的业务处理顺序不乱?
生产端
参考代码
通过MessageSelector,将orderId相同的消息,都转发到同⼀个MessageQueue中。
消费端
参考代码
注⼊⼀个MessageListenerOrderly实现。
实现思路
1、⽣产者只有将⼀批有顺序要求的消息,放到同⼀个MesasgeQueue上,通过MessageQueue的FIFO特性保证这⼀批消息的顺序,如果不指定MessageSelector对象,那么⽣产者会采⽤轮询的⽅式将多条消息依次发送到不同的MessageQueue上。
2、消费者需要实现MessageListenerOrderly接⼝,实际上在服务端,处理MessageListenerOrderly时,会给⼀个MessageQueue加锁,拿到MessageQueue上所有的消息,然后再去读取下⼀个MessageQueue的消息。
注意点
1、理解局部有序与全局有序。⼤部分业务场景下,我们需要的其实是局部有序。如果要保持全局有序,那就只保留⼀个MessageQueue。性能显然⾮常低。
2、⽣产者端尽可能将有序消息打散到不同的MessageQueue上,避免过于集中导致数据热点竞争。
3、消费者端只进⾏有限次数的重试。如果⼀条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进⾏重试。但是,如果消费者⼀直处理失败,超过最⼤重试次数,那么RocketMQ就会跳过这⼀条消息,处理后⾯的消息,这会造成消息乱序。
4、消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
作为替代。
作为替代。
延迟消息
应用场景
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望⽴⻢投递这条消息,⽽是延迟⼀定时间后才投递到Consumer进⾏消费
是RocketMQ⾮常有特⾊的⼀个功能。对⽐下RabbitMQ和Kafka。RabbitMQ中只能通过使⽤死信队列变相实现延迟
消息,或者加装⼀个插件来⽀持延迟消息。 Kafka则不太好实现延迟消息
消息,或者加装⼀个插件来⽀持延迟消息。 Kafka则不太好实现延迟消息
核心方法
5.3.0版本RocketMQ提供了两种实现延迟消息的机制,⼀种是指定固定的延迟级别,⼀种是指定消息发送时间。
生产者端
关于延迟级别,RocketMQ给消息定制了18个默认的延迟级别
实现思路
对于指定固定延迟级别的延迟消息,RocketMQ的实现⽅式是预设⼀个系统Topic,名字叫SCHEDULE_TOPIC_XXXXX。在这个Topic下,预设了18个MessageQueue。这⾥每个对列就对应了⼀种延迟级别。然后每次扫描这18个队列⾥的消息,进⾏延迟操作就可以了。
另外指定时间点的延迟消息,RocketMQ是通过时间轮算法实现的。
批量消息
应用场景
⽣产者要发送的消息⽐较多时,可以将多条消息合并成⼀个批量消息,⼀次性发送出去。这样可以减少⽹络IO,提升消息发送的吞吐量。
生产端
核心代码参考图
注意点
批量消息的使⽤⾮常简单,但是要注意RocketMQ做了限制。同⼀批消息的Topic必须相同,另外,不⽀持延迟消息。
还有批量消息的⼤⼩不要超过1M,如果太⼤就需要⾃⾏分割。
事务消息
应用场景
事务消息是RocketMQ⾮常有特⾊的⼀个⾼级功能。他的基础诉求是通过RocketMQ的事务机制,来保证上下游的数据⼀致性。
以电商为例,⽤户⽀付订单这⼀核⼼操作的同时会涉及到下游物流发货、积分变更、购物⻋状态清空等多个⼦系统的变更。这种场景,⾮常适合使⽤RocketMQ的解耦功能来进⾏串联。
以电商为例,⽤户⽀付订单这⼀核⼼操作的同时会涉及到下游物流发货、积分变更、购物⻋状态清空等多个⼦系统的变更。这种场景,⾮常适合使⽤RocketMQ的解耦功能来进⾏串联。
实现思路
参考图
1. ⽣产者将消息发送⾄RocketMQ服务端
2. RocketMQ服务端将消息持久化成功之后,向⽣产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
3. ⽣产者开始执⾏本地事务逻辑。
4. ⽣产者根据本地事务执⾏结果向服务端提交⼆次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
⼆次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
⼆次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
⼆次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
⼆次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
5. 在断⽹或者是⽣产者应⽤重启的特殊情况下,若服务端未收到发送者提交的⼆次确认结果,或服务端收到的⼆次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息⽣产者即⽣产者集群中任⼀⽣产者实例发起消息回查。
6. ⽣产者收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
7. ⽣产者根据检查到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤4对半事务消息进⾏处理。
实现代码
实现时的重点是使⽤RocketMQ提供的TransactionMQProducer事务⽣产者,在TransactionMQProducer中注⼊⼀个TransactionListener事务监听器来执⾏本地事务,以及后续对本地事务的检查。
注意点
1、半消息是对消费者不可⻅的⼀种消息。实际上,RocketMQ的做法是将消息转到了⼀个系统Topic,RMQ_SYS_TRANS_HALF_TOPIC。
2、事务消息中,本地事务回查次数通过参数transactionCheckMax设定,默认15次。本地事务回查的间隔通过参数transactionCheckInterval
设定,默认60秒。超过回查次数后,消息将会被丢弃。
设定,默认60秒。超过回查次数后,消息将会被丢弃。
3、其实,了解了事务消息的机制后,在具体执⾏时,可以对事务流程进⾏适当的调整。
ACL权限控制
应用场景
RocketMQ提供了针对队列、⽤户等不同维度的⾮常全⾯的权限管理机制。通常来说,RocketMQ作为⼀个内部服务,是不需要进⾏权限控制的,但是,如果要通过RocketMQ进⾏跨部⻔甚⾄跨公司的合作,权限控制的重要性就显现出来了。
1、RocketMQ针对每个Topic,就有完整的权限控制。⽐如,在控制平台中,就可以很⽅便的给每个Topic配置权限。
perm字段表示Topic的权限。有三个可选项。 2:禁写禁订阅,4:可订阅,不能写,6:可写可订阅
2、在Broker端还提供了更详细的权限控制机制。主要是在broker.conf中打开acl的标志:aclEnable=true。然后就可以⽤他提供的plain_acl.yml来进⾏权限配置了。并且这个配置⽂件是热加载的,也就是说要修改配置时,只要修改配置⽂件就可以了,不⽤重启Broker服务。⽂件的配置⽅式,也⾮常简单,⼀⽬了然。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.1</version>
</dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.1</version>
</dependency>
springboot整合rocketmq
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
配置文件中
rocketmq.name-server=192.168.65.112:9876
rocketmq.producer.group=springBootGroup
rocketmq.producer.group=springBootGroup
生产者声明
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg);
}
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg);
}
消费者声明
@RocketMQMessageListener
⼀个RocketMQTemplate实例只能包含⼀个⽣产者,也就只能往⼀个Topic下发送消息。如果需要往另外⼀个Topic下发送消息,就需要通过
@ExtRocketMQTemplateConfiguration()注解另外声明⼀个⼦类实例。
@ExtRocketMQTemplateConfiguration()注解另外声明⼀个⼦类实例。
对于事务消息机制,最关键的事务监听器需要通过@RocketMQTransactionListener注解注⼊到Spring容器当中。在这个注解当中可以通过
rocketMQTemplateBeanName属性,指向具体的RocketMQTemplate⼦类。
rocketMQTemplateBeanName属性,指向具体的RocketMQTemplate⼦类。
注意事项
offsetid是RocketMQ内部给每条消息分配的唯⼀索引
Producer发送的Message对象是没有offsetidoffsetid属性的。Broker端接收到Producer发过来的消息后,会给每条消息单独分配⼀个唯⼀的offsetid。
实际上,在RocketMQ内部,也会针对批量消息、事务消息等特殊的消息机制,有特殊的msgId分配机制。因此,在复杂业务场景下,不建议使⽤msgId来作为消息的唯⼀索引,⽽建议采⽤下⾯的key属性,⾃⾏指定业务层⾯上的唯⼀索引。
key是Message中的补充信息
针对key这⼀个属性,建议在业务中可以添加⼀些带有业务唯⼀性的数据,作为MessageId的补充。RocketMQ基于Keys属性,实现了消息溯源、消息压缩等⼀系列功能
通过Tag进⾏消息过滤性能⾮常⾼
tag属性也是Producer发送的Message对象的固有属性。其作⽤主要是⽤来进⾏消息过滤。实际上,RocketMQ的服务端会把消息的Tag信息以
某种形式(hashCode)写⼊到检索消息的ConsumeQueue索引中。这样当Consumer消费消息时,就可以通过过滤ConsumeQueue索引中的Tag
属性,快速找到⾃⼰感兴趣的消息。
某种形式(hashCode)写⼊到检索消息的ConsumeQueue索引中。这样当Consumer消费消息时,就可以通过过滤ConsumeQueue索引中的Tag
属性,快速找到⾃⼰感兴趣的消息。
Kafka的⼀⼤问题是Topic过多,会造成Partition⽂件过多,影响性能。⽽RocketMQ中的Topic完全不会对消息转发性能有影响。但是Topic
过多,还是会加⼤RocketMQ的元数据维护的性能消耗。所以,在使⽤时,还是需要对Topic进⾏合理的分配。
过多,还是会加⼤RocketMQ的元数据维护的性能消耗。所以,在使⽤时,还是需要对Topic进⾏合理的分配。
消费者端进⾏幂等控制
at most once 最多⼀次:每条消息最多只会被消费⼀次
at most once是最好保证的。RocketMQ中可以直接⽤异步发送、sendOneWay等⽅式就可以保证。
at least once ⾄少⼀次:每条消息⾄少会被消费⼀次
at least once这个语义,RocketMQ也有同步发送、事务消息等很多⽅式能够保证
exactly once 刚刚好⼀次:每条消息都只会确定的消费⼀次
RocketMQ只能保证at least once,保证不了
exactly once。所以,使⽤RocketMQ时,需要由业务系统⾃⾏保证消息的幂等性。
exactly once。所以,使⽤RocketMQ时,需要由业务系统⾃⾏保证消息的幂等性。
消息幂等的必要性
消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况
发送时消息重复
当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时⽣产者
意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断。 为了保证消息⾄少被消费⼀次,
消息队列 RocketMQ 的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
消息队列 RocketMQ 的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复(包括但不限于⽹络抖动、Broker 重启以及订阅⽅应⽤重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
要在业务上⾃⾏来保证消息消费的幂等性
RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个
MessageId来作为判断幂等的关键依据。
但是,这个MessageId是⽆法保证全局唯⼀的,也会有冲突的情况。所以在⼀些对幂等性要求严格的场景,最好是使⽤业务上唯⼀的⼀个标识
⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进⾏传递。
MessageId来作为判断幂等的关键依据。
但是,这个MessageId是⽆法保证全局唯⼀的,也会有冲突的情况。所以在⼀些对幂等性要求严格的场景,最好是使⽤业务上唯⼀的⼀个标识
⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进⾏传递。
关注错误消息重试
在重试时,RocketMQ实际上会为每个消费
者组创建⼀个对应的重试队列。重试的消息会进⼊⼀个 “%RETRY%”+ConsumeGroup 的队列中。
者组创建⼀个对应的重试队列。重试的消息会进⼊⼀个 “%RETRY%”+ConsumeGroup 的队列中。
多关注重试队列,可以及时了解消费者端的运⾏情况。这个队列中出现了⼤量的消息,就意味着消费者的运⾏出现了问题,要及时跟踪进⾏⼲预。
消息重试16次后仍然失败,消息将不再投递。转为进⼊死信队列。
然后关于这个重试次数,RocketMQ可以进⾏定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试
次数超过16次后,消息的重试时间间隔均为2⼩时。
然后关于这个重试次数,RocketMQ可以进⾏定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试
次数超过16次后,消息的重试时间间隔均为2⼩时。
消息最⼤重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置
⼿动处理死信队列
当⼀条消息消费失败,RocketMQ就会⾃动进⾏消息重试。⽽如果消息超过最⼤重试次数,RocketMQ就会认为这个消息有问题。但是此时,
RocketMQ不会⽴刻将这个有问题的消息丢弃,⽽会将其发送到这个消费者组对应的⼀种特殊队列:死信队列。
RocketMQ不会⽴刻将这个有问题的消息丢弃,⽽会将其发送到这个消费者组对应的⼀种特殊队列:死信队列。
死信队列的名称是%DLQ%+ConsumGroup
条消息进⼊了死信队列,意味着消息在消费处理的过程中出现了⽐较严重的错误,并且⽆法⾃⾏恢复。此时,⼀般需要⼈⼯去查看死信队列中的消息,对错误原因进⾏排查。
死信队列的特征:
⼀个死信队列对应⼀个ConsumGroup,⽽不是对应某个消费者实例。
如果⼀个ConsumeGroup没有产⽣死信队列,RocketMQ就不会为其创建相应的死信队列。
⼀个死信队列包含了这个ConsumeGroup⾥的所有死信消息,⽽不区分该消息属于哪个Topic
死信队列中的消息不会再被消费者正常消费
死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最⻓时间的消息都会被删除,⽽不管消息是否消费过。
默认创建出来的死信队列,他⾥⾯的消息是⽆法读取的,在控制台和消费者中都⽆法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要⼿动将死信队列的权限配置成6,才能被消费
核心源码解读与原理
nameServer整体架构
参考图
broker
Broker是整个RocketMQ的业务核⼼。所有消息存储、转发这些重要的业务都是Broker进⾏处理。
整体架构参考图
如何处理response类型的请求?
NettyServer处理完request请求后,会先缓存到responseTable中,等NettyClient下次发送response类型的请求,再来获取。这样就不⽤阻塞Channel,提升请求的吞吐量。优雅的⽀持了异步请求。
RPC整体架构图
Broker⼼跳注册管理
Broker启动后会⽴即发起向NameServer注册⼼跳。⽅法⼊⼝:BrokerController.this.registerBrokerAll。 然后启动⼀个定时任务,以10秒延迟,默认30秒的间隔持续向NameServer发送⼼跳。
参考图
Producer发送消息过程
核心启动流程参考图
发送消息的核心流程
参考图
1、发送消息时,会维护⼀个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。 --可以在⽣产者示例中,start后打⼀个断点,然后把NameServer停掉,这时,Producer还是可以发送消息的。
1、发送消息时,会维护⼀个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么
DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。 --可以在⽣产者示例中,start后打⼀个断
点,然后把NameServer停掉,这时,Producer还是可以发送消息的。
DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。 --可以在⽣产者示例中,start后打⼀个断
点,然后把NameServer停掉,这时,Producer还是可以发送消息的。
3、如果在发送消息时传了Selector,那么Producer就不会⾛这个负载均衡的逻辑,⽽是会使⽤Selector去寻找⼀个队列。
Consumer拉取消息过程
关注重点
消费者组之间有集群模式和⼴播模式两种消费模式。我们就要了解下这两种集群模式是如何做的逻辑封装
然后我们关注下消费者端的负载均衡的原理。即消费者是如何绑定消费队列的,哪些消费策略到底是如何落地的
最后我们来关注下在推模式的消费者中,MessageListenerConcurrently 和MessageListenerOrderly这两种消息监听器的处理逻辑到底有什么不同,为什
么后者能保持消息顺序。
么后者能保持消息顺序。
启动过程参考图
这个AllocateMessageQueueStrategy就是⽤来给Consumer和MessageQueue之间建⽴⼀种对应关系的。也就是说,只要Topic当中的MessageQueue以及同⼀个ConsumerGroup中的Consumer实例都没有变动,那么某⼀个Consumer实例只是消费固定的⼀个或多个MessageQueue上的消息,其他Consumer不会来抢这个Consumer对应的MessageQueue。
因为Broker需要按照ConsumerGroup管理每个MessageQueue上的Offset,如果⼀个MessageQueue上有多个同属⼀个ConsumerGroup的Consumer实例,他们的处理进度就会不⼀样。这样的话,Offset就乱套了。
消息持久化设计
RocketMQ消息直接采⽤磁盘⽂件保存消息,默认路径在${user_home}/store⽬录。这些存储⽬录可以在broker.conf中⾃⾏指定。
存储⽂件主要分为三个部分
CommitLog:存储消息的元数据。所有消息都会顺序存⼊到CommitLog⽂件当中。CommitLog由多个⽂件组成,每个⽂件固定⼤⼩1G。以第⼀条消息的偏移量为⽂件名。
ConsumerQueue:存储消息在CommitLog的索引。⼀个MessageQueue⼀个⽂件,记录当前MessageQueue被哪些消费者组消费到了哪⼀条CommitLog。
IndexFile:为了消息查询提供了⼀种通过key或时间区间来查询消息的⽅法,这种通过IndexFile来查找消息的⽅法不影响发送与消费消息的主流程
⼏个辅助的存储⽂件,主要记录⼀些描述消息的元数据
checkpoint:数据存盘检查点。⾥⾯主要记录commitlog⽂件、ConsumeQueue⽂件以及IndexFile⽂件最后⼀次刷盘的时间戳。
config/*.json:这些⽂件是将RocketMQ的⼀些关键配置信息进⾏存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等⼀些信息。
abort:这个⽂件是RocketMQ⽤来判断程序是否正常关闭的⼀个标识⽂件。正常情况下,会在启动时创建,⽽关闭服务时删除。但是如果遇到⼀些服务器宕机,或者kill -9这样⼀些⾮正常关闭服务的情况,这个abort⽂件就不会删除,因此RocketMQ就可以判断上⼀次服务是⾮正常关闭的,后续就
会做⼀些数据恢复的操作。
会做⼀些数据恢复的操作。
参考图
commitLog写⼊
消息入口方法
消息存储的⼊⼝在: DefaultMessageStore.asyncPutMessage⽅法
在进⾏消息写⼊前,如何进⾏的加锁?
加锁时,先通过topicQueueLock锁对列,因为数据是以MessageQueue位单位传⼊进来的,锁对列可以保证同⼀个MessageQueue的数据是按顺序写⼊的。
然后通过putMessageLock锁写⼊操作。保证同⼀时刻,只有⼀个线程在写⼊消息。
然后通过putMessageLock锁写⼊操作。保证同⼀时刻,只有⼀个线程在写⼊消息。
putMessageLock可以根据配置信息选择是SpingLock⾃旋锁还是ReentrantLock可重⼊锁。⾃旋锁就是⼀直尝试CAS直到拿到锁。ReentrantLock做⼀次CAS,拿不到就休眠,直到前⾯线程unlock的时候唤醒,继续竞争锁(⾮公平) 。两者的区别在于如果写⼊的消息⾮常多,竞争⾮常激烈,适合⽤ReentrantLock,减少CPU空转。竞争没有那么激烈,则适合⽤⾃旋锁,得到锁的速度更快。
CommitLog⽂件是按照顺序写的⽅式写⼊的。
这个mappedFile是RocketMQ⾃⼰实现的⼀个DefaultMappedFIle。 appendMessage⽅法就是关于顺序写,在appendMessage⽅法中指定byteBuffer的写⼊位置,只从⽂件最后开始写⼊
消息写⼊后如何刷盘
同步刷盘SYNC_FLUSH
同步刷盘通过GroupCommitService完成,同步刷盘的流程
RocketMQ的同步刷盘在后台任务中同样是要休眠的,意味着,消息写⼊PageCache缓存再到写⼊磁盘,这中间依然是会有时间差的。这意味着同样会有断电丢失的可能
异步刷盘ASYNC_FLUSH
就是休眠⼀段时间,⼲⼀次活。休眠间隔由配置⽂件指定。
默认200毫秒
过期⽂件删除机制
cleanCommitLogService⽤来删除过期的CommitLog⽂件,cleanConsumeQueueService⽤来删除过期的ConsumeQueue和IndexFile⽂件。
默认情况下删除3天前的数据
然后在删除ConsumeQueue和IndexFile⽂件时,会去检查CommitLog当前的最⼩Offset,然后在删除时进⾏对⻬。
RocketMQ在删除过期CommitLog⽂件时,并不检查消息是否被消费过。 所以如果有消息⻓期没有被消费,是有可能直接被删除掉,造成消息丢失的。
RocketMQ整个⽂件管理参考图
⽂件索引结构
CommitLog⽂件的⼤⼩是固定的,但是其中存储的每个消息单元⻓度是不固定的,具体格式可以参考org.apache.rocketmq.store.CommitLog中计算消息⻓度的⽅法
正因为消息的记录⼤⼩不固定,所以RocketMQ在每次存CommitLog⽂件时,都会去检查当前CommitLog⽂件空间是否⾜够,如果不够的话,就重新创建⼀个
CommitLog⽂件。⽂件名为当前消息的偏移量。
CommitLog⽂件。⽂件名为当前消息的偏移量。
ConsumeQueue⽂件主要是加速消费者的消息索引。他的每个⽂件夹对应RocketMQ中的⼀个MessageQueue,⽂件夹下的⽂件记录了每个MessageQueue中的消息在CommitLog⽂件当中的偏移量。这样,消费者通过ComsumeQueue⽂件,就可以快速找到CommitLog⽂件中感兴趣的消息记录。
⽽消费者在ConsumeQueue⽂件当中的消费进度,会保存在config/consumerOffset.json⽂件当中。
⽽消费者在ConsumeQueue⽂件当中的消费进度,会保存在config/consumerOffset.json⽂件当中。
⽂件结构: 每个ConsumeQueue⽂件固定由30万个固定⼤⼩20byte的数据块组成,数据块的内容包括:msgPhyOffset(8byte,消息在⽂件中的起始位置)+msgSize(4byte,消息在⽂件中占⽤的⻓度)+msgTagCode(8byte,消息的tag的Hash值)。
msgTag是和消息索引放在⼀起的,所以,消费者根据Tag过滤消息的性能是⾮常⾼的。
msgTag是和消息索引放在⼀起的,所以,消费者根据Tag过滤消息的性能是⾮常⾼的。
IndexFile⽂件主要是辅助消息检索。他的作⽤主要是⽤来⽀持根据key和timestamp检索消息。他的⽂件名⽐较特殊,不是以消息偏移量命名,⽽是⽤的时间命名。但是其实,他也是⼀个固定⼤⼩的⽂件。
⽂件结构: 他的⽂件结构由 indexHeader(固定40byte)+ slot(固定500W个,每个固定20byte) + index(最多500W*4个,每个固定20byte) 三个部分组成。
延迟消息机制
固定级别的延迟消息
然后Broker中会创建⼀个默认的Schedule_Topic主题,这个主题下有18个队列,对应18个延迟级别。消息发过来之后,会先把消息存⼊Schedule_Topic主题中对应的队列。然后等延迟时间到了,再转发到⽬标队列,推送给消费
者进⾏消费。
者进⾏消费。
固定延迟级别的消息,处理后转储回业务指定的Topic
延迟时间到了后,再将消息转储回到Producer提交的业务Topic和Queue中,这样就可以正常被消费者消费了。
通过时间轮算法实现的自定义延迟消息
指定时间点的延迟消息,转移到rmq_sys_wheel_timer Topic中,对列固定为0。
transformTimerMessage⽅法会对指定时间点的延迟消息进⾏检查。如果还没有到指定的时间点,同样修改消息的⽬标Topic和队列,接下来Broker就会将消息转移到rmq_sys_wheel_timer Topic中,对列固定为0
参考图
核心
数据按照预设的过期时间,放到对应的slot上(时钟表上的每个秒钟刻度)。 如果数据的延迟时间超过了时间轮的最⼤数据数,就会在slot上记录⼀个轮次(钟表上当前的第1秒和第⼆天的1秒,指向同⼀个刻度,但是数据上记录⼀个轮次,就能区分天了)
时间轮上设置⼀个指针变量(钟表上的秒钟),指针会按固定时间前进(秒钟每秒前移⼀格)。指针指向的Slot(秒钟指向的刻度),就是当前已经到期的数据(当然,如果对应slot上的轮次>1那就没有到期,只要将数据上的轮次-1就可以了)。
时间轮算法实现流程图
TimerWheel整体是⼀个数组,⼯作原理可以理解为⼀个时钟盘。盘上的每个刻度是⼀个slot。每个slot记录⼀条数据的索引。所有具体的消息数据都是放到⼀个LocalBuffer缓存数组中的。每个Slot就描述⼀条或多条LocalBuffer上的具体消息数据。
整个时间轮默认slot的个数:slotsTotal={TIMER_WHEEL_TTL_DAY}x{DAY_SECS}(7 x 86400) 即七天的秒数,时间精度timerPrecisionMs=1000,也就
是⼀秒。即每个slot⾥会保存1秒的消息索引。也就是说RocketMQ的延迟消息时间精度是1秒(实际上API上的延迟时间是可以设置到毫秒,但是具体执⾏时,精度只能到1秒)
是⼀秒。即每个slot⾥会保存1秒的消息索引。也就是说RocketMQ的延迟消息时间精度是1秒(实际上API上的延迟时间是可以设置到毫秒,但是具体执⾏时,精度只能到1秒)
在TimerMessageStore中有两个变量currReadTimeMs 和 currReadTimeMs。 这两个指针就类似于时钟上的指针。其中,currWriteTimeMs指向当前正在写⼊数据的slot。 ⽽currReadTimeMs指向当前正在读取数据的slot。这两个变量不断往前变化,就可以像时钟的指针⼀样依次读取每⼀秒上的数据。这时候读到的slot是可以表示当前这⼀秒的数据 ,还有 时间轮转过多轮后的数据。
整个延迟消息实现流程图
⻓轮询机制
⻓轮询机制简单来说,就是当Broker接收到Consumer的Pull请求时,判断如果没有对应的消息,不⽤直接给Consumer响应(给响应也是个空的,没意义),⽽是就将这个Pull请求给缓存起来。当Producer发送消息过来时,增加⼀个步骤去检查是否有对应的已缓存的Pull请求,如果有,就及时将请求从缓存中拉取出来,并将消息通知给Consumer。
为了解决什么问题?
RocketMQ对消息消费者提供了Push推模式和Pull拉模式两种消费模式。但是这两种消费模式的本质其实都是Pull拉模式,Push模式可以认为是⼀种定时的Pull机制。但是这时有⼀个问题,当使⽤Push模式时,如果RocketMQ中没有对应的数据,那难道⼀直进⾏空轮询吗?如果是这样的话,那显然会极⼤的浪费⽹络带宽以及服务器的性能,并且,当有新的消息进来时,RocketMQ也没有办法尽快通知客户端,⽽只能等客户端下⼀次来拉取消息了
参考图
零拷⻉与顺序写
顺序写加速⽂件写⼊磁盘
刷盘机制保证消息不丢失
在操作系统层⾯,当应⽤程序写⼊⼀个⽂件时,⽂件内容并不会直接写⼊到硬件当中,⽽是会先写⼊到操作系统中的⼀个缓存PageCache中。PageCache缓存以4K⼤⼩为单位,缓存⽂件的具体内容。
PageCache是源源不断产⽣的,⽽Linux操作系统显然不可能时时刻刻往硬盘写⽂件。所以,操作系统只会在某些特定的时刻将PageCache写⼊到磁盘。例如当我们正常关机时,就会完成PageCache刷盘。另外,在Linux中,对于有数据修改的PageCache,会标记为Dirty(脏⻚)状态。当Dirty Page的⽐例达到⼀定的阈值时,就会触发⼀次刷盘操作。例如在Linux操作系统中,可以通过/proc/meminfo⽂件查看到Page Cache的状态。
零拷⻉加速⽂件读写
零拷⻉(zero-copy)是操作系统层⾯提供的⼀种加速⽂件读写的操作机制,⾮常多的开源软件都在⼤量使⽤零拷⻉,来提升IO操作的性能。对于Java应⽤层,对应着mmap和sendFile两种⽅式。
cpu拷贝
⽤户态的应⽤程序⽆法直接操作硬件,需要通过内核空间进⾏操作转换,才能真正操作硬件。
这其实是为了保护操作系统的安全。正因为如此,应⽤程序需要与⽹卡、磁盘等硬件进⾏数据交互时,就需要在⽤户态和内核态之间来回的复制数据。⽽这些
操作,原本都是需要由CPU来进⾏任务的分配、调度等管理步骤的,早先这些IO接⼝都是由CPU独⽴负责,所以当发⽣⼤规模的数据读写操作时,CPU的占⽤率会⾮常⾼。
这其实是为了保护操作系统的安全。正因为如此,应⽤程序需要与⽹卡、磁盘等硬件进⾏数据交互时,就需要在⽤户态和内核态之间来回的复制数据。⽽这些
操作,原本都是需要由CPU来进⾏任务的分配、调度等管理步骤的,早先这些IO接⼝都是由CPU独⽴负责,所以当发⽣⼤规模的数据读写操作时,CPU的占⽤率会⾮常⾼。
DMA拷⻉
操作系统为了避免CPU完全被各种IO调⽤给占⽤,引⼊了DMA(直接存储器存储)。由DMA来负责这些频繁的IO操作。DMA是⼀套独⽴的指令集,不会占⽤CPU的计算资源。这样,CPU就不需要参与具体的数据复制的⼯作,只需要管理DMA的权限即可。
优点
DMA拷⻉极⼤的释放了CPU的性能,因此他的拷⻉速度会⽐CPU拷⻉要快很多。
缺点
当系统内的IO操作过多时,还是会占⽤过多的数据总线,造成总线冲突,最终还是会影响数据读写性能。
优化
Channel,是⼀个完全独⽴的处理器,专⻔负责IO操作。既然是处理器,Channel就有⾃⼰的IO指令,与CPU⽆关,他也更适合⼤型的IO操作,性能更⾼。
mmap⽂件映射机制
就是在⽤户态不再保存⽂件的内容,⽽只保存⽂件的映射,包括⽂件的内存起始地址,⽂件⼤⼩等。真实的数据,也不需要在⽤户态留存,可以直接通过操作映射,在内核态完成数据复制。
mmap机制适合操作⼩⽂件,如果⽂件太⼤,映射信息也会过⼤,容易造成很多问题。通常mmap机制建议的映射⽂件⼤⼩不要超过2G 。⽽RocketMQ做⼤的CommitLog⽂件保持在1G固定⼤⼩,也是为了⽅便⽂件映射。
sendFile机制
早期的sendfile实现机制其实还是依靠CPU进⾏⻚缓存与socket缓存区之间的数据拷⻉。
优化
并不直接拷⻉⽂件的内容,⽽是只拷⻉⼀个带有⽂件位置和⻓度等信息的⽂件描述符FD,这样就⼤⼤减少了需要传递的数据。⽽真实的数据内容,会交由DMA控制器,从⻚缓存中打包异步发送到socket中。
sendfile机制⾮常适合⼤数据的复制转移。
mmap需要⽤户态的配合,所以,性能相⽐sendfile要差⼀点。但是,另⼀⽅⾯,mmap机制可以在⽤户态操作数据,所以mmap对数据的处理,相⽐sendfile更灵活。
高级集群特性
弱一致性算法
DNS系统、Gossip协议(使⽤场景:Fabric区块链,Cassandra,RedisCluster,Consul)
强一致性算法
Basic-Paxos、Multi-Paxos包括Raft系列(Nacos的JRaft,Kafka的Kraft以及RocketMQ的
Dledger)、ZAB(Zookeeper)
Dledger)、ZAB(Zookeeper)
RAFT算法
主要解决问题:⼀个是集群中选举产⽣主节点。另⼀个是在集群内进⾏数据同步
基本工作流程
1、多个Server基于他的⼀致性协议,会共同选举产⽣⼀个 Leader,负责响应客户端的请求
2、Leader 通过⼀致性协议,将客户端的指令转发到集群所有节点上。
3、每个节点将客户端的指令以 Entry 的形式保存到⾃⼰的 Log ⽇志当中。此时 Entry 是uncommited状态。
4、当有多数节点共同保存了 Entry 后,就可以执⾏ Entry 中的客户端执⾏,提交到State Machine 状态机中。
此时 Entry 更新为commited状态。
此时 Entry 更新为commited状态。
Raft协议给每个节点设定了三种不同的状态,Leader,Follower和Candidate。
leader
Leader : 1、选举产⽣。多数派决定。2、向 Follower 节点发送⼼跳,Follower 收到⼼跳就不会竞选
Leader。3、响应客户端请求。集群内所有的数据变化都从 Leader 开始。4、向 Follower 同步操作⽇
志。 具体实现时,有的产品会让发到 Follower 上的请求转发到 Leader 上去。 也有的直接拒绝
Leader。3、响应客户端请求。集群内所有的数据变化都从 Leader 开始。4、向 Follower 同步操作⽇
志。 具体实现时,有的产品会让发到 Follower 上的请求转发到 Leader 上去。 也有的直接拒绝
follower
Follower:1、参与选举投票。2、同步 Leader 上的数据。3、接收 leader 的⼼跳。如果 leader⻓期
没有发送⼼跳,就转为 Candidate,竞选 Leader。
没有发送⼼跳,就转为 Candidate,竞选 Leader。
candidate
Candidate:没有 Leader 时,发起投票,竞选 Leader
节点状态变化
1、所有节点启动时都从 Follower 状态开始。
2、每个Follower 设定了⼀个选举过期时间Election Timeout 。Follower持续等待 Leader 的⼼跳请求。如果超
过选举过期时间,就转为 Candidate,向其他节点发起投票,竞选 Leader。为了防⽌所有节点在同⼀时间过
期,这个选举过期时间通常会设定为⼀个随机值,⼀般在 150ms到 300ms之间。
过选举过期时间,就转为 Candidate,向其他节点发起投票,竞选 Leader。为了防⽌所有节点在同⼀时间过
期,这个选举过期时间通常会设定为⼀个随机值,⼀般在 150ms到 300ms之间。
3、Candidate 开始新⼀个任期的选举。每个 Candidate 会投⾃⼰⼀票,然后向其他节点发起投票 RPC 请求。
然后等待其他节点返回投票结果。等待时⻓也是Election Timeout。
然后等待其他节点返回投票结果。等待时⻓也是Election Timeout。
4、每个节点在每⼀个任期内有⼀次投票的资格。他们会响应 Candidate 的投票 RPC 请求。按照⼀定的规则进
⾏投票。返回⽀持 或者 不⽀持。
⾏投票。返回⽀持 或者 不⽀持。
5、Candidate 收到其他节点的投票 RPC 响应之后,会重置他的 Election Timeout,继续等待其他响应。⼀旦
某⼀个 Candidate 接收到了超过集群⼀半节点的投票同意结果后,就会转为 Leader 节点。并开始向其他节点
发送⼼跳 RPC 请求。确认⾃⼰的 Leader 地位。
某⼀个 Candidate 接收到了超过集群⼀半节点的投票同意结果后,就会转为 Leader 节点。并开始向其他节点
发送⼼跳 RPC 请求。确认⾃⼰的 Leader 地位。
6、其他节点接收到 Leader 的⼼跳后,就会乖乖的转为 Follower 状态。 Candidate 也会转为 Follower 。然后
等待从 Leader 同步⽇志。直到 Leader 节点⼼跳超时或者服务宕机,再触发下⼀轮选举,进⼊下⼀个 Term任
期。
等待从 Leader 同步⽇志。直到 Leader 节点⼼跳超时或者服务宕机,再触发下⼀轮选举,进⼊下⼀个 Term任
期。
常见问题整理与解决方案
MQ如何保证消息不丢失
1.⽣产者发送消息如何保证不丢失
rocketmq
1.生产者客户端提供了3种不同的发送消息的方法
sendOneway
异步发送,不需要Broker确认,没有返回值,效率很⾼,但是会有丢消息的可能
send
同步发送,⽣产者等待Broker的确认。消息最安全,但是效率很低。通过返回状态确认是否写入磁盘发送成功
SendCallback()
异步发送,⽣产者另起⼀个线程等待Broker确认,收到Broker确认后直接触发回调⽅法。消息安全和效率之间⽐较均
衡,但是会加⼤客户端的负担。
衡,但是会加⼤客户端的负担。
事务消息机制
其核⼼思想,还是通过Broker主动触发⽣产者的回调⽅法,从⽽确认消息是否成功发送到了Broker。只不过,这⾥将⼀次确认变成了多次确认。
kafka
send
Future<RecordMetadata> future = producer.send(record)
直接send发送消息,返回的是⼀个Future。这就相当于是异步调⽤
调⽤future的get⽅法才会实际获取到发送的结果。⽣产者收到这个结果后,就可以知道消息是否成功发到broker了。
这个过程就变成了⼀个同步的过程
这个过程就变成了⼀个同步的过程
rabbitmq
Publisher Confirms⽣产者确认机制
ch.addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback)
添加两个回调,⼀个处理ack响应,⼀个处理nack响应
2.broker端如何保证消息不丢失
rocketmq
刷盘机制
SYNC_FLUSH
通过读写双队列每隔10ms从读队列里读出数据写到pagecache里然后刷盘,读队列里的数据又是由写队列取出来刷到读队列里
ASYNC_FLUSH
每隔200ms刷盘
kafka
并没有明显的同步刷盘和异步刷盘的区别,不过他暴露了⼀系列的参数,可以管理刷盘的频率。
flush.ms : 多⻓时间进⾏⼀次强制刷盘。
log.flush.interval.messages:表示当同⼀个Partiton的消息条数积累到这个数量时,就会申请⼀次刷盘操作。默
认是Long.MAX。
认是Long.MAX。
log.flush.interval.ms:当⼀个消息在内存中保留的时间,达到这个数量时,就会申请⼀次刷盘操作。他的默认值是空。如果这个参数配置为空,则⽣效的是下⼀个参数
log.flush.scheduler.interval.ms:检查是否有⽇志⽂件需要进⾏刷盘的频率。默认也是Long.MAX。
rabbitmq
那就是对于Classic经典对列,即便声明成了持久化对
列,RabbitMQ的服务端也不会实时调⽤fsync,因此⽆法保证服务端消息断电不丢失。对于Stream流式对列,
则更加直接,RabbitMQ明确不会主动调⽤fsync进⾏刷盘,⽽是交由操作系统⾃⾏刷盘
列,RabbitMQ的服务端也不会实时调⽤fsync,因此⽆法保证服务端消息断电不丢失。对于Stream流式对列,
则更加直接,RabbitMQ明确不会主动调⽤fsync进⾏刷盘,⽽是交由操作系统⾃⾏刷盘
3.Broker主从同步如何保证不丢失
普通集群
这种集群机制下,消息的安全性还是⽐较⾼的。但是有⼀种极端的情况需要考虑。因为消息需要从Master往
Slave同步,这个过程是跨⽹络的,因此也是有时间延迟的。所以,如果Master出现⾮正常崩溃,那么就有可
能有⼀部分数据是已经写⼊到了Master但是还来得及同步到Slave。这⼀部分未来得及同步的数据,在
RocketMQ的这种集群机制下,就会⼀直记录在Master节点上。等到Master重启后,就可以继续同步了。另外
由于Slave并不会主动切换成Master,所以Master服务崩溃后,也不会有新的消息写进来,因此也不会有消息
冲突的问题。所以,只要Mater的磁盘没有坏,那么在这种普通集群下,主从同步通常不会造成消息丢失
Slave同步,这个过程是跨⽹络的,因此也是有时间延迟的。所以,如果Master出现⾮正常崩溃,那么就有可
能有⼀部分数据是已经写⼊到了Master但是还来得及同步到Slave。这⼀部分未来得及同步的数据,在
RocketMQ的这种集群机制下,就会⼀直记录在Master节点上。等到Master重启后,就可以继续同步了。另外
由于Slave并不会主动切换成Master,所以Master服务崩溃后,也不会有新的消息写进来,因此也不会有消息
冲突的问题。所以,只要Mater的磁盘没有坏,那么在这种普通集群下,主从同步通常不会造成消息丢失
Dledger⾼可⽤集群
基于Raft协议的Dledger来保存CommitLog消息⽇志。也就是说他的消息会通过Dledger的Raft协议,在主从节点之间同步。
他是⼀种基于两阶段的多数派同意机制。每个节点会将客户端的治指令以Entry的形式保存到⾃⼰的Log⽇志当中。此时Entry是uncommited状态。当有多数节点统统保存了Entry
后,就可以执⾏Entry中的客户端指令,提交到StateMachine状态机中。此时Entry更新为commited状态。
后,就可以执⾏Entry中的客户端指令,提交到StateMachine状态机中。此时Entry更新为commited状态。
优先保证的是集群内的数据⼀致性,⽽并不是保证不丢失
kafka集群
在Kafka集群中,如果Leader Partition的服务崩溃了,那么,那些Follower Partition就会选举产⽣⼀个新的Leadr Partition。⽽集群中所有的消息,都以Leader Partition的为
准。即便旧的Leader Partition重启了,也是作为Follower Partition启动,主动删除掉⾃⼰的HighWater之后的
数据,然后从新的Leader Partition上重新同步消息。这样,就会造成那些已经写⼊旧的Leader Partition但是
还没来得及同步的消息,就彻底丢失了。
准。即便旧的Leader Partition重启了,也是作为Follower Partition启动,主动删除掉⾃⼰的HighWater之后的
数据,然后从新的Leader Partition上重新同步消息。这样,就会造成那些已经写⼊旧的Leader Partition但是
还没来得及同步的消息,就彻底丢失了。
4.消费者消费消息如何不丢失
⼏乎所有的MQ产品都设置了消费状态确认机制。也就是消费者处理完消息后,需要给Broker⼀个响应,表示
消息被正常处理了。如果Broker端没有拿到这个响应,不管是因为Consumer没有拿到消息,还是Consumer
处理完消息后没有给出相应,Broker都会认为消息没有处理成功。之后,Broker就会向Consumer重复投递这
些没有处理成功的消息。RocketMQ和Kafka是根据Offset机制重新投递,⽽RabbitMQ的Classic Queue经典
对列,则是把消息重新⼊队。
消息被正常处理了。如果Broker端没有拿到这个响应,不管是因为Consumer没有拿到消息,还是Consumer
处理完消息后没有给出相应,Broker都会认为消息没有处理成功。之后,Broker就会向Consumer重复投递这
些没有处理成功的消息。RocketMQ和Kafka是根据Offset机制重新投递,⽽RabbitMQ的Classic Queue经典
对列,则是把消息重新⼊队。
5.如果MQ服务全部挂了,如何保证不丢失
设计⼀个降级缓存。Producer往MQ发消息失败了,就往降级缓存中写,然后,依然正常去进⾏后续的业务。
此时,再启动⼀个线程,不断尝试将降级缓存中的数据往MQ中发送。这样,⾄少当MQ服务恢复过来后,这些
消息可以尽快进⼊到MQ中,继续往下游Conusmer推送,⽽不⾄于造成消息丢失。
此时,再启动⼀个线程,不断尝试将降级缓存中的数据往MQ中发送。这样,⾄少当MQ服务恢复过来后,这些
消息可以尽快进⼊到MQ中,继续往下游Conusmer推送,⽽不⾄于造成消息丢失。
消息零丢失总结
1.增加集群负载,降低吞吐为代价的。
这必然会造成集群效率下降,所以保证消息安全的⽅案通常都需要根据业务场景进⾏灵活取舍
这必然会造成集群效率下降,所以保证消息安全的⽅案通常都需要根据业务场景进⾏灵活取舍
mq如何保证消息顺序性
1.首先顺序性,指的是根据业务数据局部有序,而不是全局有序,全局有序是局部有序的一个特例
rockermq
1.通过selector将一组有序的消息写入同一个 messagequeue中
2.Consumer每次集中从⼀个MessageQueue中拿取消息
2.Consumer每次集中从⼀个MessageQueue中拿取消息
kafka
1.也可以将一组有序的消息发往同一个partition
2.消费端是单线程拉取数据,所以在消息不丢失的情况下,也能保证局部有序
2.消费端是单线程拉取数据,所以在消息不丢失的情况下,也能保证局部有序
rabbitmq
1.可以通过维护Exchange与Queue之间的绑定关系,将这⼀组局部有序的消息转发到同⼀个对列中,从⽽保证这⼀组有序的消息,在RabbitMQ内部保存时,是有序的。
2.如果存在多个消费者或者消息处理出现异常,多个消费者的情况下,有序的消息可能被其他消费者处理,就不能保证顺序性了,还有如果消息异常,rabbitmq会将消息重新入队列,也不能够保证消息顺序性了
2.如果存在多个消费者或者消息处理出现异常,多个消费者的情况下,有序的消息可能被其他消费者处理,就不能保证顺序性了,还有如果消息异常,rabbitmq会将消息重新入队列,也不能够保证消息顺序性了
MQ如何保证消息幂等性
rocketmq
生产者端
是会在发送消息时,给每条消息分配⼀个唯⼀的ID。通过这个ID,就可以判断消息是否重复投递。
消费者端
在⼤多数情况下,不需要单独考虑消息重复消费的问题。但是,同样,这个回答⾥也说明了,存在⼀些⼩概率情况,需要单独考虑消费者的消息幂等问题。
跟据业务场景来确定唯⼀指标
kafka
生产者端
Broker端则会针对每个<PID,Partition>维护⼀个序列号(SN),只有当对应的SequenceNumber = SN+1
时,Broker才会接收消息,同时将SN更新为SN+1。否则,SequenceNumber过⼩就认为消息已经写⼊
了,不需要再重复写⼊。⽽如果SequenceNumber过⼤,就会认为中间可能有数据丢失了。对⽣产者就会
抛出⼀个OutOfOrderSequenceException。
时,Broker才会接收消息,同时将SN更新为SN+1。否则,SequenceNumber过⼩就认为消息已经写⼊
了,不需要再重复写⼊。⽽如果SequenceNumber过⼤,就会认为中间可能有数据丢失了。对⽣产者就会
抛出⼀个OutOfOrderSequenceException。
MQ如何快速处理积压的消息
对RocketMQ和Kafka来说,他们的消息积压能⼒本来就是很强的,因此,短时间的消息积压,是没有太多问题
的。但是需要注意,如果消息积压问题⼀直得不到解决,RocketMQ和Kafka在⽇志⽂件过期后,就会直接删除
过期的⽇志⽂件。⽽这些⽇志⽂件上未消费的消息,就会直接丢失
的。但是需要注意,如果消息积压问题⼀直得不到解决,RocketMQ和Kafka在⽇志⽂件过期后,就会直接删除
过期的⽇志⽂件。⽽这些⽇志⽂件上未消费的消息,就会直接丢失
最核心的目标就是提示消费者端消费能力
创建⼀个新的Topic,配置⾜够多的MessageQueue。然后把
Consumer实例的Topic转向新的Topic,并紧急上线⼀组新的消费者,只负责消费旧Topic中的消息,并转存到
新的Topic中。这个速度明显会⽐普通Consumer处理业务逻辑要快很多。然后在新的Topic上,就可以通过添
加消费者个数来提⾼消费速度了。之后再根据情况考虑是否要恢复成正常情况
Consumer实例的Topic转向新的Topic,并紧急上线⼀组新的消费者,只负责消费旧Topic中的消息,并转存到
新的Topic中。这个速度明显会⽐普通Consumer处理业务逻辑要快很多。然后在新的Topic上,就可以通过添
加消费者个数来提⾼消费速度了。之后再根据情况考虑是否要恢复成正常情况
对于RocketMQ,因为同⼀个消费者组下的多个Cosumer需要和对应Topic下的MessageQueue建⽴对应关
系,⽽⼀个MessageQueue最多只能被⼀个Consumer消费,因此,增加的Consumer实例最多也只能和Topic
下的MessageQueue个数相同。如果此时再继续增加Consumer的实例,那么就会有些Consumer实例是没有
MessageQueue去消费的,因此也就没有⽤了。
系,⽽⼀个MessageQueue最多只能被⼀个Consumer消费,因此,增加的Consumer实例最多也只能和Topic
下的MessageQueue个数相同。如果此时再继续增加Consumer的实例,那么就会有些Consumer实例是没有
MessageQueue去消费的,因此也就没有⽤了。
对于RabbitMQ,如果是Classic Queue经典对列,那么针对同⼀个Queue的多个消费者,是按照Work Queue
的模式,在多个Consuemr之间依次分配消息的。所以这时,如果Consumer消费能⼒不够,那么直接加更多
的Consumer实例就可以了。这⾥需要注意下的是如果各个Consumer实例他们的运⾏环境,或者是处理消息
的速度有差别。那么可以优化⼀下每个Consumer的⽐重(Qos属性),从⽽尽量⼤的发挥Consumer实例的性
能。
的模式,在多个Consuemr之间依次分配消息的。所以这时,如果Consumer消费能⼒不够,那么直接加更多
的Consumer实例就可以了。这⾥需要注意下的是如果各个Consumer实例他们的运⾏环境,或者是处理消息
的速度有差别。那么可以优化⼀下每个Consumer的⽐重(Qos属性),从⽽尽量⼤的发挥Consumer实例的性
能。
0 条评论
下一页
为你推荐
查看更多