zookeeper源码解析
2021-09-12 16:00:01 2 举报
AI智能生成
zookeeper进阶篇,源码,原理
作者其他创作
大纲/内容
作用
元数据存储
Dubbo:ZooKeeper作为注册中心
Kafka:分布式集群的集中式元数据存储,分布式协调和通知
Master选举
HDFS:Master选举实现HA架构
Canal、condis:分布式集群的集中式元数据存储,Master选举实现HA架构
分布式协调
Dubbo、Spring Cloud把系统拆分成很多的服务或者是子系统
ZooKeeper分布式锁
特性
顺序写
集群中只有一台机器可以写,所有机器都可以读
所有写请求都会分配一个zk集群<b><font color="#f15a23">全局的唯一递增编号,zxid</font></b>,保证各种客户端发起的写请求都是有顺序的
数据一致性
任何一台zk机器收到了写请求之后都会同步给其他机器,保证数据的强一致
你连接到任何一台zk机器看到的数据都是一致的
数据同步是用什么协议做的
ZAB协议,原子广播协议
ZAB的核心思想
主从同步机制
2PC (两阶段提交)+ 过半写机制
1,发起一个事务proposal之前,leader会分配一个全局唯一递增的事务id,zxid,通过这个可以严格保证顺序
2,leader会为每个follower创建一个队列,里面放入要发送给follower的事务proposal,这是保证了一个同步的顺序性
3,每个follower收到一个事务proposal之后,就需要立即写入本地磁盘日志中,写入成功之后就可以保证数据不会丢失了
4,然后返回一个ack给leader,然后过半follower都返回了ack,leader推送commit消息给全部follower
5,leader自己也会进行commit操作
Leader收到事务请求,转换为事务Proposal(提议)同步给所有的Follower,超过半数的Follower都说收到事务proposal了,Leader再给所有的Follower发一个Commit消息,让所有Follower提交一个事务
崩溃恢复机制
如果Leader崩溃了,要重新选举Leader保证继续运行
选举一个leader出来,然后leader等待集群中过半的follower跟他进行数据同步,只要过半follower完成数据同步,接着就退出恢复模式,可以对外提供服务了
如果一个follower跟leader完全同步了,就会加入leader的同步follower列表中去,然后过半follower都同步完毕了,就可以对外继续提供服务了
过半机器选举机制
剩余机器超过一半,集群宕机不超过一半的机器,就可以选举新的leader,数据同步
只要有超过一半的机器,认可你是leader,你就可以被选举为leader
最终一致性
不是强一致性,zk官方给自己的定义:<b><font color="#f15a23">顺序一致性,</font></b>他比最终一致性更好一点
1,有的follower已经commit了,但是有的follower还没有commit
2,不是说leader必须保证一条数据被全部follower都commit了才会让你读取到数据,而是过程中可能你会在不同的follower上读取到不一致的数据,但是最终一定会全部commit后一致,让你读到一致的数据的
数据一致性问题
01,Leader收到了过半的follower的ack,接着leader自己commit了,还没来得及发送commit给所有follower自己就挂了
就会<b><font color="#00a650">选举一个拥有事务id最大的机器作为leader</font></b>,他得检查事务日志,如果发现自己磁盘日志里有一个proposal,但是还没提交,说明肯定是之前的leader没来得及发送commit就挂了
02,leader可能会自己收到了一个请求,结果没来得及发送proposal给所有follower之前就宕机了
老leader自己磁盘日志里有一个事务proposal,他启动之后跟新leader进行同步,发现这个事务proposal其实是不应该存在的,就直接丢弃掉就可以了
zxid
64位的,高32位是leader的epoch,是leader的版本;低32位才是自增长的zxid
新leader选举出来,epoch会自增长一位
老leader恢复了连接到集群是follower了,此时发现自己比新leader多出来一条proposal,但是自己的epoch比新leader的epoch低了,所以就会丢弃掉这条数据
高性能
每台zk机器都在<b><font color="#f15a23">内存</font></b>维护数据,所以zk集群绝对是高并发高性能的
如果你让zk部署在高配置物理机上,一个3台机器的zk集群抗下每秒几万请求没有问题
高可用
集群中挂掉不超过一半的机器,都能保证可用,数据不会丢失
高并发
基于纯内存数据结构来处理,并发能力是很高的、
只有一台机器进行写,但是高配置的物理机,比如16核32G,写入几万QPS
所有机器都可以读,3台机器的话,起码可以支撑十几万QPS
三种角色的机器
Leader
只有Leader是可以写的
客户端可以随便连接leader或者follower,如果客户端连接到follower,follower会把写请求转发给leader
Follower
Follower是只能同步数据和提供数据的读取
Leader挂了,Follower可以继续选举出来Leader
Observer
Observer也只能读但是Observer不参与选举
zk是适合写少的场景
大量的服务的上线、注册、心跳的压力,达到了每秒几万,甚至上十万,zk的单个leader写入是扛不住那么大的压力的
提供读服务,可以无限的扩展机器
读可以有每秒几万QPS
配置:peerType=observer
所有机器的配置文件,都要加入一个server.4=zk04:2888:3888:observer
核心机制
客户端与ZooKeeper之间的长连接
客户端就会跟zk建立连接,是TCP长连接
建立了一个会话,就是session,可以通过心跳感知到会话是否存在
sessionTimeout,意思就是如果连接断开了,只要客户端在指定时间内重新连接zk一台机器,就能继续保持session,否则session就超时了
数据模型
树形结构的znode,里面可以写入值,就这数据模型,都在zk内存里存放
<p class="MsoNormal"><span style="mso-spacerun:'yes';font-family:等线;mso-bidi-font-family:'Times New Roman';<br>font-size:10.5000pt;mso-font-kerning:1.0000pt;"><font face="等线">持久节点</font></span></p>
哪怕客户端断开连接,也一直存在
临时节点
客户端断开连接,节点就没了
顺序节点
创建节点的时候自增加全局递增的序号
案例应用
临时顺序节
分布式锁
<p class="MsoNormal"><span style="mso-spacerun:'yes';font-family:等线;mso-bidi-font-family:'Times New Roman';<br>font-size:10.5000pt;mso-font-kerning:1.0000pt;"><font face="等线">加锁的时候,是创建一个临时顺序节点</font></span></p>
zk会自动给你的临时节点加上一个后缀,全局递增的,编号
如果你客户端断开连接了,就自动销毁这个你加的锁,此时人家会感知到,就会尝试去加锁
持久节点
元数据存储
临时节点
分布式协调和通知(微服务)
Watch监听
客户端可以对znode进行Watcher监听
znode改变的时候回调通知你的这个客户端
在分布式系统的协调中应用
分布式系统的协调需求
分布式架构中的系统A监听一个数据的变化,如果分布式架构中的系统B更新了那个数据/节点,zk反过来通知系统A这个数据的变化
环境配置
8核16G,16核32G,高配置虚拟机最好了,SSD固态硬盘
3台机器,1个leader,2个follower,<br><b><font color="#ff00ff">leader主要是写,每秒抗几万并发写入是可以的;leader+follower,读,每秒抗个5万~10万的读是没有问题的</font></b>
机器如果有16G的内存,堆内存可以分配个10G,栈内存可以分配每个线程的栈是1MB,Metaspace区域可以分配个512MB都可以
设置垃圾回收器
新生代+老年代,ParNew+CMS,<br>如果是大内存机器,不建议这个组合了,就用G1回收所有的垃圾对象,还得设置一些G1的参数,region的大小,预期的每次GC的停顿时间是多少毫秒,比如100ms
参数
tickTime:zk里的最小时间单位,2000毫秒,2s
其他的一些参数就会以这个tickTime为基准来进行设置,比如有的参数就是tickTime * 2
dataDir:主要是放zk里的数据快照,剖析zk的源码的时候
<font color="#f15a23">内存里有一份快照,在磁盘里其实也会有一份数据的快照</font>,zk停机了,重启,才能恢复之前的数据
dataLogDir:写数据,2PC,proposal(事务),每台机器都会写入一个本地磁盘的事务日志,主要是放一些日志数据
SSD固态硬盘,读写速度非常快,dataLogDir,<font color="#f15a23">事务日志磁盘写,是对zk的写性能和写并发的影响是很大的</font>
initLimit
zk集群启动的时候,默认值10,10 * tickTime,20s
leader在启动之后会等待follower跟自己建立连接以及同步数据,最长等待时间是20s
zk里存储的数据量比较大了,follower同步数据需要的时间比较长,此时可以调大这个参数
syncLimit
默认值5,5 * tickTime,10s
leader跟follower之间会进行心跳,如果超过10s没有心跳,leader就把这个follower给踢出去了,认为他已经死掉了
数据快照
一份是在磁盘上的事务日志,一份是在内存里的数据结构,<br>理论上两份数据是一致的,即使是有follower宕机,也是内存里的数据丢失了,但是磁盘上的事务日志都是存在的
有的follower没收到事务日志就宕机了,也可以在启动之后找leader去同步数据
每次执行<b><font color="#f15a23">一定的</font></b>事务之后,就会把内存里的数据快照存储到dataDir这个目录中去,作为zk当前的一个数据快照
1000个事务对应的内存数据写入到<font color="#f44336">dataDir里作为一个数据快照</font>,继续此时<font color="#9c27b0"><b>事务日志里</b></font>有1032个事务,<br>此时zk重启,他可以直接把包含1000个事务的快照直接加载到内存里来
然后把1000之后的32个事务,1001~1032的事务,在内存里回放一遍,就可以在内存里恢复出来重启之前的数据了
snapCount:100000,<font color="#f15a23">默认是10万个事务,存储一次快照</font>
10万个事务以内,不需要快照,因为直接读取事务日志,回放到内存就重建内存数据了
maxClientCnxns
一台机器上最多能启动多少个ZooKeeper客户端
有限制的,默认来说60
zk servers最多只能允许你的一台机器跟他建立60个连接
每次请求都创建一个zk客户端,跟他建立连接,进行通信,再销毁zk客户端,如果并发有很多个请求一起连接zk,此时会导致一台机器上有很多zk客户端,会被zk servers拒绝的
jute.maxbuffer
一个znode最多可以存储多少数据呢?1mb,1048575
server.1=zk01:2888:3888
3888端口,是用来在集群恢复模式的时候进行leader<b><font color="#f15a23">选举投票</font></b>的
2888的端口,是用来进行leader和follower之间进行<font color="#f15a23">数据同步和运行时通信</font>
事务日志和数据快照是如何进行定时清理的
autopurge.purgeInterval=1<br>autopurge.snapRetainCount=3
后台自动清理掉多余的事务日志文件和数据快照文件
磁盘的事务日志有没有丢失的风险
第一个阶段里,各个机器把事务日志写入磁盘,此时一般进入os cache的,没有直接进入物理磁盘上去
commit提交的时候一般默认会强制把写的事务fsync到磁盘上去
forceSync:yes
commit的时候,需要fsync到磁盘上去
但是,有可能会丢失部分os cache里没刷入磁盘的数据,如果是leader宕机
leaderServers:yes
leader是否接受客户端的连接,写请求由follower转发给leader,leader主要接受follower的转发写请求进行处理
cnxTimeout:5000
在进行leader选举的时候,各个机器会基于3888那个端口建立TCP连接,在这个过程中建立TCP连接的超时时间
命令
echo conf | nc localhost 2181
conf(查看配置)、cons(查看连接)、crst(重置客户端统计)、dump(输出会话)、envi(查看环境)、ruok(检查是否在运行)、stat(查看运行时状态)、srst(重置服务器统计)、wchs(查看watcher信息)、wchc(输出watche详细信息)、wchp(输出watcher,以znode为单位分组)、mntr(输出比stat更详细的)
开启ZooKeeper的JMX端口观察内存
-Dcom.sun.management.jmxremote.port=21811<br>-Dcom.sun.management.jmxremote.ssl=false<br>-Dcom.sun.management.jmxremote.authenticate=false
curator客户端框架
CuratorFramework client = CuratorFrameworkFactory.newClient( "localhost:2181", 5000, 3000, retryPolicy); <br>client.start();<br>
配置中心
curator的crud的操作,底层都是调用的原生zk的API
client.create()<br> .creatingParentsIfNeeded()<br> .withMode(CreateMode.PERSISTENT)<br> .forPath("/my/path", "100".getBytes());<br>
client.setData().forPath("/my/path", "110".getBytes());
client.delete().forPath("/my/path");
byte[] dataBytes = client.getData().forPath("/my/path");
监听和通知
<b><font color="#f15a23">注意:</font></b>用<font color="#31a8e0">原生的</font>zk去注册监听器的话,监听子节点或者节点自己,如果发生了对应的事件,会通知你一次,但是下一次再有事件就不会通知了,zk原生的API里,需要你每次收到事件通知之后,都需要自己重新注册watcher,但是curator就不会有这个问题
Path<br>监听节点下一级子节点的增、删、改操作<br>
<b><font color="#ff00ff">PathChildrenCache</font></b> pathChildrenCache = new PathChildrenCache(<br> client, <b><font color="#ff00ff">“/yh/config”</font></b>, true);<br> pathChildrenCache.start();
pathChildrenCache.getListenable().<font color="#f15a23">addListener(new PathChildrenCacheListener() {</font><br> public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {<br> }<br> });
cache就是把zk里的数据缓存到了你的客户端里来
针对这个缓存的数据加监听器,去观察zk里的数据的变化
Node<br>监听节点对应增、删、改操作<br>
final <b><font color="#ff00ff">NodeCache</font></b> nodeCache = new NodeCache(client, "/cluster");<br> nodeCache.start();<br>
nodeCache.getListenable().<font color="#f15a23">addListener(new NodeCacheListener() </font>{ <br> public void nodeChanged() throws Exception { <br> Stat stat = client.checkExists().forPath(<font color="#ff00ff"><b>“/yh/config”</b></font>); <br> if(stat == null) { } <br> else { nodeCache.getCurrentData(); } <br> } <br> });<br>
Tree<br>其所有的子节点操作进行监听,呈现树形目录的监听<br>
final <b><font color="#ff00ff">TreeCache</font></b> treeCache = TreeCache.newBuilder(client, <b><font color="#ff00ff">“/yh/config”</font></b>).setCacheData(true).setMaxDepth(2)<br> .build();
<b><font color="#f15a23">增加监听:</font></b><br>treeCache.getListenable().<font color="#f15a23">addListener((curatorFramework, treeCacheEvent)</font> -> {<br> //增加或修改<br> if(treeCacheEvent.getType().equals(Type.NODE_ADDED)||treeCacheEvent.getType().equals(Type.NODE_UPDATED)){<br> String value = new String(treeCacheEvent.getData().getData());<br> localCache.put(key, Optional.ofNullable(value));//加入本地缓存<br> }<br> //删除<br> if(treeCacheEvent.getType().equals(Type.NODE_REMOVED)){<br> localCache.invalidate(key);//删除本地缓存<br> }<br> });<br> // 没有开启模式作为入参的方法<br> treeCache.start();<br> }<br>
Leader选举
第一种Leader选举机制
指定的目录下<font color="#00a650">创建一个子节点,创建一个临时顺序节点</font>
获取到的子节点做一个排序,然后看看自己是不是第一个子节点
如果你发现自己不是leader,对自己上一个节点施加一个监听器
如果发现上一个节点不存在了,此时会重新再次尝试去创建一个znode,相当于是竞争成为leader
LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");<br> leaderLatch.start(); // 尝试竞争为leader<br> leaderLatch.await(); // 直到等待他成为leader再往后走,异步化,底层会调用norifyall唤醒这个行代码
第二种Leader选举机制
<font color="#00a650">分布式锁来竞争</font>成为leader的,如果说你获取到了锁,就说明你是leader,获取锁也是跟第一种一样,创建临时顺序节点
LeaderSelector leaderSelector = new LeaderSelector(<br> client,<br> "/leader/election",<br> new LeaderSelectorListener() {<br> public void takeLeadership(CuratorFramework curatorFramework) throws Exception {<br> System.out.println("你已经成为了leader......");<br> // 在这里干leader所有的事情,此时方法不能退出<br> Thread.sleep(Integer.MAX_VALUE);<br> }<br><br> public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {<br> System.out.println("连接状态的变化......");<br> if(connectionState.equals(ConnectionState.LOST)) {<br> throw new CancelLeadershipException();<br> }<br> }<br> });<br><br> leaderSelector.start();
源码
注册Watcher监听器
CuratorFramework
CuratorZooKeeperClient
zookeeperFactory
注册watcher监听器
建立连接
启动一个线程,网络连接监听
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
创建原生ZooKeeper的客户端
session过期时间
watcher,监听zk里的事件的变更,回传给curator的framework
底层的源码,就会去跟zk的一台机器建立真正的TCP长连接
之后就会进行心跳,维护一个session,发送各种请求过去给zk server
注册watcher,zk有事件变更,会通过TCP长连接,反向通知你的ZooKeeper客户端,他会回调你提供给他的watcher
Curator实现的ZK读写锁
/locks/lock/__WRITE__{很多临时顺序节点},排队等待加写锁
判断你加写锁是否成功,看一下你在/locks/lock下面的位置,如果你的写锁临时顺序节点是在/locks/lock下面的第一个
如果没有获取到锁对哪个节点进行监听
对前一个节点加监听器
/locks/lock/__READ__{很多临时顺序节点},加读锁
去找到排在最前面的写锁,如果发现排在位置=0有一个写锁,此时获取读锁就一定失败
读锁的位置排在第一个写锁的前面,就可以获取读锁
如果没有获取到读锁,监听第一个写锁
羊群效用
解决写锁羊群效用
写锁只关注自己前面的写锁
解决读锁羊群效用
所有的读锁只关注监听离自己最近的前一个写锁
集群扩容与宕机 自动感知机制
集群中加入一台机器,自动在zk中写入一个znode,临时节点,一旦节点关闭或者宕机,临时节点自动消失。由集群Master控制节点监听zk目录子节点变化,自动感知集群中节点的上线和下线
集群里的元数据存储,无非就是对znode进行crud
监听与回调,对你需要感知的znode加监听器,回调通知你
master选举,无非也都是说创建一些临时顺序节点,排在第一位的就是leade
zk的核心源码
环境
ant安装
http://ant.apache.org/bindownload.cgi
下载apache-ant-1.10.5-bin.tar.gz
配置环境变量
ANT_HOME D:\apache-ant-1.10.5<br><br>path D:\apache-ant-1.10.5\bin<br><br>classpath D:\apache-ant-1.10.5\lib
zk
https://zookeeper.apache.org/releases.html#download
下载https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/
zk集群
peer架构
每台机器都可以成为leader,可以干leader的事儿,每台机器也都可以成为follower
里面任何一个节点都可以认为是一个peer
zk的进程,就是<font color="#f15a23">QuorumPeerMain</font>进程,java命令,通过java命令启动一个jvm进程
zk是Peer-to-Peer的集群架构,里面有leader-follower的角色区分,在多个peers节点之间要选举一个Leader出来的话
quorum集群,多台机器组成了一个集群,peer,集群各个机器的角色都是一样的,能干的事儿都是一样的,他不是master-slave的架构
zk启动
如果配置了servers,就以集群模式启动
QuorumPeerMain启动
单机版启动
启动ZooKeeperServerMain类
Leader选举流程
三台机器,myid分别是0,1,2
quorum=(3/2 + 1)=2
达到了quorum的数量,此时就可以发起leader选举了
第一轮投票
myid=0的机器,投票,(0, 0),第二个0代表zxid,发送给当前集群里其他的机器
收到的票是(1,0),也就是myid=1的机器的投票
myid=1的机器,投票,(1, 0),发送给当前集群里其他的机器
收到的票是(0,0),也就是myid=0的机器的投票
优先是zxid最大的机器成为leader
默认就是让myid最大的机器成为leader,推荐(1,0)成为leader
第二轮投票
myid=0的机器,投出去(1,0),收到的票是(1,0)
票数已经达到了集群的quorum大多数了
选举就结束,确定(1,0)机器是leader
myid=0的机器,follower<br>myid=1的机器,leader
myid=2的机器,刚刚才启动,此时发现集群里已经选举出来leader了,此时自己让自己变成follower就可以了
一般来说如果你自己部署在windows上搞3台虚拟机,部署zk集群,一台一台接着启动,第二台机器往往是leader,第一台和第三台是follower,第二台是leader
myid=2的机器,(2,10)
zk里的序列化的协议是<font color="#ff00ff"><b>jute</b></font>
类似于im系统里用的protobuf
序列化,你得把类转化为字节数组/字节流,通过网络传输类对象的字节流
反序列化,到了对方收到自己流之后,就把字节转换为一个类的对象
Follower在完成连接建立之后是如何向Leader进行注册的
leanrner里面的jute序列化(serialize方法)和字节流输出的一个过程
封装一个learnerInfo对象放入QuorumPacket进行序列化为字节流发送给leader
Leader是如何处理Follower的注册请求<br>
从jute输入流读取数据
type,zxid
反序列化成对象,保存一些数据
如果follower刚跟leader连接后,会跟leader进行数据同步
数据同步完后,会接收follower的ack
后期leader会开一个线程不停的发送数据给follower
Session会话
建立连接主要说的是TCP物理连接,他其实还需要进行一些通信,建立一个Session会话
服务端会收到一个ConnectRequest请求
服务端拿到ConnectRequest请求后,jute反序列化成对象
<font color="#ff0000">session是由服务端构造开启的</font>,客户端仅仅是发送ConnectRequest请求
createSession()
1,生成唯一的sessionId(64位)
二进制位运算
1,当前时间戳,左移24位,又右移8位
2,与myid的二进制左移56位进行或运算
3,sessionId++
最后一定是唯一的
2,在几个内存数据结构中放入这个session
3,对session计算它的过期时间以及特殊处理
expireTime就是session下一次的过期时间
session分桶机制<br>分桶(tickTime)过期时间管理<br>
expirationInterval=2S间隔
zoo.cfg可以配置
分成一个个的桶,每个桶的长度就是2S,后面计算2S内有哪些session需要检测是否过期
(6/expirationInterval+1)*expirationInterval
(7/expirationInterval+1)*expirationInterval相等
达到效果:不同的expireTime得到相同的tickTime
提高管理的效率,每次处理多个session
分桶数据结构<expireTime,SessionSet>
expireTime(12:05):sessionSet(多个session)
expireTime(12:10):sessionSet(多个session)
session tracker而言就是不停的过期一个一个分桶
ping<font color="#ff00ff">心跳</font>
客户端是如何定期发送<font color="#ff00ff">Ping心跳</font>到服务端
客户端clientCnxn在run的时候会计算ping的时间
sessionTime一般会自己设置
假如120S
sendPing()客户端每隔一段时间发送ping到服务端
客户端把ping请求放到“等待发送队列”
同时唤醒底层的网络通信组件socket
底层selector监听writeable事件
服务端是如何接收和处理<font color="#ff00ff">Ping心跳</font>请求
底层selector监听readable事件
读出请求数据
1,任何的请求都会submitRequest()
2,都会touch一下session,更新他的expireTime,重新分桶
3,会交给RequestProcessor线程来进行处理
同步数据
follower写多少条数据到事务日志文件之后,会执行flush
1,返回ack给leader,会尽快的用while循环,把积压在内存队列里排队的proposal的请求全部都尽快的写入到磁盘上的事务日志里去
一旦将队列里积压的proposal都写入事务日志了,此时就可以执行flush了
2,如果你连续写入1000条数到事务日志里去,此时也会强制性执行flush以及后续的操作<br>
follower要写入1000条事务日志之后,才能进行flush以及后续的处理
定期清理磁盘上不需要使用的文件
1~10000条数据,在这个事务日志文件01里
同时写一份数据快照,包含了1~10000条事务
10001~20000条数据,在这个事务日志文件02里
同时写一份数据快照,包含了1~20000条事务
这个快照就可以恢复数据
删除包含了1~10000条事务的数据快照
删除包含了1~10000条事务的日志
删除包含了10001~20000条事务的日志
crud操作
DataTree.java
对于zk来说,核心的数据结构,并不是文件目录树的结构
而是map接口,他所有操作都是针对一个path
所有watcher都是监听dataTree
创建节点
create [-s] [-e] path data acl s表示临时 e表示序号递增
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/demo01/world");
查询节点
ls path [watch] 后面可以跟watch监听某个节点
byte[] bytes = client.getData().forPath("/demo01/world");
获取路径数据的同时,可以对这个路径加一个监听器
childWatcher可以对这个znode的子节点变化进行监听
修改节点
set path data [version]
client.setData().forPath("/demo01/world","您好啊".getBytes());
删除节点
delete path [version]
client.delete().forPath("/hello5");
递归删除
rmr path
exists
判断当前节点是否存在
watch机制
//设置节点cache<br> TreeCache treeCache = new TreeCache(client,"/");
//设置监听器 <br>treeCache.getListenable().addListener(new TreeCacheListener() {<br> public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {<br> ChildData data = event.getData();<br> if(data!=null){<br> //switch 判断各种可能性<br> switch (event.getType()){<br> case NODE_ADDED:<br> System.out.println("节点新增:路径:"+data.getPath()+"数据:"+new String(data.getData()));<br> break;<br> case NODE_REMOVED:<br> System.out.println("节点删除,路径:"+data.getPath()+"数据:"+new String(data.getData()));<br> break;<br> case NODE_UPDATED:<br> System.out.println("节点修改,路径:"+data.getPath()+"数据:"+new String(data.getData()));<br> break;<br> }<br> }<br>
如果请求发到follower,如何转发给leader
首先由nioServerCnxnFactory收到请求
进入FollowerRequestProcessor
写请求以字节流发送到leader
顺序节点是如何来处理的
后面会拼上一串递增的10位的数字
/kafka/brokers/brokers-0000000001
/kafka/brokers/brokers-0000000002
临时节点,如果你的客户端宕机了,临时节点会自动被删除掉
ephemeral,会全部删除掉
并且一定会触发相对应的watch监听器
proposal的zxid是如何创建
先检查当前session是否过期
每次创建节点,都会更新父节点的cversion值
只有leader才可以生成zxid
全局性的生成唯一的zxid,内部加了synchronized锁,,会不断的++
创建监听器
zookeeper最最核心的一块机制,监听和回调机制
对znode文件目录树的数据结构进行增删改查的操作,临时节点,顺序节点,最多只能实现让一些其他的分布式系统、大数据系统可以基于zk进行集群元数据的存储和管理
也需要其他的很多功能,协调、选举、高可用自动切换<br>
提供监听和回调的机制,只有把这个功能给实现了,此时zookeeper才是工业级的
监听器
三种监听器
gateData时传一个watcher
getChildren时传一个watcher,对子节点加一个监听器
exists可以加一个watcher
监听是否存在
在客户端也要保存一份;在zk服务端也要保存一份
watcher会放入请求中
什么时候在客户端完成注册
ZKWatchManager.java
对每一个路径,都会有一系列的监听器
getData,getChildren,exists请求先发送出去
知道这些请求成功完成了,返回响应,此时调用了finishPacket方法,才会进行监听器的注册
只有保证从服务端成功请求之后,<b><font color="#ff00ff">服务端表示已经注册监听器了,然后客户端再进行注册。</font></b>
调用zookeeper.java的register注册方法
zk服务端处理查询请求中的watch标识
watcher会放入请求中发送给服务端
对于服务端而言,跟每个客户端的连接,都是一个nioServercnxn
NIOServerCnxn实现了watcher接口
gateData()的时候
dataWatches.addWatch(path,watcher);
watcher==客户端的连接
path->对应多个wathcer
wathcer(客户端连接)->对应多个path
如果说一旦客户端的session断开,zk服务端要进行一些清理,1,删除临时节点,2,以及删除客户端注册的一些监听器
增删改事件发生的时候触发监听器
DataTree.java
内存里znode树出现变化,执行commit之后,会触发一些watcher监听器
比如对一个节点加了监听器,增删改的时候
先增删改,然后触发回调
触发当前节点监听器
dataWatches.triggerWatch()
触发子节点监听器
childWatches.triggerWatch()
假如对目录增加了一个childWatcher
触发监听器后,直接把zk服务端注册的监听器给删除掉
所以原生api里,监听器是一次性的,数据变化触发了监听器,自动会删除这个监听器
所以才用curator去连续监听,此时Curator他会在底层自动进行重新注册
服务端触发监听器的时候,回调客户端连接
childWatches.triggerWatch()以后会调用process方法
会拿到一个watcherEvent事件
里面包括,类型,状态,path
sendResponse()序列化以后,直接响应客户端
zk客户端收到watch监听通知的请求之后,如何处理
clientCnxn读取请求
收到watchedEvent事件
客户端挂掉
zk服务端<br>
会删除掉你所有的这个客户端创建的临时节点
2PC的模式去进行删除
删除了之后到commit的时候,删除了内存里的节点,一定会触发对应的监听器的
客户端加在服务端上的这些监听器都给干掉
zk服务端挂掉<br>
客户端
客户端必须去找其他的机器,leader,follower,去建立长连接
建立会话,重新把自己内存里注册的那些监听器,在新建立连接的机器上去进行一个重新的注册
服务端
三种宕机
1,follower挂掉
2,leader挂掉
3,版本升级
客户端如何感知服务器宕机
客户端发送请求或ping
会执行socket.write()方法,这时候会感知到服务端挂掉了,抛异常出去
doIO抛异常
doTransport()抛异常
catch会捕获到异常
cleanup
客户端主动断开网络连接,关闭socket
然后再关闭,socketChannel
对多有pendingqueue(已发送等待响应的请求)标记失败完成
对还没发送出去的请求(sendqueue)标记器失败
清空自己的监听器(watcher)
发布一个断开事件
clientCnxn通信组件的run方法,<font color="#f44336"><b>进行重新连接</b></font>
startconnect()
连接集群内其他的zkserver。连接下一台
连接成功后,重新进行监听器的注册
一旦默认监听器感知到说建立了新的连接,此时要重新去施加所有的监听器
follower宕机,leader是如何感知
follower是主动跟leader建立网络连接的
proposalRequestProcessor同步数据时(LearnerHandler线程),是可以感知到的
抛异常后捕获,socket关闭
挂掉一个follower,对leader的2PC过半写机制有影响吗
majority(超过集群节点个数的一半以上)
集群本身没有影响,还能用,读请求没有影响
但是新的增删改有影响,因为返回不了过半的ack,不会进行第二阶段的commit
所以要尽快的恢复follower,然后同步数据,返回ack
重启挂掉的follower
会跟leader进行数据同步,syncWithLeader()
LearnerHandler会发很多的数据,同步给follower
follower会收到一些proposal,然后返回ack
Leader宕机,各个follower会如何感知
连接在这台机器的客户端短时间内肯定会不停的重新连接
follower从leader读取数据异常,直接关掉连接
zk短时间内是灾难性的
follower感知到leader崩溃后,释放掉网络资源,执行Leaner的shutdown(),并把状态设置成looking
follower他会把所有客户端的连接全部断开,对于客户端而言的话,他会短时间内频繁报错
此时集群就需要重新进行一个leader选举
哪个follower接收到的zxid更大,或者zxid一样大,myid,两个人一定会投票给某一个人,两轮投票,结果出来
大约需要几百毫秒或者最多几秒钟就会选出来
选举出新的leader之后
follower会跟新的leader建立连接,LearnerHandler(发送请求和接受响应ack)跟他进行通信,进行数据恢复
另外,对外提供服务,所有的客户端会自动进行重连的,找到一台机器进行连接,读写请求继续执行,另外监听器重新施加
宕机的那台leader再次重启,他此时就会发现已经有leader了,此时他就是一个follower,跟leader建立长连接,然后进行数据的恢复,接着的话呢,整个集群就瞬间就可以恢复运作了
zk重启的时候,是如何加载磁盘上的数据进行恢复的
正常情况下,会把磁盘上事务日志快照重新加载到内存中
zkDataBase.java
调用loadDataBase()
他把最近的磁盘快照反序列化到内存,对快照进行回放
然后基于内存的数据与leader进行数据同步
0 条评论
下一页