Kafka-Broker-Replica-Controller-Producer-Consumer
2023-01-12 11:27:34 0 举报
登录查看完整内容
Kafka源码解析、原理解析
作者其他创作
大纲/内容
producer
0
返回
KafkaApis.scalaApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
3
TIMEINDEX
private def processStartup(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() //选主 }
1,外界如何通信内部数据如何写磁盘
app
getAssignorInstances分配的策略
Paration:Leader
有
createReplicaFetcherManager
1
ControllerEventManager
new ConsumerCoordinator
Consumer.commitAsync()
groupCoordinator.handleJoinGroup
如果read这个事件是常注册handler是多线程多线程:乱序问题
TopicCommond.scala
subscriptions.fetchablePartitions
为了更新各分区的offset的位置
font color=\"#ff9800\
4
// send any new fetches (won't resend pending fetches) fetcher.sendFetches();
MetadataCache
如果offset 是 Long 8个字节base:基地址 8个字节的long相对offset : 4个字节的int用base + 相对offset得到真正的msg的offset
Replica同步机制KafkaApis.scala、KafkaServer.scala
Kafka服务重启,内存是空的,GroupCoordinator如何初始化的数据
readFromLocalLog
如果调用了flush,一般不会调用的log.flush()offsetIndex.flush()timeIndex.flush()txnIndex.flush()调用native的fsync()channel.force(true);fsync() ,fdatasync()mmap.force()
sendResponseCallback
channel.write(buffer);只写到了内核pagecache
Consumer
GroupCoordinator组协调器
client.font color=\"#f44336\
doAutoCommitOffsetsAsync()
jvmbrokerapp
serverChannelnioSelector
请求
socketqueue
leader
delayedProducePurgatory
super(ApiKeys.JOIN_GROUP);
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
ACK=0/1/-1
1、newConnections
Leader:respones内容先返回HW:3DATA: 4返回结果后Leader的HW更新为4 (根据follower请求过来的LEO修改)Leader@HW:4
maybeAutoCommitOffsetsAsync(timer.currentTimeMs())
trySend(timer.currentTimeMs());
scalable
RequestChannelrequestQueue(带着processor)
一个broker上只有一个GroupCoordinator,会维护不同组的offset元数据,之间是隔离的持久化写入log
new ConsumerMetadata
Consumer.commitSync()
locallog
val brokerInfo = createBrokerInfoval brokerEpoch = zkClient.registerBroker(brokerInfo)
initiateJoinGroup
maybeIncrementLeaderHW处理HW
ReplicaManager
handleCompletedReceives
kafka,依赖内核kernel的脏页刷写机制LogFlushSchedulerIntervalMs = Long.MaxValue
GroupCoordinatorgroup-1的元数据
多部门多项目组多客户端并发
nextFetchOffset = lastRecord.offset() + 1;
2,responseQueuewrite 事件的注册
if (completedFetch.nextFetchOffset == position.offset)
new Fetcher<>
2
kernel
log
consumer2
sendJoinGroupRequest发送加入组的请求,并处理返回响应
进入了ReplicaManager中的同步机制
Processor
Topic1:P3
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount获得:offset的partition ID,根据partition id获取此分区所在leader的broker,然后将broker的EndPoint的内容返回,以便consumer去建立连接,并在broker上建立GroupCoordinator
集群多进程
fetcher.resetOffsetsIfNeeded()
ACK为-1所有副本同步
P分区:offset
consumer1
LEO
partition
producer-01
KafkaApiscase ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
0/1
KafkaApis.scalacase ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
queue
delayedFetchPurgatory.tryCompleteElseWatch
segment
maybeExpandIsr
Group -1
ApiKeys.OFFSET_COMMIT
broker
acceptor
KafkaServer.scalagroupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))GROUP_METADATA_TOPIC_NAME = \"__consumer_offsets\" 公共的topic,名下有50个分区
异步提交
Topic:test 有两个分区p0、p1consumer GroupId组名为AAhashcode后为21122112%50 取模后12,也就是12号分区 所以Group组名AA的offset保存在Topic:_consumer_offset的12号分区上span style=\"font-size: inherit;\
brocker
有没有获取到数据
OFFSETINDEX (base 基地址)4B(相对offset):4B(log文件offset)
!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))上面建立真正的连接后,此处开始给Group Coordinator发送消息,要加入组,谁先入组,谁是leader
processFetchRequest
producer-01:生产一条数据LastOffset:3requireOffset:4LEO:4
LOG
Broker
response处理
Controller
C
KafkaApis.scalacase ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
doSyncGroup
Topic1:P1
因为LEO发生变化所以producer-01的LEO也会变化为5
客户端
producer-02
appendRecords
newKafkaConsumer
自身分区有序性
For循环selector.completedReceives
轮询分配
根据各个分区的offset来拉取数据
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
new ConsumerNetworkClient
producer-01:生产一条数据LastOffset:3requireOffset:4LEO:5
OffsetIndex
返回结果future.addListener为onSuccess
fetchFromLeader
源码入口文件:KafkaApis.scalaKafkaServer.scala
read 常注册,读一次后,mute-> 把read事件取消注册
分区数 > 节点数1.业务膨胀:预分区 方便扩缩容2.业务隔离性:AKF
updateAssignmentMetadataIfNeededMetadata处理
broker进行network read进行mute、unmute操作一条条的读取
分布式
responseData.forKeyValue等待返回结果
updateFetchPositions(timer)
Paration:Leader处理Fetch
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
2-1
3-3
shouldRollsegment文件形成的因素删除7天的segment文件:依据使用场景配置1,size > rollParams.maxSegmentBytes - rollParams.messagesSize当前段文件的使用量是不是大于了允许的使用量,如果大于,那就需要创建新的段文件2,(size > 0 && reachedRollMs)当前段文件有数据,空间够,但是寿命已尽3, offsetIndex.isFull 10MB4,timeIndex.isFull 10MB5,!canConvertToRelativeOffset(rollParams.maxOffsetInMessages) 不能造成 相对offset 溢出
startup()
if (coordinator != null && !font color=\"#ff9800\
值为50
FIND_COORDINATOR
joinGroupIfNeeded
pollForFetches
handleSyncGroup
commitOffsetsAsync
response的处理
request
adminZkClient.createTopic
groupCoordinator.onElection(partition.partitionId)
HW
follower2
Topic1:P2
5
组内的consumer会选举出一个Leader,来分配分区
KafkaController
相互对应的
request处理
/* setup zookeeper */ initZkClient(time)
follow2:第二次请求带着LEO:4 HW:3接收返回结果后:follow2:LEO:5HW:3
Leader生产时间线
发送给broker
apply
如何根据算法找到__consumer_offset分区的leader所在的broker信息,并返回给consumer以便去建立连接,并在broker上建立GroupCoordinator一个broker上只有一个GroupCoordinator,但是GroupCoordinator里的group的状态和元数据是隔离的一个分区对应若干个组分治(路由、索引、映射)
mmap = raf.getChannel.map(10MB)
LogManager
getBrokerMetadatas
让socket的channel在 selector上的read事件 取消掉
fetchMessages
-1
持久化写入log
1-1
ControllerEventcase classprocess()
redis单线程
connect
read 常注册
channel.setSend(send);
P.....
send发送的Request处理
onControllerFailover
createTopicWithAssignment
kafka的吞吐建立在分布式上
P1
scriptions里包含topic
assignReplicasToBrokersRackUnaware分区分配规则,可以避免热点这种分布式的痛点。该逻辑可以直接拿来使用。
Group-3
follower3
生产者 会在socket中 按顺序 把消息发过来问题是你多线程的handler会乱序,如何保证顺序?在一个socket中,取一个消息放队列,然后handler,然后取第二条消息放入队列。。。要考虑并发的问题,饿死的问题 ,隔离真实环境:一个topic 有多个生产者,且生产者在不同的物理进程,所以,会有多个连接进来且都想写一个分区,那kafka可以保障顺序吗? 不可以
V.S
partitionStates
for (TopicPartition partition : fetchablePartitions())
sendAlterIsrRequest
FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() .setKeyType(CoordinatorType.GROUP.id()) .setKey(font color=\"#f44336\
partitionfollower
DelayedProduce的tryComplete
通过GroupCoordinator知道要消费哪个leader分区上的数据
接受
ACK
Leader:respones内容先返回HW:3DATA: 3返回结果后Leader的HW更新为3(根据follower请求过来的LEO修改)Leader@HW:3
先跟任意一台broker建立连接;再根据算法找到真正的leader的broker,创建Group Coordinator并建立连接;然后加入Group Coordinator选出consumer的Leader,并分配策略(同组的consumer Leader会进行partition的分配,哪个consumer连接哪个partition)consumer leader将分配策略同步给Group Coordinator,以便consumer的follow去Group Coordinator上同步获取策略
啊 mute 啊 mute 啊 mute 啊
nextInLineFetch = initializeCompletedFetch(records);
协商 空间的问题
Kafka 2.8.0kafka-broker-kafkaserver
tradeoff有序性保证整体的节奏
返回的response
ConsumerCoordinator消费者协调器
将send又封装了一层
partitionleader
handleListOffsetRequestV1AndAbove(request)
nioSelector
setAssigments参数不一样而已
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
多线程中顺序保障问题
为了在内核堵住socket的下一条消息
lookupCoordinator()
groupMetadataTopicPartitionCount值为50
follow2:第一次请求带着LEO:3接收返回结果后:follow2:LEO:4HW:3
如何实现ctrl+c 关闭服务的操作: Runtime.getRuntime().addShutdownHook(new Thread(\"kafka-shutdown-hook\") { override def run(): Unit = kafkaServerStartable.shutdown() })
三次握手
replicaManager = createReplicaManager(isShuttingDown)
mute静音
new SubscriptionState订阅了哪些topic
Group-2
持久化同步、异步、自动提交最终都是同步给GroupCoordinator
ControllerEventThread
sendFindCoordinatorRequest(node)
!ensureCoordinatorReady(timer)为了建立真正的连接
ApiKeys.LEADER_AND_ISR
减轻当下master无磁盘的RDB传输
commitAsync(font color=\"#4caf50\
fetchPartitionData.fetchOffset == currentFetchState.fetchOffset
自动提交
ReplicaFetcherThreadmaybeFetch()
client.sendQueue up the given request for sending. Requests can only be sent out to ready nodes
生产者有多个,且对着一个分区有序产生消息由程序控制多实例的执行顺序
RequestHandlerHelper.onLeadershipChange(font color=\"#ff9800\
FileRecords
groupCoordinatorl来处理
sendListOffsetRequest给指定的broker发送ListOffsetRequest请求,获取分区和目标时间戳
ReplicaFetcherManager
onJoinLeader(JoinGroupResponse joinResponse)consumer Leader会进行partition分配,然后向Group coordinator提交分配结果,以便follower同步分区分配的结果.setAssignments(groupAssignmentList)
replicaManager
redis我们要不要配置,或者设计实例大小
mmap.putInt(只写到了内核pagecache
request的处理
LOG的类型是FlieRecords
Leader:respones内容先返回HW:4DATA: 炼狱中delay返回结果后Leader的HW更新为5(根据follower请求过来的LEO修改)Leader@HW:5
KafkaApis.scalaApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
为了匹配receive中的repsone
producer-02:生产一条数据LastOffset:4requireOffset:5LEO:5
ConsumerCoordinator消费者协调器每个组有一个
// call coordinator to handle commit offset font color=\"#ff9800\
channel
updateFollowerFetchState
KafkaRequestHandler
// Increment the log end offset. updateLogEndOffset(appendInfo.lastOffset + 1)
Follower请求时间线
Topic:__consumer_offset名下50个分区
kafkaserver
follow2:第三次请求带着LEO:5 HW:3Leader的数据在炼狱中,等待返回
算法Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
SocketServer
selector.poll
commitSync(Duration.ofMillis(defaultApiTimeoutMs));
是在consumer.poll中处理的
RequestFuture<ClientResponse> future = client.font color=\"#f44336\
attemptRead(channel);
responseCallback
if (leaderId.isEmpty) leaderId = Some(member.memberId)谁先进来,谁就是leader
3-2
Create-Topic
#Returns true if this broker is the current controller def isActive: Boolean = activeControllerId == config.brokerId
// call replica manager to append the group message副本管理器添加组的信息 font color=\"#ff9800\
Paration:Follower发送请求:
log.info(\"Resetting offset for partition {} to position {}.\
3个Broker的__consumer_offsets的分区及leader,和副本分布的详情
readFromLog
GroupCoordinatorgroup-3的元数据
系统IO
Node node = this.client.leastLoadedNode();与某个broker建立socket连接
CompletedFetch records = completedFetches.peek();
Broker server
P0
3-1
if (!coordinator.font color=\"#ff9800\
responseCallback(produceResponseStatus)
同步提交按poll的批次提交offset
respone处理
groupManager.font color=\"#4caf50\
int numReadyKeys = select(timeout);
1-2
3,事件poll
无
Old 知识:1,输入 read,常注册2,输出 write,有数据再注册
Handler
FetchPosition position = subscriptions.position(completedFetch.partition);
真正的发送
KafkaApis
loadGroupsAndOffsets
consumer3
acquireAndEnsureOpen();只能在一个线程poll
groupCoordinator = font color=\"#4caf50\
问题: 在分布式的情况,消费者与broker如何治理/协调的?1. 需要有Coordinator协调器2. 如何在Broker上分配GroupCoordinator3. Consumer端每个组需要有leader来分配消费那个分区4. 分配策略5. offset:(公共的topic:_consumer_offset含有50个分区,所有的consumer消费的offset都存在这50个分区中)6. 消费:同步、异步、性能优化(IOThreads-workerThreads)---------------------------------------------------------------Kafka Consumer主要做的事情:1. 连接外界2. 拉取数据3. 持久化1和3步都是需要Consumer Coordinator来处理。
follower1
创建super(ApiKeys.SYNC_GROUP);同步组的builder,去请求Group coordinator
Ack:-1
ControllerContext
new NetworkClient
Consumerpoll方法
onJoinFollower()
new font color=\"#4caf50\
世界上为什么有这么多的中间件一个中间件搞定所有的事情NOkafka,mq,对数据不加工redis,kv,集合操作,如果多线程加锁排斥
Group -2
KafkaServer.startup()
!coordinator.font color=\"#f44336\
AdminUtils.assignReplicasToBrokers
GroupCoordinatorgroup-2的元数据
ApiKeys.Fetch
resetOffsetsAsync(offsetResetTimestamps);
for循环处理fetchRequestMap,发送Api.Fetch的request,接收response成功后写入completedFetches队列
KafkaServerstartup()
Group-1
收发数据
commitSync(font color=\"#ff9800\
流程与同步提交类似
segment段默认1G
分治
发送数据
2-3
makeFollowersreplicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);只有在真正发送的时候,才会触发写事件。公共的写法!!!!!!
主从,主挂了有/没有持久化必然触发全同步
validatePositionsOnMetadataChange();
appendToLocalLog
ACK-1
2-2
0 条评论
回复 删除
下一页