消息队列进阶
2023-05-20 16:30:31 1 举报
AI智能生成
登录查看完整内容
消息队列进阶
作者其他创作
大纲/内容
对若干数据进行更新操作,为了保证这些数据的完整性和一致性,希望这些更新操作要么都成功,要么都失败。
ACID特性:原子性、一致性、隔离性、持久性
事务
在分布式系统中的实现事务
2PC(Two-phase Commit,也叫二阶段提交)
TCC(Try-Confirm-Cancel)
适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景
事务消息
常见的分布式事务实现
概念
事务消息需要消息队列提供相应的功能才能实现,Kafka和RocketMQ都提供了事务相关功能
例子:订单系统创建订单后,发消息给购物车系统,将已下单的商品从购物车中删除
1.订单系统在消息队列上开启事务
半消息:包含的内容就是完整的消息内容,在事务提交之前,对于消费者来说,这个消息是不可见的。
2.订单系统给消息队列发送半消息
3.半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。
4.根据本地事务的执行结果决定提交或者回滚事务消息
5.提交事务,则消息对消费者可见
简单粗暴:直接抛异常,让用户自行处理。我们可在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。
Kafka
事务反查机制:RocketMQ的Broker没有收到提交或者回滚的请求,Broker会定期去Producer上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
为支撑事务反查机制,业务代码需要实现一个反查本地事务状态的接口,告知RocketMQ本地事务是成功还是失败。
RocketMQ
问题:第4步提交事务失败该如何?
如何用消息队列实现分布式事务
消息队列中的分布式
分布式事务
Producer端对发出的消息附加一个连续递增的序号Consumer端来检查这个序号的连续性
在发消息的时候必须要指定分区
在每个分区单独检测消息序号的连续性
分布式系统要注意:Kafka和RocketMQ不保证Topic上严格顺序,只能保证分区上的消息是有序的
利用消息队列的有序性
检测消息丢失的方法
消息在Producer创建出来,经过网络传输发送到Broker端
做了什么
消息队列通过最常用的请求确认机制,来保证消息的可靠传递。客户端==>BrokerBroker==>客户端只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失
在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失
try { RecordMetadata metadata = producer.send(record).get(); System.out.println(\"消息发送成功。\");} catch (Throwable e) { System.out.println(\"消息发送失败!\"); System.out.println(e);}
同步发送
异步发送
实践
如何确保
生产阶段
消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他的副本上(只要Broker在正常运行,就不会出现丢失消息的问题。但是如果Broker出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。)
如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免因为宕机丢消息。
配置Broker参数,在收到消息后,将消息写入磁盘后再给Producer返回确认响应
单节点Broker
至少将消息发送到2个以上的节点,再给客户端回复发送确认响应(当某个Broker宕机时,其他的Broker可以替代宕机的Broker,也不会发生消息丢失)
Broker集群
存储阶段
Consumer从Broker上拉取消息,经过网络传输发送到Consumer上
采用确认机制来保证消息的可靠传递
不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
消费阶段
各阶段如何确保
如何保证消息不会丢失
消息在传递时,最多会被送达一次。
没什么消息可靠性保证,允许丢消息
对消息可靠性要求不太高的监控场景使用
At most once
消息在传递时,至少会被送达一次
不允许丢消息,但是允许有少量重复消息出现
At least once
消息在传递时,只会被送达一次,不允许丢失也不允许重复
Exactly once
为什么大部分消息队列都选择只提供At least once的服务质量,而不是级别更高的Exactly once呢?
✅扩展思考
传递消息的服务质量标准
常用的绝大部分消息队列提供的服务质量都是At least once,包括RocketMQ、RabbitMQ和Kafka 都是这样。也就是说,消息队列很难保证消息不重复
网络抖动,生产者未收到Broker的ACK,重复发送消息
Broker未收到消费者的消费ACK
重复消息产生的场景
重复消息的产生原因
利用数据库的唯一约束实现幂等
为更新的数据设置前置条件
在执行数据更新操作之前,先检查一下是否执行过这个更新操作
思路
在发送消息时,给每条消息指定一个全局唯一的ID,消费时,先根据这个ID检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
实现
记录并检查操作
消费端:让消费消息的操作具备幂等性
如何解决
如何处理消费过程中的重复消息
不需要关注消息队列本身的性能(对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。)
先检查下是不是发消息之前的业务逻辑耗时太多导致
发送端性能上不去原因
注意设置合适的并发和批量大小
如何优化
生产端
使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压
原因
一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行
优化消费业务逻辑
水平扩容,增加消费端的并发数来提升总体的消费性能(在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保Consumer的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。)
在收到消息的OnMessage方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了
会丢消息!!!如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失
错误解决方案
落地
消费端
关键在于业务代码如何与消息队列配合
消息队列的性能优化
如何规避消息积压
通过扩容消费端的实例数来提升总体的消费能力(要同步扩容分区数量)
(无法扩容)将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量
发送变快了
优先检查一下日志是否有大量的消费错误
消费线程是不是阻塞住了或者发生了死锁等
检查消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度
消费变慢了
粗粒度原因分析
消息积压发生了该如何处理
消息积压了如何处理
消息队列进阶
0 条评论
回复 删除
下一页