nacos注册中心源码
2025-04-01 11:30:00 1 举报
nacos注册中心源码流程
作者其他创作
大纲/内容
toUpdateInstances = new HashSet<>(ips)ephemeralInstances或者persistentInstances = toUpdateInstances
心跳包中的key包含在本地的内存注册表中且该key在本地的内存注册表的版本大于心跳包中的版本且心跳包遍历的数据还有数据未处理完那么直接continue
同步的发起机器就是做健康检查的那台机器
distroMapper.responsible(serviceName)
service.allIPs(true)
这块代码能解决几个问题 1. 脑裂后有两个leader,恢复后旧的leader会被新选举出的leader替代 2. 其他候选者发给当前候选者返回当前候选者 3. 当前节点只能投一次票
/nacos/v1/ns/service/status
不是leader转发到leader节点
通过udp方式将服务变动推送给订阅的client从而更新客户端本地缓存的服务实例信
子流程
发布服务变化的事件
spring.nacos.discovery.register-enabled=true
GlobalExecutor.registerHeartbeat(new HeartBeat())
Notifier implements Runnable有个成员变量阻塞队列tasks
local.resetLeaderDue()重置本地节点的选举时间
更新leader信息,将remote设置为新leader,更新原有leader的节点信息(leader节点会通过心跳通知其他节点更新leader)
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS
null
value就是这个副本
1. listener是在这里添加进listeners的。2. listener是service
pushService.serviceChanged(service)
监听到事件
写入meta.properties
/nacos/v1/ns/raft/beat
每执行一次任务就减500ms直到心跳等待时间小于等于0去发送心跳
(DistroConsistencyServiceImpl)dataProcessor.processData(distroData)
@Bean
serviceRegistry就是NacosServiceRegistry实例
getRegistration()得到的就是NacosRegistration的实例
doSrvIpxt
DistroProtocol
getPushService().serviceChanged(service)
去请求remote节点(leader)获取需要更新的key对应的数据
不是null
InstanceController@PutMapping(\"/beat\")beat
NotifyCenter.registerSubscriber(notifier)
putServiceAndInit(service)
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS
GlobalExecutor.submitDistroNotifyTask(notifier)
拿到接口返回的数据更新本地缓存
加载持久化实例数据从cacheFile中读取
toBeUpdatedServicesQueue.take()
返回
local.resetLeaderDue()重置leader选举时间
如果实例不存在则重新注册(如网络不通导致实例在服务端被下线或者服务端重启导致临时实例丢失)
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) -> continue
/nacos/v1/ns/instance/statuses
更新服务注册表和内存注册表
/nacos/v1/ns/instance/list
startLoadTask()
onReceive(RestResult<String> result)
持久化实例执行addOrReplaceService(service)
tasks.take()取队列中数据
更新客户端本地缓存的服务实例数据这个方法和服务发现那边最终去更新本地缓存调用的是一个方法
向集群其他节点发起投票返回投票结果(选了那个节点)
isLeader()当前节点是否是leader
ApplicationListener
local.resetHeartbeatDue()
ClientBeatCheckTask
local.resetLeaderDue()local.resetHeartbeatDue()
for (JsonNode datumJson : datumList) {遍历更新最新的数据}
for (String deadKey : deadKeys) {删除本地有但是心跳包中没有的key}
transportAgent.getDatumSnapshot(each.getAddress())
raftCore.receivedBeat(JacksonUtils.toObj(json.get(\"beat\").asText()))
1. 如果当前节点不是leader则直接return
for (Object object : beatDatums) {遍历心跳包中的数据}
InstanceController@PostMappingregister
instance.setHealthy(false)
HealthCheckReactor.scheduleCheck(clientBeatCheckTask)
Nacos集群新节点启动时向其他节点拉取数据同步流程
distroProtocol.sync
key: ephemeral#servicename(一个服务对应的key就一个)value: 一个服务的所有临时实例
服务发现
loadAllDataSnapshotFromRemote(each)
2. 更新自己这台机器从集群别的节点同步过来的服务实例任务
sendVote()
ServerStatusReporter.run()
DistroProtocol.onReceive(distroHttpData)
BeatTask
PersistentNotifier#onEvent(ValueChangeEvent event)
根据是否临时实例来得到不同的key
true
spring初始化创建bean的时候执行init()
/nacos/v1/ns/instance/beat
用服务名hash后对机器数取模选择集群中的一台机器来执行任务
发送心跳
getDistroMapper().responsible(service.getName())
service.init()
while (true) {从队列中拿servicekey}
processServiceJson(result)
start()
删磁盘文件
删除key对应的数据先删RaftCore的属性中的值
RaftController @GetMapping(\"/datum\")get
请求处理完进入callback
peers.reset()
获取服务的所有临时实例
2. leader去发送心跳
遍历所有的节点将投票结果添加到ips中,记录下最多票数的节点及其票数
for (final String server : peers.allServersWithoutMySelf()) {遍历集群中除了自己的所有节点,向其他节点发送vote请求}
ServiceController@PostMapping(\"/status\")serviceStatus
2. 同步实例信息到nacos-server集群其他节点
最终实际就是拿ips去替换条原来Cluster中的临时/持久实例Set
ServiceManager@PostConstructinit()
EphemeralConsistencyService
/nacos/v1/ns/raft/vote
for (; ; ){ 循环的来处理队列中的数据 }
server
bind(event)
构造方法去开启数据同步的任务
DistroDelayTaskProcessor构造方法中会添加一个延迟的定时任务
返回的就是注册时写入Cluster的Set
距离最后心跳时间(sendBeat的时候更新)有没有超过默认的是15s -> 超过则healthy为false,此时还不踢掉
Nacos这种推送方式,对于zookeeper那种通过tcp长连接来说会节约很多资源,就算大量的节点更新呢也不会让nacos出现太多的性能瓶颈,在nacos中客户端如果接受到了udp消息会返回一个ACK,如果一定时间Nacos-Server没有收到ACK,那么还会进行重发,当超过一定重发时间后就不再重发了,虽然通过udp并不能保证能真正的送到订阅者,但是Nacos还有定时轮询作为兜底,不需要担心数据不会更新的情况。Nacos通过这两种手段,既保证了实时性,有保证了数据更新不会漏掉。
投票结果
spring容器启动创建RaftCore这个bean执行初始化方法@PostConstructRaftCore通过构造方法注入的那些bean也值得看下
// 更新本地节点为FOLLOWERlocal.state = RaftPeer.State.FOLLOWER;// 投给发起vote请求的那个远程节点local.voteFor = remote.ip;// 设置为新一轮的选举周期(atomic)-> 别的候选人也往当前节点发起vote的话,会返回第一次的投票结果local.term.set(remote.term.get());
需要更新的服务添加到队列
udp方式将服务变动通知给订阅的客户端
回调参数是keys对应的remote节点中的数据
是leader更新服务最新的副本数据到磁盘文件和内存
ServerListManager@PostConstructinit()
startDistroTask()
spring-cloud-starter-alibaba-nacos-discovery-2.2.5.RELEASE.jar里的spring.factories文件中EnableAutoConfiguration对应的NacosServiceRegistryAutoConfiguration
serviceInfoMap客户端服务实例缓存
queue.take()
HttpClient.asyncHttpPost
HttpClient.asyncHttpGet
putTask((Runnable) task)
定时获取服务端最新缓存数据并更新到客户端本地缓存
RaftController @PostMapping(\"/beat\")beat
没注册过
非leader节点接收到leader的心跳之后重置leader选举等待时间重置心跳等待时间
1. 这个类有个init方法看下,用于设置metadata的。2.NacosRegistrationCustomizer这个可以看下Springboot那边,最后执行customizer.customize(registration),这也算是一个扩展顶吧,自定义一下registration
/nacos/v1/ns/distro/datum
PersistentConsistencyServiceDelegateImpl
发布准备注册事件InstancePreRegisteredEvent
synchronized (service) { 同一服务内的实例串行 不同服务的实例并行}
task.run()
new InnerWorker(name).start()
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty(\"term\
发起请求那台机器的服务的checksum和目标机器(接受请求的机器)的该服务的checksum比较不一样的话就要更新服务实例
peers.makeLeader(remote)
peers.decideLeader(peer)
/nacos/v1/ns/distro/datums
@PostConstruct
PushReceiver.run()
distroHash(serviceName) % servers.size()
InstanceController@GetMapping(\"/list\")list
serviceManager.removeInstance
1. 将注册实例更新到内存注册表将副本这个临时变量本地缓存
源码精髓:很多开源框架为了提高操作性能回大量使用这种异步任务及内存队列操作,这些操作本身并不需要写入之后立即成功,用这种方式对提升操作性能有很大帮助
新节点启动后从集群其它节点一次性同步数据
接收远程节点的投票请求返回本地节点的投票信息
servers
NacosDelayTaskExecuteEngine.processTasks()
构造方法
从请求拿参数、组装回Instance
mapConsistencyService(key)
PersistentNotifier - publisher发布ValueChangeEvent事件后执行onEvent方法可以看一下配置中心,具体怎么发布接收我画了个流程
synchronized (service) {锁各自的服务,不同的服务并发执行,相同的服务串行添加实例}
将array放入数据包再用gzip压缩最终发给所有follower
DistroTaskEngineHolder
implements
子流程将实例添加到对应的service中
spring容器启动发布事件
在服务发现的时候client会将udp端口号发送给server
如果是持久实例
raftStore.delete(deleted)
DistroController@PutMapping(\"/datum\")onSyncDatum
HttpClient.asyncHttpPostLarge
getPushService().serviceChanged(this)
DistroLoadDataTask.run()
NacosAutoServiceRegistration
cp架构raft协议下写数据要往leader写
定时任务执行集群节点的心跳任务
new Service() -> 构建一个新的Service
设置注册过的标记
推送服务变动给订阅的客户端
/nacos/v1/ns/instance
/nacos/v1/ns/raft/datum/commit
RaftController@PostMapping(\"/datum/commit\")onPublish
for (Datum datum : datums.values()) {将key和对应的版本放入element中,最终添加到array}
遍历当前机器的所有服务实例和来源机器拿到的服务实例对比更新healthy
deleteDatum(deadKey)
定时任务执行集群leader选举的任务
发布注册完成事件InstanceRegisteredEvent
一次心跳最多更新50条数据
hostReactor.processServiceJson(pushPacket.data)
拿到心跳包中的远程节点peer的数据如果远程节点不是leader则直接抛错本地节点的选举周期大于远程的选举周期则抛错
NamingService可以认为就是client
添加到阻塞队列tasks中
if (local.state != RaftPeer.State.FOLLOWER) {本地节点不是FOLLOWER,则设置本地节点为FOLLOWER投票给remote节点}
service.processClientBeat(clientBeat)
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet())
local.resetLeaderDue()
Registration
notifier.run()
service.srvIPs(List<String> clusters)
PushReceiver
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() = timestamp)) {}本地内存注册表包含该key或者不包含该key但是本地的版本小于心跳包中的版本则添加到batch中,后面去批量更新数据
距离最后心跳时间(sendBeat的时候更新)有没有超过默认的是30s -> 超过则踢掉
false
这个时间戳可以看成是datum的版本值越大说明版本越新
register()
UpdateTask
源码精髓:nacos这个更新注册表内存方法里,为了防止读写并发冲突,大量的运用额CopyOnWrite思想防止并发读写冲突,具体做法就是把原内存结构复制一份,操作完最后再替换回真正的注册表内存去。Eureka防止读写并发冲突用的方法是注册表的多级缓存结构,只读缓存,读写缓存,内存注册表,个及缓存之间定时同步,客户端感知的及时性不如nacos。
服务注册
每收到一次投票响应就会去进行leader的选举 半数以上选择一个节点则该节点就是leader
1. 同步写数据到磁盘文件
发起集群leader投票
是
立即开启一个任务CLientBeatProcessor更新当前发送心跳的实例的最后心跳时间
重置选举时间15到20s的随机值
/nacos/v1/ns/raft/datum
@ComponentDistroConsistencyServiceImpl
udp交互当服务变更后主动推送更新集群中别的机器的服务实例
执行任务
DistroSyncChangeTask.run()
instances是副本操作完后的新的实例列表
addTask子流程task是DistroSyncChangeTask
HeartBeat.run()
NacosDelayTaskExecuteEngine.ProcessRunnable#run
udpPush(ackEntry)
batch.add(datumKey)
load()
只订阅不注册当前服务则直接结束
removeTask(taskKey)
new DistroExecuteTaskExecuteEngine()new TaskExecuteWorker
执行更新服务的线程
源码入口
设置选举周期拿meta.properties中的选举周期值
ServiceUpdater.run()
RaftCore.init()
if (instance.isEphemeral()) { 添加一个延时执行的 定时心跳检测任务}
远程节点的选举周期小于等于本地节点
restful风格
this.serviceRegistry.register(getRegistration())
写数据请求转发给leader
/nacos/v1/ns//raft/datum
source就是leader节点
集群节点之间相互同步节点状态,如果有节点宕机了,集群其他节点会感知到并更新节点的状态,这个会影响心跳任务机器选择的计算
阿里自己实现的CP模式下的简单raft协议
UpdatedServiceProcessor.run()
将最新的副本实例数据写入磁盘文件并更新服务注册表
instance.setLastBeat(System.currentTimeMillis())
hostReactor.getServiceInfo
Service添加到服务注册表font color=\"#ff0000\
2. 再异步更新服务注册表的数据
Datum<Instances>构建对象
MasterElection.run()
instances:操作副本后的新的实例列表后面操作的都是这个副本用这个副本去替换旧的实例列表
ips就是需要去添加的实例
注册服务实例
this.executorService.execute(this)
beatDatums遍历结束了遍历receivedKeysMap如果value是0那么添加到需要删除的list中
定时执行服务健康检查任务
deleteIp(instance)
udpSocket.receive(packet)
如果缓存信息为空则调用server的接口更新最新的服务数据
putService(service)
进入回调方法根据其他节点的投票结果去进行leader的选举
AbstractAutoServiceRegistration
worker.process(task)
重置心跳时间(5s)
leader发送commit请求给集群其他节点返回ok则说明follower节点写入成功
PushService.onApplicationEvent(ServiceChangeEvent event)
调用server的实例同步接口
GlobalExecutor.registerMasterElection(new MasterElection())
(DistroDelayTaskProcessor)processor.process(task)
否每次去投票都会重置当前节点的选举时间
子流程添加实例到副本
raftStore.write(datum)
子流程临时实例
if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) { return; }
集群选举数据清掉
这个类的继承关系可以看下
RecordListener
调用server的服务发现接口传入的参数里有客户端的udp端口,这个是方便服务端实例有变化了通过udp方式同步给客户端
最后会更新lastRefTime为当前时间
dataProcessor.processSnapshot(distroData)
Compare and get new instance list.构建一个当前服务的实例列表的副本,根据action来操作这个副本instanceMap,最后返回的是新的实例列表拿内存注册表(DataStore->Datum)中的数据(也就是上次的操作后的新的实例列表)这里就解决了我的一个问题:第一次注册实例添加到tasks队列,异步线程还没处理完这个任务第二次注册实例如果是从服务注册表去copy出一个副本,那么就会丢失数据这里copy的是内存注册表(上次操作完后的副本 - 也就是最新数据)就解决了这个问题
初始running的值AtomicBoolean(false)
获取客户端的服务实例缓存信息
1. 服务实例集群间同步任务发起请求
每执行一次任务会减500ms直到这个随机的选举时间=0则会去开启头投票进行选举集群每台机器在真正选举前都会等待一个15到20s的随机时间谁先完成等待谁就是候选者就会去发起投票如果多个节点随机时间相同都是候选者且最终投票一样多那么就会开启下一轮的选举直到选举出leader
InstanceController@DeleteMappingderegister
发布事件删除服务注册表和内存注册表数据
// 选举周期加1local.term.incrementAndGet();// 投票给自己 local.voteFor = local.ip;// 成为候选者 -> 候选者去发起投票local.state = RaftPeer.State.CANDIDATE;
if (batch.size() < 50 && processedCount < beatDatums.size())-> 直接continue
allInstances.addAll(persistentInstances) allInstances.addAll(ephemeralInstances)
instance.setHealthy(valid)
阿里自己实现的AP模式的Distro协议
如果是临时实例
先请求来源机器拿到该服务实例信息
element.put(\"key\
利用CountDownLatchs实现了一个简单的raft协议写入数据的逻辑,必须集群半数以上节点写入成功才会给户端返回成功(具体可以看下源码怎么写的哦,还是有点意思的)
同步副本数据给集群其他节点
NacosNamingService.getAllInstances()
Nacos-server其实就是一个springboot的web应用调用server暴露出来的实例注册接口(HttpMethod.POST)
重置心跳时间
queue.put(task)
onReceive(RestResult<String> result)
遍历所有临时实例
NacosServiceRegistry
子流程持久实例
msg里存的是service的checksum
集群节点状态同步任务
extends
sendBeat()
raftStore.write(newDatum)
!this.running.get()
如果本地节点没有投过票那么就投票给自己,如果投过票了那么就返回上次的投票结果
client
延时执行的定时任务更新客户端的服务缓存数据
ServiceReporter.run()
onApplicationEvent
票数超过总节点的一半则该节点就设置为leader
deadKeys.add(entry.getKey())
同步的发起机器发送同步信息到集群中的别的机器
raftStore.updateTerm(local.term.get())
RaftController @PostMapping(\"/vote\")vote
NacosRegistration
0 条评论
下一页