04_Zookeeper原理及源码分析
2024-05-23 21:19:23 0 举报
登录查看完整内容
Zookeeper原理及源码分析
作者其他创作
大纲/内容
心跳sendPing()
如果sync操作发送LearnerSyncRequest到每个followersendSync()
将监听事件放入队列eventThread.queueEvent( WatchedEvent)
最后发送的消息队列lastMessageSent
读取消息体readPayload()
反序列化CreateRequest
发送投票WorkerSender
监听回调组件ZKWatchManager
处理事务请求processTxn
投票setCurrentVote()无限循环,直到find leaderFastLeaderElection.lookForLeader()
是
拉取最新投票
添加到ackSet
SendWorker
ServerCnxnFactory
断开连接
是否初始化?
修改节点状态self.setPeerState((proposedLeader == self.getId())
sockKey.isReadable()
session重连时的watcher监听恢复
收到的消息放入队列
开始投票ServerState = LOOKING
返回ACK响应SendAckRequestProcessor
3、消息写入消息广播模式(4、宕机后重新进入恢复模式,重新1、2、3)
读取ConnectRequest请求readConnectRequest()
创建znode/usr/data/xid
LeaderZooKeeperServer
构建Proposal类型Leader.PROPOSAL
3、选举leader(其实是初始化而不是start)startLeaderElection()
将请求放入队列addChangeRecord
如果事务操作
QuorumCnxManager.toSend
内部sender线程,发送packet
无限循环send方法
获取回调事件
initializeNextSession()创建第一个sessionid根据sid,左移24右移8 | sid左移56
Leader
TCP长连接建立session发送心跳(断开超过sessionTimeout就删除session彻底断开)
启动 start()sendThread.start(); eventThread.start();
创建消息发送和接收线程
解析cfg.confQuorumPeerConfig
无限循环获取socket请求数据
session05
leader socket连接组件LearnerCnxAcceptor
SendThread.run()无限循环,监听socket,收发请求
ZKDatabase
请求队列中增加ConnectRequest请求outgoingQueue.addFirst
1、封装为proposal(提案)
是否为初始化状态
NIOServer初始化,监听2181端口cnxnFactory.configure()
zk内存数据结构ZKDatabase
是否为OP_CONNECT事件
outstandingProposals
RecvWorker
Y
TCP长连接
在dataDir生成数据快照(snapCount个日志,生成一次数据快照)
requestHeader != null && !ping && !auth
队列
4、开始投票 & 根据选举后确定的角色执行对应角色的功能super.start(),启动自己while(true)
propose
触发watcher
直接调用commitprocessRequest
查看大部分节点的投票termPredicate
建立tcp长连接
是否如下操作类型?OpCode.create
commitProcessor
处理请求packetprocessPacket()
判断大小
发送给所有followerPROPOSAL请求
create请求
成为LeaderServerState = LEADING
create请求发送到follower的处理流程
SendThread数据发送线程
确定选举算法createElectionAlgorithm()目前就有一种算法有效FastLeaderElection
id比自己小?
type = ACK
循环处理队列中的请求
添加队列
请求处理链
加载恢复数据zk.loadData()
ClientCnxnSocketNIO网络管理组件
检查本地事务日志文件
syncProcessor增加事务日志
创建session管理组件SessionTrackerImpl
followLeader注册到leader
FollowerZookeeperServer
监听器管理组件ZKWatchManagerdataWatchesexistWatcheschildWatches
从outgoingQueue中获取待发送的packet
queuedRequests队列
start
创建请求GetDataRequestGetChildrenRequestExistsRequest
提交请求submitRequest
开始连接startConnect()watcher恢复以及发送connect请求到队列
持久proposalackcommit
lead()启动leader业务逻辑
读取响应readResponse
poll
尝试重连
FileTxnSnapLog磁盘日志
triggerWatch
processAck
注册到leaderregisterWithLeader(Leader.FOLLOWERINFO)
否
follower写入日志文件
推荐事务id最大的一个作为leader过半同意
通过nio连接组件连接服务端clientCnxnSocket.connect(addr);
recvQueue
增加日志append
k.isReadable()
恢复模式1、选举出leader
初始化处理链setupRequestProcessors()PrepRequestProcessor -> proposalProcessor -> CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor
发送请求同步到所有followersendPacket()
是否过半follower都写入成功?
创建znode/usr/data/uid
创建nio channelSocketChannel sock = createSock();
nio连接组件ClientCnxnSocketNIO
CommitProcessor
返回响应
session续约touch
转发请求到commit处理组件
拿到QuorumPacket
1、写入本地日志文件状态:未提交2、发送follower队列3、收到过半的follower ack后执行commit操作
zookeeper 03(follower)
4、commit
注入
OP_ACCEPTcreateConnection
PrepRequestProcessor -> proposalProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor
普通socket连接connectToLeader
FollowerZooKeeperServer
对比epoch
打包packet放入队列queuePacket
watchTable.remove移除已绑定的监听器,也就是说监听器是一次性的
所有follower的proposal请求处理链条,目的是修改dataTreePrepRequestProcessor -> proposalProcessor -> FinalRequestProcessor
加入commit队列commit
设置TxnHeader
jute协议解析
pRequest2Txn()
等待leader的数据同步,对比version是否一致,判断是否需要丢弃
client 03
processEvent回调我们自己监听器实现的process
待发投票消息sendqueue
watcher监听/usr/data/uid数据变化
纯内存结构 znode tree( 文件系统结构 )
2、返回ACK
定时清理日志文件每小时清理一次:autopurge.purgeInterval=1保留3个数据快照及对应的日志文件:autopurge.snapRetainCount=3
拿到Proposal
完成finishPacket注册监听器
处理事务型请求processTxn
完成session初始化finishSessionInit
连接的zk实例发送的消息队列queueSendMap
启动serversocket等待follower连接
处理watcherEventprocess
创建session请求
session分桶管理
jute解析,读取响应sendThread.readResponse(incomingBuffer)
onConnected建立zk语义上的连接
zookeeper 01进程(leader)
zookeeper 04(observer)针对读多写少的场景,增加大量observer节点可以线性提升性能
发送和读取数据doIO
session06
PROPOSAL请求处理成功
leaderZookeeperServer
唤醒notifyAll
网络底层的socket连接并不是zk语义上的connect
查getData查子节点getChildren是否存在exists
收到投票消息recvqueue
清除无效session
唤醒
转发指令到leaderfollower不接收写操作
4、写入内存结构
注册监听器register
session03
修改dataTree
关注OP_READ和OP_WRITE事件clientCnxnSocket.enableReadWriteOnly()
发送ConnectResponse
zk client
FinalRequestProcessor
处理packet
发送响应sendBuffer
保存快照takeSnapshot()
2、启动nio server承接client请求cnxnFactory.start()
发送监听响应sendResponse
设置sessionId
收发数据clientCnxnSocket.doTransport
session01
committedRequests队列
返回响应FinalRequestProcessor
为每个follower创建handler
获取消息
删除
touchping/req
完成packet
开始监听nio连接,等待客户端接入selector.selectedKeys()
FinalRequestProcessor处理响应
PrepRequestProcessor线程
发送Leader.COMMIT到所有follower,让他们提交commit
启动发送线程
增create删delete改setData
开启ServerSocket
获取LearnerInfo
构建消息ToSend将自己的id、zxid和epoch发送出去
1、加载本地epoch数据loadDataBase()
wait()等待leader发来的commit请求
启动LeaderZooKeeperServerstartZkServer()
处理connect请求processConnectRequest
client 01
创建jute协议的leader连接输入输出流leaderIsleaderOs
sockKey.isWritable()
是否存在未commit的记录
监听回调队列LinkedBlockingQueue<Object> waitingEvents
添加到toBeApplied队列
卡住等待响应while (!packet.finished) packet.wait();
createCnxnManager()
2PC提交ZAB原子广播协议保证数据最终一致性
最近2秒
doIO
zkServer.sh
启动清理快照和日志的线程DatadirCleanupManager
提交到commit处理链
请求处理链processorChain
切换新日志文件rollLog()
创建Follower核心对象
session04
获取sessionIdsession超时时间
是否监听器回调
注册
session07
类型:临时永久顺序
加载zxid
QuorumPeer创建zookeeper的一个实例
投票归档recvset
初始化DataTree
session08
session09
节点之间的socket连接管理器QuorumCnxManager
OP_READ
创建CreateMode
firstProcessor
offer
最近8秒
这里就会发生主从不一致问题,因为某些follower的commit没有及时执行,dataTree的操作是不可见的
最近4秒
packet.wait();
创建客户端连接管理组件ClientCnxn
构建投票Vote(myid,zxid,epoch)
quorumPeer.start()
AckRequestProcessor
转发createRequest请求
1、关注OP_READ2、和selectionKey关联attach
k.isWritable()
事件处理EventThread
2、成为leader后第一件事是数据同步
内存数据库ZKDatabase处理日志磁盘快照
收到过半follower都执行PROPOSAL成功后发送COMMIT请求到所有follower
follower都有这条未commit的记录,leader就进行commit
读取各种request请求readRequest()
OS Cache(可能数据会丢失)forceSync:yes,在commit的时候强制刷磁盘
初始化和启动QuorumPeerMaininitializeAndRun()
removeclose
state.isConnected()
接收投票WorkerReceiver
FileTxnSnapLog
提交到处理链submitRequest
集群启动,在initLimit * tickTime内要求leader和follower必须完成数据同步,否则leader直接对外提供服务
仅仅是数据同步不参与选举
选票PKtotalOrderPredicate()
watchManager
集群启动选举Leader
初始化
是否事务型请求?
删除等待响应packet
创建请求CreateRequestDeleteRequestSetDataRequest
通知observer直接commitinform
移动到下一个桶
发送
添加任务到队列processRequest
无限循环
发送数据
释放锁等待等待commit指令到来,也就是说事务操作需要等到过半follower写入日志成功后才能给客户端响应wait()
处理connectRequest响应readConnectResult()
createSession()
Listener节点之间通信组件
Packet请求打包
恢复的时候将数据快照直接加载,然后回放日志文件,合成完整数据
zk.startup()createSessionTracker()startSessionTracker()
操作ZKDatabase
N
等待响应队列pendingQueue
关注OP_READenableRead()
/usr-- /data------/uid
createRequest
zookeeper 02(follower)
client 02
分桶管理目的是保证最近即将过期的桶里面保留最少的session集,高效的过期session
等待发送队列outgoingQueue
while ((self.getPeerState() == ServerState.LOOKING)
socket输出流DataOutputStream(sock.getOutputStream())
session校验checkSession
创建sessioncreateSession
获取监听器集合
submittedRequests队列
FollowerRequestProcessor
连接的zk实例id和对应的sender线程映射关系senderWorkerMap
follower和leader超过syncLimit *tickTime没有心跳,leader就剔除follower
LearnerHandler请求处理组件处理follower(learner)的请求
jute反序列化为ConnectRequest对象
发送请求心跳SendThread
集群模式启动runFromConfig()
客户端连接处理组件NIOServerCnxn
成为FollowerServerState = FOLLOWING
Leader选举算法(组件)FastLeaderElection
ToBeAppliedRequestProcessor#processRequest
outstandingChanges
加入等待响应队列
old leader恢复后成为follower后
待处理队列queuePacket
处理连接handleConnection()
SyncRequestProcessor
connect()
ZAB协议处理ProposalRequestProcessor
client创建ZooKeeper实例
followerZookeeperServer
发送投票sendNotifications()
0 条评论
回复 删除
下一页