为什么要使用
随着业务体量扩大,微服务的设计思想,分布式的部署,拆分了很多的服务,很多场景下单机的技术栈和中间件不够使用,需要引入消息中间件
异步
将可以并行或者不需要实时同步执行的业务流程,进行异步化的改造
解耦
将不同的业务流程解耦出来,作为单独的业务模块来处理,互相不影响
削峰
大量请求流量时,将请求放到队列里,排队处理,减轻系统压力
问题
重复消费
服务的网络抖动,开发人员代码Bug,还有数据问题等都可能处理失败要求重发
接口幂等
幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中
在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同
幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变
例如,“setTrue()”函数就是一个幂等函数,无论多次执行,其结果都是一样的.更复杂的操作幂等保证是利用唯一交易号(流水号)实现
顺序消费
场景
同个业务场景下不同几个操作的消息同时过去
对同一账号的不同类型操作
RocketMQ
MessageQueueSelector
SelectMessageQueueByHash
Hash取模法
同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息
队列机制,可以保证存储满足FIFO
RocketMQ仅保证顺序发送,顺序消费由消费者业务保
SelectMessageQueueByMachineRoom
SelectMessageQueueByRandom
RocketMQ的topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可
RocketMQ仅保证顺序发送,顺序消费由消费者业务保证
消息丢失
生产者丢失数据
主流的MQ都有确认机制或者事务机制,可以保证生产者将消息送达到MQ。 比如RabbitMQ就有事务模式和confirm模式
MQ丢失数据
一般只要开启MQ的持久化磁盘配置就能解决这个问题,写入了磁盘就放心了
消费者丢失数据
消费者丢数据一般是因为采用了自动确认消息模式。MQ收到确认消息后会删除消息,如果这时消费者异常了,那消息就没了。改用手动确认就能解决这个问题
消息堆积
生产端
一般当生产端发生积压(Broker正常的情况下)就要查看你的业务逻辑是否有异常的耗时步骤导致的。是否需要改并行化操作等
Broker端
当Broker端发生积压我们首先要查看,消息队列内存使用情况,如果有分区的的话还得看每个分区积压的消息数量差异。当每个分区的消息积压数据量相对均匀的话,我们大致可以认为是流量激增。需要在消费端做优化,或者同时需要增加Broker节点(相当于存储扩容),如果分区加压消息数量差异很大的话(有的队列满了,有的队列可能还是空闲状态),我们这时候就要检查我们的路由转发规则是否合理
消费端
在使用消息队列的时候大部分的问题都出在消费端,当消费速度小于生产速度很快就会出现积压,导致消息延迟,以至于丢失
增加消费者,增加Broker上的分区数量
分布式事务
事务
事务就是一系列操作,要么同时成功,要么同时失败, ACID 特性(原子性、一致性、隔离性、持久性)
分类
2pc(两段式提交)
分支主题
通过消息中间件协调多个系统,在两个系统操作事务的时候都锁定资源但是不提交事务,等两者都准备好了,告诉消息中间件,然后再分别提交事务
3pc(三段式提交)
TCC(Try、Confirm、Cancel)
最大努力通知
XA
本地消息表(ebay研发出的)
半消息/最终一致性(RocketMQ)
最终一致性
分支主题
业务主动方本地事务提交失败,业务被动方不会收到消息的投递
只要业务主动方本地事务执行成功,那么消息服务一定会投递消息给下游的业务被动方,并最终保证业务被动方一定能成功消费该消息(消费成功或失败,即最终一定会有一个最终态)
Half Message(半消息)
是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer对消息的二次确认后,Consumer才能去消费它
rocketmq消息回查
由于网络闪断,生产者应用重启等原因。导致 Producer 端一直没有对 Half Message(半消息) 进行 二次确认。这是Brock服务器会定时扫描长期处于半消息的消息,会主动询问 Producer端 该消息的最终状态(Commit或者Rollback),该消息即为 消息回查
分支主题
A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。
当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
如果本地事务成功,那么Producer向Brock服务器发送Commit,这样B服务就可以消费该message。
如果本地事务失败,那么Producer向Brock服务器发送Rollback,那么就会直接删除上面这条半消息。
如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查
rocketmq消息过滤
Broker端消息过滤
在Broker中,按照Consumer的要求做过滤,优点是减少了对于Consumer无用消息的网络传输。缺点是增加了Broker的负担,实现相对复杂
Consumer端消息过滤
这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到Consumer端
RocketMQ常见问题
RocketMQ优点
单机吞吐量:十万级
可用性:非常高,分布式架构
消息可靠性:经过参数优化配置,消息可以做到0丢失
功能支持:MQ功能较为完善,还是分布式的,扩展性好
支持10亿级别的消息堆积,不会因为堆积导致性能下降
源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
RoketMQ在稳定性上可能更值得信赖,在现有使用的公司里
RocketMQ缺点
支持的客户端语言不多,目前是java及c++,其中c++不成熟
社区活跃度不是特别活跃那种
没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
消息去重
幂等性
就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用,数据库的结果都是唯一的,不可变的,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样,需要业务端来实现
去重策略
保证每条消息都有唯一编号(比如唯一流水号),且保证消息处理成功与去重表的日志同时出现
建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息
消息重复
QoS:Quality of Service,服务质量
最多一次(At most once)
至少一次(At least once)
仅一次( Exactly once)
网络原因闪断,ACK返回失败等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者
不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,Kafka实际上有个offset的概念
消息的可用性
对消息的刷盘提供了同步和异步的策略,选择同步刷盘之后,如果刷盘超时会给返回FLUSH_DISK_TIMEOUT,如果是异步刷盘不会返回刷盘相关信息
主从同步提供了同步和异步两种模式来进行复制,当然选择同步可以提升可用性,但是消息的发送RT时间会下降10%左右
采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。
而Kafka采用的是独立型的存储结构,每个队列一个文件
缺点在于,会存在较多的随机读操作,因此读的效率偏低。同时消费消息需要依赖ConsumeQueue,构建该逻辑消费队列需要一定开销
RocketMQ 刷盘实现
Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中
刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成,异步而言,只是唤醒对应的线程,不保证执行的时机
分支主题
顺序消息
生产者消费者一般需要保证顺序消息的话,可能就是一个业务场景下的,比如订单的创建、支付、发货、收货
一个topic下有多个队列,为了保证发送有序,RocketMQ提供了MessageQueueSelector队列选择机制
MessageQueueSelector
SelectMessageQueueByHash
Hash取模法
同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息
队列机制,可以保证存储满足FIFO
RocketMQ仅保证顺序发送,顺序消费由消费者业务保
SelectMessageQueueByMachineRoom
SelectMessageQueueByRandom
RocketMQ的topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可
RocketMQ仅保证顺序发送,顺序消费由消费者业务保证
Broker的Buffer问题
Broker的Buffer通常指的是Broker中一个队列的内存Buffer大小,这类Buffer通常大小有限
RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据定期清除
RocketMQ同其他MQ有非常显著的区别,RocketMQ的内存Buffer抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker会定期删除过期的数据
回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上的需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度
例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度
RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯
消息堆积
消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况
消息堆积在内存Buffer,一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限
消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力
评估消息堆积能力主要有以下四点
消息能堆积多少条,多少字节?即消息的堆积容量。
消息堆积后,发消息的吞吐量大小,是否会受堆积影响?
消息堆积后,正常消费的Consumer是否会受影响?
消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?
定时消息
定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费
如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销
RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1ms 等
RocketMQ
核心模块
rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息
rocketmq-client:提供发送、接受消息的客户端API
rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息
rocketmq-common:通用的一些类,方法,数据结构等
rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议
rocketmq-store:消息、索引存储等
rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!(一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件)
rocketmq-tools:命令行工具
架构组成
NameServer
一个功能齐全的服务器,其角色类似Dubbo中的Zookeeper,但NameServer与Zookeeper相比更轻量。主要是因为每个NameServer节点互相之间是独立的,没有任何信息交
压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据
Broker向NameServer发心跳时, 会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话, 网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败
被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群
每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息
Broker
消息中转角色,负责存储消息,转发消息
Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,底层的通信和连接都是基于Netty实现的
Broker负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型
Producer
由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败
同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信
异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务
单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集
Consumer
由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制
Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型
Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息
分支主题
集群部署,吞吐量大,高可用
可以支持多master 模式、多master多slave异步复制模式、多 master多slave同步双写模式
消息领域模型
分支主题
Message
一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址
一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题
Topic
Topic(主题)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic
Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息
一个 Topic 也可以被 0个、1个、多个消费者订阅
Tag
Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag
Group
分组,一个组可以订阅多个Topic
分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的
Queue
在Kafka中叫Partition,每个Queue内部是有序的,在RocketMQ中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题
Message Queue
主题被划分为一个或多个子主题,即消息队列
一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去
消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力
Offset
在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限,也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标
消息消费模式
Clustering(集群消费)
默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费
Broadcasting(广播消费)
广播消费消息会发给消费者组中的每一个消费者进行消费
Message Order(消息顺序)
Orderly(顺序消费)
顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列
Concurrently(并行消费)
并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制
一次完整的通信流程
Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳
Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息
分支主题
NameSpace启动流程
第一步是初始化配置
创建NamesrvController实例,并开启两个定时任务:
每隔10s扫描一次Broker,移除处于不激活的Broker;
每隔10s打印一次KV配置。
第三步注册钩子函数,启动服务器并监听Broker
Broker
Broker在RocketMQ中是进行处理Producer发送消息请求,Consumer消费消息的请求,并且进行消息的持久化,以及HA策略和服务端过滤,就是集群中很重的工作都是交给了Broker进行处理
Broker模块是通过BrokerStartup进行启动的,会实例化BrokerController,并且调用其初始化方法
Consumer
分支主题
通过RebalanceService线程接收消息,10秒钟做一次基于Topic下的所有队列负载
分支主题