加快心跳
概述
如何保障消息的成功投递?
幂等性概念
在海量订单产生的业务高峰期,如何避免消息的重复消费
Confirm 确认消息 & Return 返回消息
自定义消费者
消息的 ACK 与重回队列
消息的限流
TTL 消息
死信队列
如何保障消息的成功投递?
什么是生产端的可靠性投递?
保证消息的成功发出
保障 MQ 节点的成功接收
发送端收到 MQ 节点确认应答
完善的消息进行补偿机制
生产端-可靠性投递
常见解决方案
方案 1
消息信心落库,对消息转改进行打标
隐含问题
有两次数据持久化的操作,第一次要保存业务消息,第二次对数据进行记录。
数据 IO 磁盘,每次都需要读两次,数据库容易遇到瓶颈
解决办法:其实我们只需要对业务进行入库即可
方案 2
消息的延迟投递,做二次确认,回调检查
互联网大公司常用的方式
但也不一定能 100% 保证可靠性投递
极端情况,需要人工进行补偿
主要目的:减少数据库的操作
在项目核心链路中,每一次持久化操作,都要精心考量
花费时间太多,可能会造成核心链路中最大瓶颈
图示
分支主题
很复杂,但能够最大限度节省我们数据信息落库的操作
Callback:回调服务
第一步&第二步
把消息落库完了之后,才能 step 1 进行发送消息
还要记住,互联网大公司不会加任何事务,事务的性能会造成很严重的性能瓶颈
注意:这一次,在生产端它会一次生成两条消息
也就是执行完了 step 1 发送消息后
还会执行 step 2 做消息延迟检查,可以 2~5 分钟之后
第四步
当消费端中消息处理成功之后,还需生成一个 确认 消息
第五步
Callback 服务,通过监听器,监听 确认 消息
当确认了之后,就对消息做最终的存储
第六步
假设 5 分钟后,延迟投递检查消息,发送过来了
callback 服务,监听这个 检查细节
有单独的监听器监听
然后就去检查 MSG DB 数据库
如果刚刚没有返回,或者返回失败,出现异常了
这时 callback 需要做补偿
因为 callback 在监听延迟消息
当 callback 发现 message 并不存在,则会主动发起 RPC 通信,给上游反馈延迟检查的内容,并没有找到
然后,再次发送一次数据
这么做的目的
少做一次 DB 存储
能节省一步,就节省一步
可以进行异步去补偿
在高并发场景中,最应该关注并保证性能,保证能扛得住庞大的订单量
幂等性
幂等性是什么?
我们可以借鉴数据库的乐观锁机制
比如我们执行一条更新库存的 SQL 语句
update tb_pro set count = count - 1, version = version + 1 where version = 1
比如有很多商品,卖了一件,就减 1
如果只剩下一件了,减了 1,就卖完了,没办法继续卖了
如果此时碰上并发,两个请求同时过来,有可能 count 就变成 -1,这肯定是不行的
解决方式,就是加上 version 版本号,还可带上商品 id
像 elsaticsearch 中也是使用了这种严格的 幂等性
总结
可能你对一件事情进行操作,这个操作你可能执行非常多次,操作的结果也是相同的,这个就是幂等性保障
消费端-幂等性保障
在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
在高并发的情况下,会有很多信息到达 MQ,消费端可能要监听大量的消息,难免会出现消息的重复投递,或者网络闪断,导致 Broker 端重发消息
消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息
有可能代码会执行多次,但数据库只会执行这一步操作
金融公司,最重视
业界主流的幂等性操作
唯一 ID + 指纹码机制,利用数据库主键去重
唯一 ID + 指纹码机制,利用数据库主键去重
有些用户就有可能在某一瞬间进行多次消费
比如刚刚转了一笔钱,接着又马上又转了一笔
select count(1) from tb_order where id = 唯一 ID + 指纹码
如果已经有记录,代表已经被操作了
好处:实现简单
坏处:高并发下有数据库写入的性能瓶颈
解决方案:根据 ID 进行分库分表进行算法路由
实现分压分流的机制
利用 redis 的原子性去实现
使用 redis 进行幂等,需要考虑的问题
比如我们 set 一个key,如果第二次还 set,就会更新为最新值
也可以做一个预先判断,exsit() 操作,存在就不更新了
最简单的自增,也是可以保障的
1)我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
2)如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
Confirm 确认消息
理解 Confirm 消息确认机制
消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答
生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker,这种方式也是消息的可靠性投递的核心保障
确认机制流程图
如何实现 Confirm 确认消息?
1)在 channel 上开启确认模式:channel.confirmSelect()
2)在 channel 上添加监听:addConfirmListener
监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理
案例 Demo
Producer
Consumer
Return 消息机制
Return Listener 用于处理一些不可路由的消息
我们的消息生产者,通过指定一个 Exchange 和 Routing Key,把消息送达某一个队列中去,然后我们的消费者监听队列,进行消费处理操作
但是在某些情况下,如果我们在发送消息的时候,当前的 Exchange 不存在或者指定的路由 Key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener
在基础 API 中有一个关键的配置项
Mandatory
如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理。
如果为 false,那么 broker 端自动删除该消息。
图示
消费端自定义监听
我们一般就是在代码中编写 while 循环,进行 consumer.nextDelivery() 方法进行获取下一条消息,然后进行消费处理
但是我们使用自定义的 Cusumer 更加方便,解耦性更加强,也是在实际工作中最常使用的方式
案例 Demo
Producer
Consumer
MyConsumer
消费端限流
什么是消费端的限流?
假设一个场景,首先,我们RabbitMQ 服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况:
巨大量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据
RabbitMQ 提供了一种 qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consumer 或者 channel 设置 Qos 的值)未被确认钱,不进行消费新的消息
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:0
prefetchCount
会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack
global
true / false 是否将上面设置应用于 channel
简单说,就是上面限制是 channel 级别的还是 consumer 级别
prefetchSize 和 global 这两项,rabbimq 没有实现,暂且不研究。prefetch_count 在 no_ask = false 的情况下生效,即在自动应答的情况下这两个值是不生效的
案例 Demo
Producer
Consumer
MyConsumer
消费端 ack 与重回队列
消费端的手工 ack 和 nack
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿
如果由于服务器宕机等严重问题,那我们就需要手工进行 ack 保障消费端消费成功
消费端的重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新传回给 Broker
一般我们在实际应用中,都会关闭重回队列,也饿就是设置为 False
案例 Demo
Producer
Consumer
MyConsumer
TTL 队列 / 消息
TTL 是 Time To Live 的所写,也就是生存时间
RabbitMQ 支持消息的过期时间,在消息发送时可以进行指定
RabbitMQ 支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动地清除
死信队列
死信队列 DLX, Dead-Letter-Exchange
利用 DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新 pulish 到另一个 Exchange,这个 Exchange 就是 DLX
消息变成死信队列的几种情况
消息被拒绝(basic.reject / basic.nack)并且 requeue = false
消息 TTL 过期
队列达到最大长度
DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。
可以监听这个队列中消息做响应的处理,这个特性可以弥补 RabbitMQ 3.0 以前支持的 imediate 参数的功能
死信队列设置
首先需要设置死信队列的 exchange 和 queue,然后进行绑定
然后进行正常声明交换机
队列、绑定,只不过需要在队列上加一个参数
arguments.put("x-dead-letter-exchange", "dlx.exchange");
这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列
案例 Demo
Producer
Consumer
MyConsumer