kafkaProducer
2022-08-15 14:55:26 0 举报
kafka生产者主流程
作者其他创作
大纲/内容
node=broke=节点
KafkaProducer.send()起始方法!!!!同步底层是latch.await()
存在newTopic发起更新元数据请求并且进入wait()
connected 完成finishConnect事件
置入
thunks
Selectable selector(封装了nio- channel selector )默认使用明文交互
batchs(所有batch的操作都是synchronized同步的)
获取broken信息
ProducerBatch
metadata
batch超时处理
completedSends 完成write事件
③将数据写入batch添加callback回调
每次轮询清空
收到成功响应 放入free队尾中等待下次使用
KafkaChannel 状态位-ChannelState
处理等待响应超时的request
nio-SelectionKeynio-SocketChannel
poll()
Buffer
①优先从free取
处理已经发送的请求
Condition(首)
metadata(元数据)
处理响应结果
创建ApiVersionsRequest
数据结构
free中资源不够则创建
不存在
发送成功响应释放已完成的batch唤醒waiter等待队列
completedReceives完成read事件
sender(数据处理线程)
broken有需要发送的batch
addLast
有
send 本次需要被write的信息
newTopicsSet<String> 新的topic 元数据拉取完成后移除
需要更新元数据
否
是
NetworkReceive 本次收到read的信息
apiVersions记录node版本信息
存在
获取并写入batch多条信息未写满需要连续写入
为这些batch创建request并监听write事件
断开与broken的连接并标识重新连接
Deque<Condition> waiters(等待队列)
runOnce() 无限循环
元数据更新完成唤醒线程
Deque<ProducerBatch>
Buffer(首)
callback
②free没有则创建
KafkaClient client(NetworkClient) 网络通信交互
accumulator(数据缓冲区)
needPartialUpdate判断元数据是否需要部分更新标志位
至少有一个batch未超时
Condition
等待write事件
Deque<ByteBuffer> free(闲置池)
获取nio finishConnect()完成连接 isReadable() 读取数据isWritable() 写入数据
brokenid
accumulator
clientIdproducer名称
为准备好的broken匹配对应的batch
wakeup()
创建元数据request或者初始化建立broken连接
轮询 -peekFirst()获取batchs信息
处理成功响应的业务请求
②
updateVersion判断元数据是否更新成功的标志位
TopicPartitio(partition)
处理断开的连接
metadataUpdaterInProgressData inProgress 当前正在更新元数据的信息metadata
①topic不存在添加到newTopic中
ProducerBatch(首)
返回FutureRecordMetadata里面是一个latch阻塞
MetadataResponseApiVersionsResponse业务response
完成后执行回调通知唤醒阻塞事件
nio执行与broken的交互
MemoryRecordsBuilder
BufferPool free 缓冲池
nio.bytebuffer默认16kb
disconnected 发送失败或者超时断开
DataOutputStream appendStream
执行回调
需要更新元数据?
至少有一个broken准备好
唤醒后先从free取
java.nio.channels.Selector nioSelector
needFullUpdate判断元数据是否需要全量更新标志位
处理刚建立的连接
收藏
收藏
0 条评论
下一页