nacos
2021-01-12 14:29:37 0 举报
nacos源码阅读
作者其他创作
大纲/内容
/nacos/v1/ns/instance官网api中有个ephemeral,springcloud默认为true,临时节点
移除实例信息
/nacos/v1/ns/raft/vote
往阻塞队列tasks放入注册实例数据
if (local.leaderDueMs > 0) { return; }
start()
local.term.incrementAndGet()
@PostMapping(\"/beat\")beat()
ApplicationListener
udpSocket.send(ackEntry.origin)
com.alibaba.nacos.client.naming.NacosNamingService
InstanceController.register@PostMapping
/nacos/v1/ns/distro/datumsGET
PushService#onApplicationEvent
是
raftStore.write(datum)
定时获取服务端的最新服务数据更新到客户端本地
GlobalExecutor.registerMasterElection(new MasterElection())
@BeanNacosAutoServiceRegistration
同步写数据到内存
这里有多个实现类,因为Listener里面放的是Service,所以debug到Service对应Lin的Linstener
bean初始化
sendVote()
register()
缓存为空,从servlet远程拉取
@PostConstructRaftPeerSet.init()bean初始化
继承
选举定时任务follower长时间没收到心跳就选举的定时任务每500ms执行一次
remote.term.get() <= local.term.get()
sendBeat()
getImpl().send(p)
将实例注册到内存注册表
服务注册
对除自己外的集群节点发起选举请求
RaftController
taskDispatcher.addTask(key)
注销HttpClient.asyncHttpDelete
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
GlobalExecutor.submit()
处理客户端心跳
raftCore.receivedBeat(beat)
如果远端访问的投票选举轮次小于当前服务器轮次返回当前服务器的选举结果如果单前服务器的选举结果为空,则选举本机
instanceMap.remove(instance.getDatumKey())
server.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis())
if (!isLeader())是否是领导者节点
服务状态上报定时任务每2s执行一次
模拟发送心跳自己
createServiceIfAbsent()
volatile Notifier notifier = new Notifier();
deleteIP(instance);
源码精髓nacos这个更新注册表的内存方法里,为了防止读写并发冲突,大量运用了CopyOnWrite的思想防止并发读写冲突,具体做法是把原内存结构赋值一份,操作完再替换为真正的注册表内存去而Eureka防止读写并发冲突的方法是读写多级缓存结构,只读缓存,读写缓存,内存注册表,各级缓存之间的定时同步,客户端感知实时性不如Nacos
如果临时数据ephemeralConsistencyService,springcloud默认为true
leader的心跳定时任务每500ms执行一次
定时任务
serverListManager.listen(this)
boolean subscribe 默认为true
for (final String server : peers.allServersWithoutMySelf())
每个服务的leaderDueMs都是随机的,当leader挂掉时,发起选举,term自增加1,即轮数最大的服务节点谁先发起谁就会当选leader
server
获取远程数据
HttpClient.httpGet()
peers.decideLeader(peer)
同步信息到nacos集群的其他节点
将所有选票结果放到TreeBag里面
nacos 服务集群
instance.setHealthy(false)
InstanceController.register@DeleteMapping
executor.submit(notifier)
/nacos/v1/ns/instance
@PostConstructServerListManager#init
bind()
spring-cloud-alibaba-nacos-discovery 2.1.1spring.factoriesNacosDiscoveryAutoConfiguration(自动装配原理)
cacheFile = new File(cacheDir + File.separator + namespaceId + File.separator + encodeFileName(datum.key))
if (!getDistroMapper().responsible(service.getName())) { return; } 服务名hash % nacos服务器数目 ,得到其中一台 nacos服务器,如果是自己的话,就开始检查这个服务的实例列表,如果不是就跳过。减轻每台服务器的网络通讯压力
getPushService().serviceChanged(this)
初始化时,changed一定为true读配置的节点比初始的空集合多
partitionConfig.getTaskDispatchPeriod()@Value(\"${nacos.naming.distro.taskDispatchPeriod:200}\") private int taskDispatchPeriod = 2000;优先级 yml>@Value>代码200ms
ephemeralConsistencyService
同步数据接口
/nacos/v1/ns/raft/beat
请求服务端/nacos/v1/ns/instance/beat
同步写数据到磁盘文件
源码
服务列表更新定时任务每5s执行一次
选举term每次加1,而发布内容term每次加100。100其实就是允许重新发起投票的次数,这个数字越大越安全,100这个数字已经足够大了
UpdateTask
注册
文件路径
AbstractAutoServiceRegistration
更新lastRefTime为最新时间
/beat
DistroConsistencyServiceImpl#put
选举响应
如果同步失败
allIPs.addAll(clusterObj.allIPs())
System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()
OperatorController@RequestMapping(\"/server/status\") public String serverStatus
addTask(key)往阻塞队列放入注册实例数据
延迟执行的定时任务,更新客户端的服务缓存
ServerStatusReporter.run()
心跳定时任务
NacosDiscoveryAutoConfiguration
Bean初始化
spring容器在启动的时调用onApplicationEvent
获取客户端的服务实例缓存信息this.serviceInfoMap.get(key)
DistroController#onSyncDatum
\"remove\".equals(action)
GlobalExecutor.registerHeartbeat(new HeartBeat())
重置leader存活的时间
2.初始化service
HealthCheckReactor.scheduleNow(clientBeatProcessor)
TaskDispatcher#init@PostConstruct
遍历所有server发送状态同步
DistroConsistencyServiceImpl#init@PostConstruct
onReceiveServerStatus(status);
ConcurrentHashMap
RaftCore#init@PostConstructbean初始化
内存注册节点列表
如果是持久化数据persistentConsistencyService
循环取出任务
chooseServiceMap(namespaceId)
阿里自己实现的简单cp Raft协议相比zk少了两阶段提交
发送心跳
状态判定
DelegateConsistencyServiceImpl.put()
/nacos/v1/ns/instance/list
for (Server server : serverListManager.getHealthyServers()) { if (syncAllDataFromRemote(server)) { initialized = true; return;}}
否
syncAllDataFromRemote(server)
添加ip到本地缓存map
allIPs(clusters)
TaskScheduler.run()
/list
定时调用
默认15秒,如果超过15秒没有心跳更新,设为不健康节点
if (SystemUtils.STANDALONE_MODE) { initialized = true; return; }
allInstances.addAll(persistentInstances); allInstances.addAll(ephemeralInstances);
写文件
client
this.serviceRegistry.register(this.getRegistration())
超时无心跳,注册节点状态变更
遍历集群的其他服务器
updateIpAddresses
onApplicationEvent()
serverleader
while (true) { Pair pair = tasks.take();}阻塞获取
修改ready为true
retrySync(syncTask)
选举票数最多的节点为leader
心跳检查定时任务
if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) { return; }
System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()
阿里巴巴自己实现的Distro协议
ServerListUpdater.run()
listener.onChangeServerList(servers)
服务发现就是查询实例列表/nacos/v1/ns/instance/list真正的ribbon是在第一次调用服务接口的时候根据服务名去服务端获取的
调用servlet实例注册接口post请求
那远端服务ip为选举结果
收到状态同步通知
this.serverProxy.sendBeat(this.beatInfo)
local.resetLeaderDue()
获取缓存中的实例
if (!peers.isReady())
调用服务端
实现
listener.onChangeHealthyServerList(healthyServers)
if (service == null) {putServiceAndInit(service);}
InstanceController
notifyListeners()
同时重置当前本地服务的选举时间放弃发起选举,减少选举冲突
默认30秒,如果超过30秒没有心跳更新,剔除
doSrvIPXT
nacos入口
queue.offer(key)
HealthCheckReactor.scheduleCheck(clientBeatCheckTask)
赋值到ephemeralInstances或者persistentInstances
allIPs()
更新最后心跳时间为当前时间
@Resource(name = \"consistencyDelegate\") private ConsistencyService consistencyService;
发送事件广播
@BeanNacosServiceRegistry
if (changed) { notifyListeners(); }
RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER;
f (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod())
NacosNamingService.getAllInstances
SortedBag ips = new TreeBag()
如果注册实例到达一定数量就批量同步给nacos集群的其他节点或者距离上一次同步超过多长时间批量同步数据异步批量同步
只有leader才发送心跳,follower接受心跳
this.getRegistration()
for (String config : configs)
leader
心跳维持状态所有的server
票数最多的节点是否超过半数
GlobalExecutor.submitTaskDispatch(taskScheduler)
service.processClientBeat(clientBeat)
增加选举轮次
@BeanNacosRegistration
GlobalExecutor.registerServerListUpdater(new ServerListUpdater())
this.hostReactor.getServiceInfo()
将临时的注册实例更新到cluster的ephemeralInstances属性上,服务发现查找临时实例最终从内存中找到就是这个属性
向server发起注册
NamingProxy.getAllData(server.getKey())
service.init()
instance.isEphemeral()为true时,临时实例,延迟心跳定时任务
如果instance为空,则重新注册,因为nacos使用ap模式存在内存当中,当服务端挂了或重启,实例数据就找不到了,所以要保持心跳
/nacos/v1/ns/distro/datum
Notifier.run()
putService(service)
PersistentConsistencyService
服务发现
调用server的实例同步接口PUT
if (serviceMap.get(namespaceId) == null) { return null; }
@PostMapping(\"/vote\")vote()
启动初始化
instance.setLastBeat(System.currentTimeMillis());
否,选举
把其他服务节点的注册服务实例数据添加到本地缓存
createEmptyService()
返回即时注册的实例
udpPush(ackEntry)
单机版初始化成功
/nacos/v1/ns/raft/datum
TODO may choose other retry policy.这里的重试是不断重试,默认5s一次。支付宝重试阶梯重试,第一次5s,第二次30s,直到10次不成功,记录日志人工处理
ClientBeatProcessor.run()
服务启动后延迟5s,然后每隔5秒执行一次
local.resetLeaderDue();
follower
run(){load();}
maxApproveCount >= majorityCount()
ClientBeatCheckTask.run()
this.serviceRegistry
/nacos/v1/ns/operator/server/status
0 条评论
下一页