KAFKA
2022-07-01 08:57:51 0 举报
登录查看完整内容
KAFKA概念、客户端源码分析
作者其他创作
大纲/内容
处理b2
LogEndOffset
触发事件selectionKey.isWritable&&kafkachannel.send有值,将send写出,全部写完取消写事件,并将send写到completedSends
key
持久化a1a2a3b1b2b3数据
offset=5
d2
推送:实时取得消息
消费者分区分配器
batch2
指定时限同步失败leadr将分区移到osr
向leader同步成功后回归isr
幂等
2
partition=2offset=6
a3
kafka消息存储与同步原理
事务
未提交offset
Consumer1
partition=1offset=2
建立时间和offset的索引,支持consumer按时间段检索
REQUEST_TIMEOUT_MS_CONFIG设置请求超时时间
主题OO分区X
partition2-leader
每写一条消息就强制刷新pagecache拖累性能。pagecache达到阈值时一次性刷新磁盘。没触发那么不会写入磁盘,此时宕机将丢失消息。单机磁盘不能保证绝对安全(磁盘损坏),转向多机异地存储
低水位:LowWatemark
3
a2
拉取消息
等待写入累加器的超时时间MAX_BLOCK_MS_CONFIG
处理e2
Controller
IO Thread :单线程 Loop 负责从网卡读写数据
BATCH_SIZE_CONFIG指定缓存中的batch大小
1
completedReceives
index
清理累加器和飞行队列中的超时batch,释放batch占用的空间:设置失败状态,并循环调用超时batch里每个thunk的callback
b1
topicPartition
主题XX:找到存储主题XX消息的分区
5
逐条维护offset
(1)所有消息串行,维护进度的频率太高,影响性能,执行慢,不推荐
highWateroffset=3
向副本同步成功
e2
6
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1mute分区
a1
串行Executor
处理a3
c1
消费者组内增加消费者数量可以提高并行度,最多为主题的分区数。广播:不同消费者组可以消费同一主题且进度隔离
partition0-leader
4
acks:0,kafka收到消息就返回成功,不等待写磁盘和同步副本;1,leader写入pagecache后-1,所有isr写入成功。写失败的从isr移到osr。最少副本一致性>=n个副本同步成功
锁
主动拉取调节进度
pagecache
p0-batch2超过最大容量消息
新分配
a-a2
拦截器
主题XX分区2
触发事件selectionKey.isReadable&&completedReceives不存在已经读取完但未处理的ack处理完这次的ack后才会从内核读取下一条ack,将ack压制在内核,不放到jvm里免得OOM
非事务
消费者组C
kafka没有加工消息的过程,使用sendfile将消息直接发给consumer即可(FileChannel.transferXX)
处理b3
partition 0 leader
seek
可被consumer消费到的消息
元数据
生产者
BufferPool对象重用队列,减少DirectByteBuffer创建次数,控制写入消息的数据量,免得写入太快发生OOM
批量维护offset=2
b-b1
headers
a-a3
未来得急提交offset挪走
completedSends
Producer
(2)推荐:同分区消息串行,不同分区消息并行
partition1-leader
ClientRequest
消费者组group
e1
b3
d-d2
BATCH_SIZE_CONFIG
2-4
元数据含主题、分区、进度、分配
ACK0
batch0
断开连接集合disconnected
Consumer5
注册写事件
指定时限(10秒)同步消息成功,留到ISR,ISR内的broker会在leader挂后参与选主
按时间间隔定时提交
offset=6
c-c1
削峰:消费者自主调节获取消息的频率
b-b3
FutureRecordMetadataFuture 可获取send返回的结果
按消息的key分区,有顺序一致性要求的key相同,这样会写到同一分区,没有一致性要求的散列到多个分区,提高并行度
分区2
ISR:这里放的是分区,不是broker
Trunks
close
分区1流式计算
批量维护offset=6
消费者1
向所有broker同步元数据
c-c2
分布式锁
DELIVERY_TIMEOUT_MS_CONFIG设置等待send方法响应结果的超时时间(含重试的时间)
主题OO
timestamp
解析接收ack的completedReceives中的内容,检查correlationId与飞行队列的第一个请求是否一致,不一致抛异常,将结果写到飞行队列的第一个请求上
b2
batchIndex:在batch中的第几条消息,消息Offset=baseOffset+batchIndex
没有6
生产者clientId
batchIndex
SelectionKeyremoveInterestOps(~OP_WRITE)
处理事件
先进先出队列:按生产者的写入消息的顺序传给消费者
生产者写消息的过程
(4)先处理业务逻辑(做幂等),全部完成后提交offset
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION设置InFlightRequests存放请求请求的个数,通过这个参数控制发给kafka请求的窗口大小,调节速度:发送给KAFKA但未收到ACK的数据批次个数。例如设置为2,batch1和batch2发给kafka未收到ack,batch3等待batch1收到ack后才能发送
数据可能有顺序一致性要求,但架构不知道,为了兼容,就必须保证写入缓存的顺序与业务逻辑发来的数据顺序一致。累加器用锁保证有序
处理e1
free可重用的队列
SelectionKeyinterest(key.interestOps() & OP_WRITE)
写入
网卡
batch3丢弃
遍历连接列表,设置连接状态:READY
未命中缓存,磁盘IO读取-零拷贝
kafka消息存储在磁盘
消费者
ByteBufferSendclientIdcorrelationId版本消息
Selector
b-b2
value:a1
ack内容
分区2流式计算
提取头信息+消息写入匹配broker1的socketchannel的属性send
batch1
CountDownLauch
OSR:这里放的是同步失败的分区,不是broker
a-a1
kafka按序返回结果,tcp维护原顺序,因此不可能a2 ack比a1 ack先返回
补充消息header:.....topic: testkey: avalue: a1timestamp:nowtopicPartition:callback:
提交offset
发送分配/迁移事件
缓存全量元数据,并定时更新await:controller主题详情
sendFile
遍历发送成功的列表completedSends,发成功的就是InFlightRequest队列的第一个元素,如果不期待结果也就是ack=0,那么从飞行队列弹出这个请求,并设置其完成状态future.set
callback
拉取到消息
broker0-Deque
磁盘存储消费进度
新分配时从分配时的last开始消费
写结果ACK
超时+未超过DELIVERY_TIMEOUT_MS_CONFIG时限+未超过重试次数RETRIES_CONFIG,则将batch写到累加器(这导致乱序),并将这个batch从InFlightBatches列表删除
开启业务逻辑p2-offset5-8(正确)
未建立好的连接,那么建立与broker的连接
2-2
c2
Consumer
connected
selectionKey.isValid,关闭的连接,读取关闭前对方发送的数据
highWateroffset=8
调用请求的callback(含无需响应结果的、有响应结果的、超时的请求),实际是ClientRequest.callback
broker1-Deque
broker0-网卡FD
分区1
流式计算:并行执行可以并行的代码,合并执行不能并行执行的代码
os刷磁盘
处理b1
循环:以batch为单位,从每个主题分区拉取1或多个batch的数据,,每个分区最大为MAX_REQUEST_SIZE_CONFIG
a1 ack
ProducerBatch结构(以batch为单位提交给kafka服务器)
a2 ack
处理a2
将这些batch按broker封装到ClientRequest写到request飞行队列,并将头+消息写到kafkachannel.send
上一步直接连接成功+selectionKey.isConnectable成功的连接放入connected,取消监听连接事件+注册读事件key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ)
InFlightRequestclientIdcorrelationIdapiKey=PRODUCEapiVersion=9
e-e1
订阅主题
offset=1
推送单条消息
e-e2
发送结果的状态finalState:成功/失败(失败含超时)
d-d1
a5
ByteBuffer
unmute
内存中缓存offset,但主机挂记重启会丢失进度,需持久化指定topic+partition+group的进度offset
不能将维护offset整合进事务里的场景,如使用redis持久化offset、使用kafka持久化offset时,先执行业务逻辑,完成后维护offset失败或挂了,会出现消息重复的问题,首先是个小概率事件,看场景做trade off,如果一致性要求高的场景只要对业务逻辑做幂等即可
读取
清理超时的batch释放空间
写吧
业务逻辑
业务做幂等
如果不确认发送消息的结果,就发送下一条消息,光超时、重试就会打乱客户端发送消息的顺序,都不用发到server就乱序了
关闭超过空闲时间的连接:触发事件重计时间
message
在指定时限内(10秒)同步消息失败,分区被打入OSR冷宫,ORS内的分区不会被选为lear partition
recv-q
2-3
p0-offset=6
partition0-replica
f1
KafkaChannel
partition0-Deque
p1-offset1-3
异步/解耦:不直接建立连接,通过kafka收发数据
缓存offset
循环
消费者2挂了时从p1-offset1-3 poll消息
f-f1
处理a1
非标准大小,用时创建,用完丢弃
将消息写到本地缓存,再交由io线程写网卡,这样可以解决写数据的速度与网卡发送数据的速度不匹配问题:1.写速度快于发数据包速度,将这些数据存到有容量限制的本地缓存(避免OOM),更多的数据留到内核不动,由内核控制网速传输解决)由io线程按自身节奏拉取2.写速度慢于发数据包速度,io线程适当等待linger(LINGER_MS_CONFIG设置等待时间),以便堆积一些数据批次发送(啤酒理论),当至少一个batch写满或达到linger时限则发送消息,这样会降低实时性。等价tcp的nodelay***如果数据是单线程串行发送的,上一条消息写服务器成功收到返回结果才写下一条,linger设置实际是失效的。如果多线程写同分区,还是可以堆积的,如果需要串行,是要加锁控制发送串行的。并不是所有场景都可以批量发送请求。
p2-offset5-8
消费者3
offset=2
备份XXX不可读
只提取有主题元数据的+已建立好连接+InFlightRequests可以容纳+上一批次已经发送完事的+达到延迟时间(linger/重试延迟)+非mute的1或多个batch,从每个broker最多提取1M的数据放入batch飞行队列。如果窗口maxInflightRequests为1那么mute累加器中已经放到飞行队列的分区
indextimestamp
先读4字节长度,再读取具体数据
ProducerMetadata真实元数据含主题、分区
tcp协议保证数据包的接收顺序与传输顺序一致
响应结果
sendfile(零拷贝)
InFlightRequests
NetworkClient
被动接收
遍历断开连接列表,将这个连接下的飞行队列清空,并写入结果:超时
写数据包
遍历飞行队列,找到超时的请求,关闭这个请求使用的连接,并将这个连接的飞行队列(所有对连接的broker请求)清空,并写入结果:超时
关闭在指定时限内没有建立连接成功的连接,并将这个连接的飞行队列清空,并写入结果:超时
a4
遍历累加器所有队列:准备好的分区和未知leader分区的broker
1-1
维护
offset=3
ack长度
Consumer3
MemoryRecords
kafka是什么?分布式系统下的生产者与消费者模型,按消息的key分片存储到partition,存储时为每条消息按收到的顺序编号offset。生产者可以向多个主题写消息,消费者也可以订阅多个主题的消息
offset=4
SEND_BUFFER_CONFIG设置发送缓冲区大小
topic+partition+group+offset
d1
InFlightBatches飞行队列
服务器返回的结果baseOffset
2-5
linger
selector.select阻塞一定时间:以请求超时时间,io线程还要处理事件
消息按序append写入磁盘,消息大小不一样,想要寻址某一条消息时,需要遍历所有存储。建立offset-位置索引,加快查找速度(数组结构只需记录头指针,元素的存储位置通过计算可得出)
Trunk
partition 0 replica
维护offset
zookeeper
先按分区批次提交offset
消费者2
topic: test
并行执行也必须将顺序消息分给同一线程执行
数据可能有顺序一致性要求,必须按照缓存中的顺序串行发给网卡,这样才能让tcp有序
topic
Consumer2
同步
f
RECEIVE_BUFFER_CONFIG设置接收缓冲区大小
(3)将所有处理数据的逻辑放在一个事务,一个出错所有都错
用key除以主题的分区数得到分区;key为空则轮询主题的分区,轮询到的分区满了换个
写完send-q存入
开启消费者自动分区分配:当消费者挂了时,可能导致消息重复,因此对于严苛场景,不能订阅方式subcribe使用消费者,而必须自主管理分配的分区assign(partitions)
高水位:HighWatemark
broker2
不能被消费的消息,且leader挂了,这些消息会丢失
XXX
×××
持久化offset
get=await
receive
将结果 baseoffset+追加时间 /异常 写入batch的结果,释放batch占用的空间,并按顺序调用trunks里用户定义的callback
数据长度
KAFKA服务器集群
Consumer4
(1)不管业务状态,到点(定时任务)提交进度,offset维护不同步
socketchannel
第三方存储offset
主题XX分区1
移除超时的batch,设置batch异常结果,释放占用的内存,可能产生乱序
kafka生产者与消费者的运行模式
未知主题leader分区的做标记:请求更新元数据
__topic_offset
mute禁用分区IO线程无法获取数据
尝试发送次数:用来控制重试attempts
检查更新元数据状态为需要 或者 超过元数据有效时限,那么找出最空闲的任意broker,发送更新元数据请求
消费者组B
2-1
result
partition2-Deque
SelectionKeycancel
挂了
topic: testkey: avalue: a1
partition1-Deque
网络分发消息
读取关闭前的数据
2-6
Producer消息以生产的时间
配置元数据ip列表
给出offset消费消息
ACK1
开启业务逻辑p1-offset1-3(重复消费)
消息只推给组内2或3,它两不会同时收到同一条消息
p2-offset9-
存到
partition 0 replica
命中缓存,无需磁盘IO
已经读完存入
最近一次添加消息时间:调节发送请求的频率lastAppendTime
按分区并行Executor
batch3
broker0
p2,o1
用时间戳换算消息的offset
1-2
ACK-1
备份
a3 ack
分配
指定时限内等待副本同步并反馈
消费者组A
kafka集群
BUFFER_MEMORY_CONFIG指定缓存大小
ClientRequest-topx+broker0nodeId(即brokerid)clientIdcorrelation请求唯一标识:顺次递增的序号ackrequestTimeoutMstransactionalId
每写一条消息/关闭客户端时wakeup
send-q
写结果分区的batch
新消息offset=9
累加器Accumulator本地缓存(线程安全,由业务逻辑线程执行)
不交给kafka维护进度,完全自身管理,消费者组的设置会失效
redis
broker1
持久化
kernel pagecache:加大内存可以提高命中率
ProduceRequestResult
0 条评论
回复 删除
下一页