committedRequests.add(request)
watcher.process(pair.event)
Leader服务端
wait()
CommitProcessor.run()
case Leader.PROPOSAL
真正提交数据到内存
processRequest
create
调用统一逻辑
zks.processTxn(request)
Request request = submittedRequests.take()
committedRequests.poll()
Leader.COMMIT
queuedRequests.add(request)
p.watchRegistration.register(err)
nextProcessor.processRequest(request)
syncWithLeader(newEpochZxid)
leader
FollowerRequestProcessor
消费
WatcherEvent event = new WatcherEvent();eventThread.queueEvent( we )
processRequest(Request si)
p.watchRegistration != null
getData()方法的watch为true会走这段逻辑
new Zookeeper()
this.snapLog.commit()
createNode
从队列取出
将watcher加入到path对应watcher的集合里面
submittedRequests.add(request)
Leader.INFORM
循环发送所有follower
channel.writeAndFlush()
LeaderZooKeeperServer.setupRequestProcessors()初始化
FollowerZooKeeperServer.setupRequestProcessors()
sock.write(p.bb)
selector.select(waitTimeOut)
获取数据包
queuedPackets.poll()
阻塞等待IO事件发生
w.process(e)
waitingEvents.add(pair);
wakeup()
for (LearnerHandler f : forwardingFollowers) {f.queuePacket(qp);}
Object event = waitingEvents.take()
sendObserverPacket(qp)
服务端接收数据
Zookeeper.getData()
outgoingQueue.getFirst();
containsQuorum(qvAckset.getAckset())
参考leader选举流程源码
FinalRequestProcessor
client
sockKey.isReadable()
processEvent(event)
queuedPackets.add(p)
watchTable.remove(path)
跟服务端建立NIO连接
p.createBB()
响应客户端
sendPacket(pp)
zk.commitProcessor.commit(p.request)
Zookeeper.create()
PrepRequestProcessor.run()
hzxid.incrementAndGet()
唤醒等待线程
finishPacket(packet)
sendThread.start()
ProposalRequestProcessor
case OpCode.getData注意关注watcher操作
hasAllQuorums()
将数据发送给服务端
watchers.add(watcher)
pRequest(request)
wakeup用户唤醒阻塞在select上的线程
PrepRequestProcessor
LearnerHandler.run()
放入到commit队列
case OpCode.create
放入回调通知队列
selector.wakeup()
初始化另外两个链条
receiveMessage(buf)
processCommitted()
syncProcessor.processRequest(request)
p.notifyAll()
存入队列
while(true)死循环获取事件
leader接收消息
调用监听回调
SyncRequestProcessor.run()
sendPackets()
submitRequest(si)
Follower节点
outgoingQueue.add(packet)
zk.startup()
是否半数以上
while (this.isRunning()) {readPacket(qp);processPacket(qp);}
AckRequestProcessor
触发客户端监听事件
唤醒
firstProcessor.processRequest(si)
sendThread.readResponse(incomingBuffer)
zks.getZKDatabase().commit()
flush(toFlush)
CommitProcessor
sockKey.isWritable()
写日志文件
EventThread.run()
zks.getLeader().propose(request)
序列化PROPOSAL消息发送出去给follower
cnxn.processMessage((ByteBuf) msg)
存放数据
节点变动会通知客户端,客户端收到会回调监听方法
数据放入队列
sendBuffer(bb)
sendPacket(qp)
eventThread.start()
Set<Watcher> watchers = watches.get(clientPath)
leader.lead()
p.addAck(sid)
这里的watcher是客户端的netty连接对象即NettyServerCnxn
ToBeAppliedRequestProcessor
LeaderRequestProcessor
toFlush.isEmpty()
startSendingPackets()
nextProcessor.processRequest(si)
CommitWorkRequest.doWork()
queuedRequests.poll()
si = queuedRequests.poll()
OpCode.create
往队列存放消息
ackSet.size() > half
收到ack过半
txnLog.commit()
finally
LeaderZooKeeperServer.setupRequestProcessors()
replyHdr.getXid() == -1
通过jute序列化后封装到ByteBuf里面
异步线程池调用
SendThread.run()
接收leader消息
给所有follower发送commit消息
这里即客户端初始化的watcher
生成事务id
构建leader请求处理链
发送数据给observer
case Leader.ACK
SyncRequestProcessor
SendAckRequestProcessor
packet.wait()
leader提交commit数据
startConnect(serverAddress)
notifyAll()
ClientCnxn.start()
sendToNextProcessor(request)
sendThread.getClientCnxnSocket().packetAdded()
从队列取出消息
CommitProcessor线程wait等待
ScheduledWorkRequest.run()
监听是一次性的
在leader选举的时候,选出leader会创建一个leader对象
有NIO事件发生
CnxnChannelHandler.channelRead()
建立连接后监听读写事件并处理
inform(p)
发送ACK消息给leader
zks.getNextZxid()
异步线程
取出数据
commit(zxid)
follower.followLeader()
this.watcher = watcher