判断是否可以提交<br>boolean hasCommitted = tryToCommit(p, zxid, followerAddr);<br>
qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()
响应是否超过半数<br>return (ackSet.size() > half);<br>
超过半数,发送给follower<br>commit(zxid);<br>
构建commit类型数据包<br>QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);<br>sendPacket(qp);<br>
<br>inform(p);
发送数据包给Observer
自己commit<br>zk.commitProcessor.commit(p.request);<br>
sendToNextProcessor(pending);
构建内存写数据<br>FinalRequestProcessor.processorRequest<br>rc = zks.processTxn(request);<br>
createNode
回复监听事件<br>dataWatches.triggerWatch(path, Event.EventType.NodeCreated);<br>childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,<br> Event.EventType.NodeChildrenChanged);<br>
监听是一次性的<br>watchers = watchTable.remove(path);<br>
case OpCode.getData
zks.getZKDatabase().getData(getDataRequest.getPath(), stat,<br> getDataRequest.getWatch() ? cnxn : null);
把watch放进去<br>dataWatches.addWatch(path, watcher);<br>
处理完数据,回复客户端<br>cnxn.sendResponse(hdr, rsp, "response");<br>
channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(onSendBufferDoneListener);