Nacos 源码分析
2021-09-08 03:39:40 6 举报
spring boot 整合 nacos 启动流程,包括 nacos 服务发现,服务注册,服务下线,服务剔除,心跳检测,健康检查等流程
作者其他创作
大纲/内容
@PostConstruct
true
serviceInfoMap.get(key)
将注册实例信息更新到注册表内存结构中去value.getInstanceList() ---- 即 List<Instance> instanceList = Instances.getInstanceList()KeyBuilder.matchEphemeralInstanceListKey(key) --- 布尔值,是临时还是持久化节点
定时获取服务端最新服务数据,并更新到本地
NacosRegistration
实例是否为临时节点,默认为 true
这里关系到 AP ,CP
service.init()
HostReactor # getServiceInfo0
ServiceManager # removeInstance
implements
BeatTask implements Runnable
启动扫描 META-INF
DistroConsistencyServiceImpl # put
ServiceManager # getService
NacosDiscoveryAutoConfiguration
开启新线程异步执行
RaftCore #inner Notifier # run
HttpClient.asyncHttpDelete
HostReactor # updateServiceNow
是临时实例
server
NacosNamingService # registerInstance
创建一个空服务,这里创建的是一个空的没有 instance 实例的 service 结构
服务剔除
ServiceManager # registerInstance
Notifier # handle
NamingProxy # callServer
UtilAndComs.nacosUrlBase:/nacos/v1/ns
HealthCheckReactor.scheduleCheck(clientBeatCheckTask)
EurekaServiceRegistry
executorService.schedule(new font color=\"#f44336\
false
List<Instance> instances = service.allIPs(true)
@Bean
getRegistration()
peers.size() / 2 + 1
BeaClientBeatProcessor implements Runnable
@RequestMapping(\"/nacos/v1/ns/instance/list\")
clusterObj.allIPs()
服务注册
ScheduledFuture<?> future = addTask(new font color=\"#f44336\
instance.isEphemeral()
将刚创建的空壳 service(无 Instance) put 进注册表 serviceMap
解析 key 是临时实例还是持久化实例 ,根据 key 返回不同 ConsistencyService 实现类,默认是临时实例,返回 ephemeralConsistencyService 的实现类DistroConsistencyServiceImpl,而 PersistentConsistencyService 的实现类是 RaftConsistencyServiceImpl
ClientBeatCheckTask # deleteIp
先获取注册表中的 service,然后加锁
将实例写入 cluster
instance.setHealthy(true)
注册实例
InstanceController # list
详细见服务注册流程
raftStore.write(datum)
peers.majorityCount()
GlobalExecutor.submitDistroNotifyTask(notifier)
如果订阅了,就先本地缓存中获取,获取不到再去远程服务端获取;如果未订阅,则直接远程请求服务实例
putServiceAndInit(service)
for 死循环
服务发现 url :/nacos/v1/ns/instance/listHttpMethod.GET
putService(service)
添加服务实例
AbstractAutoServiceRegistration # start
获取所有的永久实例和临时实例
TODO:集群同步
AbstractAutoServiceRegistration # register
设置最后心跳时间
reqApi(UtilAndComs.nacosUrlBase + \"/instance/list\
定时任务
服务健康检查
System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()
new CountDownLatch(peers.majorityCount())
将持久化节点写入硬盘
allInstances.addAll(ephemeralInstances)
CP模式,采用 Raft 协议
往阻塞队列 tasks 中放入注册实例数据
InstanceController # deregister
spring.factories
font color=\"#9c27b0\
BeatReactor # addBeatInfo
RaftCore # signalPublish
ApplicationListener
根据命名空间,serviceName 服务名,集群名 获取服务实例
subscribe
instance == null
chooseServiceMap(namespaceId)
serviceInfoMap 是客户端实例缓存的 map
AbstractAutoServiceRegistration
reqApi(UtilAndComs.nacosUrlBase + \"/instance/beat\
AP 模式,采用 Distro 协议
如果 service 为 null,说明服务没有注册过,立马创建一个服务,然后 put 进 注册表,并且开启一条线程执行健康检查
心跳续约 url :/nacos/v1/ns/instance/beatHttpMethod.PUT
Service # allIPs
instance.setHealthy(false)
Service # onChange
从服务中获取具体实例
从注册表中获取服务
NamingProxy # sendBeat
服务注册 && 健康检查
当监听到注册任务时,会执行onChange方法
deleteIp(instance)
ServiceManager # putServiceAndInit
InstanceController # doSrvIpxt
System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()
client
服务初始化,开启线程,创建一个健康检查任务,默认 5s 做一次心跳检测
AbstractAutoServiceRegistration # onApplicationEvent
allInstances.addAll(persistentInstances)
hostReactor .getServiceInfoDirectlyFromServer
服务健康检查,默认每 5s 发送一次心跳创建一个健康检查的任务,如果某个实例超过15s没有收到心跳,则它的 healthy 属性则会被设为false如果某个实例超过30s没有收到心跳,直接剔除该实例
获取服务
addTask,异步执行
判断最后心跳时间是否超过 15s
判断当前节点是否是 leader,如果不是,转发给主节点,只有主节点才能写数据
@RequestMapping(\"/nacos/v1/ns/instance/beat\")
ServiceManager # addInstance
! isLeader()
service == null
服务发现
2、生成定时任务,等一段时间再次执行更新操作,更新客户端的服务缓存
服务剔除 url: http://127.0.0.1:8848/nacos/v1/ns/instance?+request.toUrl()HttpMethod.DELETE
notifier.addTask 任务实际就是将新增的服务实例加入阻塞队列 tasks而 notifier 本身是一个 Runnable 实现类对象,在当前公共类初始化的时候(@PostConstruct)就开启了一条异步线程,异步线程则是一个 死循环,不停的从 tasks 中循环取出服务实例在这之后只要有新的实例注册,就会调用这个 notifier.addTask 方法将实例放入 tasks 阻塞队列而后台一直死循环的异步线程将会监听到消息处理新注册到 tasks 的任务
循环从阻塞队列 tasks 中拿去实例数据并进行处理,tasks 中没有,则会被阻塞住,所以 clusterMap 是 HashMap,也能保证线程安全
NamingProxy #inner BeatTask # run
font color=\"#f44336\
hostReactor.getServiceInfo
synchronized service
再将持久化节点添加到阻塞队列,异步写入内存
this.serviceRegistry.register(getRegistration())
服务发现是第一次调用远程服务接口时,也就是说第一次请求时候,才会去获取服务,也就是懒加载
service.processClientBeat(clientBeat)
HealthCheckReactor.scheduleNow(clientBeatProcessor)
ClientBeatCheckTask # run
for instances
将服务实例放入注册表
注册逻辑到这一步 tasks.offer() 方法调用,将实例放进 tasks 阻塞队列就已经结束,剩下的任务异步完成,在DistroConsistencyServiceImpl 类初始化时,会开启一条线程 Notifier,这条线程会死循环监听 tasks 队列,真实的写入操作由这条线程完成。这使得实例写入 cluster 操作不会立刻完成,但是这种异步操作对性能有很大提升
1
chooseServiceMap(namespaceId).get(serviceName)
NacosNamingService # getAllInstances
判断最后心跳时间是否超过 30s
this.serviceRegistry
DistroProtocol # sync
DistroConsistencyServiceImpl # init
extends
InstanceController # register
new ClientBeatProcessor
NacosAutoServiceRegistration
DistroConsistencyServiceImpl #inner Notifier
serviceMap.get(namespaceId)
如果实例不存在,就根据 clientBeat 获取到的实例信息重新注册一个 instance。这里是应对如果网络不通导致实例在服务端被下架,或者服务端重启临时节点的服务实例丢失的问题
HostReactor # updateService
Service # srvIPs
HostReactor # getServiceInfo
RaftConsistencyServiceImpl# put
ServiceRegistry
自动配置类
mapConsistencyService(key)
服务注册 url :/nacos/v1/ns/instanceHttpMethod.POST
设置实例健康状态
从 nacos 服务端的注册表中获取,它是一个双重 Map
ClientBeatProcessor # run
AbstractAutoServiceRegistration # bind
NacosServiceRegistry
是否订阅,默认为 true
DistroConsistencyServiceImpl #innser Notifier # addTask
将新注册的实例存入 service 数据结构的 instance 列表中,并将原有的 instance 实例和刚注册的 instance 实例放在 List 集合中一起返回
从本地缓存中获取
心跳检测
InstanceController # beat
DistroConsistencyServiceImpl #innser Notifier # run
NacosServiceRegistry # register(NacosRegistration)
instance.setLastBeat(System.currentTimeMillis())
完成后再次将心跳任务放入线程池,实现定时心跳任务,默认每隔 5s 执行一次心跳检测
HostReactor #inner UpdateTask # run
非临时实例
等于
List<Instance> instances = cluster.allIPs(true)
0 条评论
回复 删除
下一页