kafka producer架构
2020-08-25 10:31:33 4 举报
AI智能生成
登录查看完整内容
kafka producer架构
作者其他创作
大纲/内容
1、KafkaProducer是生产者的入口,也是主线程,它还维护sender子线程。2、在主线程中,不断往RecordAccumulator中追加消息。3、RecordAccumulator是一个消息的仓库,当有消息batch封箱完成时,KafkaProducer会唤醒Sender线程做消息的发送处理。4、Sender首先把batch按照要发往的node分组,生成ClientRequest请求对象。5、Sender再通过NetworkClient的send方法,把ClientRequest需要的资源准备好,如Channel,数据等。6、Sender最后通过NetworkClient的poll方法,底层通过nio把准备好的请求最终发送出去。7、Sender再统一处理response,进行重试或者回调。
KafakProducer
KafkaProducer.send()
拦截器预处理
判断producer是否可用
判断topic所在的metadata是否可用
对key(TOPIC)和value(MSG)进行序列化
计算出要消息要发往的TOPIC的PARTATION,并将要发往的TOPIC和PARTATION封装
计算序列化后的大小
通过RecordAccumulator.append()把消息加入到batch中
如果batch满了或者新建了一个batch,就唤醒sender执行发送
RecordAccumulator.append()
通过原子操作,把追加消息的线程数+1
获取主题分区对应的ProducerBatch队列,如果没有此队列则新建
同步代码中尝试将消息tryAppend()会出现以下情况:a:如果dq中有batch就向队尾的batch加入消息 a.1:添加成功 a.2:如果添加失败则说明队尾的那个batch已满,需要继续往队尾新建batchb:如果dq中没有batch,返回null
预估size大小,从BytePool申请ByteBuffer
同步代码块中,再次添加看是否成功
若还未成功,则创建batch,并且创建ProducerBatch里面的MemoryRecordsBuilder
然后真正的调用batch的tryAppend()方法去append消息batch的tryAppend()和RecordAccumulator的tryAppend()不同
将新建的batch加入dq,并将batch存入一个专门用来存未发送的batch的set
释放buffer,并且原子操作使追加消息的线程数-1
ProducerBatch.tryAppend()
调用MemoryRecordsBuilder.hasRoomFor()看看是否有足够空间容纳消息
调用MemoryRecordsBuilder.append()追加消息
保存存放了callback和对应FutureRecordMetadata对象的thunk到List<Thunk> thunks中
MemoryRecordsBuilder.append()
把字节数组形式的key和value转成Bytebuffer
计算写入的offset,如果不是第一次写入,那么lastOffset+1,否则是baseOffset
如果magic号大于1,那么调用appendDefaultRecord(),否则调用appendLegacyRecord()
在DefaultRecord.writeTo()方法中,通过调用Utils.writeTo()往DataOutputStream写入key,value,header
更新状态例如消息的数量,offset
Sender.run()
进入runOnce()方法,sender的主要逻辑实际在这个方法中。一旦running为false跳出主循环,根据状态判断是继续发送完成,还是强制关闭。强制关闭的话,通过accumulator.abortIncompleteBatches()把RecourdAccumulator中incomplete集合中保存的未完成ProducerBatch做相应的处理,对他们进行封箱,防止继续有新的消息被追加进来,然后从所属Deque中删除掉,释放掉BufferPool中的空间
Sender.runOnce()
Sender.sendProducerData()
如果获得到的某个node为null,也就是未知节点,也就是ReadyCheckResult 中的unknownLeaderTopics有值,那么则需要更新Kafka集群元数据,也就是更新kafka的metadata
循环返回的result中的readyNodes,检查KafkaClient对该node是否符合网络IO的条件,不符合的从集合中删除
获取所有过期的batch,循环做过期处理
计算接下来外层程序逻辑中调用NetWorkClient的poll操作时的timeout时间
返回client的poll操作timeout时间
Sender.sendProduceRequest()
声明ProduceRequest.Builder对象,他内部有引用指向produceRecordsByPartition
生成ClientRequest对象,它包含ProduceRequest.Builder的引用。
调用NetWorkClient的send方法,做好发送ClientRequest的准备
NetworkClient.send()
檢查NetWorkClient是否為存活狀態
檢查是否是內部請求,如果不是內部請求,進行連接狀態等的檢查
NetWorkClient.doSend()
生成RequestHeader對象,包括apiKey 、version、clientId、 correlation这些属性
生成InFlightRequest請求。它持有ClientRequest,request,send等对象,把InFlightRequest添加到inFlightRequests中,InFlightRequests中按照node的id存储InFlightRequest的队列
调用select.send()发送数据,这里是伪发送,只负责把对应的channel和send绑定起来,也就是把数据设置到相应的channel中。并设置channel关注的事件是WRITE
NetWorkClient.poll()
底层通过nio把准备好的请求最终发送出去
0 条评论
回复 删除
下一页