kafka
2022-08-23 14:16:05 0 举报
AI智能生成
登录查看完整内容
kafka原理,知识点总结,与RocketMQ对比
作者其他创作
大纲/内容
kafka的元数据信息都是保存在zk上的
由broker、zk、producer、consumer组成
Kafka
RocketMQ 由 NameServer、Broker、Consumer、Producer组成
NameServer之间互不通信
Broker 会向所有的 NameServer注册,通过心跳判断Broker是否存活
Producer 和 Consumer 通过 NameServer 就知道 Broker 上有哪些 Topic
RocketMQ
架构
rocketMq单机写入TPS单实例约数十万条/秒,单机部署3个Broker,可以跑到最高12万条/秒,消息大小10个字节
kafka(由Scala和Java编写)单机写入TPS约在百万条/秒(写百万级别的TPS),消息大小10个字节,kafka 的 producer 可以批量发送数据
Kafka的TPS跑到单机百万,主要是由于Producer端将多个小消息合并,批量发向Broker
Producer通常使用Java语言,缓存过多消息,GC是个很严重的问题
Producer调用发送消息接口,消息未发送到Broker,向业务返回成功,此时Producer宕机,会导致消息丢失,业务出错
Producer通常为分布式系统,且每台机器都是多线程发送,认为线上的系统单个Producer每秒产生的数据量有限,不可能上万
消息合并功能完全可由上层业务来做
font color=\"#f44336\
RocketMQ为什么没有这么做
总结
性能
kafka单机若超过了64个partition/队列,CPU load会发生明显飙高,partition越多,CPU load越高,发消息的响应时间变长
RocketMQ单机支持最高5万个队列,CPU load不会发生明显变化
消费者的集群规模和队列数成正比,队列越多,消费类集群可以越大
队列多有什么好处
总结:RocketMQ支持的队列数远高于kafka支持的partition数,这样RocketMQ可以支持更多的consumer集群
单机支持的队列数
kafka采用短轮询的方式,实时性取决于轮询时间间隔,0.8以后版本支持长轮询
RocketMQ使用长轮询,同Push实时性一致,消息投递的延迟通常在几毫秒内
总结:kafka与RocketMQ都支持长轮询,消息投递的延迟在几毫秒内
消息投递的实时性
MQ一般都有topic的概念,为了增加生产者和消费者的并发度,在kafka里有分区的概念,而在rocketMq中是queueId的概念,多个queue可以分布到不同的broker上
一个topic会有多个partition,partition会分布在不同的broker上,在单个partition是顺序写
broker写消息到partition的时候是写到pagecache中
当broker单机的partition过多的时候,很多partition同时往pagecache中写数据,相对与磁盘来说这就是随机写了,这时候kafka的性能会急剧下降
kafka的topic是partition的概念
kafka是基于java的FileChannel的zero copy
rocketMq底层存储依靠commitLog,commitLog存储了所有的topic的数据,在写磁盘上是顺序写
通过 consumerQueue
rocketMq单机写commitLog时所有的线程是加锁等待写的
写到pagecache或者刷到磁盘以后,会有个单独的线程(ReputMessageService)来异步构建consumerQueue(一个queue对应一个cousumerQueue)
一个long型offset(对应commitLog中的offset)
一个int型的size(消息的长度)
一个long型的tag,tag有两个用途,一种是发送消息的tag值的hashCode用来消息过滤,一个是在延时消息中存储消息到期执行的时间
consumerQueue由20个字节组成
所有的topic数据都放一个文件里,那么消费者怎么知道去commitLog的哪个offset去获取数据?
逻辑是在IndexFile中实现的,就相当与根据磁盘实现了一个hashMap,根据topic+key的hash找到indexFile的物理存储位置,里面记录了commitLog中的offset,hash冲突是链表法解决的。indexFile的构建也是在ReputMessageService中异步构建的
RocketMq支持根据消息的key来搜索消息
由于rocketMq的所有topic数据都放在同一个commitLog中,rocketMq的单机broker可支持上万topic
rocketMq当正在写的文件写满后,会用另一个线程创建一个新的基于mmap的磁盘映射文件
注意,这只是做了个映射,并没有把文件对应的内存空间搞到pagecache
这时往mmap里写数据的话,操作系统会产生缺页中断,然后将磁盘的页加载到pagecache
如果没有进行磁盘与pagecache预热的话,那写数据的时候还是有一定性能影响的
RocketMq想到了这点,创建完mmap映射后,通过代码写了一些数据到pagecache,提高了性能
rocketMq默认是基于mmap来进行磁盘和内存的映射
存储
kafka单个partition会有多个副本,producer写数据的时候,会往leader里面写(读消息也是从leader副本读),然后follower会同步leader的数据,同时会在zk中维护一个isr的副本列表,在isr列表中的副本都是能跟上leader数据的。如当一个副本所在的机器宕机或发生了fullGc,这时候这个副本会被剔除isr列表,当这个副本跟上leader数据的offset之后,会被重新加入到isr列表中。当副本的leader挂了以后,zk会在isr中的副本选主,所以kafka的副本最少可以设置一个
高可用
kafka的生产者acks
kafka没有提供主动刷盘的机制,要保证消息不丢失,应该等所有的副本同步完了在返回成功
LEO即日志末端位移(log end offset),记录该副本底层日志(log)中下一条消息的位移值,leader更新LEO的值是在写入数据后更新,follower更新LEO的值是followe从leader拉取数据写入到本地之后更新
HW:即水位值,小于HW的offset被认为是更新到所有副本,这时候该数据可以被消费者消费
在多副本情况下,涉及到副本之间数据同步,必然有快慢之分,因此kafka有两个概念 : LEO和HW
消息不丢失
一个是master/slave 架构,slave同步master的数据,但是这个架构目前开源版本的当master挂了以后,slave不能主动切换成master,需要人工切换
还有一个是DLedger多副本机制,底层使用raft算法,实现了高可用。使用这种模式,一个leader应该至少对应两个副本,这样才能在leader挂掉以后,根据投票才能选出另一个leader
rocketMq 提供了两种架构
rocketMq的commitLog模式是使用操作系统的mmap将磁盘与内存做映射,写消息时是先写到pagecache
在返回写成功状态时,消息已经被写入磁盘
具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态
同步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
异步刷盘
等Master和Slave均写成功后才反馈给客户端写成功状态
同步复制
只要Master写成功即可反馈给客户端写成功状态
异步复制
金融场景下对数据一致性较高的情况下,建议采用同步刷盘、同步复制
RocketMq提供了几种策略(master/slave架构下)
这两种复制方式各有优劣,在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失;在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量
RocketMq还提供了一个堆外内存池的优化策略,写数据时先写到堆外内存,然后在有单独的线程刷到pageCache中,之后有单独的线程刷到磁盘,这样做的好处是实现了读写分离。写堆外内存,读pagecache,这种情况下有可能丢数据。
DLedger多副本的情况下,由leader先写到pagecache,leader等半数以上的flower写成功以后,返回成功给生产者。由raft算法的强leader性,数据丢失不了,有可能重复,这时候要在消费端保证幂等
leader中为每一个副本维护了一个writeIndex,其实也就是每一个副本对应着一个线程,写数据时先写到leader成功,然后leader的本地的leaderEndIndex增加。leader中维护的每个副本对应的线程只要发现副本对应的writeIndex小于leaderEndIndex,就会不停的向副本push数据。然后leader中还会有一个单独的check线程,检测当半数以上的副本写成功后,就会认为这条数据写成功了,然后CompletableFuture设置为完成,唤醒发送数据线程池中等待发送结果的线程,告诉它发送成功。这里面的实现还是有点复杂的,包括leader选举、数据同步等等
高可用与多副本
Kafka与RocketMq对比
Kafka能够和Spark、Flink、Storm等实时计算 引擎完美地结合
实时业务系统
Kafka也提供了应用接口(API),可以将主题(Topic)中的数据导出到Hive仓库做离线计算
离线任务处理
使用范围
零散的日志收集到Kafka集群中,然后通过Kafka的统一接口将这些数据开放给不同的消费者
日志收集
线上业务流量很大的应用,可以使用Kafka作为缓冲,以减少服务端的压力
消息系统
Kafka记录浏览器用户或者手机App用户产生的各种记录
用户轨迹
Kafka也可用来记录运营监控数据,包括收集各种分布式应用系统的数据(如Hadoop系统、Hive系统、HBase系统等)
记录运营监控数据
Kafka是一个流处理平台,所以在实际应用场景中也会与其他大数据套件结合使用,例如Spark Streaming、Storm、Flink等
可以比作为溪流,不过这溪流流的不是水是数据,例如“淘宝的行为情况”
第一类:想知道某个用户,比方行为信息(例如用户几点几分点击什么商品,几点几分浏览过什么),从而根据这些行为推荐商品信息。这类用户的行为信息是源源不断的,一个接一个来,比如用户在7点40分32秒浏览了iPhone6plus,在7点40分35秒看了小米4,这些信息一个个来到,越积越多,我们要求要迅速处理这些信息,没有延迟。就像在溪流的某个地方设立一个检测仪,检测水(数据)的实时情况。这就是流处理。
第二类:根据用户的一段时间的信息推荐商品,比如可以根据用户1年在淘宝的消费信息,统一进行分析处理。还是用水流的例子,可以把水流的水(数据)都集中在一个大水箱里面,然后分析水(数据)的情况。这样的分析并不是实时的。这种情况叫做批处理。
给用户推荐可以分成两类
流处理
流处理是实时性小任务的处理,它对处理的延迟容忍度较低,但是容错性较高
实现流处理
事件源是一种应用程序的设计风格,其中状态更改会产生一条带有 时间戳的记录,然后将这条以时间序列产生的记录进行保存。在面对非 常大的存储数据时,可以使用这种方式来构建非常优秀的后端程序
事件源
适用场景
消息生产者,就是向 kafka broker 发消息的客户端
Producer
消息消费者,向 kafka broker 取消息的客户端
Consumer
消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
Consumer Group (CG)
一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic
Broker
可以理解为一个队列,生产者和消费者面向的都是一个 topic
Topic
分区,为了实现扩展性,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列
Partition
副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower
Replica
每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader
leader
每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 leader
follower
Kafka Broker 维护的元数据存储在 ZK 中,扩容时,只需从 ZK 中读取数据即可
Zookeeper
kafka集群整体架构
单位时间内可以处理多少条数据
吞吐量
写数据请求发送给kafka一直到处理成功所消耗的时间
处理延迟
吞吐量和处理延时
batch微批处理,高吞吐高延迟
Spark Streaming采取微批处理技术实现流式计算
批量处理是一种非常有效的提升系统吞吐量的方法,减少网络通信开销
Kafka 的 Producer 只提供了单条发送的 send() 方法,并没有提供任何批量发送的接口。
当调用 send() 方法发送一条消息之后
无论是同步发送还是异步发送,Kafka 都不会立即就把这条消息发送出去
它会先把这条消息,存放在内存中缓存起来,然后选择合适的时机把缓存中的所有消息组成一批,一次性发给 Broker
Kafka采用了异步批量发送的机制
生产端 Producer:内部批量发送
Broker 端:每批消息都会被当做一个“批消息”来处理
消费端 Consumer:Consumer 从 Broker 拉到一批消息后,在客户端把批消息解开,再一条一条交给用户代码处理
各端批量处理数据
批处理:减少网络通信开销
PageCache 就是操作系统在内存中给磁盘上的文件建立的缓存
应用程序在写入文件的时候,操作系统会先把数据写入到内存中的 PageCache,然后再一批地写到磁盘上
PageCache 中有数据,那就直接读取,这样就节省了从磁盘上读取数据的时间
应用程序的读取线程会被阻塞,操作系统把数据从文件中复制到 PageCache 中,然后应用程序再从 PageCache 中继续把数据读出来,这时会真正读一次磁盘上的文件,这个读的过程就会比较慢。
PageCache 中没有数据,这时候操作系统会引发一个缺页中断
读取文件的时候,也是从 PageCache 中来读取数据,这时候会出现两种可能情况
接收到一批数据,Broker 直接写入 Page Cache (Page Cache 是基于 OS Cache)
Page Cache 再顺序写入磁盘
Kafka 高效数据读取:零拷贝技术
流程
利用操作系统 page cache 来缓存数据:减少磁盘 IO 开销
磁盘顺序写:提高写入性能
consumer端消费数据时候,先是经过Kafka应用从os cache里面读取数据
如果有数据直接返回到操作系统的网卡通过socket将数据发送给consumer系统
如果os cache里面没有数据,从磁盘里面读取数据到os cache然后到操作系统网卡上,通过socket返回给consumer
中间牵涉到kafka Broker跟os操作系统的上下文切换
不使用零拷贝技术读取
linux的sendfile,可以直接把操作交给os
os看page cache里是否有数据,如果没有就从磁盘上读取
如果有,直接把os cache里的数据拷贝给网卡了,中间不用走那么多步骤了,省略读取数据到kafka broker里面
使用零拷贝技术读取
CPU 的速度与磁盘 IO 的速度比起来相差几个数量级,可以用乌龟和火箭做比喻。
一般来说 IO 操作都是由 CPU 发出指令,然后等待 IO 设备完成操作后返回,那CPU会有大量的时间都在等待IO操作
但是CPU 的等待在很多时候并没有太多的实际意义,对于 I/O 设备的大量操作其实都只是把内存里面的数据传输到 I/O 设备而已。比如进行大文件复制,如果所有数据都要经过 CPU,实在是有点太浪费时间了。
基于此就有了DMA技术,翻译过来也就是直接内存访问(Direct Memory Access),有了这个可以减少 CPU 的等待时间。
什么是DMA
不使用零拷贝技术消费者(consumer)从Kafka消费数据,Kafka从磁盘读数据然后发送到网络上去,数据一共发生了四次传输的过程。其中两次是 DMA 的传输,另外两次,则是通过 CPU 控制的传输
零拷贝技术第一次传输:通过 DMA从硬盘直接读到操作系统内核的读缓冲区里面。
第二次传输:根据 Socket 的描述符信息直接从读缓冲区里面写入到网卡的缓冲区里面。
kafka使用零拷贝技术只进行了两次数据传输
可以看到同一份数据的传输次数从四次变成了两次,并且没有通过 CPU 来进行数据搬运,所有的数据都是通过 DMA 来进行传输的。没有在内存层面去复制(Copy)数据,这个方法称之为零拷贝(Zero-Copy)
详解零拷贝技术
零拷贝技术:减少数据多次拷贝开销
kafka 高吞吐低延迟
Kafka 如何实现高性能 IO
Kafka 底层数据存储以 partition 日志文件存储。
每一行数据都有一个序号offset 来代表这个消息在日志文件里的位置
注意:在消费消息的时候也有一个所谓的offset,这个offset是代表消费者目前在partition日志文件里消费到了第几条消息,是两个东西
比如:有个 topic 叫 “TOPICA”有 2 个分区,每个分区在一台机器上,2台机器上分别会有 2个目录:“TOPICA-0”、“TOPICA-1”
每个分区对应目录的格式:topic-name_partition-number(topic-分区号)
进入log.dir对应的目录,可以看到kafka的消息存储目录
其中,test1-0、test1-1代表当前test1主题下的两个分区,随便进入一个,即可看到消息日志文件。消息日志文件主要存放在分区文件夹里的以log结尾的日志文件里,如下是test1主题对应的分区0的消息日志
包含3个文件:.log、.index、.timeindex(.index、.timeindex是索引文件),创建过程叫做log rolling,正在被写入的日志段文件,叫做active log segment
文件日志 .log:实际存储数据,文件名代表起始 offset
索引 index:位移索引,为了快速查找 offset
索引 timeindex:时间戳索引,根据时间快速查找
一个 日志段文件(log segment file) 组成
日志文件不会无限增大,只要日志文件超过 1GB (log.segment.bytes 默认值),Kafka 就会创建新的段segment
offset 升序排序,使用二分查找
如何定位数据
log.retention.day:默认 7 天,每日会把 7天以前的数据清理掉,包括 .log、.index、.timeindex文件
log.retention.hours:默认 7 天换算成小时,设置保留多少小时
Broker 会在后台启动线程异步进行日志清理
核心数据:在 Kafka 中,可以保留 7 天,甚至 15天
怕数据丢失?可以重新回放处理一次,让下游消费者再次消费
系统部署一般都都需要考虑这块
磁盘上的日志文件是按照什么策略定期清理腾出空间的
日志文件
底层数据结构存储
Kafka 将一个逻辑上的 Topic 数据拆分成多个分区 partition
不同的分区 partition 部署到不同的机器上
生产数据:可以将同一 topic 上数据写入到不同的 partition 机器上
消费数据:根据主题从不同 partition 上消费数据
TB量级数据分布式存储
防止Kafka某台机器宕机,导致一个topic就丢失了一个partition的数据(数据丢失)
主分区(leader partition),有且仅有一个,负责对外提供数据读写
当使用生产者API向Kafka成功写入消息后,马上使用消费者API实现刚才生产的消息
如果允许副本提供服务,副本同步时异步的,因此可能出现follwer副本还没更新最新消息,导致查不到数据
follower 不提供服务的原因
副本分区(follower partition),可以有多个,主要复制数据
每个分区 partition 有多个副本
假设某台机器宕机,leader partition没了,此时会通过zookeeper来维持跟每个kafka的会话
如果一个kafka进程宕机了,此时kafka集群就会重新选举一个leader partition,就是用他的某个副本partition即可
通过副本partition可以继续提供这个partition的数据写入和读取,就可以实现容错
选举
多副本冗余
kafka的broker端每个partition都会有多个副本(replica),至少有一个leader副本以及多个follower副本(可配置),分区接收消息以及消费消息都会在leader中进行,而follower副本的作用就是备份leader中的消息,它们会定时的从leader中拉取最新数据,从而尽可能的保证和leader的同步
若leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,但当时刚才的数据已经丢失了
只依靠多副本机制能保证Kafka的高可用性,但是不能保证数据不丢失
如果一个 leader 里面的 ISR 列表没有 follower ,是不允许其他应用往 Kafka 写数据
只能从 ISR 列表里选举新的 leader
所有与leader副本保持一定程度同步的副本(包括leader)组成ISR(in-sync replicas)
与leader副本同步滞后过多的副本(不包括leader),组成OSR(out-sync replicas)
AR = ISR + OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR = ISR,OSR集合为空。
分区中的所有副本统称为AR(Assigned Replicas)
当follower落后太多或者长时间没有向leader发起同步请求,leader副本就会认为它出问题了,会把它从ISR中移除
这时候这个follower就会放入OSR集合中,直到某个时候这个follower同步跟上了leader,然后这个副本又会被加入到ISR中
当leader副本宕机时,只有ISR中的follower副本才有资格成为leader,OSR中的则没有资格。
leader副本负责维护和跟踪ISR中所有follower的滞后状态
kafka何时扩充ISR
ISR机制
写入数据不丢失
数据写入Topic ,实际上是写入Topic里的partition
leader partition 负责数据的读和写
将leader partition分布在不同的机器上来实现请求负载均衡
实现负载均衡效果
既然 ISR 是可以动态调整的,那么就可以出现这样的情形:ISR 为空
因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader
可是 ISR 是空,此时该怎么选举新 Leader 呢
通常来说,非同步副本落后Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本
从Kafka 0.11.0.0版本开始,此参数默认值从true设置为false
在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举
开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性
反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性
CAP 理论的话,一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利
可以根据实际业务场景决定是否开启 Unclean 领导者选举,一般不建议开启这个参数,因为数据的一致性要比可用性重要的多
Unclean领导者选举(Unclean Leader Election)
kafka的高可用
kafka基本的元数据:topic信息,partition信息,broker信息,ISR列表等信息
共享数据不应该保存到自己的JVM内存中
kafka基本的元数据存储到自己独享的内存,这是有状态的
什么叫做无状态
后续对kafka进行扩容,加了一台kafka的机器到集群中,其他broker需要跟新增的broker进行内存数据的同步和操作等,而且数据同步等操作是不好处理的。
若broker是有状态的
Kafka无状态架构
维持各节点心跳:每个 Kafka Broker 要定时给 ZK 发送心跳
发现新节点、节点宕机
进行leader partition选举
每个Kafka Broker都会将其元信息注册在ZK上
节点故障感知
0 :表示客户端只管发送数据,不管服务端接收数据的任何情况
1 :表示客户端发送数据后,需要在服务端 leader 副本写入数据成功后,返回响应
all/-1 : 表示kafka ISR列表中所有的副本同步数据成功,才返回消息给客户端
acks
Kafka ISR 列表中最小同步副本数(包括leader)
建议值大于1 且小于 replication.factor 副本数
副本有 3 个,1 个 leader 和 2 个 follower
replication.factor = 3
最小同步数为1,1个副本写入数据就认为写入成功
min.insync.replicas = 2
举例
min.insync.replicas
kafka 0.9.0 版本之前存在的参数
follower如果落后Leader的消息数量超过了这个参数指定的数量之后,就会认为follower是out-of-sync,就会从 ISR 列表里移除
如果leader瞬间刚接收到几万条消息,然后所有follower还没来得及同步过去,此时所有follower都会被踢出ISR列表,然后同步了之后,再回到ISR列表
依靠固定参数判断的机制,会导致可能在系统高峰时期,follower会频繁的踢出ISR列表再回到ISR列表
新版本移除该参数原因
replica.lag.max.messages
kafka 0.10.x 新增的参数
与leader上一次交互时间超过阈值就会把follower剔除出 ISR
如果线上出现了流量洪峰,一下子导致几个follower都落后了不少数据,但是只要尽快追上来,在指定时间内别一直落后,就不会认为是out-of-sync,这个机制比固定参数判断的效果要很多
新版本增加该参数原因
replica.lag.time.max.ms
重要参数
负责推算 Leader partition 的 HW
当所有的 follower partition 的 LEO 推送给 Leader partition 时候,Leader partition 根据 min{LEO1...LEOn} 即可得到 Leader的HW
作用
LEO
Leader partition 同步到其所有 follower 的 offset
划分已提交和未提交数据:HW=3,表示前3条数据是已经同步到其他所有的 follower 里面去了,所以也将其叫做 committed(已提交数据),消费者是消费不到 HW 之后的 uncommmitted 数据的
消费:Consumer 只能看到 base offset 到 HW offset 之间的数据,这部分数据是 committed,可以被消费
HW
概念
维护所有 follower 的 LEO值
收到 follower 请求后,每次返回都会携带 HW
更新 HW: 取各 follower的 LEO 的最小值,即 min{LEO1...LEOn}
更新 LEO: 数据写入一条,此指针就往后移动
Leader 操作
定期向 Leader 发送 fetch 请求同步数据,每个请求都会携带自己的 LEO
更新 LEO: 每次同步数据到 follower,都会更新其 LEO值
Follower 操作
Leader 跟 Follower 的 HW 和 LEO 是如何更新的
Leader HW(实际上也是Partition HW)
Leader LEO
Follower HW
Follower LEO
Leader所在的Broker上还保存了其他Follower的LEO值,称为Remote LEO
每一个副本都保存了其HW值和LEO值
当Producer向.log文件写入数据时,Leader LEO首先被更新
Remote LEO要等到Follower向Leader发送同步请求(Fetch)时,才会根据请求携带的当前Follower LEO值更新
Follower在接收到Leader的响应(Response)后,首先将消息写入.log文件中,随后更新Follower LEO
更新流程
举例来说,如果一开始Leader和Follower中没有任何数据,即所有值均为0。那么当Prouder向Leader写入第一条消息,上述几个值的变化顺序如下
Follower往往需要进行两次Fetch请求才能成功更新HW。Follower HW在某一阶段内总是落后于Leader HW,因此副本在根据HW值截取数据时将有可能发生数据的丢失或不一致
图中两副本的LEO均为2,但Leader副本B上的HW为2,Follower副本A上的HW为1,正常情况下,副本A将在接收Leader Response后根据Leader HW更新其Follower HW为2
假如此时副本A所在的Broker重启,它会把Follower LEO修改为重启前自身的HW值1,因此数据M1(Offset=1)被截断
当副本A重新向副本B发送同步请求时,如果副本B所在的Broker发生宕机,副本A将被选举成为新的Leader
数据丢失
图中Leader副本B写入了两条数据M0和M1,Follower副本A只写入了一条数据M0。此时Leader HW为2,Follower HW为1
如果在Follower同步第二条数据前,两副本所在的Broker均发生重启且副本A所在的Broker先重启成功,那么副本A将成为新的Leader
这时Producer向其写入数据M2,副本A作为集群中的唯一副本,更新其HW为2
当副本B所在的Broker重启后,它将向当前的Leader副本A同步数据
由于两者的HW均为2,因此副本B不需要进行任何截断操作
在这种情况下,副本B中的数据为重启前的M0和M1,副本A中的数据却是M0和M2,副本间的数据出现了不一致
数据不一致
HW的隐患
HW的更新机制
Kakfa引入Leader Epoch后,Follower就不再参考HW,而是根据Leader Epoch信息来截断Leader中不存在的消息。这种机制可以弥补基于HW的副本同步机制的不足
每当Leader副本发生变更时,都会增加该版本号
Epoch值较小的Leader被认为是过期Leader,不能再行使Leader的权力
Epoch:一个单调增加的版本号
起始位移(Start Offset):Leader副本在该Epoch值上写入首条消息的Offset
Leader Epoch由两部分组成
每个副本的Leader Epoch信息既缓存在内存中,也会定期写入消息目录下的leaderer-epoch-checkpoint文件中
向Leader发送LeaderEpochRequest,请求中包含了Follower的Epoch信息
Leader将返回其Follower所在Epoch的Last Offset
如果Leader与Follower处于同一Epoch,那么Last Offset显然等于Leader LEO
如果Follower的Epoch落后于Leader,则Last Offset等于Follower Epoch + 1所对应的Start Offset
Follower接收响应后根据返回的Last Offset截断数据
在数据同步期间,只要Follower发现Leader返回的Epoch信息与自身不一致,便会随之更新Leader Epoch并写入磁盘
当一个Follower副本从故障中恢复重新加入ISR中,后续操作流程:
在上面介绍的数据丢失场景中,副本A所在的Broker重启后根据自身的HW将数据M1截断
而现在,副本A重启后会先向副本B发送一个请求(LeaderEpochRequest)
由于两副本的Epoch均为0,副本B返回的Last Offset为Leader LEO值2
副本A上并没有Offset大于等于2的消息,因此无需进行数据截断,同时其HW也会更新为2
当副本B重启回来并向当前Leader副本A发送LeaderEpochRequest,得到的Last Offset为Epoch=1对应的Start Offset值2
同样,副本B中消息的最大Offset值只有1,因此也无需进行数据截断,消息M1成功保留了下来
解决数据丢失场景
在上面介绍的数据不一致场景中,由于最后两副本HW值相等,因此没有将不一致的数据截断
副本B重启后向当前Leader副本A发送LeaderEpochRequest,得到的Last Offset为Epoch=1对应的Start Offset值1,因此截断Offset=1的消息M1
这样只要副本B再次发起请求同步消息M2,两副本的数据便可以保持一致
解决数据不一致场景
Leader Epoch
副本数据同步机制
Apache Kafka 的核心组件
当控制器发现一个 broker 离开集群(通过观察相关 ZooKeeper 路径),控制器会收到消息:这个 broker 所管理的那些分区需要一个新的Leader
控制器会依次遍历每个分区,确定谁能够作为新的 Leader
然后向所有包含新 Leader 或现有 Follower 的分区发送消息,该请求消息包含谁是新的 Leader 以及谁是 Follower 的信息。
随后,新的 Leader 开始处理来自生产者和消费者的请求,Follower 用于从新的 Leader 那里进行复制
主要作用:在 Apache Zookeeper 的帮助下管理和协调整个 Kafka 集群
集群中任意一台 Broker 都能充当控制器的角色,但在运行过程中,只能有一个 Broker 成为控制器
执行 kafka-topics 脚本时,大部分的后台工作都是控制器来完成的
主题管理(创建、删除、增加分区)
利用 kafka-reassign-partitions 脚本,对已有主题分区进行细粒度的分配功能
分区重分配
Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案
假设为一个分区分配了3个副本,它们分别是 0、1、2,那么节点0就是该分区的 preferred replica,并且通常情况下是不会发生变更的,选择节点0的原因仅仅是它是副本列表中的第一个副本
preferred replica
把指定分区的leader调整回他们的preferred replica,即将例子中的节点0调整成leader
preferred leader 选举
若某Broker宕机崩溃,该Broker上的Leader副本不可用,因此必须把这些分区的Leader转移到其他的Broker上
即使Broker重启回来,其上的副本也只能作为follower副本加入ISR,不能对外提供服务
随着集群内的不断运行,这种Leader的不均衡现象,开始出现,即集群中的一小部分Broker上承载了大量的分区leader副本
为了校正这种情况引入了首选副本preffered leader
加入该机制原因
Preferred 领导者选举
控制器组件会利用 Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。 当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点
集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
数据服务:就是向其他 Broker 提供数据服务
5大职责
存活的Broker列表
关闭的Broker列表
Preferred Leader选举的分区
每个分区的副本信息
所有 Topic 列表
Topic 副本数据
Topic 分区数据
分区 Leader 和 ISR 数据
不可用 Offline 分区
存户的副本列表
存储数据
ZK 的数据模型类似文件系统的树形结构,根目录以 “/” 开始,该结构上的每个节点称为 znode
不会因为 ZK 集群重启而消失
持久性 znode
生命周期与创建该 znode 的 ZK 会话绑定,一旦会话结束,节点就会自动删除
临时 znode
znode
node 节点有一个 Watcher 机制:当数据发生变化的时候, ZooKeeper 会产生一个 Watcher 事件,并且会发送到客户端
基于 Zookeeper上创建的节点,可以对这些节点绑定监听事件
比如可以监听节点数据变更、节点删除、子节点状态变更等事件
通过这个事件机制,可以基于 ZooKeeper实现分布式锁、集群管理等功能
Watcher 监听机制是 Zookeeper 中非常重要的特性
ZooKeeper
番外
Controller通过与zookeeper交互,进行集群的元数据管理,其管理的元数据主要包括以下内容
集群间会有多个 Broker,每个 Broker 都会有一个 broker.id
每个 broker.id 都有一个唯一的标识符用来区分,这个标识符可以在配置文件里手动指定,也可以自动生成
broker.id.generation.enable 参数是用来配置是否开启自动生成 broker.id 的功能,默认情况下为true,即开启此功能
自动生成的 broker.id 有一个默认值,默认值为1000,也就是说默认情况下自动生成的 broker.id 从1001开始
Kafka 可以通过 broker.id.generation.enable 和 reserved.broker.max.id 来配合生成新的 broker.id
Kafka 在启动时会在 ZooKeeper 中 /brokers/ids 路径下注册一个与当前 broker 的 id 相同的临时节点
新的 Broker 会试着进行注册,但不会成功,因为 ZooKeeper 里面已经有一个相同 ID 的 Broker
如果要启动另外一个具有相同 ID 的 Broker,那么就会得到一个错误
监听 Broker 列表的 Kafka 组件会被告知该 Broker 已移除
在 Broker 停机、出现分区或者长时间垃圾回收停顿时,Broker 会从 ZooKeeper 上断开连接,此时 Broker 在启动时创建的临时节点会从 ZooKeeper 中移除
在完全关闭一个 Broker 之后,如果使用相同的 ID 启动另一个全新的 Broker,它会立刻加入集群,并拥有一个与旧 Broker 相同的分区和主题
在关闭 Broker 时,它对应的节点也会消失,不过它的 ID 会继续存在其他数据结构中,例如主题的副本列表中
Kafka 的健康状态检查就依赖于此节点。当有 Broker 加入集群或者退出集群时,这些组件就会获得通知
集群Broker间的关系
Broker 在启动时,会尝试去 ZooKeeper 中创建临时节点 /controller
Zk 会保证只有一个会创建成功
这种方式可以确保只有一个控制器存在。那么只有单独的节点一定是有个问题的,那就是单点问题。
Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器
ZooKeeper 赋予客户端监控 znode 变更的能力,即所谓的 Watch 通知功能
一旦 znode 节点被创建、删除,子节点数量发生变化,或是 znode 所存的数据本身变更,ZooKeeper 会通过节点变更监听器 (ChangeHandler) 的方式显式通知客户端
没有创建节点成功的 Broker,都会在 /controller 节点上加个监听器
控制器的选举
第一个在 ZooKeeper 中的 /brokers/ids 下创建节点的 broker 作为 broker controller,也就是说 Broker controller 只有一个,那么必然会存在单点失效问题
故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器
kafka 为考虑到这种情况提供了故障转移功能,也就是 Fail Over
Broker0 一开始是控制器
现在 Broker0 宕机了,Zk 通过 Watch 机制感知到并删除了 /controller 临时节点
Broker1、Broker2、Broker3 监听到 Zk 的消息,都开始竞选成为新的控制器
最终,Broker3 赢得了选举,成功地在 Zk 上重建了 /controller 节点
Broker3 从 Zk 中读取集群元数据信息,并初始化到自己的缓存中
Broker3 开始行使正常的工作职责
控制器故障转移的过程
Broker controller 故障转移
这个创建过程就会在 Zk 中注册对应的 Topic 的元数据(partition、副本)
此时状态都是 NonExistentReplica
创建一个 Topic:会分配设置的 partition,以及每个 partition 指定几个副本
监听 Zk 上的数据变更
感知到 Topic 变动,就会从 Zk 中加载所有 partition 副本到内存里,并把 partition副本状态变更为 NewReplica
然后选择的第一个副本作为 Leader,其他都是 follower,并且把他们都放到 partition 的 ISR 列表中
Kafka Controller 本质
从每个 partition 的副本列表中取出第一个作为 leader,其他的就是 follower,并把这些放到 partition 对应的 ISR 列表中
把 partition 副本均匀分散到各机器上
同时设置整个 partition 的状态为:OnlinePartition
接着 Controller 会把这个 partition 和副本所有的信息(哪个是 leader、哪个是 follower、ISR列表)都发送给所有 Broker
例子
创建 Topic 时,Controller 如何实现 Leader 选举
通知设置对应所有 Partition 副本的状态为:OfflineReplica,即让对应副本全部下线
Controller 会发送请求给这个 Topic 所有 Partition 所在的 Broker 机器
Controller 把对应全部副本状态变为 ReplicaDeletionStarted
Controller 再发送请求这些 Broker,让其把对应 partition 副本的数据都删除(磁盘上的文件)
最后设置分区状态为:OfflineReplica
删除 Topic 时,Controller 如何清理数据
感知到 Broker 变更,将信息同步给其他 Broker
分配信息 或 进行 Leader 选举
Controller 如何感知 Broker 的上线以及崩溃
为每个 Broker 创建一个对应的 Socket连接,然后在创建一个专属的线程,用于向这些 Broker 发送特定的请求
控制器还会为主题删除创建额外的 I/O 线程
这些线程还会访问共享的控制器缓存数据,为了维护数据安全性,控制在代码中大量使用 ReetrantLock 同步机制,进一步拖慢了整个控制器的处理速度
控制器是多线程的设计,会在内部创建很多线程
Kafka 0.11 版本之前
增加了一个 Event Executor Thread,事件执行线程
从图中可以看出,不管是 Event Queue 事件队列还是 Controller context 控制器上下文都会交给事件执行线程进行处理
将原来执行的操作全部建模成一个个独立的事件,发送到专属的事件队列中,供此线程消费
第一个改进
控制器缓存中保存的状态只被一个线程处理,因此不再需要重量级的线程同步机制来维护线程安全,Kafka 不用再担心多线程并发访问的问题,非常利于问题定位和诊断控制器的各种问题
ZooKeeper API 提供了两种读写的方式:同步和异步
之前控制器操作 ZooKeeper 都是采用的同步方式,这次把同步方式改为异步,据测试,效率提升了 10 倍
将之前同步的 ZooKeeper 全部改为异步操作
第二个改进
之前的设计是 broker 会公平性的处理所有 controller 发送的请求
什么意思呢?公平性难道还不好吗?在某些情况下是的,比如 broker 在排队处理 produce 请求,这时候 controller 发出了一个 StopReplica 的请求,你会怎么办?还在继续处理 produce 请求吗?这个 produce 请求还有用吗?此时最合理的处理顺序应该是,赋予 StopReplica 请求更高的优先级,使它能够得到抢占式的处理
根据优先级处理请求
第三个改进
最大的改进就是:把多线程的方案改成了单线程加事件队列的方案
Kafka 0.11 之后
Controller 内部设计原理
controller挂掉后,Kafka集群会重新选举一个新的controller
这里面存在一个问题,很难确定之前的controller节点是挂掉还是只是短暂性的故障。如果之前挂掉的controller又正常了,他并不知道自己已经被取代了,那么此时集群中会出现两台controller
集群中有Broker1、Broker2、Broker3,其中Broker1为Controller Broker
某个时间段,处于活跃状态的Controller进入了长时间的GC暂停。它的ZooKeeper会话过期了,之前注册的/controller节点被删除。集群中其他Broker会收到zookeeper的这一通知
由于集群中必须存在一个controller Broker,所以现在每个Broker都试图尝试成为新的controller。假设Broker2速度比较快,成为了最新的controller Broker
此时,每个Broker会收到Broker2成为新的controller的通知,由于Broker1正在进行\"stop the world\"的GC,可能不会收到Broker2成为最新的controller的通知
等到Broker1的GC完成之后,仍会认为自己是集群的controller,在Broker1的眼中好像什么都没有发生一样
现在,集群中出现了两个Controller,它们可能一起发出具有冲突的命令,就会出现脑裂的现象
如果对这种情况不加以处理,可能会导致严重的不一致。所以需要一种方法来区分谁是集群当前最新的Controller
epoch number只是单调递增的数字
第一次选出Controller时,epoch number值为1,如果再次选出新的Controller,则epoch number将为2,依次单调递增
Kafka是通过使用 epoch number(纪元编号,也称为隔离令牌)来完成的
每个新选出的controller通过Zookeeper 的条件递增操作获得一个全新的、数值更大的epoch number
其他 Broker 在知道当前epoch number 后,如果收到由 Controller 发出的包含较旧(较小) epoch number的消息,就会忽略它们
即 Broker根据最大的 epoch number 来区分当前最新的 Controller
解决方案
epoch number 记录在 Zookeepr 的一个永久节点 controller_epoch
脑裂问题
控制器组件(Controller)
包括具体的分区信息,比如领导者副本是谁,ISR集合中有哪些副本等
所有主题信息
包括当前都有哪些运行中的Broker,哪些正在关闭中的Broker等
所有Broker信息
包括当前正在进行Preferred领导者选举以及分区重分配的分区列表
所有涉及运维任务的分区
这些数据其实在ZooKeeper中也保存了一份。每当控制器初始化时,它都会从ZooKeeper上读取对应的元数据并填充到自己的缓存中
而Broker上元数据的更新都是由Controller通知完成的,Broker并不从Zookeeper获取元数据信息
注意
比较重要的数据有
Producer和Broker之间(发送数据)
Broker和Consumer之间(消费数据)
Broker和Broker之间(同步数据)
通信主要发生位置
通过固定的连接不断的传输数据,避免频繁的创建连接和销毁连接的开销。
Broker端会构造一个请求队列,然后不停的获取请求放入队列,后台再通过一堆的线程来获取请求进行处理
通信都是基于TCP协议进行的,底层基于TCP连接和传输数据
Kafka跟Netty有一个共同点就是都是使用Reactor模型实现了多路复用但是kafka在此基础之上多了一个Handler线程池去处理请求,将处理完的请求放到相应队列里面,让processor线程去处理返回给acceptor
可以用num.network.threads参数设置processor线程的数量,默认是3
每个Broker上都有一个acceptor线程和很多个processor线程
Client跟一个Broker之间只会创建一个socket长连接,会复用
然后Broker就用一个acceptor来监听每个socket连接的接入,分配这个socket连接给一个processor线程
processor线程负责处理这个socket连接,监听socket连接的数据传输以及客户端发送过来的请求,acceptor线程会不停的轮询各个processor来分配接入的socket连接
使用的是Reactor的多路复用思想,基于selector机制
用一个selector监听各个socket连接,看其是否有请求发送过来,这样一个processor就可以处理多个客户端的socket连接了
processor线程会负责把请求放入一个Broker全局唯一的请求队列,默认大小是500,是queued.max.requests参数控制的
所有的processor会不停的把请求放入这个请求队列中
接着一个KafkaRequestHandler线程池负责不停的从请求队列中获取请求来处理,这个线程池大小默认是8个,由num.io.threads参数来控制
处理完请求后的响应,会放入每个processor自己的响应队列里
然后会监听自己的响应队列,把响应拿出来通过socket连接发送回客户端
proessor需要处理多个客户端的socket连接就是通过java nio的selector多路复用思想来实现的
一个proessor如何处理多个客户端的socket连接请求的?
图解
Reactor模式
基于Reactor模型进行多路复用处理请求
Kafka通信
Kafka、Dubbo、ZooKeeper、Netty 、Quartz、Caffeine 、Akka 中都有对时间轮的实现
Timer 内部使用一个叫做 TaskQueue 的类存放定时任务,它是一个基于最小堆实现的优先级队列
TaskQueue 会按照任务距离下一次执行时间的大小将任务排序,保证在堆顶的任务最先执行。这样在需要执行任务时,每次只需要取出堆顶的任务运行即可
不过其缺陷较多,比如一个 Timer 一个线程,这就导致 Timer 的任务的执行只能串行执行,一个任务执行时间过长的话会影响其他任务(性能非常差),再比如发生异常时任务直接停止(Timer 只捕获了 InterruptedException )
java.util.Timer是 JDK 1.3 开始就已经支持的一种定时任务的实现方式
Kafka内部有很多延时任务,没有基于JDK Timer来实现,插入和删除任务的时间复杂度是O(nlogn),而是基于了自己写的时间轮来实现的,时间复杂度是O(1)
一个时间轮(TimerWheel)就是一个数组实现的存放定时任务的环形队列,数组每个元素都是一个定时任务列表(TimerTaskList),这个TimerTaskList是一个环形双向链表,链表里的每个元素都是定时任务(TimerTask)
时间轮的机制
Kafka的时间轮延时调度机制
核心原理
生产者(Producer)写数据时不再区分同步和异步,所有的操作请求均以异步的方式发送,这样大大地提高了客户端写数据的效率
异步生产数据
发送消息,消息的格式如下为ProducerRecord
序列化消息,根据Topic的元数据缓存进行partitioner 。分区后的消息格式就是一个Map,Key为TopicPartition对象用来标识一个消息的唯一分区。Deque对象中存储了每个消息的回调函数,用于Sender线程响应客户端
将消息放入缓冲区,主线程的操作结束
用户主线程
不断轮询缓冲区中达到要求的batch
每个broker一次就是一个请求
按照Broker进行分类
建立socket连接发送给不同的Broker
这个参数默认值是5
默认情况下,每个Broker最多只能有5个请求是发送出去但是还没接收到响应的,所以这种情况下是有可能导致顺序错乱的,先发送的请求可能后续要重发
max.in.flight.requests.per.connection
根据消息的回调函数,进行响应
Sender线程
涉及到两个线程
设置meigebatch的大小,如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里
也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量
默认值是:16384,就是16kb
batch.size
默认是0,意思就是消息必须立即被发送
一般设置一个100毫秒之类的,就是这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去
但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力
linger.ms
消息批量发送的核心参数
producer会创建一个accumulator缓冲区,里面是一个HashMap数据结构,每个分区都会对应一个batch队列,因为打包的batch,必须是这个batch都是发往同一个分区的,这样才能发送一个batch到这个分区的leader broker
第一个是compressor,这是负责追加写入batch的组件
第二个是batch缓冲区,就是写入数据的地方
第三个是thunks,就是每个消息都有一个回调Callback匿名内部类的对象,对应batch里每个消息的回调函数
每个batch包含三个东西
缓冲区内部数据结构
如果写消息缓冲区满了,此时是阻塞住一段时间,然后默认是60000,也就是60秒之后抛异常
max.block.ms
发送消息过程
首先指定消息发送到哪个 Topic
也可以指定一个分区 key,根据 key 的 hash 值来分发到指定的分区
也可以自定义 partition 来实现分区策略
选择一个 Topic 的分区 partitiion,默认是轮询来负载均衡
找到这个分区的 leader partition
与所在机器的 Broker 的 socket 建立通信
发送 Kafka 自定义协议格式的请求(包含携带的消息)
发送消息
Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。
producer.send(msg);
方式一:默认分区策略
方式二:指定 key,根据 key 的 hash值去分发到某个分区上
所谓分区策略是决定生产者将消息发送到哪个分区的算法
Round-robin 策略,即顺序分
Kafka Java 生产者 API 默认提供的分区策略
轮询策略
随意地将消息放置到任意一个分区上
随机策略
一旦消息被定义了 Key,那么就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的
一个 Topic 只有一个分区
如何保证消息全局顺序一致
发送方保证业务顺序性
发送的消息根据 key,发送到同一个分区下
如何保证相关业务顺序处理
按消息键保序策略
其他分区策略:自定义
策略
分区策略
producer.close();
主动断开
kill -9
暴力删除
用户主动关闭
connection.max.idles.ms
TCP 连接是在 Broker 端被关闭的,但这个关闭连接请求是客户端发起的,对 TCP 而言这是被动的关闭,被动关闭会产生大量的CLOSE_WAIT连接
kafka 自动关闭
Producer 端关闭 TCP 连接的方式
重试次数配置
比如说网络抖动导致Producer以为没成功,其实是成功了的
消息重复发送
消息重试是可能会导致消息乱序的,因为可能排在后面的消息都发送出去了
所以可以使用“max.in.flight.requests.per.connection”的参数设置为1,这样可以保证Producer同一时间只能发送一条消息
消息乱序
重试可能导致的问题
retries
Leader分区不可用了
平时重启 Broker 进程,肯定会导致 leader 切换,会导致写入报错。
产生原因:某个机器挂了,要等待选出新 leader分区,才能继续写入。
LeaderNotAvailableException
Controller 所在 Broker 挂了
产生原因:某个机器挂了,等待 Controller 重新选举
NotControllerException
网络异常
产生原因:断网、网络分区、丢包等等
NetworkException
如果重试几次之后还不行,那么就要交给人工处理
以上问题解决方案:重试发送即可
常见异常
发送给 Broker 遇到异常,则重试
生产端 Producer
老版本:消费者把消费记录写到zk中
消费者把消费记录写到Kafka中,并以内部主题的方式进行存储,Kafka系统将其命名为__consumer_offsets
提交过去的时候,key是group.id+topic+分区号,value就是当前offset的值
每隔一段时间,kafka内部会对这个topic进行compact(重新整理碎片化的磁盘,释放多余的空间)也就是每个group.id+topic+分区号就保留最新的那条数据即可
可以通过 offsets.topic.num.partitions 来设置
默认分区50个
新版本
Consumer消费消息时候会记录消费数据的偏移量
高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储
消费记录从zk移到kafka原因
偏移量迁移
consumer心跳时间,必须保持心跳才能知道consumer是否故障了,如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作
heartbeat.interval.ms
kafka多长时间感知不到一个consumer就认为故障了,默认是10秒
session.timeout.ms
如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别的去消费,结合业务处理的性能来设置就可以了
max.poll.interval.ms
感知消费者故障
获取一条消息最大的字节数,一般建议设置大一些
fetch.max.bytes
一次poll返回消息的最大条数,默认是500条
max.poll.records
consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收
connection.max.idle.ms
消息进行消费时需要注意的参数
如果下次重启,发现要消费的offset不在分区的范围内,就会重头开始消费;但是如果正常情况下会接着上次的offset继续消费的
auto.offset.reset
开启自动提交唯一
enable.auto.commit
5000,默认是5秒提交一次
auto.commit.inetrval.ms
消费者offset相关的参数设置
每个consumer group都会选择一个broker作为自己的coordinator(调度者),是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance
首先对group id 进行hash,接着对__consumer_offsets的分区数量进行取模,默认分区数量是50
找到分区以后,这个分区所在的broker机器就是coordinator机器
如何选择coordinator机器?
每个consumer都会发送JoinGroup请求到计算出来的coordinator那台机器
然后coordinator从一个consumer group中取出一个consumer作为leader
coordinator把consumer group情况发送给这个leader
接着leader会负责制定消费方案
通过SyncGroup发送给coordinator
接着coordinator就把消费方案下发给所有的consumer,他们会从指定的分区的leader broker开始进行socket连接和进行消息的消费
消费流程
假设有9个partiton
range策略就是按照partiton的序号范围,比如partitioin0~2给一个consumer,partition3~5给一个consumer,partition6~8给一个consumer
默认策略
可能在rebalance的时候会导致分区被频繁的重新分配
比如挂了一个consumer,然后就会导致partition0~4分配给第一个consumer,partition5~8分配给第二个consumer
这样的话,原本是第二个consumer消费的partition3~4就给了第一个consumer,实际上来说未必就很好
问题
range
轮询分配,比如partiton0、3、6给一个consumer,partition1、4、7给一个consumer,partition2、5、8给一个consumer
round-robin
尽可能保证在rebalance的时候,让原本属于这个consumer的分区还是属于他们,然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略
consumer1:0~2 + 6~7consumer2:3~5 + 8
和range对比,若挂了一个consumer
sticky
rebalance的三种策略
刚开始Consumer Group状态是:Empty
接着部分consumer发送了JoinGroup请求,会进入:PreparingRebalance的状态,等待一段时间其他成员加入,这个时间现在默认就是max.poll.interval.ms来指定的,所以这个时间间隔一般可以稍微大一点
接着所有成员都加入组了,就会进入AwaitingSync状态,这个时候就不能允许任何一个consumer提交offset了,因为马上要rebalance了,进行重新分配了,这个时候就会选择一个leader consumer,由他来制定分区方案
然后leader consumer制定好了分区方案,SyncGroup请求发送给coordinator,他再下发方案给所有的consumer成员,此时进入stable状态,都可以正常基于poll来消费
如果在stable状态下,有consumer进入组或者离开崩溃了,那么都会重新进入PreparingRebalance状态,重新看看当前组里有谁,如果剩下的组员都在,那么就进入AwaitingSync状态
leader consumer重新制定方案,然后再下发
Consumer Group的状态机流转机制
Group Coordinator
在一个while循环里不停的去调用poll()方法,开启一个线程,这个线程就是唯一的KafkaConsumer的工作线程
定时发送心跳给broker
拉取消息
缓存消息
在内存里更新offset
每隔一段时间提交offset
执行rebalance
Consumer内部就一个后台线程,是调用Consumer.poll()方法的那个线程任务
因为可以监听N多个Topic的消息,此时会跟集群里很多Kafka Broker维护一个Socket连接,然后每一次线程调用poll(),就会监听多个socket是否有消息传递过来
可能一个consumer会消费很多个partition,每个partition其实都是leader可能在不同的broker上,那么如果consumer要拉取多个partition的数据,就需要跟多个broker进行通信,维护socket
每个socket就会跟一个broker进行通信
每个Consumer内部会维护多个Socket,负责跟多个Broker进行通信,就一个工作线程每次调用poll()的时候,其实会监听多个socket跟broker的通信,是否有新的数据可以去拉取
为什么叫做poll呢?
Consumer内部单线程处理一切事务的核心设计思想
消费端 Consumer
kafka
0 条评论
回复 删除
下一页