Mq相关技术总结
2023-03-25 22:06:26 2 举报
AI智能生成
常用队列;kafka,rocketMq,rabbitmq相关总结
作者其他创作
大纲/内容
RabbitMQ
通信协议
基于AMQP协议
基础概念
消费者消息获取方式
(默认的方式)轮询poll的方式拉取消息
生产者推送消息给消费者
RabbitMq书写语言:erlang
基础概念
VirtualHost
:从主机中虚拟出来的一个虚拟主机;每个虚拟主机都是一个相对独立的rabbitmq服务器
一个虚拟主机里面可以有多个不同的交换机和不同的队列
exchange
概念:1.数据从生产者到消费者之间的数据转换层;2.隔离了一个虚拟主机下面不同数据之间的推送;3.生产者消费者隔离;
交换机种类
Headers Exchange默认交换机
不绑定route-key;交换机和queue名称一样
Fanout Exchange广播交换机
把消息发送到绑定了该交换机的所有队列上
不需要指定routeing-key
Direct Exchange直连交换机
数据会被发送到指定路由的queue上去
Topic Exchange主题交换机
消息会被转发到所有满足route-key的队列,以及bingkey模糊匹配到的队列
Queue
消息队列,实际存储消息数据
参数配置
name
交换机名称
Durability
是否持久化,true持久化
值集True flase
Auto-delete
所有的消费者完成消费后自动删除
ture,所有的消费者消费完成之后,自动删除
值集True flase
Arguments(拓展参数)
Message TTL
消息生存时间
时间单位毫秒
消息在被抛弃前可以存活多久
Auto expire
队列生存时间
时间单位是毫秒
队列在指定时间内没有被使用,就会自动被删除
Max length
队列容纳的消息的最大条数
超过设定条数就会默认放弃队列头部数据
Max length byte
队列可容纳最大字节数量
超过设定的长度的数据,那么就会默认放弃头部消息
Broker:
消息中间件的服务节点。
Connection
生产端消费端都需要和服务端建立Connection连接,也就是tcp连接
Channel
消息通道,在客户端的每个Connection连接里,可建立多个channel.
Channel是轻量级的Connection,减少了tcp频繁连接断开的开销
Channel实际上就是Tcp的连接复用
mandatory标志
表示作用
标记当消息发送出去,找不到路由的处理方式
处理方式
true:消息返回给服务端,服务端可以做后续的处理
false:消息返回服务端,服务端直接删除
队列工作模式
简单模式
生产者消费者一对一
work模式
生产者消费一对多;每个消费者获取的消息都是唯一
订阅模式
生产者消费者一对多,同样的消息会被订阅的消费者都消费到
路由模式
生产者指定发给一个消费者
主题模式
生产者指定发送给某一类消费者
工作流程
子主题
消息队列设计机制
消息确认机制
发送端-服务端
return消息机制
发送端把消息发送到服务器,结果找不到对应的交换机,路由队列;return消息机制就是应对这种情况
发送消息时候给Channel参数manDetory设置为true;消费就会返回到发送端可以做后续处理;如果为false,服务端就会直接把该条消息删除
事物消息机制
实现原理:AMQP协议
服务端
confirm消息机制
发送端把消息发送个服务端,服务端接收到消息并且把消息持持久化到磁盘就会给发送端一个异步的confirm应答
confirm种类
单条应答
批量应答
服务端-消费端
ack消息机制
ack种类模式
不确认
消费端发生异常或者无响应,都会通知服务端消费成功
自动确认
自动确认,如果发生异常,就会给服务端发送不确认信息;那么消费就会回到消息队列尾部
手动确认
针对个性化处理,针对默写异常是否需要做ack,或者做noack处理;
消息重试机制
消息重试目的
消费者异常的情况下,能够让生产者重新发送该消息;保证消息的最大程度被正常消费
消息重试配置说明:以springBoot整合RabbitMQ说明
enabled
开启重试机制
max-attempts
最大重试次数
initial-interval
重试间隔时间
max-interval
最大间隔时间(不能超过这个时间间隔)
multiplier
间隔时间乘法数(重试的时间间隔在上一次的倍数)
重试机制原理
消息被消费的时候会被监听,当抛出异常的时候,就会执行补偿机制;
实现的原理还是建立在消费端的ack机制之上
消息拒绝机制
消费端手动拒绝
单条拒绝
多条拒绝
消息被拒绝后可以再次回到队列中
消息重新入队机制
消息路由不成功的消息,可以配置相关的死信队列;消息可以发送到死信队列
批量消息发送机制
此机制需要开发做拓展
消息持久化
持久化的对象
交换机
把交换机的属性持久化;在宕机或者重启之后服务器可以自动的去创建交换机,避免手动或者跑程序创建
设置durable=true
队列
把队列的属性持久化,在宕机或者重启之后可以自动的去创建队列,避免手动创建
设置durable=true
消息
消息的持久化是建立在队列的持久化之上,如果队列没有持久化,那么消息也不能持久化
设置 deliveryMode =2 ; deliveryMode =1 是不进行持久化
概述
并不能完全解决消息丢失问题
持久化会降低rabbtimq性能
持久化过程
持久化概述
所有队列的消息都会写入到磁盘的中间中去;当写入的数据大小超过了文件大小,那么就会关闭此文件,再新建一个文件存储;
持久化时间节点
消息本身推送到消费端的时候在服务端需要存入磁盘
内存资源少,需要把队列中的数据存入磁盘
消息刷盘条件
消息并不是来一条消息就往磁盘上存储一条,而是先把消息都放入到一个缓冲池;等一定的条件才会缓存的消息写入磁盘
1.缓冲池缓冲的数据大小超过缓冲池本身
2.超过固定的刷盘时间25ms,不管缓冲池是否满了,都会刷盘
3.消息写入缓冲区后,没有其他后续请求写入,那么也会刷盘
读取持久化数据过程
根据消息ID,找到消息所在文件,根据消息在文件中的偏移量,找到该消息;
持久化消息删除
删除说明
收到消费者的ack消息的时候,并不是马上去删除消息,而是先给消息做一个删除的标记
删除过程
后台进程检车到垃圾数据比例超过50%,并且文件不少于3个,的时候就会触发持久化数据的垃圾回收;找到符合要求的左右两个文件,先整理左文件中的有效数据,然后再把有文件中有消息数据复制到左文件;再把又文件删除;
删除条件
1.所有文件中垃圾数据达到50%的比例;
2.存储的文件必须至少有三个;
RabbitMq队列问题+解决方案
消息延迟发送
RabbitMq本身并没有延迟队列;
解决方案
设置消息的存活时间
消费在队列中存活时间;当时间超过了消息就会被抛弃;设置死信交换机,被抛弃的消息就会落入到死信交换机;
核心点
1.不设置消费者,就可以让消息一直堆积,直到超过存活时间
具体解步骤
1.创建死信交换机
2.创建死信路由
3.新建消费者队列绑定死信路由
特别说明
1.死信交换机就是普通交换机
2.死信交换机被动接受其他交换机或者无法消费的消息
3.创建生产的交换机的时候就需要设置对应的死信交换机
消息丢失
消息丢失类型
生产者发送消息-服务端
丢失原因
1.由于网络原因导致数据丢包
2.交换机的路由没有被队列绑定,消息直接丢失
解决方案
针对1:
1.事物消息机制
发送端开启一个事物,再推送消息,如果投递失败;进行事物回滚,然后重新发送消息;如果服务端收到消息,发送端就提交事务。
缺点:事物消息造成发送端阻塞,发送端只有等到服务端回应之后,才会发送下一条数据;生产者的消息吞吐量大大降低;
2.confirm消息机制
发送端把消息发送个服务端,服务端接收到消息并且把消息持持久化到磁盘就会给发送端一个异步的confirm应答
确认方式
1.串行确认
发送一条确认一条;服务器返回flase,会重新发送
缺点:效率比较低
2.批量确认
发送端每发送一批,才会确认
缺点:重新发送消息的时候需要把同一批消息再次发送
3.异步确认
服务端接受到了一条或者多条之后,会异步回调发送端的异步确认方法;
发送端发送完消息,可以接着发送其他消息,不会阻塞;
整体流程
任何一种确认方式,服务端接受到消息之后不是立马给发送端确认;而是需要等待批量数据持久化之后再发送确认消息;
在发送消息之前把消息用排序的Map集合保存起来;如果消息发送失败,那么就会从map集合中读取消息再次发送
针对2:
1.设置mandatory 设置true
交换机找不到相应的队列就会把消息返回被生产者
2.alternate-exchange设置备用交换机
交换机找不到消息,消息会发给备用的交换机
服务端丢失
丢失原因
丢失原因:客户端在处理消息的时候突然机器挂了,导致消息丢失了;
解决方案
服务端设置交换机,队列,数据的持久化;服务器宕机后,重启会读取磁盘上的持久化的数据;
问题:由于消息的持久化是一批的持久化,可能宕机了,这一批数据还持久化到磁盘
消息的持久化
1、服务端收到生产者发送过来的消息,会做消息的持久化
2、当服务宕机后, 会从磁盘当中读取相应的消息,最大程度上保证消息不在服务节点上丢失
消费端丢失
丢失原因
1.消费者在处理消息的时候出现异常了,那么这条消息就是没有被正常的消费;如果不采取措施,那么这个消息就会丢失
解决方案
ack机制
ack机制概述
消息只有正常消费后,反馈给服务端;服务端才会从队列里面把该条消息删除
ack机制三种模式
不确认
不会发送ack确认消息
自动确认
服务端发送完消息就自动认为该消息被成功消费
缺点:由于网络原因,造成数据从服务端发送到消费者消息丢失
手动确认
消费者消费成功之后,显示的给服务端ack信号;服务端只有收到该信号才会把数据从队列里面删除
设置手动ack,尽可能减少消费端的数据丢失问题;正常就是发送ack,异常就记录日志,然后发送nack
ack机制弊端
内存泄露
如果消费者异常没法发ack消息,服务端会认为这些数据都是没有被正常消费;就会堆积在队列当中,造成内存没法回收,内存泄露;
内存泄露解决方案
1.设置手动应答,如果异常,捕获异常记录日志,给服务端发送正常消费;
2.设置重试次数(默认是3次,三次不消费成功就会放入到默认的死信队列)
ack机制默认打开,而且是自动确认
消息堆积
消息堆积的本质
消费者的消费速度低于生产者生产的速度
堆积的实际原因
生产者原因
生产者突然发送大量信息
消费者原因
消费者消费失败
消费者出现性能瓶颈
消费者直接挂掉
消息堆积后果
队列溢出,新消息无法进入队列
消息无法被消费
阻塞时间超过消息存活时间
等待消费时间超过业务时间
消息堆积解决方案
优化消费者消费参数
设置多个线程同时处理消费消息
默认是单线程消费
设置一次从服务端拉取多条消息
默认是每次拉取一条消息
取消消费端ack确认机制
新增生产这队列,把消息推送另外的机器上
排查性能瓶颈,针对性改造
顺序消费
顺序错乱场景
1.生产者消费者一对多
2.生产者消费者一对一,消费者多线程消费
解决方案
针对1
生产者拆分成多个,让生产者和消费者一对一生产消费(消费者内部可以开多线程消费)
针对2
开启多个消费者,把前后有关联的数据往同一个消费者发送
消息重复消费
消息重复消费原因
1.消费端异常没有给服务端发送消息成功消费的标记;
2.服务端没有接收到消费端发送的消费成功的标记;
只要是服务端没有接收到消费成功的标记,服务端都会再次给消费端发送消息;
解决方案
1.在消费端做幂等性判断
1.全局消息id做幂等性判断
2.全局业务id做幂等性判断
2.消费端代码做限制,无论如何都会发送消费确认消息
RabbitMQ集群
集群模式:
1.主备模式
特点:
1.一主一备;也可以是一主多备
2.主节点提供读写,从节点备份主节点数据
3.主节点挂了,从节点就会变成主节点;原来的从节点回复之后,就会变成备用节点
使用场景
1.并发和数据量不高的情况下;
搭建过程
1.需要使用haproxy作为中间件
2.远程模式
概述:数据进行复制,跨地域让两个MQ集群复制和通信;如果当前集群MQ服务超过设定的阈值,那么消息就会被转移到远程的MQ上做分担处理;
说明:需要使用到shovel插件,让跨地域的集群通信
3.镜像模式
概述:集群模式,一般2-3个节点实现数据同,主节点收到发送过来的数据,然后同步到其他节点上。
需要搭配haProxy做高可用负载均衡器
4.多活模式
概述:多中心模式,多套数据中心部署相同的MQ集群;一个集群中通过负载均衡器使得只有一个节点接受消息
各个中心需要配置插件 federation,可以使一个集群节点与另外一个集群节点做通信
死信队列
死信队列定义
未被正常消费的消息存放的队列;
死信队列数据来源
1.拒绝消息
拒绝一条消息
拒绝多条消息
2.超时消息
超过消息本身设置的存活时间还没有被消息
超过消息发送时候队列设置的存活时间还没有被消息
3.溢出消息
超过队列的最大长度
超过了队列的最大容量
死信队列使用场景
延时操作
kafka
基础概念
topic
区分不同类别信息别称
broker
kafka服务器或者服务集群
副本
TODO
每个主题在创建时会要求制定它的副本数(默认1)
partition(分区)
特点
分区也就是让kafka相同的topic在不同机器,也就是同一个消息可以在不同的kafka节点上;这样就天然的让kafka变成队列集群
概述
同一个topic会有不同的分区,分区可在不同的机器
同一个topic可以有一个或者多个分区
所以一个节点上面可以有来自多个topic对应的分区
分区工作机制
每一个分区都是一个有序队列,分区中的消息都会被分配上一个有序的id(偏移量)
分区策略
message
生产者向某个topic发送的消息
offest偏移量
消息在日志文件中存储的位置
Segment
日志分段
Consumer
消费者
Consumer Group
消费者组
kafka消息核心api
生产者api
消费这api
stream-api
connectior-api
admin-api
管理台对应的api
ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)
ISR
速率和leader相差低于10秒的follower的集合
kafka中与leader副本保持一定同步程度的副本(包括leader)组成ISR
OSR
速率和leader相差大于10秒的follower
AR
全部分区的follower
HW、LEO
HW:高水位,指消费者只能拉取到这个offset之前的数据
LEO:标识当前日志文件中下一条待写入的消息的offset
工作机制
零拷贝
零拷贝的实现
概述
DMA直接内存访问;现代计算机就是允许硬件之间直接进行数据交互;DMA将一个地址空间复制到另外一个地址空间,然后数据的传输是DMA设置之间完成
DMA
直接内存访问
DMA设备
能够不经过cpu直接相互直接就能进行数据交互的硬件设备
两种实现方式
mmap
概述(kafka就是mmap实现方式)
用户态直接应用内核态的文件句柄
mmap方式,用户态和内核态共享内核态数据缓冲区,数据不需要从内核态复制到用户态空间;用户态发送数据的时候,就直接应用内核态的文件句柄就行(无需把数据从内核态拷贝到用户态,再从用户态拷贝到socket套接字的内核空间);
sendfile
数据不需要经历从内核态拷贝用户态;数据直接从DMA设备直接发送对应的做网络传输的DMA设备,由这个设备直接传输数据
Kafka零拷贝
概述
Kafka的零拷贝并不是说完全不存在拷贝,而是避免不必要的拷贝
零拷贝过程
从磁盘把数据拷贝到内核空间;
从内核空间中直接把数据发送到网卡;
传统拷贝方式步骤
1.从磁盘去读到内核空间缓存页;
2.应用从内核缓存页读取到用户空间缓存区;
3.应用程序将用户缓冲区的数据放入socket缓冲区;
4.操作系统将socket里面的数据复制到网卡接口,发送数据;
零拷贝和传统拷贝方式对比
1.kafka的零拷贝从获取数据到最终把数据发送出去只需要经历一次拷贝;
2.传统拷贝方式从获取数据到最终把数据发送出去,需要经历4次拷贝;
kafka的拷贝方式大大降低了数据在不同的内存空间中复制的次数,提高了系统io效率
kafka持久化机制
消息持久化原理
概述
基于磁盘的线性的读写(操作系统做了大量的IO技术优化),甚至会被随机的内存读写更快
io优化技术
read-ahead
write-behind
和其他数据缓存的差异
kafka是直接把数据写入日志文件;其他几乎都是先把数据缓存在内存中然后再间隔刷盘
持久化读写
读写操作
写操作
将数据顺序追加到文件末尾
文件写入超过一定大小会被滚动到新的文件中
写操作参数设置
操作系统积累多少条数据就一定要被刷到磁盘
操作系统积累了多少秒的数据就一定要被刷到磁盘
关于日志丢失
也就是根据设置最多丢失多少秒或者多少条数据
读操作
从文件中读取
读操作参数设置
最大消息大小
缓冲区大小
读取过程
1.缓冲区大小大于消息大小就可以直接读取成功
2.如果缓冲区大小小于消息大小,那么就会读取失败,缓冲区大小翻倍知道成功读取完整条消息;
读写概述
读写都是顺序写入顺序消费,能保持较高的效率
好处
1.读操作不会组阻塞写操作
2.不受内存大小限制
3.线性的读取速度依旧很快
4.相对于内存保存时间更长
删除
删除策略
删除策略是可以配置
常见删除策略
超过一定时常
保留最近多少磁盘大小文件
删除内容
日志文件中的消息和日志文件本身都会被删除
删除操作阻塞读操作
读操作读取的是要被删除文件的副本
持久化文件构成
日志文件
日志文件特点
1.topic的每一个分区都会专属的append-only日志文件;
3.每条消息在文件的位置称之为offset(偏移量)
2.属于分区的消息会被追加到日志文件的末尾
日志条目
概述
日志文件由日志条目组成
日志条目内容
消息头(4字节整形数,表示消息体有多长)
消息体
包含消息内容
消息偏移量(用来表示消息的起始位置)
日志文件名称
该文件第一条数据偏移量+.kafka
索引文件
记录每一个segment下包含的日志条目偏移量范围
日志清理
消息有效期
在消息有效期内,是允许消费者重复消费;
日志清理两种方式
日志删除
根据保留策略删除日志分段
参数配置log.cleanup.policy = delete
日志删除策略
基于时间
log.retention.hours、log.retention.minutes、log.retention.ms
最长时间7天
基于日志大小
log.segment.bytes,每个日志分段大小
og.retention.bytes ,总的日志大小
扫描,某个分段超过日志分段大小,那么就删除;如果总的文件大小超过了设定,那么就删除时间距离现在最久的日志
基于日志起始偏移量
logStartOffset;删除偏移量小于这个设定的偏移量大小的日志
日志压缩
根据消息的key进行压缩,相同的key的消息,只会保留一个副本;这个key就是业务消息中的key,需要去手动指定这个key对应的是业务中的那个字段
参数设定
log.cleanup.policy = compact
log.cleaner.enable = true
压缩过程
压缩线程会根据日志分段中需要被清理压缩的占比最高的日志分段开始压缩清理;根据业务中的key去做删除,相同的key只会保留一条消息;
log.cleaner.min.cleanable.ratio ,设置当需要被压缩的数据超过百分之多少的比例的时候,就进行压缩;
队列工作方式
消息发送/消费方式assign
消息发送方式
1.消息可以指定分区发送
2.消息可以通过负载均衡方式发送到不同的分区
3.通过指定key进行hash运算后确定让哪个分区发送
消息消费方式
消费者集群的各个消费者只能消费不同的分区
一个topic消息可以发送给多个消费者集群
一个消费者可以消费多个集群的消息
多个消费者集群可以消费一个topic下面的消息
订阅队列模式设置subscribe
设置多个消费者消费一个分区的消息-订阅
一个消费组中的一个消费者只订阅一个分区的消息-点对点
kafka集群
zk的作用
注册中心服务治理
注册服务节点
同一管理所有的服务器
注册topic
记录topic的分区信息与对应的服务器节点对应关系
注册消费者
消费者启动的时候,都会去zk创建自己的节点
负载均衡
生产者负载均衡
可以通过zk的配置文件动态感受来自服务器节点的新增减少,来实现相应的负载均衡
消费者负载均衡
zk动态感受消费者新增减少,来合理的实现负载均衡
记录数据
记录分区与消费者关系
将分区和消费者id绑定记录到临时节点上
记录消费中的偏移量
记录每个分区中消费者消费的偏移量会发送给zk,方便在消费者重启之后,或者是重新分配消息分区,能够继续之前的消费
负载均衡
四层负载均衡
此负载均衡是kafka自带的
缺点是无法动态感知服务器节点的新增减少,从而在服务器新增减少的时候,不能根据服务器做负载均衡
kafka事务
事务场景
生产者发送的多条消息需要组成事物,对所有消费者同时可见,或者同时不可见
生产者发给多个topic,多个分区发送消息,要么都成功,要么都失败
子主题 3
子主题 4
leader选取策略
策略
OfflinePartition Leader
有新的分区上线就重新选leader
ReassignPartition Leader
运行重新分区命令,重新选择leader
PreferredReplicaPartition Leader
运行重新选择leader命令
ControlledShutdownPartition Leader
服务正常关闭之后,重启重新选择leader
子主题
不支持读写分离
主要原因
数据一致性问题
leader副本的数据和其他副本数据都不一致,读写分离容易导致数据不一致
延时问题
leader副本数据到从副本数据有数据延迟;
消费者是pull(拉)还是push(推)
producer 将消息推送到 broker
consumer 从broker 拉取消息
如果是kafka节点向消费这push消息,可能会造成消费者消费积压,或者是消费者性能浪费
zookeeper对于kafka的作用
1、存储kafka元数据
2、集群不同节点之间通信
3、leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。
kafka判断一个节点还活着的有那两个条件
1、节点和zk之间心跳检测正常
2、follow节点可以即时同步leader的写操作,且不能延时过高
ack 的三种机制
0
生产者不需要等到kafka节点的ack;
1
kafka节点上的leader副本收到消息就发送ack;
不需要等所有follow副本确认
-1
kafka所有的follow副本接收到消息,leader服务才会发送ack
kafka高性能原因
1、0拷贝
1、索引和日志文件读写
2、数据传输
2、对log文件进行分段处理,且分段数据会简历索引文件
3、本身就是天然的分布式
4、页缓存
对下一页数据的读取是从缓存中读取
5、对磁盘的写入顺序写入
6、消费采取pull模式
能够让消费者处于消费者机器自身资源相符的消费速度
leader副本和Follow副本区别
读写都是从leader副本操作的
follow副本的数据都是从lader服务同步
实际开发
引用场景
日志收集
同一日志同一收集,然后以同一服务的形式发放给各种消费者
消息系统(削峰,异步处理)
用户日活跟踪
运营指标
收集生产者各种生产数据,同一做报表处理
流式处理
和flink,spark,strom做流计算处理
实际的开发方式
1.需要导哪些包
2.有哪些核心API
消息队列实际问题
重复消费
消息重复消费原因
概述:
主要原因就是消息消费时候提交的偏移量,服务器并不知情
具体情况分类
1.强行杀掉线程,导致偏移量没有提交
2.消费了,还没有提交偏移量,分区就掉线了,触发重平衡,然后消息就会重复消费
3.消费者重新分配分区,导致消费者数据重新消费
4.消息消费时间过长,让zk觉得机器宕机了,触发了重平衡
解决方式
最稳定的方式就是在代码中根据消息唯一id做幂等性判断
消息一致性
概述
kafka消息一致性指的是分区中的leader和多个副本的消息数据保持一致性;
消息一致性解决方式
ISR机制
副本同步leader数据
同步参数
rerplica.lag.time.max.ms=10000
根据一定的时间间隔副本同步leader数据
rerplica.lag.max.messages=4000
当副本数据和leader数据查了多少条也会同步数据
ACK=all机制,生产者给服务器发送消息的时候,直到所有的副本都收到消息才会通知生产者,服务端这边已经收到消息
消息延迟消费
时间轮
延迟消息的实现
消息有序性
消息乱序原因
1.消息重试机制会导致消息乱序(一个分区对应一个消费者)
2.多个分区对应多个消费者, 需要顺序消费的数据被分配到了不同的消费者
解决方案
针对1:max.in.flight.requests.per.connection=1禁止生产者想服务器响应前再次发送请求,也就是消息的重试必须是在上次失败之后,里面发起重试
针对2:
可以设置一个topic只有一个分区,只有一个消费者
生产者把需要顺序消费的消息发送到指定的分区上
消息丢失
消息丢失情况分类
生产者发送消息给节点丢失
消息丢失原因
发送消息的程序异常,导致消息压根没有发送出去
消息发送了,由于中间网络原因,以及服务器接收原因,导致数据服务器没有正常接收到数据
节点保存消息丢失
消息丢失原因
ACk设置=1,主机拿到了数据,从机还没有同步主机数据,这时候主机挂了,从机无法同步主机数据
主机拿到数据,主机就挂了,没有设置相应的从节点,来备份数据;
主机挂了,从机被选为主机,主机中还有部分数据没有被同步到从机
消费者丢失消息
消息丢失原因
消息在消费的时候,消息消费的自动确认提交偏移量;如果批量的消息有20条,消费到10条消息的时候异常了,那么就会自动提交消息的偏移量是20,也就是会导致后面10条消息是没有被正常消费,也相当于消息丢失;
针对不同丢失情况对策
生产者发送丢失
消息确认机制
消息确认机制
消息确认等级
ACK =0
ACK=1
ACK=all
消息确认相应机制
ACK=1(默认设置),只要分区的leader副本接受到了消息,就会给生产者发送消息接受成功(其他副本再回去同步leader的数据)。
设计上是比较折中的在一定程度上能够保证消息的不丢失,也能保证一定的吞吐量
ACK=0,生产者给服务端发送消息,不等服务端是否有接收到消息,发送完了就认为消息到被服务端接受了;而实际情况是,消息会生产者的缓冲池中待一段时间然后才会被发送到服务端,生产者就不知道消息具体在啥时候发送到服务端;
缺点:网络宕机的时候,消息会丢失
优点:满足大吞吐量的数据发送;
ACK=all,消息的分区leader还有所有的副本都接受到消息,才会给生产者发送消息已经被接受了。
优点:最大程度上保证服务器节点接受到消息;
缺点:极度影响性能,导致数据的吞吐量低
消息重试机制
概述
生产者发送消息异常,然后会重新给服务器发送消息
消息重试前提条件(两者同时满足才能重试)
1.重试的次数小于retries指定的次数
也就是当重试的次数超过了设定次数,那么也不会发送的;
2.异常的类型是RetriableException或者事务管理器允许重新发送
消息重试机制参数设置
retries
消息重试次数,默认次数为int的最大值
retry.backoff.ms
重试的间隔时间
消息确认机制和重试机制区别
消息确认机制主要是针对消息发送到服务器正常接收这个过程的处理
消息重试机制,主要是针对消息发送之前生产者内部自己发送消息异常的兜底处理
节点数据丢失
1.ACK=all,让所有的服务器节点都获取到数据;
2.合理的设置从机的个数,设置数据的备份
参数设置:min.insync.replica
3.禁止主机挂掉,选从机作为新的主机
参数设置:unclean.leader.election.enable=false 本身默认就是flase
消费者消费数据丢失
设置消息消费提交偏移量为手动提交偏移量,通过代码在finnaly里面手动设置消息异常的那个前一条的偏移量做提交;
消息积压
积压原因
消费者消费能力不足
消费者处理不及时
解决方式
针对1
添加分区个数和消费者个数
针对2
合理增大每次拉取的消息数量
死信队列
不支持死信队列
消息如何控制只被消费群组中一个消费者消费
原则
kafka的一个分区的数据只会被消费者组中的一个消费者消费;
一个消费者可以消费来自多个分区的数据;
详细说明
分区数量- 3,消费者数量- 3
Kafka 将一个分区分配给一个使用者。除非某些使用者发生故障并且发生使用者重新平衡(将分区重新分配给使用者),否则所有使用者都将映射到其分区,并按顺序使用这些分区的事件。
分区数量 - 1,使用者 - 3
如果消费者多于分区数量,Kafka就没有足够的分区来分配消费者。因此,该组中的一个消费者被分配给分区,而该组中的其他消费者将处于闲置状态。
分区- 4,消费者- 3
在此方案中,其中一个使用者获得 2 个分区,而在使用者重新平衡期间,另一个使用者可能会获得 2 个分区。
Kafka 将一个分区分配给一个使用者。除非某些使用者发生故障并且发生使用者重新平衡(将分区重新分配给使用者),否则所有使用者都将映射到其分区,并按顺序使用这些分区的事件。
分区数量 - 1,使用者 - 3
如果消费者多于分区数量,Kafka就没有足够的分区来分配消费者。因此,该组中的一个消费者被分配给分区,而该组中的其他消费者将处于闲置状态。
分区- 4,消费者- 3
在此方案中,其中一个使用者获得 2 个分区,而在使用者重新平衡期间,另一个使用者可能会获得 2 个分区。
kafka参数设置
1.服务器配置
1.节点自身属性设置
broker.id
broker在集群中的标识
默认值-1
listeners
监听的服务地址(多个用,隔开)
无默认值
2.连接zk配置
zookeeper.connect
连接的zookeeper地址(多个地址用,隔开)
zookeeper.connection.timeout.ms
连接zookeeper超时时间(毫秒)
无默认超时时间
zookeeper.session.timeout.ms
连接ZK会话超时时间
zookeeper.sync.time.ms
zk的从机落后zk主机的最长时间
zookeeper.max.in.flight.requests
消费者有多少个未确认的消息,才会导致阻塞
3.日志配置
log.dirs
日志存放目录(有多个目录分布时使用,隔开)
无默认值
log.dir
日志存放目录(当log.dirs为null时)
默认值/tmp/kafka-logs
log.flush.interval.messages
将消息刷新到磁盘之前,日志分区上累计的消息数量
默认值:9223372036854775807
log.flush.interval.ms
刷盘前在内存中最长存在时间
log.retention.bytes
日志文件的最大容量
默认值-1,也就是可以无穷大
日志保存时间
log.retention.hours
日志文件保存的最长时间
默认是1周时间
log.retention.minutes
日志保存的最长分钟
默认为null
log.retention.ms
日志保存的最长分钟
默认为null
日志分区
log.roll.hours
新分区产生时间,以小时为单位
默认一周
log.roll.ms
子主题 1
子主题 2
log.segment.bytes
分区最大容量
默认1g
log.segment.delete.delay.ms
分区等待删除时间
默认60000ms
消息配置
message.max.bytes
拉取的批量消息的最大内存大小
默认值:0.9M
子主题 2
主题相关配置
auto.create.topics.enable
第一次发动消息时,自动创建topic。
默认值:true;
delete.topic.enable
是否可以删除topic
默认值:true
如果为Flase,那么管理工具将不能删除主题
auto.leader.rebalance.enable
rebalance配置
auto.leader.rebalance.enable
leader.imbalance.check.interval.seconds
分区重平衡检查的频率
leader.imbalance.per.broker.percentage
触发重平衡比例
默认值100%
线程配置
background.threads
后台处理线程个数
默认值10;
num.io.threads
处理请求线程数量
默认值:8
num.network.threads
处理网络请求网络相应线程数量
默认值3
num.recovery.threads.per.data.dir
日志恢复和日志关闭时刷新的线程数
默认值1
num.replica.alter.log.dirs.threads
日志之间移动副本线程数
无默认值
num.replica.fetchers
主节点数据复制到副本的线程数
偏移量
offset.metadata.max.bytes
与偏移量提交管道的元数据最大大小
offsets.commit.timeout.ms
偏移量超时时间
offsets.topic.num.partitions
偏移量提交主题分区的数量
offsets.topic.replication.factor
子主题 1
offsets.topic.segment.bytes
日志索引文件大小
默认值100M
子主题 6
unclean.leader.election.enable
leader挂了,是否会选举其他副本作为leader
默认值;false
压缩
compression.type
按照给定的压缩方式压缩数据
值集:“gzip”、“snappy”、“lz4”、“zstd”
事物
transaction.max.timeout.ms
事务执行最长时间,超时则抛出异常
900000ms
2.生产者配置
1.连接配置
bootstrap.servers
服务器节点配置
2.消息相关配置
buffer.memory
消息缓冲区大小
默认值:33554432 =32M
生产者最大可以用缓存;生产者可以用来缓冲等待发送到服务器的记录的总内存字节
消息序列化
key.serializer
指定消息的key的序列化类(需要实现Serializer接口)
无默认值
value.serializer
指定消息内容的序列化类(需要实现Serializer接口)
无默认值
消息发送
消息发送条件
batch.size
批量发送的最大容量
默认值16384 =16k;缓存到本地内存批量发送大小;每当消息的数据量达到16k才会把数据发送给服务器
作用
消息不是一条一条的发送,而是积累到一定量才会发送
linger.ms
生产者将请求传输之间到达的任何记录组合到一个批处理请求中的时间
默认值0
作用
消息发送延迟时间,也就是在一个延迟时间内所有的消息都是被同一批次的发送出去;
batch.size和liger.size只要满足一个,消息就会被发送
消息发送阻塞
max.block.ms
消息发送到具体分区的阻塞时间
默认值:60000ms,一分钟
阻塞原因:缓冲池已经满了,或者是系统元数据不可用,导致这个问题;
消息请求阻塞时间
request.timeout.ms
生产者请求发出后,获取相应的最长时间,如果超过了该时间,那么客户端就会重新发送
默认值:30000 ,30秒
消息发送大小
max.request.size:
生产者发送最大直接数量
默认值:1M
消息确认
acks
生产者要求领导者在考虑完成请求之前收到的确认数量
默认值1
用途:
配置消息发送发到服务的消息确认机制
值集
0:表示producer无需等待leader的确认;
1:代表需要leader确认写入它的本地log并立即确认;
-1(all):代表所有的备份都完成后确认
delivery.timeout.ms
生产者发送完消息,接受服务器消息确认的时间
默认值120000ms,120秒
消息重试
retries
消息发送失败消息重试次数
默认值是int的最大值
retry.backoff.ms
消息重新发送中间间隔时间
默认值100ms
消息压缩
compression.type
消息以怎么的压缩格式进行压缩
值集:“gzip”、“snappy”、“lz4”、“zstd”
和服务器连接
connections.max.idle.ms
关闭空闲连接时间(生产者和服务器最大失联时间)
默认540000
max.in.flight.requests.per.connection:
单个连接,可接受的最大未确认数量
默认值5;也就是消息发送需要服务端确认,这个就是在发送消息之前需要确认发送如果没有确认的消息大于等于该参数,那么就会发送失败。
自定义操作类
metric.reporters
参数修改之后发送通知的类
interceptor.classes
消息拦截器
发送消息之前消息会被拦截,消息还可以做相应的处理
数据传输设置
receive.buffer.bytes
TCP连接接受方缓冲区大小
默认值:32K
send.buffer.bytes
TCP连接发送方缓冲区大小
默认值:128
3.事物消息相关配置
transactional.id
事务ID(当有多个生产者时,标识哪个生产者的事务,可用于消息幂等)
transaction.timeout.ms
事务超时时间
3.消费者配置
1.连接配置
bootstrap.servers
服务器连接地址
2.消费者本身配置
group.id
消费者组的ID
client.id
消费者ID
3.消息配置
auto.offset.reset
初始偏移量当前偏移量不存在的时候,消费者消费的起始点
值集
earliest
自动到最早的偏移量位置
latest
自动把偏移量充值为最新偏移量
none
如果没有找到以前的偏移量,那么就会抛出异常
anything else
直接抛出异常
默认值
earliest
exclude.internal.topics
是否公开topic内部的元数据信息
默认值:true;
事物
isolation.level
隔离级别
生产者数据拉取配置
max.poll.records
自动拉取消息的个数
默认值:500
max.poll.interval.ms
自动拉取消息的频率
默认值:5分钟
fetch.max.bytes
拉取消息的最大数据量
默认值:50M
fetch.min.bytes
拉取最小字节数
默认值:1字节
如果服务器没有数据,那么就会阻塞,直到服务器有数据才会相应
fetch.max.wait.ms
拉取消息阻塞时间
默认值:500ms
生产者自动提交配置
enable.auto.commit
消费者是否是自动提交偏移量
auto.commit.interval.ms
消费者自动提交偏移量的间隔时间
生产者序列化配置
key.deserializer
指定消息的key的反序列化类(需要实现Deserializer接口)
value.deserializer
指定消息内容的反序列化类(需要实现Deserializer接口)
生产者连接配置
connections.max.idle.ms
超过多久关闭服务器和消费者的连接
默认值:540000
request.timeout.ms
消费者给服务端发送请求超时时间
默认值:30秒
session.timeout.ms
心跳发送相应超时时间
说明:消费者是会主动向服务器发送心跳,以此来正面自己是存活的
heartbeat.interval.ms
心跳时间:消费者心跳消息发送到消费者协调器的期望时间
默认值:3秒,设置必须是小于session超时时间的三分之一
kafka监控平台
kafka缺点
对于mqtt协议不支持
不支持物联网传感数据直接接入
仅支持统一分区内消息有序,无法实现全局消息有序
可以通过代码控制顺序
监控不完善,需要安装插件
依赖zookeeper进行元数据管理
kafka最全面试题
https://zhuanlan.zhihu.com/p/109814155
http://events.jianshu.io/p/869464e66cfb
RocketMQ
RocketMQ通信方式
示意图
基础概念
生产发送消息类型
消息种类划分
同步消息
最大程度上确保消息的不丢失
使用场景
重要的消息通知
短信通知
异步消息
使用场景
对业务的效应时间非常敏感的业务
单向消息
使用场景
不是特别关注发送结果的场景
日志发送
优缺点
同步消息,异步消息会有消息的重新发送,单向消息消息发送失败不会重新发送
同步消息,异步消息发送的时候需要服务器节点返回消息接收的确认信息,而单向消息没有
消费方式
拉取式消费
消费者从服务节点上拉取消息消费
默认的消费方式,但是实时性不高,但是不会造成消息消费堆积
推动式消费
服务器节点主动给消费者推送消息消费
优点
消息消费实时性高
缺点
消费者来不及消费过多消息,容易造成消费者消息堆积
本质
消息推送本质上还是消息拉取
基本概念解释
Name Server
功能
1、服务器路由提供者,
2、生产者,消费者能够通过名称服务查询各主题的相应元数据信息
工作模式
1、多个Name Serve 组成集群
2、集群中各个Name Server相互独立,没有信息交互
生产者组
同一类Producer的集合,生产者发送消息逻辑一致
消费者组
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致
集群消费
集群消费模式下,相同的消费者组,每一个消费者平摊消息;
广播消费
消费者集群中的每一个消费者,都是都会受到消息;
普通顺序消费
工作特性
1、消费者通过同一个消息队列(topic分区)收到的消息是有序的
2、不同的消息队列收到的消息可能是无序的
优缺点
优点
生产者发送消息快速
缺点
同一个消费者消费的不同队列之间的消息,是无序的
使用场景
对程序性能要求高,但是顺序消费要求不高
严格顺序消费
消费者收到的所有消息均是有顺序的
优缺点
优点
最大程度上确保了消息的有序性
缺点
消息发送的吞吐量大大降低
使用场景
对消费有顺序要求,且对程序性能要求不高
RocketMQ消息特性
消息顺序
全局顺序消费(严格顺序消费)
某个Topic下的所有消息都要保证顺序
分区顺序消费(普通顺序消费)
部分顺序消息只要保证每一组消息被顺序消费即可
消息过滤
发送消息的时候设置tag,消费的时候根据对应的tag做相关的过滤处理
消息可靠性
影响消息可靠性几种情况
1、节点非正常关闭
2、节点宕机
3、节点所在服务宕机
4、服务器断电,但是能立即供电
5、机器无法开机
6、磁盘设备损坏
影响范围
1、前四种可以立即回复,可能会有少量的数据丢失
2、后面两种,如果服务器是单点,那么消息将全部丢失,如果不是单点,消息还可以恢复绝大部分消息
至少一次
Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息
消息回溯
工作机制
按照特定时间回溯到具体的历史时间点,重新消费消息
事物消息
应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败
定时消息
指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic
消息重试
工作机制
1、消费者消费消息失败后,令消息再消费一次;
消费失败后的消息会进入消息重试队列
消息消费失败原因
1、消息反序列化失败
2、程序异常
2、消费者依赖的校友服务不可用
消息重投
工作机制
1、生产者发送消息时,同步消息,异步消息的发送如果失败了,生产者会重新发送
2、单向发送发送失败,生产者无法重新发送消息;
流量控制
生产者流量控制
生产这发送消息过多,服务器节点处理这些消息达到性能瓶颈
控制副作用
消息不会重投
消费者流量控制
消费者这边接收到的消息,消息处理不过来达到性能瓶颈
控制副作用
降低拉取频率
作用
降低服务器节点压力,降低消费者节点压力
死信队列
消息重试达到最大次数后,依旧无法正常消费,死信队列就会接受到该消息;
可以通过RocketMQ的控制台,对死信队列中的数据重新消费;
接受不能被处理的消息,放在以后再做处理
集群部署
Rocket消息问题
消息堆积
消息堆积的原因
Producer原因
生产者生产速度过快;
短时间的业务高峰期;
Broker消息堆积
Broker同步策略导致消费堆积
Consumer原因
消费者消费速度过慢
消费者宕机
解决方式
Producer原因
生产者限流
Broker消息堆积
调整刷盘策略
Consumer原因
多线程消费消息
设置数据开关,开关打开消息直接放入数据库,或者直接返回,最大程度上降低消费者程序时间
判断MQ是否存在消息堆积场景方式
Producer发送消息的速率监控
Consumer消费消息的速率监控
Producer发送消息的最大偏移量(maxOffset)与Consumer消费消息的当前偏移量(currOffset)
的差别值与给定的消息堆积数值告警值对比,若是差别 值大于数据告警值,则存在消息堆积,不然不存在消息堆积
消息堆积场景
差别值呈现增大趋势
producer消息的发送速度大于consumer的消息消费速度
处理方式
1、消费者进行扩容操作
2、提高消费者消费速度;
3、对生产者限流操作;
producer的生产速率无明显增长,consumer的消费速率无明显增长
处理方式
这种状况基本上是能够肯定是RocketMQ自己的故障造成的,需要提高Broken节点自身的服务器配置,和相关参数;
producer生产速率正常,RocketMQ服务器性能正常,consumer消费速率下降
差别值呈现平稳趋势或者降低趋势
最佳工作模式:RocketMQ自己的服务性能,必要的时候能够对RocketMQ 进行扩容,提升消息堆积能力。
消息顺序消费
问题出现原因
某些特殊场景下,发送出去的消息,消费者需要按照顺序来消费
顺序消费的前提
发送出去的多条消息,都是走的同一个topic发送
问题具体场景
大多数业务场景不需要考虑消息的顺序性
不需要考虑消费顺序
消息系统吞吐量不大
消费者和消费者同时都只有一台机器
其他场景
生产者消费消息的时候,把消息发送到同一个topic下同一个队列;可以保证消费方只有一个线程去消费消息
将需要消费的顺序消息,合并成一条消息发送出去,这样消费的时候就是有顺序;
设置发送的消息为全局顺序消息
消息重复消费(消息幂等)
问题原因
网络问题,导致消息消费的确认消息,rocketMQ节点没有收到
1、消费者没有发出
2、网络原因导致数据丢失
3、rocketMQ节点没有收到
解决方式
1、代码层面
消费者代码逻辑中保持幂等性
2、消息发送层面
通过每条消息的唯一编号来保证
消费者记录消费过的消息的唯一id,接收到消息的时候,发现有此id已经消费,那么就不做处理
唯一编号
msgId
消息设置的key
消息体重的唯一标记
3、重复消息不处理
有些业务场景,重复接受到消息,也不会影响到业务,所以不处理也行
消息丢失
消息丢失场景
主要有三种场景
1、生产者发送到队列节点消息丢失
网络抖动导致消息丢失
2、RocketMQ节点消息未能持久化到磁盘
消息还未持久化到磁盘,节点宕机
已经持久化到磁盘,磁盘损坏,但是没有备份
3、RoekctMq节点消费者丢失
消息还未消费完成,就通知节点消息已经消费完了,此时消费者宕机,导致当前正在消费消息丢失;
处理方式
针对场景1处理
RoekctMQ 事物机制来确保消息都能发送到节点上
消息重投机制,消息投递失败,会再次投递
针对场景2处理
1、修改节点的消息持久化刷盘机制
将异步持久化到磁盘,修改成同步持久化到磁盘
RocketMq消息本身默认的持久化盘刷机制是异步刷盘
2、RocketMQ采用主从机构
确保主机挂了,从机上还有消息备份数据
针对场景3处理
ack机制,设置消息成功消费之后,再通知节点消息已经成功消费
处理方式带来的问题
导致问题
性能和吞吐量也将大幅下降
优化机制
使用事务机制传输消息
1、耗费性能,导致消息发送速率降低
同步刷盘
刷盘操作更为频繁,导致刷盘效率低下
主从机制
主机需要把数据同步到从机,消耗主机网络io,和cpu
消费完再通知节点
消费者消费消息速度降低
消息复制
MQ对比
性能对比
示意图
各自优缺点
Kafka
优点
性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。
时效性:ms级
可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;
有优秀的第三方Kafka Web管理界面Kafka-Manager;
在日志领域比较成熟,被多家公司和多个开源项目使用;
功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
时效性:ms级
可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;
有优秀的第三方Kafka Web管理界面Kafka-Manager;
在日志领域比较成熟,被多家公司和多个开源项目使用;
功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点
Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
使用短轮询方式,实时性取决于轮询间隔时间;
消费失败不支持重试;
支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
社区更新较慢;
使用短轮询方式,实时性取决于轮询间隔时间;
消费失败不支持重试;
支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
社区更新较慢;
RabbitMQ
优点
由于erlang语言的特性,mq 性能较好,高并发;
吞吐量到万级,MQ功能比较完备
健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
开源提供的管理界面非常棒,用起来很好用
社区活跃度高;
吞吐量到万级,MQ功能比较完备
健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
开源提供的管理界面非常棒,用起来很好用
社区活跃度高;
缺点
erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug,不利于做二次开发和维护。
RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。
需要学习比较复杂的接口和协议,学习和维护成本较高
RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。
需要学习比较复杂的接口和协议,学习和维护成本较高
RocketMQ
优点
单机吞吐量:十万级
可用性:非常高,分布式架构
消息可靠性:经过参数优化配置,消息可以做到0丢失
功能支持:MQ功能较为完善,还是分布式的,扩展性好
支持10亿级别的消息堆积,不会因为堆积导致性能下降
源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控
可用性:非常高,分布式架构
消息可靠性:经过参数优化配置,消息可以做到0丢失
功能支持:MQ功能较为完善,还是分布式的,扩展性好
支持10亿级别的消息堆积,不会因为堆积导致性能下降
源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控
缺点
支持的客户端语言不多,目前是java及c++,其中c++不成熟;
社区活跃度一般
没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
社区活跃度一般
没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码
技术选型
Kafka
日志收集和传输
RocketMQ
RoketMQ在稳定性上可能更值得信赖,业务有并发场景,建议可以选择RocketMQ
RabbitMQ
数据量没有那么大,小公司优先选择功能比较完备的RabbitMQ
0 条评论
下一页