MQ_01RocketMQ开发模型
2023-04-27 17:23:47 18 举报
AI智能生成
RocketMQ开发模型
作者其他创作
大纲/内容
RocketMQ开发模型
消息发送者
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
消息消费者
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
RocketMQ的消息样例
基本消息
生产者
同步发送
异步发送
单向发送
没有返回值,也没有回调。只管把消息发出去就行
消费者
拉模式
主动去Broker上拉取消息的拉模式
推模式
等待Broker把消息推送过来
顺序消息
RocketMQ保证的是消息的局部有序,而不是全局有序
RocketMQ也只保证了每个OrderID的所有消息有序(发到了同一个queue),而并不能保证所有消息都有序
消息有序的原理
要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序
发送者端
将一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序
Broker
一个队列内的消息是可以保证有序的
消费者端
消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的
消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息<br>
MessageListenerOrderly
通过锁队列的方式保证消息是一个一个队列来取的
MessageListenerConcurrently
不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条)。因此也无法保证消息有序
广播消息
广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组
延迟消息
message.setDelayTimeLevel(3)<br>producer.send(message)<br>
在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去
开源版本的RocketMQ
对延迟消息并不支持任意时间的延迟设定(商业版本中支持)
18个固定的延迟级别
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
批量消息
将多条消息合并成一个批量消息,一次发送出去
可以减少网络IO,提升吞吐量
如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB
使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK
过滤消息
使用Message的Tag属性来简单快速的过滤信息
使用SQL表达式来对消息进行过滤信息
只有推模式的消费者可以使用SQL过滤
事务消息
事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现
保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败
事务消息的使用限制
事务消息不支持延迟消息和批量消息
为了避免单个消息被检查太多次而导致半队列消息累积,默认将单个消息的检查次数限制为 15 次
可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制
如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志<br>通过重写 AbstractTransactionCheckListener 类来修改这个行为<br>
事务性消息可能不止一次被检查或消费
事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享
事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者
检查时间时隔<br>
Broker 配置文件中的参数 transactionMsgTimeout
用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数
实现机制
事务消息机制的关键是在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,这样对消费者是不可见的。<br>再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了<br>
流程<br>
1.生产者发送half消息到broke<br>
2.broker回执half消息到生产者<br>
3.执行本地事务<br>
4.回滚或者提交<br>
5.未确认的消息状态,borker对会生产线消息回查<br>
6.生产者回查消息状态<br>
7.根据详细状态回滚或者提交<br>
提交到topic的正常消费,回滚的消息则丢弃<br>
SpringCloudStream整合RocketMQ
SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品
SpringCloudStream框架。这是一套几乎通用的消息中间件编程框架,<br>从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了<br>
注意各个MQ的个性化配置属性
例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头
常见的问题
哪些环节会有丢消息的可能
跨网络环节
生产者发消息到MQ<br>
MQ到消费者
MQ主从同步的时候
MQ存盘环节<br>
通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘<br>
在这个时间差内,就可能会造成消息丢失:如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失<br>
消息零丢失方案
RocketMQ的事务消息机制就是为了保证零丢失来设计的<br>
half消息用处<br>
确认RocketMQ的服务是否正常
嗅探下RocketMQ服务是否正常,并且通知RocketMQ,我马上就要发一个很重要的消息了,你做好准备
half消息如果写入失败了
half消息如果写入失败,我们就可以认为MQ的服务是有问题的,这时,就不能通知下游服务了
可以在下单时给订单一个状态标记,然后等待MQ服务正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务
订单系统写数据库失败了
写数据库失败(可能是数据库崩了,需要等一段时间才能恢复)。那我们可以另外找个地方把订单消息先缓存起来
RocketMQ返回一个UNKNOWN状态。这样RocketMQ就会过一段时间来回查事务状态
在回查事务状态时再尝试把订单数据写入数据库,<br>如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失<br>
同步刷盘+Dledger主从架构保证MQ主从同步时不会丢消息
把RocketMQ的刷盘方式 flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了<br>
Dledger会通过两阶段提交的方式保证文件在主从之间成功同步<br>
uncommitted阶段<br>
Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件
commited阶段
接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态
Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步
消费者端不要使用异步消费机制<br>
NameServer挂了如何保证消息不丢失<br>
只能设计一个降级方案来处理这个问题
多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送
RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去
RocketMQ如何保证消息顺序
保证一致性的场景
数据系统,需要对业务系统的日志进行收集分析<br>
用户的积分正常顺序的加减
如何保证消息有序
全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费
常用的聊天室,就是个典型的需要保证消息全局有序的场景
将Topic配置成只有一个MessageQueue队列(默认是4个),对整个Topic的消息吞吐影响是非常大的
局部有序:只保证一部分关键消息的消费顺序
对于电商订单场景,也只要保证一个订单的所有消息是有序的就可以了
QQ聊天,只需要保证一个聊天窗口里的消息有序就可以了
将有序的一组消息都存入同一个MessageQueue里
快速处理积压消息
如何确定
使用web控制台,直接看到消息的积压情况。
以通过mqadmin指令在后台检查各个Topic的消息延迟情况
有RocketMQ也会在他的 ${storePathRootDir}/config 目录下落地一系列的json文件,也可以用来跟踪消息积压情况
如何处理
如果Topic下的MessageQueue配置得是足够多的
可以简单的通过增加Consumer的服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况<br>
最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同
如果Topic下的MessageQueue配置得不够多的话<br>
可以创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic<br>
紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的<br>
新的Topic上,就可以通过增加消费者个数来提高消费速度了<br>
0 条评论
下一页