Nacos注册中心源码
2025-03-18 15:43:54 1 举报
Nacos服务注册和服务发现源码流程图
作者其他创作
大纲/内容
从LinkedBlockingQueue队列中获取注册实例
调用sendVote方法进行投票选举
很多开源框架为了提升系统性能,都使用了这种异步操作,这些操作本身并不需要写入之后立即返回成功,直接保存数据即可返回,然后后台异步处理,对提升系统性能有很大帮助
@Bean
Notifier
创建MasterElection对象,添加选举任务
调用init方法做一些初始化工作
服务发现
调用init方法
调用list方法获取实例列表
调用addClient方法
遍历所有的投票节点,找到旧的主节点,异步发送请求获取旧主节点的数据
传入
如果不存在或者上次刷新时间未达到阈值,则调用updateServiceNow方法更新本地注册表缓存
如果实例没有被监听则结束,否则调用Notifier的addTask添加数据变更任务
有旧服务缓存,则更新,没有则添加
根据配置数据构造Properties对象
遍历所有的实例,实例没有被标记并且距离上次收到心跳的时间已经过去了30s,则调用deleteIP方法删除不健康的过时实例
根据Properties对象通过构造参数反射实例化NacosNamingService对象
当新注册的实例数量达到了批量同步数阈值【1000】时 或者距离上一次同步时间已经过了 2000 毫秒,开始批量同步
调用NamingProxy的registerService方法进行注册
调用onPut方法,将注册实例更新到内存注册表
主节点会通过心跳通知其他节点更新主节点信息,不用再投票选取主节点
遍历所有的服务实例,找到发送心跳的实例,更新收到心跳的时间
组装确认数据,调用DatagramSocket的send方法向服务端发送回执数据
调用reqAPI方法
设置投票批次,投票给自己,角色是候选人
发送数据会被Receiver的run方法接收并处理
获取集群的健康检查类型,获取对应的HealthCheckProcessor处理器,调用process方法进行实际的处理
获取实例所在的集群,再获取该集群中的所有服务实例
如果当前实例所在的集群对象不存在,则创建并初始化,初始化主要是设置健康检查任务。如果是不健康的实例则移除
遍历所有的实例,如果距离上次收到心跳的时间已经过去了15s,并且实例没有被标记,则说明可以更新实例的健康状态
遍历每个实例数据,判断是否是临时实例
初始化一个使用单个线程工作的无界队列线程池newSingleThreadExecutor
定时拉取服务端最近数据更新本地注册表缓存
设置最新的同步调度时间戳
创建一个超过半数服务节点的CountDownLatch对象
对服务对象加同步锁,调用removeInstance方法移除实例
makeLeader方法更新当前主节点和旧主节点数据
第二步,获取实例对应的Datum并判空,如果为空说明已经同步结束了,将其从同步进程集合中移除
如果是临时实例,会调用BeatReactor的addBeatInfo方法添加一个BeatTask发送心跳任务,默认延迟5s执行
如果没有选举出主节点则主节点标识为空,继续投票选举
异步执行MasterElection的run方法
调用processServiceJSON方法更新本地注册表缓存
调用deregister方法删除服务实例
如果不仅只发送心跳,则构建全量数据,压缩转换成二级制数据后异步发送给所有从节点
服务健康检查
初始化BeatReactor对象
如果实例没有被监听则返回,否则调用Notifier的addTask方法添加数据变更的任务
加可重入锁单线程处理最新的数据
等成功的返回结果达到能选举出主节点时,调用RaftPeerSet的decideLeader方法选举主节点
RaftCore
调用ConsistencyService的put方法持久化实例数据,写入内存注册表
调用DatagramSocket的receive方法接收数据
处理新增的实例和过时的实例后将结果覆盖原有的内存注册表
如果实例未被标记,并且不是健康状态,则设置为健康状态
更新实例的健康状态为不健康,发布ServiceChangeEvent事件和InstanceHeartbeatTimeoutEvent事件
从本地注册表缓存中获取对应服务的ServiceInfo
receivedBeat方法接收主节点的心跳数据
遍历所有需要同步的实例数据,如果相关的实例已经在同步进程中了则移除该实例,然后添加一个实例同步任务异步执行
在当前集群中更新新主节点的数据
获取发送心跳的实例,如果不存在,可能是服务重新启动导致基于内存的实例数据都被清空了,需要调用ServiceManager的registerInstance方进行重新注册
将数据解析成PushPacket对象,调用HostReactor的processServiceJSON方法存储或者更新本地注册表缓存
在刷新应用程序上下文并准备好Web服务之后会回调AbstractAutoServiceRegistration的onApplicationEvent方法
获取对应的服务,调用PushService的addClient方法添加客户端推送
阿里基于CP模式实现的简单的Raft协议
请求地址是/v1/ns/raft/datum/commit,会被对应服务的RaftController的onPublish方法接收并处理
服务注册
Nacos使用这种认为半数以上的节点返回成功则同步数据成功的方法来代替分布式系统中两阶段提交的同步方式
调用该类的bind方法
调用doSrvIPXT方法
初始化FailoverReactor对象,并初始化newSingleThreadScheduledExecutor线程池
初始化HostReactor对象
调用HealthCheckProcessorDelegate的process方法处理
再添加一个心跳任务,循环执行心跳发送任务
调用DistroConsistencyServiceImpl的onPut方法,将新注册实例更新到内存注册表中
如果对应的Service不存在,则调用createEmptyService方法,再调用createServiceIfAbsent方法创建并验证
初始化一个ClientBeatProcessor客户端心跳处理程序,调用HealthCheckReactor的scheduleNow方法,异步执行该程序任务
初始化NamingProxy对象
调用addInstance方法,添加服务实例
调用ArrayBlockingQueue的offer方法,将实例数据添加到有界阻塞队列
组装请求数据,调用reqAPI方法
异步执行HeartBeat的run方法
防止收到的数据与主节点发送的数据不一致的逻辑处理从节点同步数据发生在两个时机:1、数据写入时,会发送新增的数据2、主节点发送心跳时,会发送压缩过的全量数据
onPublish
获取当前服务节点,如果收到投票的批次落后于自己的批次,说明收到的是无效的投票数据,将票投给自己
@Autowired
初始化ScheduledThreadPoolExecutor线程池
否则调用submit方法进行重试
异步执行ClientBeatCheckTask的run方法进行心跳检查
删除旧数据
spring.factories
从ArrayBlockingQueue中获取元素
调用RaftStore的loadDatums方法加载持久化数据,主要是通过调用readDatum方法来实现的,再添加一个数据变更的任务
否则重置主节点过期时间,投票给收到的节点,并将自己的角色设置为跟随者,同步投票批次
调用PushService的serviceChanged方法
遍历集群中的每个节点,创建SyncTask同步任务对象,设置需要同步的实例和同步的目标服务地址
组装数据,调用onPublish方法持久化到磁盘
异步执行TaskScheduler的run方法
调用addTask方法,添加UpdateTask定时任务定时拉取服务端最新数据的定时任务
调用getRegistration方法,获取上面注入的NacosRegistration
添加一个每隔 5s 执行的SwitchRefresher添加一个每隔 一天 执行的DiskFileWriter添加一个延迟 10s 执行的故障转移备份任务
获取当前节点,如果不是主节点并且是集群模式则结束,因为只有主节点才能发送心跳
调用register方法注册实例
将服务消费者添加到集合中,之后如果检测到服务提供者出现异常,会基于 UDP 协议推送更新给服务消费者
服务集群未准备好,或者主节点过期时间还未到,则不需要选举
receivedVote方法接收投票并处理
onApplicationEvent方法监听服务变更事件
调用substractIpAddresses方法移除实例
初始化PushReceiver对象
发送异步请求删除过时的不健康实例,请求地址v1/ns/instance,请求会被InstanceController的deregister方法处理
异步执行ClientBeatProcessor的run方法
需要批量处理的数据达到阈值,则异步向主节点获取该数据的最新数据
sendVote方法发送投票请求
调用NamingProxy的sendBeat方法发送心跳
调用callServer方法发送心跳,请求地址是/v1/ns/instance/beat
调用TaskDispatcher的addTask方法
调用DatagramSocket的receive方法接收服务更新数据
decideLeader方法决定主节点
如果是新数据,则写入磁盘文件,保存到内存中
调用DelegateConsistencyServiceImpl的put方法覆盖本地注册表中的实例
RaftPeerSet
发送数据会被PushReceiver的run方法接收并处理
继承了ApplicationListener,监听了WebServerInitializedEvent事件
在当前集群中更新旧主节点的数据
收到的心跳不是主节点发送的或者是过时的心跳数据,抛异常
创建HeartBeat对象,添加心跳任务
NacosServiceRegistry
如果是永久实例,则调用RaftConsistencyServiceImpl的put方法
调用Cluster的updateIPs方法,更新内存注册表数据
超过最大重试次数则返回,否则调用DatagramSocket的send方法发送更新数据
调用AbstractApplicationContext的publishEvent方法发布事件,会被PushService的onApplicationEvent监听被处理
遍历所有的服务节点,发送异步请求将数据分发到其他节点,成功则更新同步器
添加一个延时10s执行的重试任务
主节点和发送心跳的节点不一致,则更新成收到心跳的节点
网络抖动是分布式系统不可避免的问题,所以一定要考虑到重试和补偿策略
signalPublish
获取实例所在的服务、服务名和命名空间,添加一个延时1s执行的任务,基于UDP协议推送更新给服务消费者
发布实例预注册事件,调用该类的register方法,发布实例注册事件,CAS设置启动状态
调用ServiceRegistry的register方法,这里的ServiceRegistry就是之前注入的NacosServiceRegistry
是临时实例,则调用DataStore的put方法保存实例到本地注册表
解析数据,拿到之前发送的数据,计算从发送数据到收到回执所花费的时间
之后发送心跳时就可以更新各个节点的主节点信息,不用再投票选举
检测到服务提供者异常,基于UDP协议推送更新数据
PushService
获取所属的服务,调用Service的processClientBeat方法处理客户端心跳
调用mapConsistencyService方法获取对应的持久化服务类
上次刷新时间超过阈值,则调用refreshOnly方法刷新注册表缓存
判断目标服务是否在健康的集群列表中,如果不在则忽略任务,将实例信息从同步进程中移除
异步执行BeatTask的run方法,发送心跳
初始化一个实例集合对象,保存刚才添加的实例
调用callServer方法进行注册,请求地址是/v1/ns/instance
如果当前不是主节点,因为只有主节点才能写数据,所以调用RaftProxdy的proxyPostLarge方法将注册请求转发到主节点上
获取当前服务节点,清空主节点信息和所有节点的投票信息
初始化EventDispatcher对象
遍历需要推送的客户端,即服务消费者,过滤掉僵尸节点,封装数据,调用udpPush方法,推送更新数据
组装实例数据返回
获取对应的责任线程,添加一个任务,将注册实例暂存入LinkedBlockingQueue队列
获取收到的新服务信息和本地缓存中对应的旧服务信息
NacosRegistration
CAS设置端口号且只设置一次,调用该类的start方法
调用getServiceInfo0方法从本地的注册表缓存中获取服务信息
Receiver的run方法接收客户端发送的回执信息
初始化ScheduledThreadPoolExecutor线程池,并添加一个每隔 30s 刷新服务列表的定时任务,添加一个每隔 5s 执行注册的定时任务,并都先执行一次
第一步,检查服务器列表是否为空
NacosDiscoveryAutoConfiguration
阿里基于AP模式实现的Distro协议
异步向其他服务节点发送投票请求,请求会被receivedVote方法处理
这里比较粗暴,还未开发完成,可以精细化重试策略
调用updateIpAddresses方法
调用Service的srvIPs方法获取最新的服务实例,然后调用Selector的select过滤合格的实例
有一个ApplicationContext属性和被@PostConstruct修饰的生命周期回调方法,主要是处理配置的metadata数据
调用putServiceAndInit方法将服务添加到双层Map集合中
如果需要持久化数据,则调用RaftStore的write方法,写入磁盘文件
调用updateIPs方法
DistroController
调用updateIpAddresses方法更新实例
调用Notifier的addTask方法添加数据变更的任务
将请求体解析为Instance实例对象,获取实例所在的服务,调用ServiceManager的removeInstance方法移除实例
注册请求会被InstanceController的register方法处理
InstanceController
Nacos更新内存注册表数据的方法里,为了防止读写并发冲突,大量的使用了CopyOnWrite思想,具体做法就是把原内存结构数据复制一份,操作完副本后再替换掉原来的注册表数据。Eureka防止读写并发冲突用的方法是用了注册表的多级缓存结构,只读缓存和读写缓存,各级缓存之间定时同步,客户端感知的时效性不如Nacos
调用sendBeat方法发送心跳
init生命周期回调方法初始化
异步执行UpdateTask的run方法
集群数据同步方案,等数据积攒到一定数据再同步,节省网络带宽,提高同步效率
心跳检测
如果缓存为空,则调用updateServiceNow方法更新本地注册表缓存
Nacos有两种模式的实例,一种是基于内存的临时实例,一种是基于磁盘的永久实例,分别对应CAP模式中的AP和CP
带Properties对象的构造方法会调用该类的init方法
NacosDiscoveryProperties
如果不仅只发送心跳,则同步主节点的数据
遍历每个服务集群,关联创建的服务,并调用init方法进行初始化
服务集群未准备好,或者当前节点心跳时间还未过期,则不需要发送心跳
将请求数据解析为Instance对象,设置最近收到心跳的时间点,然后调用ServiceManager的registerInstance方法进行注册
onSyncDatum方法
调用Service的onChange方法处理数据变更的行为
调用EventDispatcher的serviceChanged方法,添加一个事件,通知事件订阅方
调用scheduleUpdateIfAbsent方法
调用NamingProxy的queryList方法获取最新的服务信息
NacosAutoServiceRegistration
收到投票的节点,原则上只要不是无效的投票都会投给收到的节点,无效才投给自己
初始化DatagramSocket对象初始化ScheduledThreadPoolExecutor线程池,执行PushReceiver的任务,准备接收服务端推送的服务更新数据,并发送确认报文
第三步,发送请求同步数据,请求地址v1/ns/distro/datum,请求会被DistroController的onSyncDatum方法处理
心跳请求会被InstanceController的beat方法处理
调用beat方法进行心跳检测
如果因为网络原因或者节点原因同步失败了,会调用retrySync进行重试机制,成功则清空同步进程集合中的所有相关实例数据
保存投票信息,统计得票最多的节点,如果票数超过半数节点,则将其置为主节点,并发布主节点选举完成事件
异步执行DistroConsistencyServiceImpl的Notifier的run方法
最多等待5s,同步辅助器门闩没有打开说明半数以上的服务节点没有返回成功
如果实例所在的服务不存在,则创建
调用DataSyncer的submit方法同步实例信息
调用RaftPeerSet的makeLeader方法更新主节点
异步执行PushReceiver的run方法
异步执行HealthCheckTask的run方法进行集群服务检查
调用RaftCore的signalPublish方法
自动配置的类
临时实例会初始化一个Datum对象,添加实例数据并保存到DataStore中,后面同步需要
getServiceInfo
调用HealthCheckReactor的scheduleCheck添加定时健康检查的心跳任务,延时 5s 执行,之后周期性过 5s 执行
NacosNamingService
如果是临时实例,则调用DistroConsistencyServiceImpl的put方法
HostReactor
0 条评论
下一页