Controller Leader 选举
2017-05-06 13:25:55 0 举报
kafka controller 选举
作者其他创作
大纲/内容
尝试进行选举(在ZK/controller中创建临时文件)
监听ZK/Controller数据变化
循环context中的partitionsBeingReassigned,依次调用controller中的重点方法initiateReassignReplicasForTopicPartition(稍后介绍),实现TopicAndPartition的AR重新指定功能
F
liveBrokerIds中包含replicaId
调用Partition状态机的triggerOnlinePartitionStateChange方法, 将OfflinePartition和NewPartition状态的TopicAndPartition设置为OnlinePartition,然后封装请求,发送到相应的Broker中(后续单独介绍)
尝试创建ZK/Controller临时文件
调用Replica状态机,设置ZK监听器
取消对ZK中/brokers/ids的监听器
添加所有Topic Partition的监控器,目录为 ZK /brokers/topics/[topic 名字]
判断自己保存的leaderId是否为本地BrokerId
从ZK/controller_epoch中读取epoch、epochZkVersion,并分别赋值给controllerContext相应元素
为Context中的partitionLeadershipInfo创建一个内存映射,但是此时里面没有值
将broker的状态设置为RunningAsController
是否成功创建
重点函数说明:onControllerFailover为新的Controller初始化函数onControllerResignation为Controller清理函数
开始选举(elect)
如果auto.leader.rebalance.enable为True,开启topic partition的leader自平衡调度器
取消对ZK中/brokers/topics/offset-test的监听器
onControllerFailover
写ZK状态
从ZK/controller中获取当前Controller的leaderId
注册ZK/admin/preferred_replica_election监控器
onControllerResignation
清空partitionState
其他任何错误,打印错误日志,此时epoch和epochZkVersion为0
leaderId设置为-1
将context中的partitionsUndergoingPreferredReplicaElection所有元素作为参数,调用controller的onPreferredReplicaElection方法,实现特定TopicAndPartition的leader自平衡
如果delete.topic.enable为True,设置ZK /admin/delete_topics监听器
调用Partition状态机,设置ZK监听器
将所有replicas的状态转换成OnlineReplica,并且调用相应的处理流程(原状态为OnlineReplica,目标状态为OnlineReplica,填充Batch对象中相应Map),然后再将partitionAndReplica-OnlineReplica添加到replicaState中
异常2:任何其他异常
删除ZK /controller临时文件
关闭TopicDeletionManager(Topic 删除线程),并清理
partitionLeadershipInfo是否包含该TopicAndPartition
将所有live broker上的所有replicas,状态设置为OnlineReplica,并发送request请求到相应的broker
partition 状态机启动(startup)
确保ZK中,/controller的父路径存在,比如/kafka/controller
取消ZK/admin/preferred_replica_election的监听器
replica 状态机启动(startup)
T
数据变更
启动deleteTopicManager
依据所有的topic名字,从ZK /brokers/topics/[topic名字]中获取所有Topic信息(主要是TopicPartition和对应的AR),依次添加到context中的partitionReplicaAssignment
异常1:ZkNodeExistsException(别的Broker已经成功创建了临时文件)
数据删除
AR中的Replica
取消ZK/admin/reassign_partitions的监听器
从ZK /admin/preferred_replica_election中获取TopicAndPartition信息,选择出真正需要自平衡的TopicAndPartition,添加到context中的partitionsUndergoingPreferredReplicaElection
从ZK /admin/delete_topics中获取需要删除的Topic名称,记为topicsQueuedForDeletion,选择出(所有Topic中的Partition的Replica包含有Down机的BrokerId、partitionsUndergoingPreferredReplicaElection中的Topic、partitionsBeingReassigned中的Topic)的并集,记为topicsIneligibleForDeletion,然后利用这两个变量,创建一个TopicDeletionManager对象,赋值给deleteTopicManager
从context的partitionReplicaAssignment中获取所有TopicAndPartition和对应的AR
打印Log
启动partition状态机
replica状态机中的brokerRequestBatch对象的newBatch,判断batch对象中的几个Map中是否有元素
利用context的partitionReplicaAssignment中所有元素,初始化ReplicaState
ZK /controller_epoch不存在,则新建ZK目录,并将epoch、epochZkVersion设置为初始值1,同时写入ZK目录
注册ZK/admin/reassign_partitions监控器
调用Partition状态机的handleStateChanges,将partitionsUndergoingPreferredReplicaElection中的所有TopicAndPartition状体设置为OnlinePartition状态,此时使用preferredReplicaPartitionLeaderSelector的Leader选择器,重新选择新的Leader
判断该TopicAndPartition的leaderId是否存活
调用Batch的sendRequest方法,将Batch对象Map中的元素组合成相应的Request请求,发送给相应的Broker
声明一个LeaderChangeListener监听
调用onControllerResignation
初始化controllerContext
写成功,将新的epoch和epochZkVersion赋值给context相应属性
ZookeeperLeaderElector
将epoch+1,然后尝试写会ZK/controller_epoch中
调用Partition状态机的initializePartitionState方法,将TopicAndTopic设置为相应的状态,添加到Partition状态机的partitionState中
从ZK /admin/reassign_partitions中获取TopicAndPartition已经重新指定的AR信息,选择出真正需要重定向AR的TopicAndPartition,添加到context中的partitionsBeingReassigned
关闭partitionStateMachine(partition状态机)
关闭replicaStateMachine(replica状态机)
循环context的partitionReplicaAssignment中所有Keys(TopicAndPartition)
关闭partitionsBeingReassigned(重新指定AR)中topic的ISR监控器,ZK路径为:/brokers/topics/offset-test/1/state
取消对ZK中/admin/delete_topics的监听器
关闭autoRebalanceScheduler(leader 自平衡)
StratUp
将replica状态机对象中的hasStarted设为True(表示状态机开始工作)
将当前的controllerContext的epoch、epochZkVersion设置为0
调用maybeTriggerPartitionReassignment函数,触发partitionsBeingReassigned中Partition的AR重新分配
将partitionAndReplica-ReplicaDeletionIneligible添加到replicaState中
启动startChannelManager(在channelManager章节详细介绍)
清空replicaState
打印相应日志
将leaderId设置为-1
启动replica状态机
调用onControllerFailover,进行初始化
设置ZK /brokers/ids监听器
调用maybeTriggerPreferredReplicaElection函数,触发partitionsUndergoingPreferredReplicaElection中topic and partition的leader自平衡
initializeControllerContext
设置ZK /brokers/topics监听器
leaderId设置为当前brokerId
根据leaderId是否为本地BrokerId,返回True/False
将当前的Broker的状态设置为RunningAsBroker
所有AR中Replica循环完成
从ZK /brokers/ids中获取所有Broker信息,赋值给context中的liveBrokersUnderlying和liveBrokerIdsUnderlying
将Partition状态机对象中的hasStarted设为True(表示状态机开始工作)
leaderId != -1
关闭controllerChannelManager(关闭channel连接、清空队列、停止request发送线程、清空brokerStateInfo)
将partitionAndReplica-OnlineReplica添加到replicaState中
将Metadata发送给当前ZK /brokers/ids中所有broker,进行缓存
从ZK /brokers/topics中获取所有Topic名字,赋值给context中的allTopics
调用controller的重点方法checkAndTriggerPartitionRebalance(稍后介绍),实现定时TopicAndPartition的Leader自平衡
将LeaderChangeListener部署到ZK中,监听ZK中的/controller路径
依据partitionReplicaAssignment的keys(topicAndPartition),从ZK /brokers/topics/[topic名字]/[partition ID]/state中获取该PartitionAndPartition的LeaderAndISR信息,然后添加到context的partitionLeadershipInfo中
收藏
0 条评论
下一页