RocketMQ(角色/集群/消息种类)
2021-02-23 17:11:20 0 举报
AI智能生成
角色/集群/消息种类
作者其他创作
大纲/内容
RocketMQ是由阿里研发的,<b>基于Java</b>,后来交给Apache孵化,是一款分布式、队列模型的消息中间件
优点
支持事务消息
严格保证消息顺序
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
吞吐量仅次于Kafka,亿级的消息堆积能力与Kafka相当
角色
架构
Producer:消息的发送者<br>
Consumer:消息接收者<br>
Broker:暂存和传输消息<br>
NameServer:路由中心(管理Broker)<br> 类似注册中心<br>
NameServer挂了怎么办?
只要有一台NameServer存活就可以通信
NameServer全都挂了呢?
RocketMQ不可用,生产者发消息会失败
Topic:区分消息的种类; 一个发送者可以发送消息给一个或者多个Topic; 一个消息的接收者可以订阅一个或者多个Topic消息<br>
Message Queue:相当于是Topic的子分区;用于并行发送和接收消息
集群
特点
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步<br>
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的Brokerld来定义, Brokerld为0表示Master, 非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信 息到所有NameServer
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。 Producer完全无状态, 可集群部署
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建 立长连接,且定时向Master、 Slave发送心跳。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定
模式
单Master模式
Broker重启或者宕机时,会导致整个服务不可用
多Master模式<br>2m-noslave
全是Master,没有Slave<br>
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘<br>非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响
多主多从模式(异步)<br>2m-2s-async
主备消息同步采用<b>异步复制方式</b>,Master成功后立即响应,然后异步发送到从节点,主备有短暂消息延迟(毫秒级)<br>
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,<br>而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样
缺点:Master宕机,磁盘损坏情况下会丢失少量消息
多主多从模式(同步)<br>2m-2s-sync
主备消息同步采用<b>同步双写方式</b>,只有主备都写成功,才向应用返回成功<br>
优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机
Dledger集群
支持高可用
工作流程
1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer. Consumer连上来,相当于一个路由控制中心<br>
2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,<br>NameServer集群中就有Topic跟Broker的映射关系<br>
3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic<br>
4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列<br>列表中选择一个队列, 然后与队列所在的Broker建立长连接从而向Broker发消息<br>
5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费
消息发送
消息的生产/消费
生产消息
1. 创建消息生产者producer,并制定生产者组名<br>2. 指定Nameserver地址<br>3. 启动producer<br>4. 创建消息对象,指定主题Topic、Tag和消息体<br>5. 发送消息<br>6. 关闭生产者producer
消费消息
1. 创建消费者Consumer,制定消费者组名<br>2. 指定Nameserver地址<br>3. 订阅主题Topic和Tag<br>4. 设置回调函数,处理消息<br>5. 启动消费者consumer
消费模式
负载均衡(默认)
广播模式(consumer.setMessageModel(MessageModel.BROADCASTING);)
消息种类
顺序消息
概念
消息有序指的是可以按照消息的发送顺序来消费(FIFO)
如何保证消息有序
全局有序:发送和消费参与的queue只有一个
分区有序:控制发送的顺序消息只依次发送到同一个queue中,<br> 消费的时候只从这个queue上依次拉取,则保证顺序
分区有序的实现原理
生产者构建 <b>消息队列选择器</b>,通过订单号路由消息
消费者用 <b>单线程的监听器,</b>消费队列中的有序消息
延时消息
使用限制
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";<br>现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级:<b>1 到 18</b><br>
用途举例:一小时后检查订单状态,如未付款就取消订单释放库存
实现原理
生产者的消息设置超时时间
消费者没有特殊改动
批量消息
Batch机制
把许多小的消息,合成为一条批量消息,一次发过去
减少网络IO,能显著提高传递消息的性能,限制4M
使用限制
批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息
批量消息的单次发送总大小<b>不能超过4MB</b>,如果超过4M则需要对消息进行分割
实现原理
生产者批量发送消息
消费者没有特殊变化
过滤消息
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:<br>DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");<br>consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC"); //消费者将接收包含TAGA或TAGB或TAGC的消息<br>
使用限制
一个消息只能有一个标签,这对于复杂的场景可能不起作用
在这种情况下,可以使用<b>SQL表达式筛选消息</b>。SQL特性可以通过发送消息时的属性来进行计算
实现原理
生产者发送消息时,可以通过`putUserProperty`来设置消息的属性
消费者消费消息时可以用MessageSelector.bySql来使用sql筛选消息
事务消息
防止生产者丢消息
同步发送+多次尝试
事务消息机制<br> <b>推荐使用</b>
(1) 生产者发送 half 消息到 MQ(对消费者不可见)
(2) MQ服务端收到 half 消息后记录消息并回复生产者
(3) 生产者根据MQ响应结果执行本地事务,并发送本地事务的执行状态
(4) MQ服务器根据本地事务状态执行Commit或者Rollback
Commit操作提交half消息,使消费者可见
RollBack是进行回滚操作,删除half消息
(5) 对没有发送状态的事务消息,MQ服务端会发起“回查”(默认回查15次,如果仍然失败则丢弃消息)
(6) 生产者收到回查消息,检查对应的本地事务的状态,重新Commit或者Rollback
防止 Broker 丢消息
刷盘策略
默认为异步刷盘,修改为同步刷盘,存入磁盘后再返回写入成功
通过Broker配置文件里的 <b>flushDiskType </b>参数设置
ASYNC_FLUSH
SYNC_FLUSH
集群同步
默认为异步同步(master写成功就返回)修改为同步到slave再返回成功
通过Broker配置文件里的 <b>brokerRole </b>参数设置
ASYNC_MASTER
SYNC_MASTER
因此可以通过同步刷盘策略+同步双写策略+主从的方式解决丢失消息的可能
防止消费者丢消息
消费者收到消息后先执行本地事务,再修改offset,然后通知Broker,如果通知失败则重试
不要使用异步处理逻辑,如果收到消息后开启线程异步处理,就返回成功,很容易导致消息丢失
事务消息状态
提交状态
提交事务,它允许消费者消费此消息
回滚状态
回滚事务,它代表该消息将被删除,不允许被消费
中间状态
中间状态,它代表需要检查消息队列来确定状态
问题
为什么要发送 half 消息?
验证MQ服务器的可靠性
half 消息写入 MQ 失败怎么办?
说明MQ服务器异常,不执行本地事务
half 消息成功了 MQ 没有收到后续响应怎么办?
MQ服务器回查事务状态
整个 MQ 服务器挂了怎么保证消息零丢失?
发送消息时增加降级方案:缓存
下单成功后如何等待支付成功?
下单成功后标记为unknow,利用RocketMQ的消息回查机制实现
0 条评论
下一页