02_kafka源码
2024-05-23 21:29:09   62  举报             
     
         
 kafka源码深入剖析
    作者其他创作
 大纲/内容
 RecordAppendResult appendResult =  last.tryAppend(... ...)
  Y
  网络通信的组件,NetworkClient一个网络连接最多空闲多长时间(9分钟),每个连接最多有几个request没收到响应(5个),重试连接的时间间隔(50ms),Socket发送缓冲区大小(128kb),Socket接收缓冲区大小(32kb)
  放入channel管理器中
  connectionStates.isConnected(node)Broker是否已经建立了连接了
    LRU保证活跃的连接数量,丢弃不常用连接,减轻客户端网络压力idleExpiryManager.update()
  org.apache.kafka.clients.KafkaClient#ready检查一下是否准备好向broker发送数据,此时若没跟某个Broker建立好连接,必须在这里把长连接准备好,之后才可以把数据发送过去,直接就基于最底层的NIO来开发的
  一个响应是否读取完整NetworkReceive#complete
  发送NetworkClient#poll
  N
  阻塞一段时间maxBlockMs - 获取元数据耗费的时间
  多线程争抢创建buffer,并修改这个pool已使用的空间底层是返回nio的byteBufferByteBuffer.allocate(size);
  建立连接KafkaSelector 
  1
  全部满足条件,说明网络组件准备好了
  上锁第二次 : 尝试放入消息
  当有新的batch创建的时候,就唤醒sender线程,因为底层的nioSelector有可能正在执行select(ms)卡住了。this.sender.wakeup();
  若发送成功将Send添加到completedSends集合
  将KafkaChannel中的NetworkReceive设置为null,标志着一次完整的响应读取,不处理拆包和粘包问题了
  释放内存块deallocate
  处理超时handleTimedOutRequests
  是指定分区key
  timeToWaitMs,这个Batch从创建开始算起,最多等待多久就必须去发送,如果是在重试的阶段,这个时间就是重试间隔,但是在非重试的初始阶段,就是linger.ms的时间(100ms)
  sender第三次run
  这一批响应是否处理完?deque.isEmpty()
  计算协议封装后的消息大小消息不能大于maxRequestSize和totalMemorySize
  构建RecordBatch
  返回新的NetworkReceive
  获取到未响应的nodeInFlightRequests#getNodesWithTimedOutRequests
  增加BuffePool的可用大小
  selectionKey关注着OP_READ事件
  Sender#handleProduceResponse
  key.isWritable()是否关注OP_WRITE事件
  parseResponse
  NetworkClient
  去broker上拉取了一次集群的元数据过来,后面每隔5分钟会默认刷新一次集群元数据
  拿到锁的线程根据创建的buffer构建MemoryRecords
  TopicPartition
  利用copyOnWrite读多写少的特点,保证线程安全的创建Deque
  selector.connect ()开始尝试连接
  KafkaProducer
  返回头一块空间free.pollFirst()
  doubleCheck放入batches
  KafkaChannel封装了socketChannel
  NetworkClientsocket网络连接组件
  Selector#addToCompletedReceives
  如果要更新metadata,先拿到version,更新needUpdate=trueversion = metadata.requestUpdate()
  while (partitionsCount == null)直到超时,或者获取到partition元数据
  唤醒sender线程异步去拉取这个topic的元数据sender.wakeup();
  BufferPool
  根据TopicPartition创建或get对应的Deque
  ByteBuffer
  删除stage当这一批响应处理完成后,就删除stage,这样就可以继续往这个broker发送数据了。
  selectionKey获取kafkaChannelkey.attachment()
  1、就直接将InFlightRequests中的请求移除2、包装一个响应回去
  TransportLayer#addInterestOps()关注OP_WRITE事件
  遍历batch中的每条消息(thunks)
  检查所有准备好的leader partition所在的broker网络情况
  清空buffer并放回free的deque中
  socketChannel
  key的murmur2计算出hash值%partitionCount
  元数据管理器 Metadata1、从broker集群去拉取元数据的Topics(Topic -> Partitions(Leader+Followers,ISR))、2、每隔一段时间就再次发送请求刷新元数据,metadata.max.age.ms,默认是5分钟,默认每隔5分钟一定会强制刷新一下3、在发送消息的时候,若发现写入的某个Topic对应的元数据不在本地,会发送请求到broker尝试拉取这个topic对应的元数据,若你在集群里增加了一台broker,也会涉及到元数据的变化
  封装List<ClientRequest>请求集合,设置回调函数Sender#createProduceRequests
  KafkaChannel#receive()通过transportLayer通过nio读取响应NetworkReceive#readFrom()
  第一个线程
  batch
  是否有异常
  返回,等待下次继续读取,同时此次的Buffer和receive等对象留在内存中
  expired,当前Batch已经等待的时间(120ms) >= Batch最多只能等待的时间(100ms),已经超出了linger.ms的时间范围了,否则呢,60ms < 100ms,此时就没有过期。如果linger.ms默认是0,就意味着说,只要Batch创建出来了,在这个地方一定是expired = true
  删除inFlightRequests滞留的请求。这样才能让后续的请求发送到broker上
  inFlightRequests中超时机制
  full,Batch是否已满,如果说Dequeue里超过一个Batch了,说明这个peekFirst返回的Batch就一定是已经满的,另外就是如果假设Dequeue里只有一个Batch,但是判断发现这个Batch达到了16kb的大小,也是已满的
  有准备好的broker
  sendable,综合上述所有条件来判断,这个Batch是否需要发送出去,如果Bach已满必须得发送,如果Batch没有写满但是expired也必须得发送出去,如果说Batch没有写满而且也没有expired,但是内存已经消耗完毕
  发送数据
  消息分区DefaultPartitioner#partition
  inFlightRequests#completeNext处理并移除存放在inFlightRequest中的请求
  Y 准备好了,可以发送数据过去
  partition进行分组
  poll()
  sender第二次run
  加入待更新的元数据集合,后面尝试再次拉取
  层层条件过滤,选出满足的batch
  返回ReadyCheckResult1、可以发送数据的leader所在的broker2、最近下一次检查的时间3、没有元数据的partition
  设置send
  格式化响应体,解析kafka二进制协议parseResponse()
  是否可用重试S?ender#canRetry
  将申请到的buffer还给BufferPoolfree.deallocate(buffer)
  分区器  Partitioner决定,你发送的每条消息是路由到Topic的哪个分区里去的
  上锁第一次 : 尝试放入消息
  万能poll方法NetworkClient poll()
  Sender线程负责从缓冲区里获取消息发送到broker上去,request最大大小(1mb),acks(1,只要leader写入成功就认为成功),重试次数(0,无重试),请求超时的时间(30s),线程类叫做“KafkaThread”,线程名字叫做“kafka-producer-network-thread”,此处线程直接被启动
  第一次启动所有broker都没准备好会删除掉batch对应的broker,开始执行poll()
  内存缓冲中batch超时检测机制
  是否有可发送的batch?
  bufferPool.allocate()
  batch是否有足够的空间放入这条消息?
  包装 DataOutputStream,将二进制压缩流按约定的消息格式写入byteBuffer中,实现消息压缩wrapForOutput(bufferStream)
  sender.run()
  Deque
  构建统计组件Metrics
  如果receive == null
  最终拿到partition元数据,返回partitionsCount = cluster.partitionCountForTopic(topic);
  拿到Deque,获取Deque中最后一个batchRecordBatch last = deque.peekLast();
  producer初始化
  BufferPool将已经存在的buffer返回最终放入topicPartition的deque中此时pool的 availableSize - bufferSize
  Cluster cluster = metadata.fetch();
  handleCompletedReceives解析响应,放入响应列表
  影响broker的消息发送
  没有处理中的响应stagedReceives
  重新创建新的batch
  last = null ?
  如果free队列为空?
  发送消息 1、获取partition元数据KafkaProducer#send()
  封装exception
  取消对OP_CONNECT的关注增加对OP_READ的关注
  消息压缩及写入ByteBuffer    Compressor
  内存缓冲区 RecordAccumulator消息缓冲机制,发送到每个分区的消息会被打包成batch,一个broker上的多个分区对应的多个batch会被打包成一个request,batch size(16kb)默认情况下,如果只考虑batch的机制的话,那么必须要等到足够多的消息打包成一个batch,才能通过request发送到broker上去;但是有一个问题,如果你发送了一条消息,但是等了很久都没有达到一个batch大小所以说要设置一个linger.ms,如果在指定时间范围内,都没凑出来一个batch把这条消息发送出去,那么到了这个linger.ms指定的时间,比如说5ms,如果5ms还没凑出来一个batch,那么就必须立即把这个消息发送出去
  发送完成后的处理handleCompletedSends
  获取SendKafkaChannel#write()
  2
  摘出broker
  如果大小超过设定的batch大小,那么就会创建一个消息大小的batch。这样导致的问题是,一个消息是一个batch,batch失去意义。所以我们要合理的设定batch.size , max.request.size linger.ms等参数。
  发送消息
  如果有的partition缺少元数据?
  超时检测,获取到超时的batchRecordAccumulator#abortExpiredBatches
  判断超时,回调onComplete函数,返回TimeoutExceptionRecordBatch#maybeExpire
  Selector#poll
  依然无内存
  拦截器ProducerInterceptors
  此时broker批量返回的响应都已处理完,并全部放到stage的延时队列中。接下来,我们需要每次将取出stage中头一个响应放入complete中,进行完成响应的处理。
  获取Cluster通过metadata.fetch()获取之前metadata.update()设置的broker地址
  关闭连接Selectable#close()
  返回存入缓存结果,等待回调RecordAppendResult
  释放掉从缓冲区借来的内存块RecordAccumulator#deallocate
  注册到nio selector上关注OP_CONNECT事件
  计算消息的最大大小
  通过NetWorkClient走底层的网络通信,把每个Broker的ClientRequest给发送过去就可以了,poll方法,他是负责实际的 进行网络IO通信操作的一个核心的方法,负责发送数据出去,也包括读取响应回来
  发送出去的数据Send
  发送请求
  放到responses列表中
  直接返回
  生成自增clientId
  丢弃等待gc
  下次什么时候在检查
  Sender再次run的时候,需要判断重试的消息是否达到了重试间隔,再次发送RecordAccumulator#ready中的backingOffbackingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
  RecordAppendResult appendResult =  last.tryAppend(... ...)
  说明上次已经读取完整了一个响应创建响应对象NetworkReceive
  遍历所有响应responses
  最近接收到的数据Receive
  关联key和channel,将来可以通过selectionKey获取到channelkey.attach(channel);
  sender第一次run
  selectionKey
  获取kafkaChannel
  broker.id
  添加处理完成集合completedReceives
  当没有batch可以发送并且没到linger.ms,设置nextReadyCheckDelayMs,下次再来检查是否有batch可以发送,起码要等nextReadyCheckDelayMs时间过了以后才可以
  如果acks = 0 
  第二个线程
  inFlightRequests.canSendMore(node)Broker同一时间最多容忍5个请求发送过去但是还没有收到响应,超过5个就不再发送
  移动position到可读的位置 buffer.flip();
  打开长连接SocketChannel socketChannel = SocketChannel.open();设置生产环境socketChannel参数
  构建interceptCallback,放入缓冲区之后会根据情况回调
  SocketChannel#write()
  标记重新拉取元数据,因为有可能是leader partition改变到其他broker上了metadataUpdater.requestUpdate()
  匹配请求和响应
  拿出当时的request
  从BufferPool中切一块缓存创建batch所需要的新的内存空间(ByteBuffer)其实这个pool管理的就是一个Deque<ByteBuffer> free队列,这个队列的每个元素就是代表的每个batch的一小块未使用且待分配内存空间元素数量 * 每个元素大小 = 缓冲区大小。
  kafkaChannel
  KafkaChannel#read粘包拆包处理
  waitedTimeMs,当前时间减去上一次发送这个Batch的时间,若Batch从来没有发送过,上一次发送时间就是Batch被创建出来的那个时间,这个Batch从创建开始到现在已经等待了多久了
  有一些batch被发送出去了,获取到了响应,此时就可以释放那个batch底层对应的ByteBuffer,就会被放回到BufferPool里面去,此时就可以唤醒阻塞的线程,再次申请一个新的ByteBuffer构造一个Batch
  selector.isChannelReady(node)和broker之间的socketChannel是否已连接
  响应对象rewind,将position置0
  取消关注OP_WRITE事件TransportLayer#removeInterestOps()
  筛选出已经建立连接的broker
  RecordAccumulator 数据结构
  Selector(kafkaSelector)
  重新入队,放入内存缓冲区的头一个RecordAccumulator#reenqueue
  放入缓冲区 RecordAccumulator#append
  自增integer % partitionCount
  放入发送但未响应KafkaClient#send
  send()只能暂存一个Send
  waitOnMetadata()保证topic有可用的元数据
  调用
  ByteBufferSend#completed
  回调处理RecordBatch#done
  响应回调
  请求打包
  关闭输出流last.records.close()
  序列化组件keySerializervalueSerializer
  若buffersize = 设定的大小?比如16k
  在topicPartition的Deque中增加last batchdq.addLast(batch);
  接收响应
  已发送但未响应的集合InFlightRequests#add()(Deque<ClientRequest> .addFirst(request) )
  抛异常
  是否有partition元数据?partitionsCount = cluster.partitionCountForTopic
  获取已经可以发送消息的partition
  update()更新broker集群地址,维护Metadata的Cluster之后发送消息时,若发现没有topic相关元数据,就会从这个地址去拉取元数据
  partition()
  发现last batch是有的,就将消息尝试放入这个batch
  加载配置构建ProducerConfig
  exhuasted,内存是否已经耗尽,可能有人阻塞在写操作,无法申请到内存,在等待新的内存块空闲出来才可以创建新的Batch
  KafkaSelectorSelector#send()
  放入已连接集合中connected
  此时closed,当前客户端要关闭掉,此时就必须立马把内存缓冲的Batch都发送出去
  处理接收到的响应handleCompletedReceives
  调用callback的onComplete
  Sender线程
  backingOff,是否重试且上一次发送这个batch的时间 + 重试间隔的时间,是否大于了当前时间
  !metadataUpdater.isUpdateDue(now)当前broker不能处于元数据加载的过程
  找到所有可发送的batch对应的leader partition所在的broker,放入readyNodes集合
  获取有请求的selectionKeynioSelector.select(ms)
  ByteBufferSend#writeTo
  内存复用
  遍历内存缓冲区的batches组件,获取每个partition中头一个batchRecordBatch batch = deque.peekFirst();
  从channel中读取响应KafkaChannel#read
  获取每个RecordBatch,对每个batch做响应处理Sender#completeBatch
  序列化处理keySerializervalueSerializer
  对每个Broker都创建一个ClientReqeust,包括了多个Batch,就是在这个Broker上的多个Leader Partition所对应的Batch,聚合起来组成一个ClientRequest,形成一个请求,发送到Broker上去inFlightRequestsselector.send
  ByteBuffer输出流ByteBufferOutputStream(buffer)
  添加响应到响应集合中(响应和请求配对)List<ClientResponse> responses响应中包含请求信息(ClientRequest)
  初始化连接
  是否有足够的空间创建buffer
  KafkaChannel
  加入等待响应的集合incomplete.add(batch)
  NetworkClient#poll
  如果建立完连接channel.finishConnect()
  free(Deque)
  TransportLayer  负责网络通信
  请求暂存 
  拉取元数据
  更新batch的标志位,不让其他人写入了writable = false
  出现了拆包粘包问题上次没读取完整一个响应从内从中拿到上次的NetworkReceive接着处理
   
 
 
 
 
  0 条评论
 下一页
  
   
   
   
   
  
  
  
  
  
  
  
  
 