含义
是一个<font color="#e74f4c">队列模型的消息中间件</font>,具有<font color="#e74f4c">高性能、高可靠、高实时、分布式</font>特点;
消费形式
<font color="#e74f4c"><b>拉取式消费(Pull Consumer)</b></font>
Consumer消费的<font color="#e74f4c">一种类型</font>,应用通常<font color="#e74f4c">主动调用Consumer的拉消息方法</font>从Broker服务器拉消息、主动权由<font color="#e74f4c">应用控制</font>。<br>一旦获取了批量消息,应用就会启动消费过程。<br>
<font color="#e74f4c"><b>推动式消费(Push Consumer)</b></font>
该模式下Broker收到数据后会<font color="#e74f4c">主动推送给消费端</font>,该消费模式一般<font color="#e74f4c">实时性较高。</font>
特性
<font color="#e74f4c"><b>订阅与发布</b></font>
消息的发布是指某个生产者向<font color="#e74f4c">某个topic发送消息</font>;<br>消息的订阅是指某个消费者<font color="#e74f4c">关注了某个topic</font>中带有<font color="#e74f4c">某些tag的消息</font>,进而从该topic消费数据<br>
<font color="#e74f4c"><b>消息顺序</b></font>
消息有序指的是一类消息消费时,能按照<font color="#e74f4c">发送的顺序来消费</font>。
全局顺序
对于指定的一个Topic,所有消息按照严格的<font color="#e74f4c">先入先出(FIFO)</font>的顺序进行发布和消费。
分区顺序
对于指定的一个Topic,所有消息根据 <font color="#e74f4c">sharding key</font> 进行区块分区。<br><font color="#e74f4c">同一个分区</font>内的消息按照严格的FIFO顺序进行发布和消费。<br>
消息过滤
RocketMQ的消费者可以<font color="#e74f4c">根据Tag</font>进行消息过滤,也支持自定义属性过滤
消息可靠性
RocketMQ支持消息的高可靠。RocketMQ从3.0版本开始支持<font color="#e74f4c">同步双写</font>。<br>
至少一次
至少一次(At least Once)指每个消息必须投递一次<br>
<font color="#e74f4c">回溯消费</font>
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要<font color="#e74f4c">重新消费。<br><br></font>RocketMQ支持<font color="#e74f4c">按照时间</font>回溯消费,时间维度精确到毫秒。<br>
<font color="#e74f4c"><b>事务消息</b></font>
指<font color="#e74f4c">应用本地事务和发送消息操作</font>可以被定义到<font color="#e74f4c">全局事务</font>中,要么同时成功,要么同时失败。<br><br>RocketMQ的事务消息提供类似<font color="#e74f4c">X/Open XA</font>的分布事务功能,通过事务消息能达到分布式事务的最终一致<br>
<font color="#e74f4c"><b>定时消息</b></font>
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待<font color="#e74f4c">特定时间</font>投递给真正的topic。<br>broker有配置项<font color="#e74f4c">messageDelayLevel,有18个level,</font>msg.setDelayLevel(level)。<br><br>定时消息会暂存在名为<font color="#e74f4c">SCHEDULE_TOPIC_XXXX</font>的topic中,并<font color="#e74f4c">根据delayTimeLevel</font>存入特定的queue,queueId = delayTimeLevel – 1,<br>即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。<br>
<font color="#e74f4c"><b>消息重试</b></font>
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。<br><br>RocketMQ会为每个消费组都设置一个Topic名称为“<font color="#e74f4c">%RETRY%+consumerGroup</font>”的重试队列<br>(这里需要注意的是,这个Topic的重试队列是<font color="#e74f4c">针对消费组</font>,而不是针对每个Topic设置的)<br>
<b><font color="#e74f4c">消息重投</font></b><br>
生产者在发送消息时,<font color="#e74f4c">同步消息失败会重投</font>,<font color="#e74f4c">异步消息有重试</font>
设置消息重试策略
retryTimesWhenSendFailed:<font color="#e74f4c">同步发送失败重投次数</font>,默认为2,<br>不会选择上次失败的broker,尝试向<font color="#e74f4c">其他broker</font>发送,最大程度保证消息不丢。<br>
retryTimesWhenSendAsyncFailed:异<font color="#e74f4c">步发送失败重试次数</font>,异步重试<font color="#e74f4c">不会选择其他broker</font>,<br>仅在同一个broker上做重试,不保证消息不丢<br>
retryAnotherBrokerWhenNotStoreOK:<font color="#e74f4c">消息刷盘(主或备)超时或slave不可用</font>(返回状态非SEND_OK),<br>是否尝试发送到其他broker,<font color="#e74f4c">默认false</font>。十分重要消息可以开启。<br>
<b><font color="#e74f4c">流量控制</font></b>
生产者流控<br>
broker<font color="#e74f4c">通过拒绝send 请求方式</font>实现流量控制
broker每隔<font color="#e74f4c">10ms</font>检查<font color="#e74f4c">send请求队列头部请求的等待时间</font>,默认200ms, 超过,则拒绝当前send请求,返回流控<br>
<font color="#e74f4c">commitLog文件被锁时间</font>超过osPageCacheBusyTimeOutMills时,参数<font color="#e74f4c">默认为1000ms</font>,返回流控
transientStorePoolEnable == true 并且broker为<font color="#e74f4c">异步刷盘的主机</font>,且<font color="#e74f4c">transientStorePool中资源不足</font>,拒绝当前send请求,返回流控
消费者流控
消费者流控的结果是降低拉取频率<br>
消费者本地缓存<font color="#ec7270">消息数超过,</font>默认1000<br>
消费者本地缓存<font color="#e74f4c">消息大小</font>超过,默认100MB<br>
消费者本地缓存<font color="#e74f4c">消息跨度</font>超过,默认2000<br>
死信队列
死信队列用于处理无法被正常消费的消息
在RocketMQ中,可以通过使用<font color="#e74f4c">console控制台</font>对死信队列中的消息进行重发来使得<font color="#e74f4c">消费者实例再次进行消费</font>。
消息刷盘
<font color="#e74f4c">同步刷盘</font><br>
只有在消息<font color="#e74f4c">真正持久化至磁盘</font>后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。
<font color="#e74f4c">异步刷盘</font><br>
能够充分利用OS的<font color="#e74f4c">PageCache</font>的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端
消息刷盘采用<font color="#e74f4c">后台异步线程</font>提交的方式进行,<font color="#e74f4c">降低了读写延迟</font>,<font color="#e74f4c">提高了MQ的性能和吞吐量</font>。
集群模式
<font color="#e74f4c">单Master模式</font>
<font color="#e74f4c">多Master模式</font><br>
<font color="#e74f4c">多Master多Slave模式(异步)</font>
每个Master配置一个Slave,有多对Master-Slave,<font color="#e74f4c">HA采用异步复制方式</font>,主备有短暂消息延迟(毫秒级)
<font color="#e74f4c">多Master多Slave模式(同步)</font>
每个Master配置一个Slave,有多对Master-Slave,<font color="#e74f4c">HA采用同步双写方式</font>
<font color="#e74f4c">DLedger 集群模式</font>
RocketMQ-on-DLedger Group 是指一<font color="#e74f4c">组相同名称的 Broke</font>r,至少需要 3 个节点,通过 <font color="#e74f4c">Raft</font> 自动选举出一个 Leader,<br>其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。<br>
RocketMQ-on-DLedger Group 能<font color="#e74f4c">自动容灾切换</font>,并保证数据一致
天然弊端
RocketMQ 采用一个 consumer 绑定一个或者多个 Queue 模式,<br>假如某个消费者服务器挂了,则会造成<font color="#e74f4c">部分Queue消息堆积</font><br>
基本概念
消息模型(Message Model)
RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责<font color="#e74f4c">存储消息</font>。<br>Broker在实际部署过程中<font color="#e74f4c">对应一台服务器</font>,每个Broker可以<font color="#e74f4c">存储多个Topic的消息</font>,每个Topic的消息也可以<font color="#e74f4c">分片存储于不同的Broker</font>。<br>MessageQueue用于<font color="#e74f4c">存储消息的物理地址</font>,每个<font color="#e74f4c">Topic</font>中的消息地址<font color="#e74f4c">存储于多个MessageQueue</font>中。ConsumerGroup由多个Consumer实例构成。<br>
消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发<font color="#e74f4c">送到broker服务器</font>。
消息消费者(Consumer)
负责消费消息,一般是后台系统负责<font color="#e74f4c">异步消费</font>。一个消息消费者会<font color="#e74f4c">从Broker服务器拉取消息</font>、并将其提供给应用程序。
主题(Topic)
表示<font color="#e74f4c">一类消息的集合</font>,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行<font color="#e74f4c">消息订阅的基本单位。</font>
代理服务器(Broker Server)
<font color="#e74f4c">消息中转</font>角色,负责<font color="#e74f4c">存储消息、转发消息</font>。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、<br>同时为消费者的拉取请求作准备。代理服务器也存储消息<font color="#e74f4c">相关的元数据</font>,包括<font color="#e74f4c">消费者组、消费进度偏移、主题和队列消息</font>等。<br>
名字服务(Name Server)
名字服务<font color="#e74f4c">充当路由消息</font>的提供者。生产者或消费者能够<font color="#e74f4c">通过名字服务</font>查找<font color="#e74f4c">各主题相应的BrokerIP列表</font>。<br>多个Namesrv实例组成集群,但<font color="#e74f4c">相互独立</font>,没有信息交换。<br>
生产者组(Producer Group)
<font color="#e74f4c">同一类Producer的集合</font>,这类Producer发送<font color="#e74f4c">同一类消息</font>且<font color="#e74f4c">发送逻辑一致</font>。如果发送的是事务消息且原始生产者在<font color="#e74f4c">发送之后崩溃</font>,<br>则Broker服务器会<font color="#e74f4c">联系同一生产者组</font>的其他生产者实例以提交或回溯消费。<br>
消费者组(Consumer Group)
<font color="#e74f4c">同一类Consumer的集合</font>,这类Consumer通常消费同一类消息且消费逻辑一致。<br>消费者组使得在消息消费方面,实现<font color="#e74f4c">负载均衡和容错</font>的目标变得非常容易。<br><br>要注意的是,消费者组的消费者实例必须订阅<font color="#e74f4c">完全相同的Topic</font>。<br>
普通顺序消息(Normal Ordered Message)
消费者通过同一个消息队列(Topic分区,称作Message Queue)收到的消息是<font color="#e74f4c">有顺序的</font>,<br>不同消息队列收到的消息则可能是无顺序的。<br>
严格顺序消息(Strictly Ordered Message)
消费者收到的<font color="#e74f4c">所有消息</font>均是<font color="#e74f4c">有顺序的</font><br>
消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,<font color="#e74f4c">每条消息必须属于一个主题</font>。
RocketMQ中每个消息拥有<font color="#e74f4c">唯一的Message ID</font>,且可以携带具有<font color="#e74f4c">业务标识的Key。<br></font>系统提供了通过Message ID和Key查询消息的功能。<br>
标签(Tag)
为<font color="#e74f4c">消息设置的标志</font>,用于<font color="#e74f4c">同一主题</font>下<font color="#e74f4c">区分不同类型</font>的消息。<br>
消息发送方式
同步发送
异步发送
顺序发送
单向发送
注意:同步和异步方式均<font color="#e74f4c">需要Broker返回确认信息</font>,<font color="#e74f4c">单向发送</font>不需要
消费模式
<b><font color="#e74f4c">集群消费(Clustering)</font></b><br>
集群消费模式下,相同Consumer Group的每个Consumer实例<font color="#e74f4c">平均分摊消息</font>。
<b><font color="#e74f4c">广播消费(Broadcasting)</font></b>
广播消费模式下,相同Consumer Group的每个Consumer实例都<font color="#e74f4c">接收全量的消息</font>
消息存储整体架构
<font color="#e74f4c">CommitLog</font><br>
<font color="#e74f4c">消息主体以及元数据</font>的存储主体,存储Producer端<font color="#e74f4c">写入的消息主体内容</font>,消息内容<font color="#e74f4c">不是定长的</font>。<br>消息主要是<font color="#e74f4c">顺序写入</font>日志文件,当文件满了,写入下一个文件<br>
<font color="#e74f4c">ConsumeQueue</font>
消息消费队列,引入的目的主要是<font color="#e74f4c">提高消息消费的性能</font>,<br>
ConsumeQueue(逻辑消费队列)作为<font color="#e74f4c">消费消息的索引</font>,保存了指定Topic下的队列消息在<font color="#e74f4c">CommitLog中的起始物理偏移量offset</font>,<br><font color="#e74f4c">消息大小size</font>和<font color="#e74f4c">消息Tag的HashCode值</font>。consumequeue文件采取<font color="#e74f4c">定长设计</font>,每一个条目共<font color="#e74f4c">20</font>个字节<br>
<font color="#e74f4c">IndexFile</font>
IndexFile(索引文件)提供了一种可以通过<font color="#e74f4c">key或时间区间</font>来查询消息的方法
IndexFile的底层存储设计为在文件系统中实现<font color="#e74f4c">HashMap结构</font>,故rocketmq的索引文件其底层实现为<font color="#e74f4c">hash索引</font>。<br>
架构图
说明
RocketMQ采用的是<font color="#e74f4c">混合型的存储结构</font>,针对Producer和Consumer分别采用了<font color="#e74f4c">数据和索引部分</font>相分离的存储结构
1、Producer发送消息至Broker端,然后Broker端使用<font color="#e74f4c">同步或者异步的方式对消息刷盘持久化</font>,保存至CommitLog中
<font color="#e74f4c">集群工作流程</font><br>
<font color="#e74f4c">流程说明</font>
1、启动NameServer,<font color="#e74f4c">NameServer起来后监听端口</font>,等待Broker、Producer、Consumer连上来,相当于一个<font color="#e74f4c">路由控制中心</font>。
2、<font color="#e74f4c">Broker启动</font>,跟<font color="#e74f4c">所有的NameServer保持长连接</font>,<font color="#e74f4c">定时发送心跳包</font>。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。<br>注册成功后,NameServer集群中就有<font color="#e74f4c">Topic跟Broker的映射关系</font>。<br>
3、发送消息前,<font color="#e74f4c">先创建Topic</font>,创建Topic时需要<font color="#e74f4c">指定该Topic要存储在哪些Broke</font>r上,也可以在发送消息时自动创建Topic。
4、Producer发送消息,启动时先跟NameServer集群中的<font color="#e74f4c">其中一台建立长连接</font>,并从NameServer中<font color="#e74f4c">获取当前发送的Topic存在哪些Broker</font>上,<br><font color="#e74f4c">轮询从队列列表中选择一个队列</font>,然后与队列所在的Broker建立长连接从而向Broker发消息。<br>
5、Consumer跟Producer类似,跟<font color="#e74f4c">其中一台NameServer建立长连接</font>,获取当前订阅Topic存在哪些Broker上,<br>然后直接跟Broker建立连接通道,开始消费消息。<br>
问题
可以一直增加客户端的数量提升消费能力吗?<br>
当然不可以,因为 Queue 数量有限,客户端数量一旦达到 Queue 数量,<br>再扩容新节点无法提升消费能力,因为会有节点分配不到 Queue 而无法消费。<br>
comsumer 在启动时会和comsumer queue绑定,这个绑定策略是咋样的?
默认策略<br>
queue 个数<font color="#e74f4c">大于 Consumer个数</font>, 那么 Consumer 会<font color="#e74f4c">平均分配 queue</font>,不够平均,会<font color="#e74f4c">根据clientId排序</font>来拿<font color="#e74f4c">取余数</font><br>
queue个数<font color="#e74f4c">小于Consumer个数</font>,那么会有<font color="#e74f4c">Consumer闲置</font>,就是浪费掉了,<font color="#e74f4c">其余Consumer平均分配到queue</font>
一致性hash算法
就近原则,离的近的消费
每个消费者<font color="#e74f4c">依次消费</font>一个queue,<font color="#e74f4c">环状</font>
自定义方式