消息中间件(MQ)
2021-03-26 16:03:12 161 举报
AI智能生成
消息中间件(MQ)是一种基于异步通信、分布式的消息传递架构,用于在分布式系统中实现应用程序之间的解耦、缓冲和削峰填谷。它通过将消息发送者与接收者分离,使得系统具有更高的可扩展性、可靠性和灵活性。消息中间件通常采用发布-订阅模式,支持多种消息传输协议,如AMQP、MQTT和STOMP等。常见的消息中间件有RabbitMQ、Kafka、ActiveMQ和RocketMQ等。
作者其他创作
大纲/内容
activeMq
吞吐量低<br>
订阅形式
点对点(p2p)
广播(发布-订阅)
现状
没有经过大规模吞吐量场景验证,社区不活跃<br>
基于内存的队列,极其成熟
rocketMq
概述<br>
比rabbitMq高<br>
订阅形式
基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模
现状
社区活跃度不算太高<br>
国内相对活跃
要点
优先级队列<br>Message Priority<br>
没有实现优先级队列,但可以<b>通过定义高优先级队列和低优先级队列的方式来实现</b>
顺序队列<br>message order
基本概念
生产者消息推送方式<br>
同步发送
异步发送
顺序发送
普通顺序消息
消费者通过同一个消费队列收到的消息是有顺序的,<br>不同消息队列收到的消息则可能是无顺序的。<br>
严格顺序消息
消费者收到的所有消息均是有顺序的
单向发送
消费者消费消息两种方式
pull<br>
客户端不断的轮询请求服务端,来获取新的消息。
push<br>(默认)
只要有数据Broker就会一直推,不关注消费端是否能够处理<br>
客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端。
非真正意义推送,基于长轮询实现
两种消费模式<br>
集群消费
相同Consumer Group的每个Consumer实例<b>平均分摊</b>消息(只有一个能消费)
广播消费<br>
相同Consumer Group的每个Consumer实例都接收<b>全量</b>的消息
主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
名字服务器(name server)<br>
充当路由消息的提供者。生产者或消费者能够通过Nameserver查找各主题相应的Broker IP列表。<br>多个Namesrver实例组成<font color="#f15a23"><b><u>集群,但相互独立,没有信息交换</u></b>。</font><br>
消息(Message)
每条消息必须属于一个主题。<br>RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。<br>系统提供了通过Message ID和Key查询消息的功能。<br>
标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。<br>来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。<br>标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。<br>消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。<br>
特性<br>
消息顺序
全局顺序(严格顺序)<br>
指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用场景
性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
分区顺序(普通顺序)
指定的一个 Topic,所有消息根据 sharding key 进行区块分区。<br> 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。<br><b><font color="#f15a23">Sharding key 是顺序消息中用来区分不同分区的关键字段</font></b>,和普通消息的 Key 是完全不同的概念。 <br>
适用场景
性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
消息过滤
消费者可以根据<u><b>Tag进行消息过滤,也支持自定义属性过滤</b></u>。<br>消息过滤目前是<b><font color="#f15a23">在Broker端实现</font></b>的,<br><b>优点</b>:减少了对于Consumer无用消息的网络传输;<br><b>缺点</b>:增加了Broker的负担、而且实现相对复杂。<br>
消息可靠性
影响可靠性原因<br>
1.Broker非正常关闭<br>2.Broker异常Crash<br>3.OS Crash<br>4.机器掉电,但是能立即恢复供电情况<br>5.机器无法开机(可能是cpu、主板、内存等关键设备损坏)<br>6.磁盘设备损坏<br>
对策
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,<br>RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(<b><font color="#f68b1f">依赖刷盘方式是同步还是异步</font></b>)。<br>
5),6)通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。<br>通过<font color="#f15a23"><b>同步双写技术</b></font>可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合<br>
回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,<br>要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。<br>
RocketMQ<b><u><font color="#f1753f">支持按照时间回溯消费,时间维度精确到毫秒</font></u></b>。
事务消息
基于“半消息“实现
基于<font color="#f15a23"><b>回查确认</b></font>的方式保证故障情况下的事物有一致性
如生产者发送“半消息”之后,发生异常或者本地事物失败等情况,一直没有确认“半消息”,<br>则rocketmq等待一定时间后会调用生产者接口回查数据已确认事物是否已经完成,从而做到事物一致性<br>
延迟队列
消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic
<b>broker有配置项messageDelayLevel</b>,<br>默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h <b>2h</b>”,<b><font color="#f68b1f">18个level</font></b>。<br>可以配置自定义messageDelayLevel<br>
<font color="#c41230" style=""><b style="">messageDelayLevel是broker的属性,不属于某个topic。</b></font><br>发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:<br><ol><li>l<u>evel == 0,消息为非延迟消息</u></li><li><u>1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s</u></li><li><u>level > maxLevel,则level== maxLevel,例如level==20,延迟2h</u></li></ol>
延迟消息会暂存在名为<font color="#f15a23"><b>SCHEDULE_TOPIC_XXXX的topic中</b></font>,并根据delayTimeLevel存入<font color="#f15a23">特定的queue,queueId = delayTimeLevel – 1</font>,即<u><b><font color="#f68b1f">一个queue只存相同延迟的消息</font>,<font color="#f68b1f">保证具有相同发送延迟的消息能够顺序消费</font></b></u>。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高
RocketMQ支持定时消息,但是<u><font color="#f15a23"><b>不支持任意时间精度,支持特定的level(因为相同延时的消息放到同一个队列,如果用户自定义将产生大量队列)</b></font>,例如定时5s,10s,1m等</u>
消息重试<br>(消费者消费失败)<br>
Consumer消费消息<b><font color="#f15a23">失败后</font></b>,要提供一种<font color="#f15a23">重试</font>机制,令消息再消费一次
RocketMQ<u>会为<b><font color="#c41230">每个消费组</font></b>都设置一个Topic名称为“<font color="#f15a23"><b>%RETRY%+consumerGroup</b></font>”的重试队列</u>(这里需要注意的是,这个Topic的<u><b><font color="#c41230">重试队列是针对消费组</font>,而不是针对每个Topic设置的</b></u>),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个<b>重试级别</b>,每个重试级别都有与之对应的重新投递延时,<font color="#f15a23">重试次数越多投递延时就越大</font>。RocketMQ对于重试消息的处理是先保存至Topic名称为“<font color="#f68b1f"><u><b>SCHEDULE_TOPIC_XXXX</b></u></font>”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
消息重投<br>(生产者投递消息失败)<br>
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证
消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题
消息重投策略
同步发送<br>retryTimesWhenSendFailed<br>
同步发送失败重投次数,<b><font color="#f15a23">默认为2</font></b>,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。<br>不会选择上次失败的broker,<b><font color="#f15a23">尝试向其他broker发送</font></b>,最大程度保证消息不丢。<br><b>超过重投次数,抛出异常</b>,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。<br>
同步双写<br>retryAnotherBrokerWhenNotStoreOK<br>
消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,<br>默认false。十分重要消息可以开启。<br>
异步发送<br>retryTimesWhenSendAsyncFailed<br>
异步发送失败重试次数,异步重试不会选择其他broker,<b><font color="#f15a23">仅在同一个broker上做重试</font></b>,不保证消息不丢。
流量控制
生产者流控
1.<u>commitLog文件被锁时间超过</u>osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。<br>2.如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且t<u>ransientStorePool中资源不足</u>,拒绝当前send请求,返回流控。<br>3.broker每隔10ms检查<u>send请求队列头部请求的等待时间</u>,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。<br>4.<u>broker通过拒绝send 请求方式实现流量控制。</u>
注意
<u><b>生产者流控,不会尝试消息重投。</b>需要自己实现逻辑</u>
消费者流控
1.<b>超数</b>-消费者本地<u>缓存消息数超过</u>pullThresholdForQueue时,默认1000。<br>2.<b>超量</b>-消费者本<u>地缓存消息大小</u>超过pullThresholdSizeForQueue时,默认100MB。<br>3.<b>超时</b>-消费者本地<u>缓存消息跨度超过</u>consumeConcurrentlyMaxSpan时,默认2000。
消费者流控的结果是<u><b><font color="#f68b1f">降低拉取频率</font>。</b></u>
死信队列<br>
用于处理无法被正常消费的消息。<u><b><font color="#f15a23">达到最大重试次数后,若消费依然失败</font></b></u>,<br>则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是<u>将其发送到该消费者对应的特殊队列中</u>。<br>
可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
架构
Producer
与NameServer集群中的<b>其中一个节点(随机选择)建立<font color="#f68b1f">长连接</font></b>,<font color="#f15a23"><b>定期</b></font>从NameServer获取Topic路由信息,并<b>向提供Topic 服务的Master建立长连接</b>,且定时向Master发送心跳(<font color="#f68b1f">默认30秒</font>)。Producer完全无状态
启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,<u><b><font color="#f15a23">轮询</font>从队列列表中选择(可设置策略)一个队列,然后与队列所在的Broker建立<font color="#f68b1f">长连接</font>从而向Broker发消息。</b></u>
Consumer
与NameServer集群中的<b>其中一个节点(随机选择)建立长连接</b>,<font color="#f15a23"><b>定期</b></font>从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立<font color="#f68b1f">长连接</font>,且定时向Master、Slave发送心跳(<font color="#f68b1f">默认30秒</font>)。<br>Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及<font color="#f15a23">从服务器是否可读等因素</font><font color="#0097a7">建</font><font color="#1976d2">议下一次是从Master还是Slave拉取</font><br>
获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道
NameServer<br>
Topic路由注册中心,支持Broker的动态注册与发现
通常也是集群的方式部署,<font color="#f15a23"><u><b>各实例间相互不进行信息通讯</b></u></font>。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当<u>某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息</u>,Producer,Consumer仍然可以动态感知Broker的路由的信息。
功能
Broker管理
接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供<b>心跳检测机制</b>,检查Broker是否还存活
路由信息管理
每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。<br>然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费<br>
<span style="color: rgb(51, 51, 51); font-family: Helvetica, Arial, freesans, sans-serif; font-size: 16px; font-style: normal; font-variant-caps: normal; font-weight: normal; letter-spacing: normal; orphans: auto; text-align: left; text-indent: 0px; text-transform: none; white-space: normal; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); display: inline !important; float: none;">BrokerServer</span>
主要负责消息的存储、投递和查询以及服务高可用保证
子模块
Remoting Module
整个Broker的实体,负责处理来自clients端的请求
Client Manager
负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
Store Service
提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
HA Service
高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
Index Service
根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
集群部署
Master与Slave 的对应关系通过<b><font color="#f15a23">指定相同的BrokerName,不同的BrokerId </font></b>来定义,<font color="#f15a23"><u><b>BrokerId为0表示Master,非0表示Slave</b></u></font>;<br><b><font color="#c41230">Master也可以部署多个。</font></b><br>每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer<br>
目前在部署架构上支持一Master多Slave,但<font color="#f15a23"><b>只有BrokerId=1的从服务器才会参与消息的读负载</b></font>
架构设计
存储架构
文件系统
CommitLog
消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容;<br>消息内容不是定长的<br>
单个文件大小默认<font color="#f15a23">1G </font>,<u>文件名长度为<font color="#ff0000">20</font>位,左边补零,剩余为<font color="#ff0000">起始偏移量</font></u>
比如<br>00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;<br>当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推<br>
消息主要是<font color="#f15a23">顺序写入日志文件</font>,当文件满了,写入下一个文件
ConsumeQueue
作为消费消息的索引,保存了指定<b>Topic下的队列消息在<font color="#ff0000">CommitLog中的起始物理偏移量offset</font></b>,<font color="#ff0000">消<b>息大小size</b></font>和<b><font color="#ff0000">消息Tag的HashCode值</font>;</b><br>consumequeue文件可以看成是基于topic的commitlog索引文件<br>
consumequeue文件夹的组织方式如下:<font color="#f15a23">topic/queue/file三层结构</font>,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
文件采取定长设计,<b><font color="#f15a23">每一个条目共</font><font color="#ff0000">20</font><font color="#f15a23">个字节</font></b>,分别为<font color="#f15a23">8字节的commitlog物理偏移量</font>、<font color="#f15a23">4字节的消息长度</font>、<font color="#f15a23">8字节tag hashcode,</font>单个文件由<b><font color="#f15a23">30W个条目</font></b>组成,可以像数组一样<b>随机访问</b>每一个条目,每个ConsumeQueue文件大小约5.72M
Index File<br>
提供了一种可以<b><font color="#f68b1f">通过key或时间区间</font></b>来查询消息的方法
存储位置:$HOME \store\index${fileName}
文件名fileName:以创建时的时间戳命名的
单个文件大小约为400M,可以保存 2000W个索引
底层存储设计为在文件系统中实现<b><font color="#f68b1f">HashMap结构</font></b>,故底层实现为<b><font color="#f68b1f">hash索引</font></b>。
页面缓存与内存映射<br>
页面缓存
<font color="#ff0000">对文件进行顺序读写的速度几乎接近于内存的读写速度;</font><br>主要原因OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache(<b>预读取</b>)<br>
写数据
OS会先写入至Cache内,<br>随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上<br>
读数据
一次读取文件时出现未命中PageCache的情况,<br>OS从物理磁盘上访问读取文件的同时,会顺序<font color="#f15a23"><u><b>对其他相邻块的数据文件进行预读取</b></u></font><br>
ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取
<u><b>CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能</b></u>
优化方向:选择合适的系统IO调度算法
RocketMQ主要通过MappedByteBuffer对文件进行读写操作
利用了NIO中的FileChannel模型将<u><font color="#f57f17">磁盘上的物理文件直接映射到用户态的内存地址中</font></u>(<b>减少内存拷贝</b>)
将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率<br>(正因为需要使用内存映射机制,故RocketMQ的<font color="#f57c00" style="">文件存储都使用定长结构来存储,方便一次将整个文件映射至内存</font>)<br>
消息刷盘
同步刷盘<br>(性能损耗大)<br>
在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应
异步刷盘<br>(极端情况会丢失数据)
能够充分利用OS的PageCache的优势,只要消息写入<b>PageCache</b>即可将成功的ACK返回给Producer端。<br>消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。<br>
通讯机制
概述
(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后<b><font color="#f68b1f">每隔30s</font>时间</b>定时向NameServer上报Topic路由信息。
(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从<b>本地缓存的TopicPublishInfoTable获取路由信息</b>。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔<b><font color="#f68b1f">30s</font></b>向NameServer拉取一次路由信息。
(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块
Reactor多线程设计
消息过滤
Consumer端订阅消息时再做消息过滤的
2种的过滤方式
Tag过滤方式
Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有<font color="#f68b1f">多个TAG,可以用||分隔</font>。<br>其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。<br>Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。<br>Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是<u><b><font color="#c41230">根据hashcode进行判断,无法精确对tag原始字符串进行过滤</font></b></u>,故在消息消费端<font color="#c41230">拉取到消息后,还需要对消息的原始tag字符串进行比对</font>,如果不同,则丢弃该消息,不进行消息消费。<br>
SQL92的过滤方式
和Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由<b>rocketmq-filter模块负责</b>的。<br>每次过滤都去执行SQL表达式会影响效率,<u>所以RocketMQ使用了<font color="#f15a23"><b>BloomFilter</b></font>避免了每次都去执行</u>。SQL92的表达式上下文为消息的属性。<br>
负载均衡
Producer的负载均衡
发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,<br>RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList<b>中选择一个队列(MessageQueue)进行发送消息</b><br>
容错策略均在MQFaultStrategy这个类中定义
有一个sendLatencyFaultEnable开关变量,<br>如果开启,在<u><font color="#ff0000">随机递增取模的基础上,再过滤掉not available的Broker代理</font></u>。<br>所谓的"latencyFaultTolerance",是指<font color="#ff0000">对之前失败的,按一定的时间做退避。</font><br>
默认轮询发送消息到consumer queue上
Consumer的负载均衡
<font color="#f15a23"><b>两种消费模式(Push/Pull)都是基于拉模式来获取消息的</b></font>,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取
<font color="#e65100">每个consumer平均分配consumer queue</font>
所有负载均衡都由客户端完成
集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是<b><font color="#ff0000">一个queue只分给一个consumer实例</font></b>,一个consumer实例可以允许同时分到不同的queue
事物消息
采用2PC模式实现事务,增加一个补偿逻辑来处理二阶段超时或失败的情况<br>
1.提交“半消息”到队列,内部实现为将该消息放到一个<font color="#f15a23"><u><b>特殊的topic</b></u></font>下,使其cumsumer不可见;<br>2.commit的时候直接修改message的topic索引即可;<br>3.r<font color="#f15a23"><b>ollback不是真正的删除message,</b></font>由于顺序读写的方式,无法真正删除,<font color="#f15a23">只是修改message的一个特殊标志位</font>,标识改message作废。<br>
流程
消息发送和提交逻辑<br>
(1) 发送消息(half消息)。<br>(2) 服务端响应消息写入结果。<br>(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。<br>(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
补偿流程<br>(解决消息Commit或者Rollback发生超时或者失败的情况)<br>
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”<br>(2) Producer收到回查消息,检查回查消息对应的本地事务的状态<br>(3) 根据本地事务状态,重新Commit或者Rollback
<font color="#c41230">默认回查<u><b>15次</b></u>,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息</font>
消息查询
按照MessageId查询
MessageId的长度总共有16字节,其中包含了消息存储<font color="#f68b1f"><b>主机地址(IP地址和端口)</b>,消息<b>Commit Log offset</b></font>。
Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送请求;<br>读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回
按照MessageKey查询
基于RocketMQ的<font color="#f15a23">IndexFile索引文件</font>来实现的
过期消息清理<br>
<font color="#f15a23"><b>4.6版本默认48小时后会删除不再使用的CommitLog文件</b></font>
检查这个文件最后访问时间;判断是否大于过期时间指定时间删除;<br><b>默认凌晨4点</b><br>
<font color="#f15a23"><b>未被消费的消息不会存在超时删除这情况。</b></font>
kafka<br>
特性
高吞吐、低延迟
每秒几十万条消息,延迟最低只有几毫秒
可扩展
kafka集群支持热扩展
持久性、可靠性
消息被持久化到本地磁盘,并且支持数据备份防止丢失
<div><span style="font-size: inherit;">容错性</span><br></div>
允许集群中节点失败
高并发
支持数千个客户端同时读写
比rocketMq高<br>
要点
至此以集合未单位进行发送消息,在此基础上,kafka还<font color="#f44336"><b><u>支持对消息集合进行压缩,减少传输的数据量,减少对网络传输的压力</u></b></font>
<font color="#e57373">消费端采用拉取的方式消费</font><br>
顺序性
Kafka只保证一个Partition内的消息的有序性。
现状
大数据领域的实时计算,日志采集的行业标准<br>
消息可靠性
At most once
消息可能丢失,但绝不会重复传输
At least one
消息绝对不会丢失,但可能会重复传输
Exactly once
每条消息肯定会被传输一次且仅传输一次
架构
zookeeper协调控制
管理broker与consumer的动态加入与离开。
触发负载均衡
维护消费关系及每个partition的消费信息
<u><i>Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。记录在zk中</i></u>
broker<br>
controller<br>(Leader)
选举
所有Broker节点再zookeeper上注册一个临时节点,<br>唯一成功的哪个成为leader,称为Broker Controller, 其他的称为Broker follower<br>
当leader Partition分区宕掉之后,由Broker Controller负责在有用分区(ISR)中选择新的Leader<br>
可以动态增加broker
无状态
没有副本机制,不保存订阅者状态,由订阅者自己保存
Topic
Partition
概述
topic中的数据分割为一个或多个partition
每个topic至少有一个partition
物理上一个partion对应一个文件夹,该文件夹存储所有消息和索引文件
物理上每个partition中的数据使用多个<b>segment文件存储</b>
提供两种策略删除旧数据<br>
基于时间删除
基于文件大小删除<br>
partition中的数据是有序的,不同partition间的数据丢失了数据的顺序;<br>需要严格保证消息的消费顺序的场景下,需要将partition数目设为1<br>
角色
leader
每个partition有多个副本,其中有且<b><font color="#f15a23">仅有一个作为Leader</font></b>,Leader是当前负责数据的读写的partition。
Leader Election算法<br>(有两种方式实现基于ZooKeeper的分布式锁)
临时顺序节点
所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁
节点名称唯一性
多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁
<u><i><b>Kafka 0.8.*的Leader Election方案解决了上述问题,<font color="#c41230">它在所有broker中选出一个controller</font>,<font color="#c41230">所有Partition的Leader选举都由controller决定</font>。<br>controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为此作为响应的Broker。<br><font color="#c41230">同时controller也负责增删Topic以及Replica的重新分配。</font></b></i></u><br>
ISR(in-sync replicas)
leader负责跟踪所有follower的状态
follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower
当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”<b><font color="#f15a23">(ISR)列表</font></b>中删除,重新创建一个Follower
从Leader中复制数据,不与producer/consumer交互
在broker上的分配策略
1.将所有Broker(假设共n个Broker)和待分配的Partition排序<br>2.将第i个Partition分配到第(i mod n)个Broker上<br>3.将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
消息同步传输策略
Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。<br>每个Follower都从Leader pull数据。Follower在收到该消息并写入其Log后,向Leader发送ACK。<br>一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将向Producer发送ACK<br>
为了提高性能
每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中
replication是否“活着”包含两个条件
一是它必须<b>维护与ZooKeeper的session</b>(这个通过ZooKeeper的Heartbeat机制来实现)
二是Follower必须能够<b>及时将Leader的消息复制过来</b>,不能“落后太多”
Producer
可靠性投递<br>partition ack
ack = -1
表示需要所有partition成功,才返回成功
ack = 0
表示发送即代表发送成功,不等broker返回确认信息
ack = 1
producer写道partition leader成功后,broker就返回
ack = 2
表示有2个partition成功,就返回成功
producer路由<br>
1、 <b>指定</b>了 partition,则直接使用;<br>2、 未指定 partition 但<b>指定 key,通过对 key 的 value 进行hash </b>选出一个 patition<br>3、 partition 和 key 都未指定,使用<b>轮询</b>选出一个 patition。
Consumer
消息确认方式
手动commit
zookeeper中保存该Consumer在该Partition中读取消息的offset
自动commit
采用pull的消费模式,可以逐条消费也可以批量消费<br>
Consumer Group
每个Consumer属于一个特定的Consumer Group
可为<b>每个Consumer</b>指定group name,<b>若不指定group name则属于默认的group</b>
message
消息有一个定长的header和边长的字节数组组成
单个消息的大小无限制
推荐消息大小不要超过1MB
如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案
等待ISR中的任一个Replica“活”过来,并且选它作为Leader
可能回永远起不起来
选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader
将会丢失没有同步到的数据
概述
优点
异步处理、应用解耦、流量消峰
缺点
系统可用性减低
需要依赖中间件服务(存在该服务崩溃的风险)<br>
系统复杂度提高
如果保证数据一致性(事务)<br>
如何保证不被重复消费<br>
常见问题
消息的顺序问题
方案一
解决方案<br>
保证生产者-mqServer-消费者一对一关系<br>
缺点<br>
并行度低<br>
只要消费端出现问题,就会导致整个处理流程阻塞
方案二<br>
通过合理的设计或者将问题分解来规避
不关注乱序的应用实际大量存在
从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。
消息的重复问题<br>
根本原因
网络不达
解决办法<br>
消费者保证幂等性
利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。
消息挤压
临时紧急扩容
可以给消息设置过期时间
rabbitMq、kafka支持 AMQP协议,可以集成企业级消息总线,可以和spring bus集成
rabbitMq
概述
性能比activeMq高<br>
现状
erlang语言阻碍了二次开发,不过项目稳定,社区活跃
要点<br>
<font color="#f15a23">可以控制每一次给消费者消费的消息数量(prefetchCount=???),控制消费者压力</font><br>
消费端,采用推送的方式消费
基本概念<br>
<ul><li><b>Broker:</b> 简单来说就是消息队列服务器实体</li><li><b>Exchange:</b> 消息交换机,它指定消息按什么规则,路由到哪个队列</li><li><b>Queue:</b> 消息队列载体,每个消息都会被投入到一个或多个队列</li><li><b>Binding:</b> 绑定,它的作用就是把exchange和queue按照路由规则绑定起来 <b><font color="#f15a23">(最大长度255字节)</font></b></li><li><b>Routing Key:</b> 路由关键字,exchange根据这个关键字进行消息投递<font color="#f15a23"><b>(最大长度255字节)</b></font></li><li><b>VHost:</b> vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有<b><font color="#f68b1f">独立的权限系统</font></b>,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。</li><li><b>Producer:</b> 消息生产者,就是投递消息的程序</li><li><b>Consumer:</b> 消息消费者,就是接受消息的程序</li><li><b>Channel:</b> 消息通道,在客户端的<font color="#f57c00">每个连接里,可建立多个channel</font>,每个channel代表一个会话任务</li></ul>
顺序性保证
一个queue 一个 consumer<br>
消息挤压解决办法<br>
临时紧急扩容<br>
Mq消息失效<br>
可以设置消息的过期时间也就是TTL<br>
传输协议<br>
TCP协议连接和销毁成本大,rabbitMq采用信道方式传输数据,<br><b><font color="#f15a23">信道</font></b>是创建在TCP连接内的虚拟连接,且<b><font color="#f44336">每条TCP连接上的信道连接没有限制</font></b><br>
交换器
交换器类型
direct
<b>路由键完全匹配</b>,消息被投递到对应的队列,每个amqp的实现都<b><u>必须有一个direct交换器,包含一个空白字符串名称的默认交换器。<br>声明一个队列时,会自动绑定到默认交换器,并且以队列名称作为路由键</u></b>:channel->basic_public($msg,’ ’,’queue-name’)<br>
fanout
消息广播到绑定的队列,<font color="#e57373">不处理路由键</font>
topic
通过使用“*”和“#”,使来自不同源头的消息到达同一个队列,”.”将路由键分为了几个标识符,<u>“*”匹配1个</u>,“<u>#”匹配一个或多个</u>
headers
header模式取消routingkey,使用header中的 key/value(键值对)匹配队列
备用交换器
在<u>第一次声明交换器时被指定</u>,用来提供一种预先存在的交换器,如果主交换器无法路由消息,那么消息将被路由到这个新的备用交换器。
备用交换器就是<b><font color="#f68b1f">普通的交换器,没有任何特殊的地方。</font></b><br>
使用备用交换器,向往常一样,声明Queue和备用交换器,把Queue绑定到备用交换器上。<br>然后在<u><b>声明主交换器时,通过交换器的参数,<font color="#00bcd4">alternate-exchange</font>,,将备用交换器设置给主交换器。</b></u><br>
死信交换器DLX<br>
<font color="#f15a23">投递消息被拒绝后的一个可选行为</font>,往往用在对问题消息的诊断上。
死信交换器<font color="#f15a23">仍然只是一个普通的交换器,创建时并没有特别要求和操作</font>。<br>在<b>创建队列</b>的时候,<b>声明该交换器将用作保存被拒绝的消息</b>即可,<br>相关的参数是<font color="#00bcd4">x-dead-letter-exchange</font><font color="#f15a23">。</font><br>
包含情况
消息<b><font color="#f44336">被拒绝</font></b>,并且设置 requeue 参数为 false
消息<b><font color="#ff0000">过期</font></b>
<b><font color="#ff0000">队列达到最大长度</font></b>
<b>队列</b>
临时队列<br>
自动删除队列
当最后一个消费者也断开连接时,队列将会被删除
属性auto-delete标识为true即可
单消费者队列<br>
普通队列允许的消费者没有限制,多个消费者绑定到多个队列时,RabbitMQ会采用轮询进行投递。<br>如果需要<b><font color="#0097a7">消费者独占队列</font></b>,在队列创建的时候,设定属性exclusive为true。<br>
自动过期队列<br>
指队列在超过一定时间没使用,会被从RabbitMQ中被删除。
过期的定义
<ul><li><u>一定时间内没有Get操作发生</u></li><li><u>没有Consumer连接在队列上</u></li><li><u>特别的:就算一直有消息进入队列,也不算队列在被使用。</u></li><li><u>通过声明队列时,设定x-expires参数即可,单位毫秒。</u></li></ul>
永久队列<br>(普通队列)
队列的持久性<br>
持久化队列会被<b><font color="#f68b1f">保存在磁盘</font></b>中,固定并持久的存储,<br>当Rabbit服务重启后,该队列会保持原来的状态在RabbitMQ中被管理<br>
队列级别消息过期<br>
为每个队列设置消息的超时时间。<br>只要给队列设置x-message-ttl 参数,就设定了该队列所有消息的存活时间,时间单位是毫秒。<br><b><font color="#f1753f">如果声明队列时指定了死信交换器,则过期消息会成为死信消息</font></b>。<br>
<b>消息</b>
存活时间
当队列消息的TTL 和消息TTL都被设置,<b><font color="#0097a7">时间短的TTL设置生效</font></b>。<br><i><font color="#f68b1f">如果将一个过期消息发送给RabbitMQ,该消息不会路由到任何队列,而是直接丢弃</font></i>。<br>
RabbitMQ<font color="#ff0000">只对处于<b style=""><u style="">队头</u></b>的消息判断是否过期</font>(即<b><u>延迟判断,不会扫描队列</u></b>)
持久化
将队列和交换器的durable属性设为true,缺省为false,<br>还需要将消息在发布前,将投递模式设置为2。<b><font color="#c41230">消息要持久化,必须要有持久化的队列、交换器和投递模式都为2</font></b>。<br>
消息发送方式
1.<b>不做任何配置</b><br>
生产者不知道消费者是否真正到达rabbitMq服务器<br>
2.<b>失败通知</b>
失败才通知生产者,成功则不通知
3.<b>事务</b>
主要是对信道的设置,<br>分为:启动事务、提交事务、回滚事务;<br>会伴随<font color="#e65100">严重的性能问题</font><br>
4.<b>发送确认模式</b>;
该模式比事务模式轻,性能消耗几乎不计<br>
<b>消息不可路由时</b><br>
<b>消息可路由时</b><br>
消息获取方式
拉取Get
属于一种轮询模型,发送一次get请求,获得一个消息
如果没有消息会返回一个表示为空的回复
性能低,即使没有数据也要不断的盲循
推送Consume<br>
注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者
<b>Qos预取模式</b><br>(批量消费)
在确认消息被接收之前,消费者可以<font color="#f15a23">预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认</font>。<br>如果消费者应用程序在确认消息之前崩溃,则所有未确认的消息将被重新发送给其他消费者。<br>
消息拒绝策略
Reject<br>(单个拒绝)
在拒绝消息时,可以<font color="#f15a23">使用requeue标识,告诉RabbitMQ是否需要重新发送给别的消费者</font>。<br>不重新发送,一般这个消息就会被RabbitMQ丢弃。<b><font color="#f68b1f">Reject一次只能拒绝一条消息</font></b>。<br>
Nack<br>(批量拒绝)
<font color="#f68b1f">对Reject的扩展,可以一次性拒绝多个消息</font>
可靠性传输
消息必达性保证<br>
发送方确认模式
消息到达队列的确认<br>
接收方确认机制
接收方确认消息
数据丢失原因<br>
生产者丢失<br>
消息列表丢失<br>
消费者丢失
解决办法<br>
transaction和confirm模式来确保生产者不丢消息
失败通知
它只会让RabbitMQ向你<b><font color="#f15a23">通知失败,而不会通知成功</font></b>。<br>如果消息正确路由到队列,则发布者不会受到任何通知。<br>
发送消息时设置mandatory标志 :<b><font color="#c41230"> mandatory=true</font></b><br>
缺点: 无法确保发布消息一定是成功的,因为<b><font color="#f68b1f">通知失败的消息可能会丢失</font></b>。
<b>transaction机制</b><br>
发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),<br>缺点:<font color="#f15a23">吞吐量下降; </font><font color="#000000">一般选择使用confirm模式替代</font>
<b>confirm模式</b><br>
所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),<br>一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。<br>
三种实现方式<br>
channel.waitForConfirms()<b><font color="#c41230">普通</font></b>发送方确认模式;消息到达交换器,就会返回true。
channel.waitForConfirmsOrDie()<b><font color="#c41230">批量</font></b>确认模式;使用同步方式等所有的消息发送之后才会执行后面代码,<br>只要有一个消息未到达交换器就会抛出IOException异常。<br>
channel.addConfirmListener()<b><font color="#c41230">异步监听</font></b>发送方确认模式;
消息队列持久化
开启消息接收确认模式,处理消息成功后,手动回复确认消息
工作模式<br>
Simple模式(最简单的收发模式)<br>
点对点模式
一个生产者,一个消费者
Work工作模式(资源的竞争)
一个生产者,多个消费者,每个消费者获取到的消息唯一<br>(竞争消费者模式(<b><font color="#f15a23">默认循环</font>发送给每个消费者)</b>)<br>
Publish/Subscribe发布订阅模式(共享资源)<br>
一个生产者发送的消息会被多个消费者获取(发布一次,消费多个)
Routing路由模式<br>
发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
Topic主题模式(路由模式的一种)<br>
将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,<br>“#”匹配一个词或多个词,“*”只匹配一个词<br>
rpc
集群
主备模式(Warren)<br>
主节点提供读写;从<font color="#f57c00">节点不提供服务,只备份数据,当主节点不可用时,完成主从切换</font>
适用<br>
并发与数据量都不高的情况
普通集群
多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。<br>创建的 queue,只会放在一个 RabbitMQ 实例上,<br>只是<u><b>每个实例都同步 queue 的<font color="#c41230">元数据</font></b></u>(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。<br><u>消费的时候,实际上如果连接到了另外一个实例,那个实例会从 queue 所在实例上拉取数据过来。</u><br>这方案主要是提高吞吐量的,让集群中多个节点来服务某个 queue 的读写操作<br>
镜像模式(mirro)
<br>
保证数据100%不丢失
用 <b>KeepAlived 做了 HA-Proxy 的高可用</b>,<b>建议有 3 个及以上节点</b>的 MQ 服务,<br>消息发送到主节点上,主节点通过<font color="#f15a23"><b> mirror 队列</b></font>把数据同步到其他的 MQ 节点,这样来实现其高可靠。<br>
多中心模式
远程模式(shovel)<br>
描述<br>
<b>双活的一种模式</b><br>
简称 shovel 模式,<br>所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制
<br>
当我们的消息到达 <b>exchange</b>,它会<b>判断当前的负载情况以及设定的阈值</b>,<br>如果负载不高就把消息放到我们正常的 warehouse_goleta 队列中,<br>如果负载过高了,就会放到 backup_orders 队列中。<br>backup_orders 队列通过 shovel 插件与另外的 MQ 集群进行同步数据,把消息发到第二个 MQ 集群上。<br>
多活模式<br>
rabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心<font color="#f15a23"><b>各部署一套 rabbitMQ 集群</b></font>,<br>各中心的rabbitMQ 服务除了需要为业务提供正常的消息服务外,<font color="#f15a23"><b>中心之间还需要实现部分队列消息共享。</b></font><br>
federation 插件
federation 插件可以在 brokers 或者 cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用不同版本的 rabbitMQ 和 erlang。federation 插件使用 AMQP 协议通信,可以接受不连续的传输。federation 不是建立在集群上的,而是建立在单个节点上的
zeroMq
比kafka高<br>
订阅形式
点对点(p2p)
现状
0 条评论
下一页