ZooKeeper-Leader选举源码(3.4.14)
2022-04-07 12:33:13   15  举报             
     
         
 ZK-Leader选举源码(3.4.14)@@@
    作者其他创作
 大纲/内容
   sendqueue.offer(notmsg);
  获取选票
  整个zookeeper选举底层可以分为选举应用层和消息传输层,应用层有自己的队列统一接收和发送选票,传输层也设计了自己的队列,但 是按发送的机器分了队列,避免给每台机器发送消息时相互影响,比如某台机器如果出问题发送不成功则不会影响对正常机器的消息发送。
  将上面创建的初始服务连接对象放入本服务器节点对象
    logicalclock.set(n.electionEpoch);
  接受的选票选举周期大于自己的选举周期,这种情况可能是自己后启动加入集群选举,或者是网络中断恢复后加入集群,其他机器已经选举过好几轮流,所以需要更新自己的选举周期到最新
  while 循环接收啦leader同步的数据
     DataNode.deserialize
  设置选举类型,默认是3(electionAlg = 3;)
  LOOKING选举状态
  WorkerSender线程
  quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
  一般不可能发生
  bootstrap.getPipeline().addLast(\"servercnxnfactory\
   long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
  启动发送选票线程
  同步Leader数据
  runFromConfig(config);
  从内存中获取选票中最大的zxid的数据
  选票PK因为自己的选票周期落后了,可能是刚加入集群选举,所以是拿收到的选票跟投给自己的选票做PK
  获取leaderserver
   recvqueue = new LinkedBlockingQueue<Notification>();
  解析到配置文件加载到内存
  初始化Leader管理器
  注册自己到leader
  sid3<--->RecvWorker
  RecvWorker消息接收线程
     while (this.isRunning()) {                    readPacket(qp);                    processPacket(qp);                }
  接收fllower数据并开启显线程处理
      closeSocket(sock);
  机器1
  //取出发送选票的队列ArrayBlockingQueue<ByteBuffer> bq = queueSendMap                                .get(sid);//发送选票send(b);
  启动集群选举leader线程
   ss = new ServerSocket();ss.bind(addr);Socket client = ss.accept();
   queue.add(buffer);
    this.electionAlg = createElectionAlgorithm(electionType);
  注册JMX
  while重传输层接收队列取出选票
   sock.close();
  如果发送选票方是选举状态并行发送周期小于自己则把自己PK出來的選票回发给发送选票方送
    startLeaderElection(); 
  myid=1的机器,投出去(1,0),收到的票是(2,0),将收到的票跟自己投出去的票对比,优先选择zxid大的为leader,zxdi大的机器包含的数据是最新的,如果zxid一样,默认选myid大的为leader,推荐(2,0)成为leader 
  最终将接收到的选票放入recvQueue队列中异步处理
  绑定业务处理CnxnChannelHandler
  机器2
   sendNotifications();
  返回选出的Leader并设置到自己的节点的currectVote属性里
  实现类
  queueSendMap发送队列每台机器对应一个发送队列
  sid1<--->SenderWorker
  QuorumCnxManager.Listener.run()
   getInitLastLoggedZxid()
  是
  给发送选票sid这台机器创建一个选票发送器,在接收到选票的时候就给发选票机器建立一个选票发送器线程供将来发选票使用,并启动发送线程
    config.parse(args[0]);
  启动选举监听
  QuorumServer leaderServer = findLeader();          
  while循环发送选票
  false自己胜
     if (sid > this.mySid) 
  SendWorker   sw.start();
  FOLLOWING
  switch (n.state) 
  第一轮投票 (2,0)选票胜
   BinaryInputArchive.readRecord
    ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
  leader.lead();
   quorumPeer.start();
  给所有其他参与投票的节点发送选票到应用层发送队列总
  发送
  构建leader处理请求
  while
  在上一步选举leader之后再看下是否还有新选票加入,如果有,还需要在做下选举的PK,如果新选票胜则需要重新选举
   ManagedUtil.registerLog4jMBeans();
  初始化集群选举leader相关对象数据
  入队
  recvqueue.offer(n);
    self.setPeerState((proposedLeader == self.getId()) ?                                        ServerState.LEADING: learningState());
  myid=1
  myid=3
  myid=2的机器,投出去(2,0),收到的票是(1,0),还是推荐(2,0)成为leader 
  初始化Netty线程组BossGroup和WorkGroup
  选举核心逻辑
  QuorumPeerMain
  NettyServerCnxnFactory
  SendWorker.run
  如果发送选票方是选举状态则把本机认为的leader选票回发给发送选票方
   connectionExecutor.execute(                    new QuorumConnectionReceiverThread(sock));
  初始化服务连接器对象zk默认是NIO,官方推荐Netty
     if (sid == this.mySid) 
  Socket(BIO)
  投票机器超过半数
     connectOne(sid);
   makeFollower(logFactory)
  WorkerReceiver线程
  将接收的选票放入选票箱
   zk.loadData();
  recvQueue
  LeaderZooKeeperServer.setupRequestProcessors
  绑定启动端口
  发送选票选自己
  同步数据到从节点
  第一次启动时肯定是没有选票,这时候会根需要发送选票的机器连接链接建立socket链接
  选举应用层(机器1)
  sid2<--->RecvWorker
  leader根据所有fllower定时发送ping请求保持长连接
  myid=1的机器,投出去(2,0),收到的票是(2,0),投一台机器的票数已经超过集群的半数,此时选举就结束了,确定(2,0)机器是leader
      cnxnFactory.start();   
  RecvWorker.run
  QuorumPeer.run()
    setLeader(makeLeader(logFactory));
  if(n == null)
   if((ackstate == QuorumPeer.ServerState.LOOKING)                                        && (n.electionEpoch < logicalclock.get()))
  while循环接受选票
   connectionThreadCnt.incrementAndGet();
     if (sid != this.mySid) 
  启动快速选举算法相关线程
  leader选举多层队列架构
  t.start();WorkerSender.run
   quorumPeer.setCnxnFactory(cnxnFactory);
     DataOutputStream dout = null;        DataInputStream din = null;  dout = new DataOutputStream(sock.getOutputStream());            dout.writeLong(this.mySid);            dout.flush();            din = new DataInputStream(                    new BufferedInputStream(sock.getInputStream()));
  启动netty服务
   while ((self.getPeerState() == ServerState.LOOKING) &&                    (!stop))
  设置内存数据库对象(ZKDatabase)
   receiveConnectionAsync(client);
  接收的选票周期等于自己,意味着大家一直在参与选举,那么在选举PK的时需要拿收到的选票跟之前自己投的选票做PK
  这种状态一般是已经选出leader的集群有新机器加入了,新机器处理LOOKING状态会先选举投票给自己,其他机器收到后会发已选出集群leader选举给新新机器,这个选票的发送方式状态就是FOLLOWING或LEADING
  过半数选举leader逻辑
  sendqueue
   recvQueue.add(msg);
  当前节点是选举状态不断从应用层队列里面拿选票做选举
   sendqueue = new LinkedBlockingQueue<ToSend>();
  接收
  main()
  处理连接信息
  recvqueue
  myid=3的机器,启动时发现集群已经选举出来leader了,此时会让自己变成follower
    Vote current = self.getCurrentVote();
    follower.followLeader();
  startZkServer();
  选举应用层(机器2)
    manager.connectAll();
  Main loop根据当前节点的状态做对应的业务处理
   quorumPeer.setElectionType(config.getElectionAlg());
  清理快照任务
   setPeerState(ServerState.LOOKING);
  选举周期加1
  初始化选票(自己)
  QuorumConnectionReqThread.run
  获取集群几点对象
  在选举端口监听连接选举使用普通的Socket通信(BIO)
  客户端执行命令行
  读取发送选票机器的sid
  给所有其他参与投票的节点发送选票到应用层发送队列
  否判断选票的状态,这个状态是发送到选票方的状态
  RecvWorkerrw.start();
  如果选票接收器接收的是自己放入自己的选票队列
  t.start();WorkerReceiver.run
  connectOne(sid);
  n.electionEpoch > logicalclock.get()
  sid3<--->SenderWorker
  create  /guanbo666
  receiveConnection(sock);
  第二轮投票 (2,0)选票胜超过半数节点的2号机器成为leader
  QuorumMaj.containsQuorum
  return self.getQuorumVerifier().containsQuorum(set);
     if (sid < this.mySid) 
  if(ackstate == QuorumPeer.ServerState.LOOKING){
   sid = din.readLong();
  setupRequestProcessors()
  循环选票箱收到的选票,与本机的leader选票对比,如果相等,将投p机器sid加入voteset
  myid=2的机器,投出去(2,0),收到的票是(2,0),投一台机器的票数已经超过集群的半数,此时选举就结束了,确定(2,0)机器是leader
  生产选票
  super.start(); 
  senderWorkerMap消息发送线程
  listener.start();
  接收连接
  当前机器主动发起socket连接到发送选票的id较小的机器
    return (set.size() > half);
    main.initializeAndRun(args);
   NettyServerCnxnFactory.start
     process(m);
  如果leader挂了 会触触发异常
  myid=2
  接收的选票周期小于自己,意味着发送选票的机器刚加入集群选举,发起投他自己的选票,这种选票一般都是要废弃
  使用jute序列化从输入流里面拿数据,jule类似protobuf.据官方说后面会弃用jute
  sid2<--->SenderWorker
  数据转为选票
   readPacket(qp);
  启动服务节点
   sendqueue.offer(notmsg);
  清空之前的选举箱
  逻辑类型
  true接收到的选票胜
  开启选票接收的线程
    if (vote.equals(entry.getValue())){                set.add(entry.getKey());            }
  n.electionEpoch < logicalclock.get()
    loadDataBase();
  如果发送选票的机器id小于当前机器的则关闭连接,为了防止机器之间相互重复建立socket连接(双向的),zk不允许id小的机器连接id大的机器
    syncWithLeader(newEpochZxid);      
   LearnerHandler.run
  archive.startRecord(\"node\");        data = archive.readBuffer(\"data\");        acl = archive.readLong(\"acl\
  QuorumConnectionReceiverThread.run()
  如果要使用Netty通信,需要加上启动参数:-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
   setCurrentVote(makeLEStrategy().lookForLeader());
   recvset.clear();
  实现
  核心启动流程
  purgeMgr.start();
  quorumPeer.initialize();(SaslQuorumAuthServer、SaslQuorumAuthLearner)
  与leader建立连接,并接收leader数据同步
    recvQueue.add(msg);
  发送选票线程
  启动接收选票线程
  如果本机的leader和本机的sid一样,则自己就是leader,否则就是FOLLOWING或者OBSERVING
  初始化服务
   setFollower(makeFollower(logFactory));
  n.electionEpoch = logicalclock.get()
  LOOKING
  sid1<--->RecvWorker
  启动或leader宕机选举leader流程
  主动向leader发起socke连接
    while (running) 
  qcm = createCnxnManager();
  是本机还处于选举状态放入应用层接收队列中
  switch (getPeerState()) 
  sw.setRecv(rw);
  DataInputStream  din = new DataInputStream(                    new BufferedInputStream(sock.getInputStream()));
  给发送选票sid机器初始化一个发送选票队列放入Map
   LearnerCnxAcceptor.run
   FastLeaderElection.lookForLeader
    zk.startup();
   bootstrap.bind(localAddress);
  接收选票线程
  quorumPeer = getQuorumPeer();
  FOLLOWING andLEADING
     Socket s = ss.accept();
  LEADING
  更新自己的选举周期
  放入传输层待放入队列
     break;
  cnxAcceptor = new LearnerCnxAcceptor();            cnxAcceptor.start();
   this.messenger = new Messenger(manager);
  判断sid类型
   logicalclock.incrementAndGet();
  将选票发送器与sid对应放在map里面
  CnxnChannelHandler.channelread
  否本机不是选举状态,已经传出了leader
  if(self.getPeerState() == QuorumPeer.ServerState.LOOKING)
  加载文件数据到内存
  self.setPeerState((n.leader == self.getId()) ?                                        ServerState.LEADING: learningState());
    leaveInstance(endVote);                                return endVote;
   
 
 
 
 
  0 条评论
 下一页
  
   
   
   
   
  
  
  
  
  
  
  
  
 