nacos源码剖析-服务注册与发现
2024-03-22 08:53:27 0 举报
nacos的源码解析
作者其他创作
大纲/内容
循环从阻塞队列tasks里拿实例数据处理
DistroLoadDataTask.run()
bind(event)
NotifyCenter.publishEvent
如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册)
NamingService.registerInstance
将注册实例信息更新到注册表内结构里去
获取客户端的服务实例缓存信息
调用server的实例注册接口(HttpMethhod.POST)
serviceManager.registerInstance
否
/instance/beat
NacosRegistration
调用server的实例同步接口(HttpMethod.PUT)
从nacos官方文档上找到的客户端服务发现的代码,这里先看下底层逻辑,实际上,服务发现实在第一次调用服务接口时根绝服务名去服务端获取的,这个参看ribbon源码讲解
阿里自己实现的CP模式的简单Raft协议
scheduleUpdateIfAbsent(serviceName,clusters)
是
创建内存注册表结构
同步写实例数据到文件
NamingProxy.getAllData(targetServer)
register()
DistroConsistencyServiceImpl.init()
this.getRegistration()就是NacosRegistration的实例
服务发现
serviceManager.removeInstance
HealthCheckReactor.scheduleCheck(clientBeatCheckTask)
instance.setLastBrat(System.currentTimeMillis())
worker.process(task)
如果实例不存在重新注册(如果网络不通导致实例在服务端被下线或服务端重启临时实例丢失)
NacosNamingService.getAllInstances()
doSrvIPXT
UpdateTask
DistroProtocol
本节点是否leader
/instance
client
调用server的服务发现接口(HttpMethod.GET)
传入的参数里面又客户端的udp端口,这个时方便服务端实例有变化了通过udp方式同步给客户端
deleteIp(instance)
调用server的实例注销接(HttpMethod.DELETE)
serverProxy.sendBeat(beatInfo)
ephemeralInstances=toUpdateInstances
NacosAutoServiceRegistration
/distro/datums
如果缓存为空,调用server接口获取最新服务数据
新节点启动后,从集群其它系欸但一次性同步数据
AbstractAutoServiceRgistration
@Bean
run方法
startDistroTask()
service.srvlPs
putService(service)
最后会更新lastRefTime为当前时间
InstanceController.list
PutServiceAndInit(service)
往阻塞队列tasks里放入注册实例数据
如果某哥实例超过15秒没有收集到心跳则将它的healthy属性置为false
allInstances.addAll(persistenInstances);allInstances.addAll(ephemerallInstances);
发布服务变化事件
handleFailedTask()
源码入口
serviceRegistry.register(this.getRegistration())
start()
Server
定时获取服务端最新服务数据并更新到本地的任务
如果同步不成功重试
InnerWorker.run()
将service对应的全景实例instances写入内存注册表
/instance/list
源码精髓:很多开源框架为了提升性能会大量使用这种异步任务及内存队列操作,这些操作本身并不需要写入之后立即成功,用者种方式对提升操作性能有很大帮助
new DistroDelayTaskExecuteEngine()
dataStore.put(key.datum)
distroMapper.responsible(serviceName)
serverProxy.registerService
/distro/datum
ServiceManager.init()
NacosServiceRegistry
beatReactor.addBeantInfo
HttpClient.asyncHttpPostLarge
consistencyService.onPut
InstanceControler.register
GlobalExecutor.submitDistroNotifyTask(notifier)
raftProxy.proxyPostLarge(将注册请求转发到集群的leader节点)
notifier.run()
注册服务实例信息在集群系欸但那见同步任务
int target=distroHash(serviceName)%servers.size()
集群节点之间相互同步系欸按状态,如果有节点宕机了,集群其他节点会感知到并更新集群节点的状态,这个会影响心跳任务机器选择的计算
源码精髓:nacos这个更新注册表内存方法里,为了防止读写并发冲突,大量的运用CopyOnWrite思想防止并发读写冲突,具体做法就是把原内存结构赋值一份,操作完最后在替换回真正的注册表内存里去Eureka防止读写并发冲突的方法时注册表的多级缓存结构,只读缓存,读写缓存,内存注册表,各级缓存之间定时同步,客户端感知的及时行不如nacos
将临时的注册实例更新到cluster的ephemeralInstances属性上去,服务发现查找临时实例最终从内存里找到的就是这个属性
构造方法里开启数据同步任务
如果是持久化实例数据
用服务名称hash后对集器数取模,选择集群里的一台集器执行任务
hostReactor.getServiceInfo
GlobalExecutor.submitLoadDataTask
DistroTaskEngineHolder
indtsnce.setHealthy(false)
返回的就是注册时写入的实例属性
利用CountDownLatch实现了一个简单的raft协议写入数据的逻辑,必须集群半数以上节点写入成功才会给客户端返回成功
instanceController.beat
PushService.onApplicationEvent(ServiceChangeEvent event)
tasks.take()
DistriConsistencyServiceImpl.put(key.value)
load()
集群节点窗台同步任务
ServerStatusReporter.run()
nacos集群新节点启动时向其他系欸但拉去数据同步流程
queue.put(task)
ClientBeatCheckTask
mapConsistencyService(key)
/raft/datum/commit
1.将注册实例更新到内存注册表
DistroController.onSyncDatum
loadAllDataSnapshotFromRemote(each)
distroProtocol.sync
将新注册实例加入对应服务service的实例列表里去
同步实例信息到nacosserver集群其他节点
ProcessRunnable.run()
NacosDiscoveryAutoConfiguration
raftStore.write(datum)
BeatTask
调用server的实例发送心跳接口(HttpMethod.PUT)
定时执行任务
transportAgent.getDatumSnapshot(each.getAddress())
DistroSyncChangeTask.run()
PersistentNotifier.onEvent(ValyeChangeEvent event)
task.run()
spring-cloud-alibaba-nacos-discovery.jar里的spring.factories文件里的EnableAutoConfiguration对应NacosDiscoveryAutoConfiguration
serviceInfoMap(客户端实例缓存map)
getDisstroMapper().responsible(service.getName())
同步的发起机器就是做健康检查任务的哪台机器
延迟执行的定时任务更新客户端的服务缓存
发布事件ValueChangeEvent更新内存注册表
getServiceInfo(serviceName,clusters)
继承
service.init()
persistConsistencyService
if(instance.isEphemeral()){添加一个延时执行的定时心跳任务BeatTask}
service。processClientBeat(clientBeat)
DelegateConsistencyServiceImpl.put
阿里实现自己的ap模式的Distro协议
(DIstroDelayTaskProcessor)processor.process(task)
实现ApplicationListener接口的类spring容器启动时会调用处理事件方法
createEmptyService
ApplicationListener
synchronizer.send(server.getAddress().msg)
服务注册
getPushService().serviceChanged(this)
实现
udp方式降费变动通知给订阅的客户端
ephemeralConsistencyService
startLoadTask()
ServerStatusReporter.init()
serviceRegistry就是NacosServiceRegistry的实例
queue.take()
如果是临时实例数据
listener.onChange
onApplicationEvent
ServiceReporter.run()
收藏
收藏
0 条评论
下一页