消息队列RabbitMQ
2023-10-22 21:15:15 0 举报
AI智能生成
登录查看完整内容
消息队列 RabbitMQ 消息队列面试 面试题 队列 kafka
作者其他创作
大纲/内容
同步通讯:就像打电话,需要实时响应。
异步通讯:就像发邮件,不需要马上回复。
微服务间通讯的两种方式?同步和异步
两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。
微服务间基于Feign的调用就属于同步方式
分支主题
电商项目的支付场景
微服务中有那些同步通讯的场景
每次加入新的需求,都要修改原来的代码
•\t耦合度高
调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。
•\t性能下降
调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
•\t资源浪费
如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障
•\t级联失败
同步调用存在那些问题
•\t时效性较强,可以立即得到结果
同步调用的优点有那些?
同步通讯
我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker(中间人),不关心谁来订阅事件。订阅者从Broker(中间人)订阅事件,不关心谁发来的消息。
Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。
电商项目支付场景,使用消息队列解耦
异步调用常见实现就是事件驱动模式
微服务中有那些异步通讯的场景
•\t吞吐量提升:无需等待订阅者处理完成,响应更快速
•\t故障隔离:服务没有直接调用,不存在级联失败问题
•\t调用间没有阻塞,不会造成无效的资源占用
•\t耦合度极低,每个服务都可以灵活插拔,可替换
•\t流量削峰:不管发布事件的流量波动多大,都由Broker(中间人)接收,订阅者可以按照自己的速度去处理事件
•\t架构复杂了,业务没有明显的流程线,不好追踪管理
好在现在开源软件或云平台上 Broker 的软件是非常成熟的,比较常见的一种就是MQ技术。
•\t需要依赖于Broker(中间人)的可靠性、安全性、吞吐能力
同步调用的缺点有那些?
异步通讯
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
MQ 是什么?
概述
对比
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
这些消息队列各有什么特点?
市场上有那些常见的消息队列?
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/
是什么?
在Centos7虚拟机中使用Docker来安装
docker pull rabbitmq:3-management
方式一:在线拉取
下载镜像包
docker load -i mq.tar
上传到虚拟机中后,使用命令加载镜像即可:
方式二:从本地加载
下载镜像
执行下面的命令来运行MQ容器:
安装MQ
单机部署
普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。
类似 redis 中的主从模式
普通模式
与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。
类似 redis 中的哨兵模式
镜像模式
集群分类
集群部署
RabbitMQ怎么安装部署?
- publisher:生产者
- consumer:消费者
注意:exchange负责消息路由,而不是存储,路由失败则消息丢失
- exchange个:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
RabbitMQ中的一些角色:
MQ的基本结构
基本消息队列(BasicQueue)
工作消息队列(WorkQueue)
Fanout Exchange:广播
Direct Exchange:路由
Topic Exchange:主题
消息模型
发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种。发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。常见exchange类型包括:Fanout:广播。Direct:路由。Topic:话题。
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息
包括三个角色:
简单队列模式的模型图
- mq-demo:父工程,管理项目依赖
- publisher:消息的发送者
- consumer:消息的消费者
- 建立连接
- 创建Channel
- 声明队列
- 发送消息
- 关闭连接和channel
思路:
publisher实现
- 订阅消息
consumer实现
在idea中创建项目
代码
案例:基于最基础的消息队列模型来实现
1. 建立connection
2. 创建channel
3. 利用channel声明队列
4. 利用channel向队列发送消息
基本消息队列的消息发送流程:
4. 定义consumer的消费行为handleDelivery()
5. 利用channel将消费者与队列绑定
基本消息队列的消息接收流程:
RabbitMQ
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
AMQP应用间消息通信的一种协议,与语言和平台无关
什么是SpringAMQP
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
SpringAMQP提供了三个功能:
<!--AMQP依赖,包含RabbitMQ--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
在父工程mq-demo中引入依赖
spring: rabbitmq: host: 192.168.150.101 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: jates # 用户名 password: 123321 # 密码
首先配置MQ地址,在publisher服务的application.yml中添加配置:
在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
消息发送
首先配置MQ地址,在consumer服务的application.yml中添加配置:
然后在consumer服务的`cn.jates.mq.listener`包中新建一个类SpringRabbitListener,代码如下:
消息接收
启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息
测试
实现:Basic Queue 简单队列模型
Work queues,也被称为(Task queues),任务模型。简单来说就是**让多个消费者绑定到一个队列,共同消费队列中的消息**。
可以提高消息处理速度,避免队列消息堆积
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用实现:工作消息队列(WorkQueue) 模型,多个消费者共同处理消息处理,速度就能大大提高了。
应用场景
在publisher服务中添加一个测试方法,循环发送50条消息到simple.queue队列
这次我们循环发送,模拟大量消息堆积现象。在publisher服务中的SpringAmqpTest类中添加一个测试方法:
编写两个消费者,都监听simple.queue
注意到这个消费者sleep了1000秒,模拟任务耗时。
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
总结
实现:工作消息队列(WorkQueue) 模型
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
发布( Publish )、订阅( Subscribe )
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue
发布订阅-Fanout Exchange
Fanout:广播
Direct:路由
Topic:话题
常见exchange类型包括:
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
订阅模型中有那些角色
发布订阅的模型
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
在广播模式下,消息发送流程是这样的:
在consumer服务中,利用代码声明队列、交换机,并将两者绑定
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
在publisher中编写测试方法,向jates.fanout发送消息
实现思路如下:
SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:
Spring提供了一个接口Exchange,来表示所有不同类型的交换机:
在consumer中创建一个类,声明队列和交换机:
在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:
步骤1:在consumer服务声明Exchange、Queue、Binding
声明队列和交换机
在publisher服务的SpringAmqpTest类中添加测试方法:
步骤3:在publisher服务发送消息到FanoutExchange
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者,分别监听fanout.queue1和fanout.queue2::
步骤2:在consumer服务声明两个消费者
具体代码实现
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
交换机的作用是什么?
Queue
FanoutExchange
Binding
声明队列、交换机、绑定关系的Bean是什么?
实现:发布订阅-Fanout Exchange(广播)模型
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
Direct 和 Fanout的区别
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个`RoutingKey`(路由key)
每一个Queue都与Exchange设置一个BindingKey
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 `RoutingKey`。
发布者发送消息时,指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的`Routing Key`进行判断,只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
Direct 模型的运作逻辑如下:
利用@RabbitListener声明Exchange、Queue、RoutingKey
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
在publisher中编写测试方法,向jates. direct发送消息
利用SpringAMQP演示DirectExchange的使用
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,
并利用@RabbitListener声明Exchange、Queue、RoutingKey
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
步骤1:在consumer服务声明Exchange、Queue
步骤2:在publisher服务发送消息到DirectExchange
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
描述下Direct交换机与Fanout交换机的差异?
- @Queue
- @Exchange
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
实现:发布订阅-DirectExchange(定向)模型
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
#:匹配一个或多个词
*:匹配不多不少恰好1个词
item.#:能够匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu
举例:
通配符规则:
•\tQueue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather
•\tQueue4:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
在publisher中编写测试方法,向jates. topic发送消息
利用SpringAMQP演示TopicExchange的使用
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,
在consumer服务的SpringRabbitListener中添加方法:
步骤1:【消费者】在consumer服务声明Exchange、Queue
步骤2:【生产者】在publisher服务发送消息到TopicExchange
代码实现
Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词
描述下Direct交换机与Topic交换机的差异?
实现:发布订阅-TopicExchange(通配符)模型
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
- 数据体积过大
- 有安全漏洞
- 可读性差
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题
修改消息发送的代码,发送一个Map对象:
停止consumer服务
发送消息后查看控制台:
测试默认转换器
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,
JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖:
配置消息转换器。在启动类中添加一个Bean即可
步骤
配置JSON转换器
利用MessageConverter实现的,默认是JDK的序列化
注意发送方与接收方必须使用相同的MessageConverter
SpringAMQP中消息的序列化和反序列化是怎么实现的?
消息转换器
SpringAMQP
生产者发送的消息未送达exchange
消息到达exchange后未到达queue
发送时丢失:
MQ宕机,queue将消息丢失
consumer接收到消息后未消费就宕机
消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
针对这些问题,RabbitMQ分别给出了解决方案:
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
publisher-confirm,发送者确认
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
publisher-return,发送者回执
结果有两种请求:
流程图
确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
注意事项
simple:同步等待confirm结果,直到超时
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publish-confirm-type:开启publisher-confirm,这里支持两种类型:
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
配置说明:
在publisher这个微服务的application.yml中添加配置:
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
在publisher服务的cn.jates.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:
发送消息,指定消息ID、消息ConfirmCallback
SpringAMQP实现生产者确认
消息成功发送到exchange,返回ack
消息发送失败,没有到达交换机,返回nack
消息发送过程中出现异常,没有收到回执
publisher-comfirm:
消息成功发送到exchange,但没有路由到queue,调用ReturnCallback
SpringAMQP中处理消息确认的几种情况:
解决方案1:生产者消息确认
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。
- 交换机持久化
- 队列持久化
- 消息持久化
有那些类型的持久化
RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:
事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。
可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示:
交换机持久化
RabbitMQ中队列默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定队列持久化:
事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。
可以在RabbitMQ控制台看到持久化的队列都会带上D的标示:
队列持久化
SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定的:
消息持久化
解决方案2:消息持久化
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。
- 1)RabbitMQ投递消息给消费者
- 2)消费者获取消息后,返回ACK给RabbitMQ
- 3)RabbitMQ删除消息
- 4)消费者宕机,消息尚未处理
场景:
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。
manual:自己根据业务情况,判断什么时候该ack
manual:手动ack,需要在业务代码结束后,调用api发送ack。
auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
一般,我们都是使用默认的auto即可。
auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
none模式下,消息投递是不可靠的,可能丢失
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
SpringAMQP则允许配置三种确认模式:
修改consumer服务的application.yml文件,添加下面内容:
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。
演示none模式
再次把确认机制修改为auto:
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:
演示auto模式
解决方案3:消费者消息确认
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
- 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
- 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
重启consumer服务,重复之前的测试。可以发现:
修改consumer服务的application.yml文件,添加内容:
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack,消息会被丢弃
结论:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
1)在consumer服务中定义处理失败消息的交换机和队列
2)定义一个RepublishMessageRecoverer,关联队列和交换机
完整代码
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
失败策略
本地重试
解决方案4:消费失败重试机制
开启生产者确认机制,确保生产者的消息能到达队列
开启持久化功能,确保消息未消费前在队列中不会丢失
开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
总结:如何确保RabbitMQ消息的可靠性?
如何确保发送的消息至少被消费一次?(怎么保证消息不丢失)
将所有消息发送到单一分区,然后在消费者端使用单一消费者来消费该分区中的消息。这样可以确保单一分区内的消息按照发送顺序进行消费。但是这种方式会限制了系统的并发性能和扩展性。
单一分区:
在消费者端维护一个记录消息处理顺序的序号,对于每个消费者来说,记录当前处理消息的序号。当消费者收到消息时,根据序号判断消息是否按照顺序到达,如果消息乱序,则可以进行等待或重新排序操作。
消息处理顺序记录:
消息队列怎么保证消息的有序消费?
尽管上述方法可以在一定程度上保证消息的有序消费,但它们都有一些限制和权衡。在实际应用中,需要根据业务需求和实际场景来选择合适的方法。有时候,在设计应用程序时,可以通过其他手段来解决无序消息带来的问题,如在消费者端进行有序性的处理或在业务逻辑中引入足够的容错机制来处理无序消息。
可以根据消息的唯一标识进行幂等性判断,将已处理过的消息标记或记录下来,避免重复消费。
\t消息消费者幂等性处理:
将接口的参数相加,使用hash、等加密算法生成一个字符串,然后,存储在数据库的一个表里面,然后,当有接口过来了,对比两个这个字符串是否重复,如果是,就表示已经执行过了。
比如,一个修改数据库订单扣减库存的接口,在分布式系统场景下可能会出现网络相关的异常,这样,就可能会出现客户端重复调用接口的情况,那么服务端,怎么保证自己只执行一次,这个扣减库存的接口?
消息队列怎么保证消息重复消费?
怎么保证消息队列的可靠性?
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
什么是死信?
如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。
什么是死信交换机
因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:
如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:
图文展示,一个消息被消费者拒绝了,变成了死信:
- 死信交换机名称
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
- 死信交换机与死信队列绑定的RoutingKey
队列将死信投递给死信交换机时,必须知道两个信息:
在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。
我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。
我们在consumer(消费者)服务中,定义一组死信交换机、死信队列:
案例:利用死信交换机接收死信
消息被消费者reject或者返回nack
消息超时未消费
队列满了
什么样的消息会成为死信?
- 如果队列绑定了死信交换机,死信会投递到死信交换机;
- 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。
死信交换机的使用场景是什么?
给队列设置dead-letter-exchange属性,指定一个交换机
给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey
如何给队列绑定死信交换机?
面试问答
死信交换机
TTL,也就是Time-To-Live(超时时间)。如果一个队列中的消息TTL结束仍未消费,则会变为死信。
消息所在的队列设置了存活时间
队列里面 停留(倒计时) 5秒,消息再发送到下一个环节
消息本身设置了存活时间
ttl超时分为两种情况:
在consumer(消费者)服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:
注意,这个队列设定了死信交换机为`dl.ttl.direct`
要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:
声明交换机,将ttl与交换机绑定:
发送消息,但是不要指定TTL:
发送消息的日志:
因为队列的TTL值是10000ms,也就是10秒。可以看到消息发送与接收之间的时差刚好是10秒。
查看下接收消息的日志:
声明一个队列,并且指定TTL
在发送消息时,也可以指定TTL:
查看发送消息日志\t:
接收消息日志:
这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信。
发送消息时,设定TTL
实战:接收超时死信的死信交换机
给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
两者共存时,以时间短的ttl为准
消息超时的两种方式是?
给消息的目标队列指定死信交换机
消费者监听与死信交换机绑定的队列
发送消息时给消息设置ttl为20秒
如何实现发送一个消息20秒后消费者才收到消息?
TTL
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟发送短信
用户下单,如果用户在15 分钟内未支付,则自动取消
预约工作会议,20分钟后自动通知所有参会人员
延迟队列的使用场景包括:
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html
- 接收消息
- 判断消息是否具备x-delay属性
- 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:
DelayExchange原理
插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。
基于注解方式(推荐):
也可以基于@Bean的方式:
声明DelayExchange交换机
发送消息时,一定要携带x-delay属性,指定延迟的时间:
发送消息
使用DelayExchange
•声明一个交换机,添加delayed属性为true
•发送消息时,添加x-delay头,值为超时时间
延迟队列插件的使用步骤包括哪些?
延迟队列
怎么解决消息队列 中的 延迟消息问题?
普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
类似redis 中的集群分片模式
普通集群
镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
类似redis 中的 哨兵模式
镜像集群
镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。
仲裁队列
普通集群,或者叫标准集群(classic cluster)
会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
队列所在节点宕机,队列中的消息就会丢失
具备下列特征:
交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
一个队列的主节点可能是另一个队列的镜像节点
所有操作都是主节点完成,然后同步给镜像节点
主宕机后,镜像节点会替代成新的主
镜像集群:本质是主从模式,具备下面的特征:
与镜像队列一样,都是主从模式,支持主从数据同步
使用非常简单,没有复杂的配置
主从同步基于Raft协议,强一致
仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:
Java代码创建仲裁队列
注意,这里用address来代替host、port方式
SpringAMQP连接MQ集群
怎么保证消息队列 高可用?
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
是什么
- 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
要提升队列容积,把消息保存在内存中显然是不行的。
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
- 扩大队列容积,提高堆积上限
解决消息堆积有两种思路:
惰性队列
怎么解决 消息队列的 消息堆积问题
MQ的一些常见面试题
消息队列RabbitMQ
0 条评论
回复 删除
下一页