RocketMQ
2024-08-01 18:37:38 5 举报
AI智能生成
登录查看完整内容
RocketMQ是一个分布式、高性能、易于扩展的消息中间件,适用于大规模分布式系统之间的异步通信。它提供了包括发布/订阅、请求/回复、负载均衡、容错性、事务性消息等特性。RocketMQ基于Java开发,支持多种编程语言的客户端,如Java、C++、Python等。它主要用于大规模数据采集、实时处理、在线大数据分析等场景。
作者其他创作
大纲/内容
消息队列(MQ)是一种系统间相互写作的通信机制,目前业界通常由两种方式来实现系统间通信,其中一种是基于远程过程调用的方式;另一种是基于消息队列的方式。前一种就是我们常说的RPC调用,客户端不需要知道调用的具体实现细节,只需要调用实际存在于远程计算机上的某个对象即可,但调用方式看起来和调用本地应用程序中的对象一样。基于消息队列的方式是指由应用中的某个系统负责发送消息,由关心这条消息的相应系统负责接收消息,并在收到消息后进行各自系统内的业务处理,消息可以非常简单,比如只包含文本字符串;也可以很复杂,比如包含字节流、字节数组,还可能包含嵌入对象,消息在被发送后可以立即返回,由消息队列来负责消息的传递,消息发布者只管将消息发布到消息队列而不用管谁来去,消息使用者只管从消息队列中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在
消息中间件是什么
解耦。多个业务系统间的解耦,使得单个业务系统更加独立
异步。对于一些业务上实时性要求不高的处理,可以转为异步处理
削峰。对于某个时间段的高并发流量可以进行固定速率进行处理
数据一致性。事务消息可以帮我们更好的实现分布式事务
为什么需要消息中间件
1.RPC虽然可以分为同步调用和异步调用,但在大多数情况下,RPC请求发出时是需要获取最终的调用结果的,需要结果进行回传
2.MQ虽然也可以实现分布式系统之间的调用,但是MQ回传被调用方的处理结果给调用方是,这个操作是比较困难的,不是很容易实现
MQ和RPC的区别
这是因为Kafka在一开始设计Parition的时候,就已经设计成了一个Parition在同一个时刻只能被一个Consumer消费,当消费者数量大于分区数量时,新加入的消费者是消费不到消息的,除非之前的分区数量是小于消费者数量
Kafka与RocketMQ
RocketMQ是阿里巴巴于2012年开源的分布式消息中间件,后来捐赠给Apache软件基金会,并于2017年9月25日称为Apache的顶级项目.作为经历多过多次阿里巴巴双11这种超级工程的洗礼并有稳定出色表现得国产中间件,以其高性能、低延迟和高可靠等特性近年来被越来越多的国内企业所使用
RocketMQ是所有的Topic都写入到一个CommitLog文件
概述
集群结构
组件结构
领域模型结构
模型设计图
具有灵活的可扩展性。RocketMQ天然支持集群,其核心四大组件(NameServer、Broker、Producer、Consumer)的每一个都可以在没有单点故障的情况下进行水平扩展
具有海量消息堆积能力。RocketMQ采用零拷贝原理实现了超大量消息的堆积能力,据说单机已经可以支持亿级消息堆积而且在堆积了这么多消息后依然可以保持写入低延迟
支持顺序消息。RocketMQ可以保证消息消费者按照消息的发送的顺序对消息进行消费。顺序消息分为全局有序消息和局部有序,一般推荐使用局部有序消息,即生产者通过将某一类的消息按顺序发送到同一个队列中来实现
支持事务消息,RocketMQ除了支持普通消息、顺序消息之外,还支持事务消息,这个特性对于分布式事务来说提供了另一种解决思路
支持回溯消费,回溯消费是指对于消费者已经消费成功的消息,由于业务需求需要重新消费,RocketMQ支持按照时间回溯消费时间精确到毫秒,可以向前回溯,也可以向后回溯
特点
主题(Topic)可以被看作是消息的归类,它是消息的第一级类型,比如一个电商系统可以分为交易信息、物流信息等,一条消息必须有一个主题,主题与生产者和消费者的关系非常松散,一个主题可以有0个或多个生产者向其发送消息,一个生产者也可以同时向不同的主题发送消息,一个主题也可以被多个消费者订阅
Topic
消息就是要传输的信息。一条消息必须有一个主题,主题可以被看作是信件要邮寄的地址一条消息也可以拥有可选的标签和额外的键值对,它们被用于设置一个业务key并在broker上查找此消息,以便在卡法期间查找问题
Message
主题被划分为一个或者多个子主题,即队列(Queue),在一个主题下可以设置多个队列,在发送消息时执行该消息的主题,RocketMQ会轮询该主题下的所有队列将消息发送出去
Queue
标签(tag)可以被看作是子主题,它是消息的第二级类型,用于伪用户提供额外的灵活性。使用标签,同一业务模块的不同目的的消息就可以用相同的主题而不同的标签来标识。比如交易消息又可以分为交易创建消息,交易完成消息等,一条消息可以没有标签标签有助于保持代码干净和连贯,并且还可以为RocketMQ的查询系统提供帮助
Tag
负责生产消息,生产者向消息服务器发送由业务应用程序系统生成的消息,RocketMQ提供了三种方式发送消息同步、异步、单向
同步发送指消息发送方发出数据后,会在收到接收方发出的响应之后才发送下一个数据包,一般适用于重要通知消息场景,例如重要通知邮件,营销短信等,消息最可靠,如果发送失败,则进行重传,不会引起消息丢失,但可能会发出重复消息,性能比较低
同步发送
异步发送指发出数据后,不等接收方发回响应,就接着发送下一个数据包一般适用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知转码服务等。一般需要提供回调接口,当broker收到消息时调用回调方法,让Produce回查哪些消息发送成功,哪些发送失败,对于发送失败的消息,进行重发,对于批量消息而言,如果其中某一个消息发送失败,则需要重发这一批消息,容易引起消息重复,但是这种机制效率最高,实现也比较复杂
异步发送
单向发送指只负责发送消息而不等服务器回应且没有回调函数触发。一般适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集
单向发送
Producer
消费者负责消费消息,它从消息服务器拉取消息并将其输入用户应用程序中,从用户应用的角度来看,消费者由两种类型,拉取型消费者和推送型消费者
拉取型消费者(Pull Consumer)主动从消息服务器拉取消息,只要批量拉取到消息,用户应用就会启动消费过程,所以poll被称为主动消费类型
Consumer
单Master采用这种方式,一旦Broker重启或宕机就会导致整个服务不可用,这种方式风险较大,所以不建议在线上使用
多Master所有消息服务器都是Master,没有Slave.这种方式的优点是配置简单,单个Master当即或重启维护对应用无影响缺点是在单台机器宕机期间,该机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会受到影响
多Master多Slave(同步双写)为每个Master都配置一个Slave,所以有多对Master-Slave,消息采用同步双写方式,主备都写成功了才返回成功,这种方式的优点是数据与服务没有单点问题,Master宕机时消息五延迟,服务于数据的可用性非常高缺点是相对异步复制的方式其性能略低,发送消息的延迟略高
多Master多Slave(异步复制)为每个Master都配置一个Slave,所以有多对Master-Slave,消息采用异步复制方式,竹北之间有毫秒级消息延迟,这种方式的优点时丢失的消息非常少,且消息的实时性不会受到影响,Master宕机后消费者可以继续从Slave消费,中间的过程对用户应用程序透明,不需要人工干预,性能同多Master方式几乎一样,缺点是Master宕机后在磁盘损坏的情况下会丢失极少量的消息
Broker
生产者组(ProducerGroup)是一类生产者地集合,这类生产者通常发送一类消息并且发送逻辑一致,所以将这些生产者分组在一起从部署结构上看,生产者通过生产者组的名字来标识自己是一个集群
Producer分组有什么用
Producer Group
Consumer分组有什么用?
Consumer Group
1.NameServer之间没有任何通信,每个都是独立的,不存在选举问题而ZooKeeper之间需要通过选举
2.ZooKeeper在CAP理论中保证的是CP,站在服务中心的角度来看,AP要比CP好,如果客户端发消息请求NameServer发送失败,重发即可,在用Zookeeper,一致性期间,NamerServer无法访问
3.Nacos通用的服务注册和参数配置,虽然Nacos是AP,但是RocketMQ中NameServer做的事情比较少,只需要保管好这些Broker的信息即可,这是一个轻量级框架
4.如果要用分布式,则必然要解决脑裂问题,当出现网络分区,选出两个Master时,则需要再次进行选举
为什么要自己实现注册中心,而不用Zookeeper
NameServer
核心组件
普通消息
将同一个订单(即具有相同的orderId)的消息按状态先后顺序消费的,所以消息生产者调用send方法发送时需要传入MessageQueueSelector接口的实现类,将orderId相同的消息放入同一个MessageQueue中,比如对orderId进行取余,消费端还需要实现MessageListenerOrderly接口用于消费有序的消息,MessageListenerConcurrently接口消费的消息是无序的
顺序消费的原理是确保将消息投递到同一个队列中,在队列内部RocketMQ保证先进先出,而同一个队列会被投递到同一个消费者实例再由消费者拉取数据进行消费。在消费者内部会维护本地队列锁,以保证当前只有一个线程能够进行消费,所拉到的消息先被存入消息处理队列中,然后再从消息处理队列中顺序获取消息用MessageListenerOrderly进行消费(这也是在顺序消费时监听消息要实现MessageListener接口)的原因
消费者端的顺序消费,需要有个前提,那就是保证Producer、Broker要保证有序,缺一不可
Broker中一个队列内的消息是可以保证有序的
消费者会从多个消息队列上去拿消息,这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的,消费者端要保证消息是有序的,就需要按队列一个一个地来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给Consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的,MessageListenerConcurrently这个消息监听器不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条),因此也无法保证消息有序
顺序消息
延时消息
批量消息
过滤消息是在Broker端做,这样能节省网络带宽。过滤时,先进行MessageTag的HashCode比较,如果相同,再进行内容比较
过滤消息
所谓事务消息就是基于消息中间件模拟的两阶段提交(2PC),属于对消息中间件的一种特殊利用。总体思路如下:1.系统A先向消息中间件发送一条预备消息(Half Message),消息中间件在保存好消息之后向系统A发送确认消息2.系统A执行本地事务3.系统A根据本地事务执行结果再向消息中间价发送提交消息,以提交二次确认,如果消息中间件得到不到系统A的本地事务执行情况,将会执行系统A实现的本地事务回查接口4.消息中间件收到提交消息后,把预备消息标记为可投递,订阅者最终将接收到该消息
事务消息设计模型图
Producer本地事务执行和Half消息发送组成一个整体,消费者消费下游服务组成一个整体
事务消息
支持的消息类型
集群消费
广播消费
消费模式
顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,如果正在处理的全局顺序时强制性的场景,则需要保证所使用的主题只有一个消息队列
顺序消费
不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制
并行消费
消费顺序
2.RocketMQ的其他一些核心参数
ulimit,需要进行大量的网络通信和磁盘IO
3.Linux内核参数定制。在部署RocketMQ的时候,还需要对Linux内核参数进行一定的定制
系统参数调优相关
产生的原因是发送消息时采用了多数分布式消息中间件产品提供的最少一次(at least once)的投递保障,对于这个问题最常见的解决方案就是消息消费端实现业务幂等,只要保持幂等性,不管来多少条重复消息,最后处理的结果都是一样的
保障策略有at most once 最多消费一次, at least once 最少消费一次, exactly once 刚好一次,RocketMQ不支持exactly once只有一次的模式,因为要在分布式系统下实现发送不重复并且消费不重复,将会产生非常大的开销,RocketMQ为了追求高性能并没有支持此特性其实该问题的本质时网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才会产生消息重复的问题
重复消费
为什么RocketMQ不用ZooKeeper而要自己实现一个NameServer来注册?见NameServer组件介绍
Consumer分组有什么用? Producer分组的作用?见Producer/Consumer Group组件介绍
这4个环节都有丢消息的可能
哪些环节会有丢消息的可能
1.为什么要发送这个half消息?有什么用
如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ.这时候写入消息到MQ如果失败就会非常尴尬了,而Half消息如果写入失败,我们就可以认为MQ的服务是有问题的,这时就不能通知下游服务了,我们可以在下单时给订单一个状态标记,然后等待MQ服务正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务
2.half消息如果写入失败了怎么办?
3.订单系统写数据库失败了怎么办?
在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的,也就是如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务,这时,我们就可以将订单一直标记为\"新下单\"的状态。而等RocketMQ恢复后,只要存储的消息没有丢失,RocketMQ就会再次继续状态回查的流程
4.Half消息写入成功后RocketMQ挂了怎么办?
数据库方案.在订单场景下,通常会要求下单完成后,客户在一定时间内,例如10分钟内完成订单支付,支付完成后才会通知下游服务进行进一步地营销补偿?如果不适用事务消息,那通常会怎么办?最简单地方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超时的订单进行回收,这种方式显然有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小的压力。
延迟消息方案.更进一步的方案是什么呢?是不是就可以使用RocketMQ提供的延迟消息机制,往MQ发一个延迟一分钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知,而如果没有支付,就再发一个延迟1分钟的消息,最终在第是个消息时把订单回收,这个方案就不用对全部的订单表进行扫描,而只需要每次处理一个单独的订单消息
事务消息方案.利用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的位置状态。而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。只需要配置RocketMQ的事务消息回查次数(默认15此)和事务回查间隔时间(messageDelayLevel)就可以更优雅的完成这个支付状态检查的需求
5.下单成功后如何优雅地等待支付成功?
整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业务的分布式事务一致性问题,而事务一致性问题一直依赖都是一个非常复杂的问题。而RocketMQ的事务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件的事务一致性,而对下游服务的事务并没有保证,但是即便如此,也是分布式事务的一个很好的降级方案,目前来看,也是业内最好的降级方案
6.事务消息机制的作用
1.生产者使用事务消息机制保证消息零丢失
可以简单的把RocketMQ的刷盘方式flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了
1.同步刷盘
在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式抱着呢个文件在主从之间成功同步
简单来说,数据同步会通过两个阶段,一个是uncommited阶段,一个是commiitted阶段Leader Broker上的Dledger收到一条消息后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发送给Follower Broker的DledgerServer组件。接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger,如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态再接下来,Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让它们把消息也标记为committed状态,这样,就基于Raft协议完成了两阶段的数据同步
2.Dledger的文件同步
正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK相应,这时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种情况会造成服务端消息丢失,这种异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能
3.消费者端不要使用异步消费机制
NameServer在RocketMQ中,扮演一个路由中心的角色,提供到Broker的路由功能。但是其实这样的路由中心这样的功能,在所有的MQ中都是需要的,Kafka使用ZooKeeper和一个作为Controller的Broker一起来提供路由服务的,整个功能是相当复杂纠结的。而RabbitMQ是由每一个Broker来提供路由服务,只有RocketMQ把这个路由中心单独抽取了出来,并独立部署,每一个NameServer都是独立的,集群中任意多的节点挂掉,都不会影响它提供的路由功能,如果集群中所有的NameServer节点都挂了呢?
有很多人就会认为生产者和消费者中都会有全部路由信息的缓存副本,那整个服务可以正常工作一段时间,当NameServer全部挂了后,胜场这和消费者是立即就无法工作了的
回到消息不丢失的问题。在这种情况下,RocketMQ相当于整个服务都不可用了,那它本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如再订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另找地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时地扫描这些失败地订单消息,尝试往RocketMQ发送,这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的
4.RocketMQ特有的问题,NameServer挂了如何保证消息不丢失
1.生产者使用事务消息机制
2.Broker配置同步刷盘+Dledger主从架构
3.消费者不要使用异步消费
4.整个MQ挂了之后准备降级方案
这套方案在各个环节都大量地降低了系统地处理性能以及吞吐量。在很多场景下,这套方案带来的性能损失的代价可能远大于部分消息丢失的代价。所以在使用这套方案时,要根据实际的业务情况来考虑,例如,如果针对所有服务器都在同一个机房的场景,完全可以把Broker配置成异步刷盘来提升吞吐量。而在有些对消息可靠性要求没有那么高的场景,在生产者端就可以采用其他一些更简单的方案来提升吞吐,而采用定时对账、补偿的机制来提高消息的可靠性。而如果消费者不需要进行消息存盘,那使用异步消费的机制带来的性能提升也是非常显著的。
5.RocketMQ消息零丢失方案总结
2.RocketMQ配置同步刷盘+(Dledger)Broker主从架构保证MQ主从同步时不会丢消息
RocketMQ消息零丢失方案
比如,下单完之后,需要支付成功,才会进行物流快递,不能先让物流服务执行,再支付成功
1.为什么要保证消息有序?
全局有序。整个MQ系统的所有消息岩哥按照队列先入先出顺序进行消费
局部有序。只保证一部分关键消息的消费顺序
首先我们需要分析下这个问题,在通常的业务场景中,全局有序和局部有序哪个更重要?其实在大部分的MQ业务场景,我们只需要保证局部有序就可以了,对于电商订单场景,只要保证一个订单的所有消息是有序的就可以了,全局消息的顺序并不会太关心
落地到RocketMQ。通常情况下,发送者发送消息时,会通过MessageQueue轮询的方式保证消息尽量均匀地分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,它们之间的消息都是互相隔离的,在这种情况下,是无法保证消息全局有序的,而对于局部有序的要求,只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中,剋在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发到哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。
另外,通常所谓的保证Topic全局消息有序的方式,就是将Topic配置成只有一个MessageQueue队列(默认是4个)。这样天生就能保证消息全局有序,这种方式对整个Topic的消息吞吐影响是非常大的,如果这样用,基本上就没有用MQ的必要了
2.如何保证消息有序?
使用RocketMQ如何保证消息顺序
在正常情况下,使用MQ都会要尽量保证它的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。这类问题通常在实际工作中会出现得比较隐蔽。例如某一天一个数据库突然挂了,大家大概率就会集中处理数据库得问题,等好不容易把数据库恢复过来了,这时基于这个数据库服务得消费者程序就会积累大量的消息。或者网络波动等情况,也会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当恐怖的,所以消息积压是个需要时时关注的问题
对于消息积压,如果是RocketMQ或者Kafka还好,它们的消息积压不会对性能造成很大的影响,而如果是RabbitMQ的话,那就不太好了,大量的消息积压可以瞬间造成性能直线下滑。对于RocketMQ来说,有个最简单的方式来确定消息是否有积压。那就是使用web控制台,就能直接看到消息的积压情况,另外也可以通过mqadmin指令在后台检查各个Topic的消息延迟情况,还可以在它的${sotrePathRootDir}/config目录下落地一系列的json文件,也可以用来跟踪消息积压情况
1.如何确定RocketMQ有大量的消息积压?
如果Topic下的MessageQueue配置得是足够多的,那每个Consumer实际上会分配多个MessageQueue来进行消费。这个时候,就可以简单地通过增加Consumer的服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。但是如果此时再继续增加Consumer的服务节点就没有用了
如果Topic下的MessageQueue配置不够多的话,那就不能用上面这种增加Consumer节点个数的方法了这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够的多MessageQueue.然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的,然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了.之后再根据情况恢复成正常的情况
2.如何处理大量积压的消息
使用RocketMQ如何快速处理积压消息
RocketMQ默认提供了消息轨迹的功能,这个功能在排查问题时是非常有用的
生产实例信息
发送消息时间
消息是否发送成功
发送耗时
消费实例信息
投递时间,投递轮次
消息是否消费成功
消费耗时
消息的Topic
消息存储位置
消息的Key值
消息的Tag值
1.RocketMQ消息轨迹数据的关键属性
2.消息轨迹配置
默认情况下,消息轨迹数据是存于一个系统级别的TopicRMQ_SYS_TRACE_TOPIC,这个Topic在Broker节点启动时会自动创建出来,当然也可以自定义
3.消息轨迹数据存储
RocketMQ的消息轨迹
调整之前,调整任意一个实例的订阅关系和另一个保持一致
调整之后
订阅关系不一致
1.确认哪个消息未消费。在这时消费者至少需要手机消息id、消息key、消息发送时间段三者之一2.确认消息是否发送成功。可以通过消息id、消息key、消息时间段等任意一个条件在社区提供的RocketMQ Console查找消息。如果查到消息,说明问题在消费者自身。此时消费者可以做如下检查,确认问题:2.1 订阅的Topic和发送消息的Topic是否一致,包含大小写一致2.2 订阅关系是否一致2.3 消费代码是否抛出了异常,导致没有记录日志2.4 消费者服务器和Namesrv或者Broker是否网络通畅3.如果在第二步中没有查到消息,说明生产者没有生产成功。消息没有生产成功的问题可能是生产者自身的问题,也可能是Namesrv或者Broker问题导致消息发送失败。此时生产者可以做如下检查3.1 确认生产者服务器与Namesrv或Broker网络是否通畅3.2 检查生产者发送日志,确认生产者是否被流控3.3 检查Broker日志,确认Broker是否繁忙3.4 检查Broker日志,确认磁盘是否已满
消费者不能消费消息是最常见的问题之一,也是每个消息队列服务都会遇到的问题
订阅关系不一致和不能消费时如何排查?
常见问题
在RocketMQ的管理控制台创建Topic时,可以看到要单独设置读队列和写队列。通常在运行时,都需要设置读队列=写队列。perm字段表示Topic的权限,有三个可选项2:禁写禁订阅4: 可订阅6: 可写可订阅这其中,写队列会真实的创建对应的存储文件,负责消息写入。而读队列会记录Consumer的Offset,负责消息读取,这其实是一种读写分离的思想。RocketMQ在设置MessageQueue的路由策略时,就可以通过指向不同的队列来实现读写分离
在往写队列里写Message时,会同步写入到一个对应的读队列中
如果写队列大于读队列,就会有一部分写队列无法写入到读队列中,这一部分的消息就无法被读取,就会造成消息丢失 --消息存入了,但是读不出来
而如果反过来,写队列小于读队列,那就有一部分读队列里时没有消息写入的,如果有一个消费者被分配的时这些没有消息的读队列,那这些消费者就无法消费消息,造成消费者空转,极大的浪费性能
从这里可以看到,写队列>读队列,会造成消息丢失,写队列<读队列,又会造成消费者空转,所以,在使用时,都是要求=读队列.只有一种情况下可以考虑将读写队列设置为不一致,就是要对Topic的MessageQueue进行缩减的时候。例如原来四个队列,现在要缩减成两个队列。如果立即缩减读写队列,那么被缩减的MessageQueue上没有被消费的消息,就会丢失,这时,可以先缩减写队列,待空出来的读队列上的消息都被消费完了之后,再来缩减读队列,这样就可以比较平稳的实现队列缩减了
读队列与写队列
CommitLog
ConsumerQueue
为了消息查询提供了一种通过key或者时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
IndexFile
RocketMQ消息直接采用磁盘文件保存消息,默认路径在${user_home}/store目录下,这些存储目录可以在broker.conf中自行指定,存储文件主要分为三个部分
数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳
checkpoint
这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset等等一些信息
config/*.json
这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作
abort
另外还有几个辅助的存储文件
1.CommitLog文件存储所有消息实体。所有生产者发过来的消息,都会无差别的依次存储到commitLog文件当中。这样的好处是可以减少查找目标文件的时间,让消息以最快的速度落盘,对比Kafka存文件时,需要寻找消息所属的Partition文件,再完成写入,当Topic比较多时,这样的Partition寻址就会浪费比较多的时间,所以Kafka不太适合多Topic的场景,而RocketMQ的这种快速落盘的方式在多Topic场景下,优势就比较明显文件结构:CommitLog的文件大小是固定的,但是其中存储的每个消息单元长度是不固定的,具体格式可以参考org.apache.rokcet.store.CommitLog.正因为消息的记录大小不固定,所以RocketMQ在每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,如果不够的话,就重新创建一个CommitLog文件,文件名为当前消息的偏移量
3.IndexFile文件主要是辅助消息检索。消费者进行消息消费时,通过ConsumeQueue文件就足够完成消息检索了,但是如果要按照MessageId或者Messagekey来检索文件,比如RocketMQ管理控制台的消息轨迹功能,ConsumeQueue文件就不够用了,IndexFile文件就是用来辅助这类消息检索的,它的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,它也是一个固定大小的文件,文件结构:它的文件结构由indexHeader(固定40byte) + slot(固定500w个,每个固定20Byte) + index(最多500W*4个,每个固定20Byte)三部分组成
整体的消息存储结构
消息持久化--重点
消息既然要持久化,就必须有对应的删除机制,RocketMQ内置了一套过期文件的删除机制,首先:如何判断过期文件:RocketMQ中CommitLog文件和ConsumeQueue文件都是以偏移量命名的,对于非当前写的文件,如果超过了一定的保留时间,那么这些文件都会被认为是过期文件,随时可以删除。这个保留时间就是在broker.conf中配置的fieReservedTime属性。注意,RocketMQ判断文件是否过期的唯一标准就是非当前写文件的保留时间,而并不关心文件当中的消息是否被消费过。所以,RocketMQ的消息堆积也是有时间限度的
然后:何时删除过期文件:RocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作。用户可以指定文件删除操作的执行时间.在broker.conf中deleteWhen属性指定,默认是凌晨四点
过期文件的删除
RocketMQ采用了类似于Kafka的文件存储机制,但是文件存储是一个比较重的操作,需要有非常多的设计才能保证频繁的文件读写场景下的高性能
我们知道,操作系统对于内存空间,是分为用户态和内核态的,用户态的应用程序无法直接操作硬件,需要通过内核空间进行操作转换,才能真正操作硬件。这其实是为了保护操作系统的安全,正因为如此,应用程序需要与网卡、磁盘等硬件进行数据交互时,就需要在用户态和内核态之间来回的复制数据,而这些操作,原本都需要由CPU来进行任务的分配、调度等管理步骤的,早先这些IO接口都是由CPU独立负责,所以当发生大规模的数据读写操作时,CPU的占用率会非常高,见上图
1.理解CPU拷贝和DMA拷贝
import java.util.Scanner;public class BlockDemo { public static void main(String[] args) { Scanner scanner = new Scanner(System.in); final String s= scanner.nextLine(); System.out.println(s); }}
2.mmap文件映射机制
子主题
3.sendFile机制是怎么运行的
零拷贝技术加速文件读写。零拷贝(zero-copy)是操作系统层面提供的一种加速文件读写的操作机制,非常多的开源软件都在大量使用零拷贝,来提升IO操作的性能。对于Java应用层面,对应着mmap和sendFile两种方式
顺序写加速文件写入磁盘。通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片,所以在写一个文件时,也就无法把一个文件卸载一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写,这个过程中有大量的寻址操作,会严重影响写数据的性能,而顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。Kafka官方详细分析过顺序写的性能提升问题,Kafka官方曾说明,顺序写的性能基本能够达到内存级别,而如果配备固态硬盘,顺序写的性能甚至有可能超过写内存,而RocketMQ很大程度上借鉴了Kafka的这思想
PageCache是源源不断产生的,而Linux操作系统显然不可能时时刻刻往硬盘写文件,所以,操作系统只会在某些特定的时刻将PageCache写入到磁盘。例如当我们正常关机时,就会完成PageCache刷盘,另外,在Linux中,对于有数据修改的PageCache,会标记为Dirty(脏页)状态。当DirtyPage的比例达到一定的阈值时,就会触发一次刷盘操作,例如在Linux操作系统当中,可以通过/proc/meminfo文件查看到PageCache的状态
但是,只要操作系统的刷盘操作不是时时刻刻执行的,那么对于用户态的应用程序来说,那就避免不了非正常宕机时的数据丢失问题,因此,操作系统也提供了一个系统调用,应用程序可以自行调用这个系统调用,完成PageCache的强制刷盘。在Linux中时fsync(),也可以用man systemcall fsync()进行查看
同步刷盘。在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完后唤醒等待的线程,返回消息写成功的状态
异步刷盘。在返回写成功状态时,消息可能只是被写入了内存的PageCache,写操作的返回快,吞吐量大,当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入
配置方式:刷盘方式是通过Broker配置文件里的flushDiskType参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的一个,同步刷盘机制会更频繁地调用fsync,所以吞吐量相比异步刷盘会降低,但是数据地安全性会得到提高
RocketMQ对于何时进行刷盘,也设计了两种刷盘机制,同步刷盘和异步刷盘
刷盘机制保证消息不丢失。在操作系统层面,当应用程序写入一个文件时,文件内容并不会直接写入到硬件当中,而是会先写入到操作系统中的一个缓存PageCache中。PageCache缓存以4K大小为单位,缓存文件的具体内容。这些写入到PageCache中的文件,在应用程序看来,是已经完全落盘保存好了的,可以正常修改、复制等等。但是,本质上PageCache依然是内存形态,所以一断电就会丢失,因此,需要将内存状态的数据写入到磁盘当中,这样数据才能真正完成持久化,断电也不会丢失这个过程就称为刷盘
高效文件写
消息主从复制
Consumer也是以MessageQueue为单位来进行负载均衡的,分为集群模式和广播模式
AllocateMachineRoomNearby将统计放的Consumer和Broker有限分配在一起。这个策略可以通过一个matchineRoomResolve对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对统计放的Broker和Consumer进行分配一般也就用简单的平均分配策略或者轮询分配策略(但是比较鸡肋,直接给属性指定机房更好)
AllocateMessageQueueAveragely平均分配,将所有MessageQueue平均分给每一个消费者
AllocateMessageQueueAveragelyByCircle轮询分配。轮流地给一个消费者分配一个MessageQueue
AllocateMessageQueueByMachineRoom按逻辑机房地概念进行分配。又是对BrokerName和COnsumerId定制化地配置
ALlocateMessageQueueConsistenHash这个一致性哈希策略只需要指定一个虚拟节点数,使用一个Hash环地算法,虚拟节点是为了让Hash数据在环上分布更为均匀
集群模式
官博模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说,而在实现上,即使在Consumer分配Queue时,所有Consumer都分到所有的Queue.广播模式实现的关键是将消费者的消费偏移量不再保存到Broker当中
广播模式
负载均衡--重点
首先对于广播模式下的消息,是不存在消息重试的机制的,即消息消费失败后,会再重新进行发送,而只是继续消费新的消息,而对于普通的消息,当消费者消费失败后,可以通过设置返回状态达到消息重试的结果
如何让消息进行重试?集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:1.返回Action.ReconsumeLater (推荐)2.返回null3.抛出异常如果希望消费失败后不重试,可以直接返回.CommitMessage
如果消息重试16次后仍然失败,消息将不再投递,转为进入死信队列。另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。这个重试次数,RocketMQ可以进行定制,例如通过consumer.setMaxReconsumeTimes(20)// 将重试次数设置为20次,当定制的重试次数超过16次后,消息的重试时间间隔均为2小时
重试次数
在老版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessgeId始终都是一样的但是在4.9.1版本中,每次重试MessageId都会重建
MessageId
消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效,并且最后启动的Consuemr会覆盖之前启动的Consumer配置
配置覆盖
重试消息如何处理?重试的消息会进入一个\"%RETRY\" + ConsumerGroup的队列中,然后RocketMQ默认允许每条消息最多重试16次,每重试间隔时间如图,随着重试次数的递增,重发间隔时间也是递增的,注:消费者实例要避免只有一个,否则重试次数是没有意义的
消息重试
当一条消息消费失败,RocketMQ就会自动进行消息重试,而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题,但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列死信队列的名称是\"%DLQ%+ConsumerGroup\
死信队列
幂等概念
1.发送时消息重复当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且MessageId也相同的消息
2.投递时消息重复。消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断,为了保证消息至少被消费一次,消息队列RocketMQ的服务端将在网络恢复后再次投递之前已被处理过的消息,消费者后续会受到两条内容相同并且MessageId也相同的消息
消息幂等的必要性。在互联网应用种,由器在网络不稳定的情况下,消息队列RocketMQ的消息有可能会出现重复,这个重复简单可以概括为以下情况:
处理方式
消息幂等
RocketMQ高性能背后的核心原理
RocketMQ
MQ消息中间件
0 条评论
回复 删除
下一页