RocketMQ
2025-04-20 21:35:51 0 举报
RocketMQ是一款由阿里巴巴开源的消息中间件,它提供了分布式系统中消息的可靠传输及发布-订阅功能,广泛应用于在线和离线消息处理场景。作为一个高性能、高可靠性、易于使用的消息中间件,RocketMQ旨在支持核心的互联网场景,如流量削峰、应用解耦、消息分发等。它能够处理高并发读写请求,保证消息的零丢失,即使在系统故障情况下也能确保数据的一致性与完整性。此外,RocketMQ的设计非常灵活,用户可以根据需要编写插件来自定义消息存储、消息过滤和消息推送等逻辑,提供了强大的扩展性。整个系统支持千万级别的主题和队列,轻松应对大规模分布式系统的高性能消息服务需求。
作者其他创作
大纲/内容
Consumer
1.如果消费者组中的消费者数量小于Topic下的分区数,那么在消费者组中增加消费者的数量,并且启动集群模式消费,配置合适的负载均衡策略2.如果Topic对应的分区不小于消费者组中的消费者数量,那么增加消费者组2.批量消费3.批量拉取消息,然后根据消息的业务特性放到不同的内存队列中,然后利用线程池使用不同的线程去处理唯一的一个队列4.优化消费端业务逻辑代码
既要保证消息的顺序性又不产生消息积压
消息顺序消费
Queue1
Queue2
Broker端
生产端
Broker1
根据业务在发送消息的时候指定key保证同一个key的业务消息会发送到相同的一个Queue中
TopicA
RocketMQ集群
消费者
Broker2
Client2
消息重试
Consumer Group1
Queue4
1.消费端关闭自动commit进行手动commit2.消费端使用集群模式,消费者将消费进度(Offset)定期持久化到 Broker,记录已成功处理的消息位置
Client1
RocketMQ相比于Kafka、RabbitMQ来说官方已经帮我们封装好了很多消息不丢失、顺序消费、消息积压的解决方案
生产端降低生产速率
生产端有同步发送重试retryTimesWhenSendFailed以及异步发送重试配置retryTimesWhenSendAsyncFailed
消息积压如何快速消费
1.同步发送:消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式注意:同步发送方式请务必捕获发送异常,并做业务侧失败兜底逻辑,如果忽略异常则可能会导致消息未成功发送的情况2.异步发送+回调函数实现:需要自己去实现SendCallback()回调函数,然后进行重试或者兜底逻辑3.事务消息发送:事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。font color=\"#e80c38\
消费端
TopicB
根据业务将Topic拆分出更多的Queue,同比例的增加分区和对应的消费者组
1.Broker设置刷盘机制为flushDiskType=SYNC_FLUSH:生产者的消息写入请求需等待 Broker 将数据刷入磁盘后才返回成功响应2.Broker端一主多从集群部署,通过 brokerRole=SYNC_MASTER 配置,当Master 将消息同步到 Slave 成功后才返回生产者写入成功3.若 Master 宕机,消费者会自动切换到 Slave 节点继续消费,避免服务中断
消息不丢失
在@RocketMQMessageListener注解中设置重试次数maxReconsumeTimes注意:消费端在进行消费重试的时候要在业务层做好幂等性处理
1.为了防止消息积压,消费端可以批量进行拉取消息,然后单线程顺序消费,如果这一批消息中某一个消息消费失败了,先根据重试规则进行重试,如果达到重试次数之后依然没有成功,将这个消息发送给DLT中,继续去消费后面的消息,最后依然会批量提交这一批消息2.批量拉取消息,然后放到内存队列中,然后利用线程池来进行多线程处理,最后批量提交偏移量
Consumer Group2
Topic下的Queue中的消息天生保证了顺序性
Queue3
1.创建 Topic 是要指定 -o 参数(--order)为true2.保证NameServer中的配置orderMessageEnable=true3.保证NameServer中的配置returnOrderTopicConfigToBroker=true
0 条评论
下一页