zookeeper
2023-02-17 21:59:30 4 举报
AI智能生成
zookeeper启动流程图
作者其他创作
大纲/内容
loadDataBase();<br>zookeeper加载数据文件
zkDb.loadDataBase();
cnxnFactory.start();<br>启动NIO或者Netty
bootstrap.bind(localAddress);<br>绑定地址端口
收到数据<br>CnxnChannelHandler进行处理<br>public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception<br>
LeaderZooKeeperServer.setupRequestProcessors<br>初始化责任链
LeaderRequestProcessor
PrepRequestProcessor<br>构建数据,zxid
ProposalRequestProcessor<br>将消息分发给follower
CommitProcessor
SyncRequestProcessor<br>持久化消息
AckRequestProcessor<br>
TobeAppliedRequestProcessor
FinalRequestProcessor<br>
处理数据<br>cnxn.processMessage((ByteBuf) msg);<br>
receiveMessage(buf);
zks.processPacket(this, bb);
submitRequest(si);
在LeaderRequestProcessor初始化<br>构建责任链处理<br>firstProcessor.processRequest(si);<br>
放入队列<br>submittedRequests.add(request);<br>
PrepRequestProcessor.start()
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);<br>构建zxid
request.zxid = zks.getZxid();<br>nextProcessor.processRequest(request);
ProposalRequestProcessor<br>sendPacket(pp);<br>
遍历follower,发送数据 Leader.class<br>for (LearnerHandler f : forwardingFollowers) {<br> f.queuePacket(qp);<br> }<br>
将消息放入队列
写数据文件,run方法<br>syncProcessor.processRequest(request);<br>
flush(toFlush);
写文件<br>zks.getZKDatabase().commit();<br>
AckRequestProcessor<br>nextProcessor.processRequest(i);<br>
给自己发送一个ACk<br>leader.processAck(self.getId(), request.zxid, null);<br>
加入集合<br>p.addAck(sid);<br>
判断是否可以提交<br>boolean hasCommitted = tryToCommit(p, zxid, followerAddr);<br>
qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()
响应是否超过半数<br>return (ackSet.size() > half);<br>
超过半数,发送给follower<br>commit(zxid);<br>
构建commit类型数据包<br>QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);<br>sendPacket(qp);<br>
<br>inform(p);
发送数据包给Observer
自己commit<br>zk.commitProcessor.commit(p.request);<br>
sendToNextProcessor(pending);
构建内存写数据<br>FinalRequestProcessor.processorRequest<br>rc = zks.processTxn(request);<br>
createNode
回复监听事件<br>dataWatches.triggerWatch(path, Event.EventType.NodeCreated);<br>childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,<br> Event.EventType.NodeChildrenChanged);<br>
监听是一次性的<br>watchers = watchTable.remove(path);<br>
case OpCode.getData
zks.getZKDatabase().getData(getDataRequest.getPath(), stat,<br> getDataRequest.getWatch() ? cnxn : null);
把watch放进去<br>dataWatches.addWatch(path, watcher);<br>
处理完数据,回复客户端<br>cnxn.sendResponse(hdr, rsp, "response");<br>
channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(onSendBufferDoneListener);
startLeaderElection();<br>初始化选举相关数据
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());<br>生成当前节点
createElectionAlgorithm(electionType);<br>
qcm = createCnxnManager();<br>初始化选举管理
listener.start();<br>启动监听
ss = new ServerSocket();<br>创建serverSocket<br>Socket client = ss.accept();<br>socket接收连接
handleConnection(sock, din);<br>处理连接信息
sid = din.readLong();<br>读取发送选票的机器id
如果机器id小于自己,就断开连接,并发送连接<br>
大于就启动<br>SendWorker<br>RecvWorker<br>
snedWorker像发送连接的sid发送选票<br>
RecvWorker<br>将接收到选票放入recevQueue<br>
le = new FastLeaderElection(this, qcm);<br>启动快速选举线程
启动WorkerSender<br>发送选票
process(m);<br>处理选票
如果是自己的sid,就放入自己recvQueue<br>
addToSendQueue(bq, b);<br>将选票放入发送线程
connectOne(sid);<br>进行连接
启动WorkerReceiver<br>接收选票
self.getPeerState() == QuorumPeer.ServerState.LOOKING<br>如果当前自己还是Looking状态<br>
是
已经选举出了leader<br>
super.start();
case LOOKING:
logicalclock.incrementAndGet();<br>逻辑时钟+1<br>
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());<br>初始化选票自己<br>
sendNotifications();<br>将选票发送出去
Notification n = recvqueue.poll(notTimeout,<br> TimeUnit.MILLISECONDS);<br>从别人那里获取读取<br>
如果为null创建连接
不为null<br>
判断接收到的状态<br>
也是looking的时候就进行选票PK<br>
将胜出的选票再次发送<br>
termPredicate(recvset,<br> new Vote(proposedLeader, proposedZxid,<br> logicalclock.get(), proposedEpoch))<br>判断是否超过半数
更新自己状态
case FOLLOWING:
follower.followLeader();
connectToLeader(leaderServer.addr, leaderServer.hostname);
建立Socket连接<br>sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));<br>
syncWithLeader(newEpochZxid);
zk.startup();
创建责任链<br>setupRequestProcessors();<br>
SyncRequestProcessor
SendAckRequestProcessor<br>
FollowerRequestProcessor
死循环接收数据<br>QuorumPacket qp = new QuorumPacket();<br> while (this.isRunning()) {<br> readPacket(qp);<br> processPacket(qp);<br> }<br>
case Leader.PROPOSAL:<br>fzk.logRequest(hdr, txn);<br>
写入本机<br>syncProcessor.processRequest(request);<br>
写回去<br>SendAckRequestProcessor.processRequest<br>
case LEADING:
makeLeader(logFactory)<br>创建leader监听<br>
leader.lead();<br>
cnxAcceptor = new LearnerCnxAcceptor();<br> cnxAcceptor.start();<br>
接收事件<br>s = ss.accept();<br>
LearnerHandler.start();<br>
startSendingPackets();<br>
将消息发送出去<br>oa.writeRecord(p, "packet");<br>
接收消息,并且进行反序列化<br>qp = new QuorumPacket();<br>ia.readRecord(qp, "packet");<br>
case Leader.ACK:<br>leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());<br>
startZkServer();<br>
for (LearnerHandler f : getLearners()) {<br> // Synced set is used to check we have a supporting quorum, so only<br> // PARTICIPANT, not OBSERVER, learners should be used<br> if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {<br> syncedSet.add(f.getSid());<br> }<br> f.ping();<br> }<br>一直循环pingFollwer
0 条评论
下一页