常见的MQ
ActiveMQ
高吞吐, 低延迟, 高可用, 低丢失
维护少<br>
Kafka(大型公司)
超高吞吐, 低延迟, 分布式可用性; 有第三方web管理页面
RocketMQ(双11等高并发场景)
阿里, Java. 参照了Kafka
高吞吐, 高可用, 可分布式. 0丢失
RabbitMQ(中小公司)
基于AMQP协议的消息中间件. 最主流. 高性能, 稳定, 健壮,跨平台, 支持多语言, 有第三方web管理页面, 社区活跃, 更新频繁;
商业版付费<br>
消息的确认 -- 消息应答<br>
消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了(肯定/否定),rabbitmq 可以把该消息删除了
手动应答<br>
特点
Multiple
批量应答处理, 减少网络拥堵<br>
性能提升, 一致性下降
类型
肯定确认<br>
Channel.basicAck
否定确认<br>
Channel.basicNAck()<br>
Channel.basicReject()<br>
与 Channel.basicNack 相比少一个参数<br> 不处理该消息了直接拒绝,可以将其丢弃了
消息的防丢失
消费者断连 -- 消息自动重新入列
消费者失去连接, 但其已接收的消息并未处理完(未发送ACK确认). 消息队列会将该消息自动入列并重新排队以被消费<br>
开启手动应答, 在消费者回调中, 对消息进行消费后, 进行手动肯定应答. 对于未肯定应答的消息, 会自动进行重新入列<br>
服务器宕机 -- 消息和队列的持久化
队列持久化
(生产者) 初始化队列时, 持久化参数 durable: true
消息持久化
生产者发送消息时, 消息属性设置 BasicProperties: MessageProperties.PERSISTENT_TEXT_PLAIN<br>
将消息标记为持久化并不能完全保证不会丢失消息,消息还在缓存的一个间隔点, 此时并没<br>有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余
持久化间隔点时的服务器宕机 -- 发布确认
前提: 队列、消息的持久化开启<br>
发布信道开启 confirm 模式, 消息到达信道后回调<br>
开启发布确认
开启生产者信道的发布确认
channel.confirmSelect()<br>
(生产者)获取发布确认的结果
channel.waitForConfirms() : boolean
三种模式
同步
原理: 调用waitForConfirms() 返回 <b><font color="#64b5f6">自上次调用以来消息发布结果</font></b><br>
具体子模式
普通确认 单个 (简单,但吞吐量非常有限)
批量确认 (简单,合理的吞吐量, 排错难)
异步确认 (综合性能和排错性最佳)<br>
原理: 对信道添加发布确认监听器, 其中包含发布正确和错误的回调, 可对正确发布和错误发布分别处理.<br>
实现
Channel.addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);<br>
异常消息的处理 -- 死信队列
异常情况
消息被拒
消息TTL过期
队列达到最大长度
延时队列
利用死信队列中 TTL 过期的死信特性, 直接不配置前置消费者, 来达到延时通知的效果
问题: 对于消息粒度的过期时长, 死信判断只针对队列的最出口一个消息. 所以即使中间某个消息已超过ttl, 但因为消息堆积也可能还存在队列中. 故不能达成延时队列的顺序性<br>
解决方法: 使用 延时队列交换机插件
rabbitmq_delayed_message_exchange-3.8.0.ez
模型
子主题
注意: 对插件延时, 需要对延时交换机设置参数: x-delayed-type=direct<br>
发布确认高级与队列回退
前提: 开启发布确认模式/队列回退
spring.rabbitmq.publisher-confirm-type=correlated<br>
spring.rabbitmq.publish-return: true
在broker块对消息是否接收进行回调反馈
交换机(发布确认)
RabbitTemplate.ConfirmCallBack<br>
队列(队列回退)
RabbitTemplate.ReturnCallBack
rabbitTemplate.mandatory = true
将回退的消息返回给生产者, 否则丢弃
实现如上接口, 并注入到 使用的rabbitTemplate的对应内部属性中, (该注入操作需要前置到调用rabbitTemplate)
备份交换机
功能
可以替代队列回退 进行 队列的路由异常的处理. 优先级比队列回退高<br>
当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备<br>份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定<br>的队列中
使用
使用参数 "alternate-exchange=备份交换机名" 对主机进行绑定设置
开发中其它场景<br>
优先级队列
优先级范围 [0,255]<br>
对入队列的消息, 进行排队
使用
队列开启优先级设置 "x-max-priority=最大优先级", 通常最大设为10, 消息添加优先级参数<br>
惰性队列
队列的模式设置 "x-queue-mode=指定模式"
default正常队列(内存); lazy惰性队列(磁盘)
在慢消费-消息大量堆积而长时间不能消费的情况下节省内存
高可用负载均衡
HaProxy + KeepAlive<br>
RabbitMQ的集群与镜像队列
异常报错
https://blog.csdn.net/Epoch_Elysian/article/details/94075456
集群状态下(网络延迟导致的)数据一致性问题<br>
联邦交换机 Federation Exchange
联邦队列 Federation Queue