nacos服务注册流程调用
2021-08-20 15:01:05 0 举报
AI智能生成
nacos服务注册流程调用nacos服务注册流程调用nacos服务注册流程调用nacos服务注册流程调用nacos服务注册流程调用nacos服务注册流程调用nacos服务注册流程调用
作者其他创作
大纲/内容
nacos客户端
SpringBoot应用启动,发布WebServerInitializedEvent事件
AbstractAutoServiceRegistration(其实现类是NacosAutoServiceRegistration)监听到该事件
onApplicationEvent()
bind()
start()
register()
NacoServiceRegistry#register()
获取namingService
实例化NacosNamingService
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, this.beatReactor, this.cacheDir, this.isLoadCacheAtStart(properties), this.initPollingThreadCount(properties)); 实例化HostReactor
this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, this.beatReactor, this.cacheDir, this.isLoadCacheAtStart(properties), this.initPollingThreadCount(properties)); 实例化HostReactor
实例化HostReactor
this.pushReceiver = new PushReceiver(this); 实例化PushReceiver()
this.udpSocket = new DatagramSocket(); 实例化DatagramSocket,也就是客户端建立UDP接收端
解释:客户端向服务端获取服务列表时,服务端会接收到客户端的UDP信息,以后当服务端更新的时候会把更新信息推送给客户端
PushReceiver是一个线程
run()
udpSocket.receive(packet); 接收服务端传输的信息
// 更新本地缓存
hostReactor.processServiceJson(pushPacket.data);
hostReactor.processServiceJson(pushPacket.data);
// 消费者向服务端发送ack
udpSocket.send()
udpSocket.send()
总结:客户端(消费者),基于udp协议监听服务端推送来的信息, 一旦接收到服务端传来的数据,就会更新消费者的本地缓存。
定义一个UpdateTask线程任务,后面客户端向服务端请求 服务列表 会用到
HostReactor#getServiceInfo(),向服务端请求服务列表
// 默认每隔10秒向服务端发起一次短轮询,用于查询服务的实例列表
// (注:10秒,来自服务端的相关属性的值,非客户端本地默认值)
scheduleUpdateIfAbsent(serviceName, clusters);
// (注:10秒,来自服务端的相关属性的值,非客户端本地默认值)
scheduleUpdateIfAbsent(serviceName, clusters);
开启了UpdateTask线程任务
updateService()
发送http请求:/instance/list
获取serviceId、group
将服务相关的信息封装成Instance
registerInstance(),注册实例
封装心跳信息BeatInfo
定义线程任务,发送http心跳请求(jdk的HttpClient)给nacos服务端
serverProxy.registerService();发送http请求(POST方式)给nacos服务端,注册实例
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
while(i < servers.size()) {
String server = (String)servers.get(index);
try{
return this.callServer(api, params, body, server, method);
}catch () {
index = (index + 1) % servers.size();
++i;
}
}
站在客户端角度,服务端集群中的任意节点都是对等的,客户端请求集群的节点都是随机的;
如果请求失败则换一个服务端节点重新发送请求
int index = random.nextInt(servers.size());
while(i < servers.size()) {
String server = (String)servers.get(index);
try{
return this.callServer(api, params, body, server, method);
}catch () {
index = (index + 1) % servers.size();
++i;
}
}
站在客户端角度,服务端集群中的任意节点都是对等的,客户端请求集群的节点都是随机的;
如果请求失败则换一个服务端节点重新发送请求
DistroFilter#doFilter(),拦截器
if (method.isAnnotationPresent(CanDistro.class) && !distroMapper.responsible(groupedServiceName))
集群的任意一个节点都存储所有数据,但每个节点只负责其中一部分服务,在接收到客户端的“写”(注册,心跳,下线等),
服务器节点判断请求的任务是否为自己负责,如果是,则处理;否则交由负责的节点处理
集群的任意一个节点都存储所有数据,但每个节点只负责其中一部分服务,在接收到客户端的“写”(注册,心跳,下线等),
服务器节点判断请求的任务是否为自己负责,如果是,则处理;否则交由负责的节点处理
DistroMapper#responseible()
// 搞这么复杂的操作,而不是用一个随机数来随机选择服务器来处理请求,
// 是因为如果采用随机数,那么会存在服务端数据同步延迟的效果。
// 例子:假如A服务器原本有UserService,但是B服务器还未同步过来,
// 当请求被随机分到B上面时,那么就无法拿到UserService了
int index = servers.indexOf(ApplicationUtils.getLocalAddress());
int lastIndex = servers.lastIndexOf(ApplicationUtils.getLocalAddress());
if (lastIndex < 0 || index < 0) {
return true;
}
// 这个算法有缺点,例子:如果服务器数量变少了,假设A服务器挂了,那么所有的请求
// 都要重新计算hash,看看应该将请求分配到哪个服务器上面
// 应该采用一致性hash算法比较好
int target = distroHash(serviceName) % servers.size();
return target >= index && target <= lastIndex;
// 是因为如果采用随机数,那么会存在服务端数据同步延迟的效果。
// 例子:假如A服务器原本有UserService,但是B服务器还未同步过来,
// 当请求被随机分到B上面时,那么就无法拿到UserService了
int index = servers.indexOf(ApplicationUtils.getLocalAddress());
int lastIndex = servers.lastIndexOf(ApplicationUtils.getLocalAddress());
if (lastIndex < 0 || index < 0) {
return true;
}
// 这个算法有缺点,例子:如果服务器数量变少了,假设A服务器挂了,那么所有的请求
// 都要重新计算hash,看看应该将请求分配到哪个服务器上面
// 应该采用一致性hash算法比较好
int target = distroHash(serviceName) % servers.size();
return target >= index && target <= lastIndex;
nacos服务端
InstanceController#register(),处理服务注册
获取namespaceId、serviceName
从request中解析得出Instance
serviceManager.registerInstance()注册实例
如果没有服务,则创建一个空服务
获取服务
如果服务为空,则创建一个空服务
如果入参cluster不为空,则与服务绑定起来
检查客户端的集群
putServiceAndInit(),缓存、初始化服务
putService(),缓存服务
serviceMap.put(),把一个空map放到serviceMap里面。该serviceMap的格式为【Map(namespace, Map(group::serviceName, Service))】
serviceMap.get().put(),根据key把那个空map拿出来,再往里面放服务
service.init(),初始化服务
定义线程任务,检查客户端是否健康
Cluster.setService(),绑定服务
Cluster.init(),初始化集群
定义线程任务检查集群是否健康
线程ClientBeatCheckTask#run()
DistroMapper#responsible();该方法的作用是判断一个请求由服务端集群中的哪个节点负责
若当前事件-上一次客户端发送心跳的事件 > 15s,则标记该实例为不健康
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent());发布一个实例超时的事件
对于不健康的服务实例,会发布一个ServiceChangeEvent事件。
解释:这时PushService会监听到ServiceChangeEvent事件,进而会调用PushService#onApplication()->最终会调用
udpPush(ackEntry),所以会以udp方式发送信息给消费者(nacos客户端)
udpPush(ackEntry),所以会以udp方式发送信息给消费者(nacos客户端)
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent());发布一个实例超时的事件
再过15s后,服务端仍没有收到该客户端发送心跳信息,则剔除该实例
consistencyService.listen(),AP模式(临时的),监听服务
consistencyService.listen(),CP模式(非临时的),监听服务
如果是CP模式,则持久化服务
获取服务
将服务service与实例instance绑定起来
用KeyBuilder生成service的唯一标识key
获取服务
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);拿到service下的所有实例
将若干个新的instance与旧的instance合并起来
若服务的集群里面没有实例所在的集群,则初始化客户端的集群
封装一个Instances,绑定instanceList
consistencyService.put(key, instances);一致性服务
采用DelegateConsistencyServiceImpl去判断采用哪个一致性服务的实现类
AP模式(临时性),则采用DistroConsistencyServiceImpl实现类
实例化DistroConsistencyServiceImpl
this.distroMapper = distroMapper;
this.dataStore = dataStore;
this.serializer = serializer;
this.switchDomain = switchDomain;
this.globalConfig = globalConfig;
this.distroProtocol = distroProtocol;
this.dataStore = dataStore;
this.serializer = serializer;
this.switchDomain = switchDomain;
this.globalConfig = globalConfig;
this.distroProtocol = distroProtocol;
实例化DistroProtocol
new DistroLoadDataTask();定义一个服务端加载远程的线程任务,做服务端数据一致性同步
startVerifyTask();
其中有一个处DistroMapper#responsible();该方法的作用是验证当前服务器是否负责当前输入的服务
startLoadTask();
load();
除了自己,没有其他服务器,则线程休眠,一直while循环
loadAllDataSnapshotFromRemote();加载远程服务端的所有数据
DistroData distroData。发送http请求,拿到远程服务器的所有数据(这些数据是二进制的数据)
processSnapshot();处理拿到的远程数据
processData();
反序列化,拿到instances,封装成Map<String, Datum<Instances>> datumMap
遍历拿到的datumMap,判断当前当前机器上有没有当前service,若没有则创建空的service,
listener.onChange();相当于发布事件,把service保存起来
再对datumMap做一次遍历,发布事件
dataStore.put();把service放进去,更新service
onPut(key, value);填充DataStore,发布change事件,改变service这个对象的数据结构
datum.value = (Instances) value;将Datum与instances绑定起来
dataStore.put(key, datum);将datum放进DataStore里面
if (!listeners.containsKey(key)) {
return;
} listeners的类型为 (Map<String, ConcurrentLinkedQueue<RecordListener>> listeners)
return;
} listeners的类型为 (Map<String, ConcurrentLinkedQueue<RecordListener>> listeners)
notifier.addTask(key, DataOperation.CHANGE);发布事件(事件被放入一个队列里面),会有一个线程任务监听这个事件
服务器集群的数据一致性
CP模式(非临时性),则采用RaftConsistencyServiceImpl实现类
InstanceController#beat(),处理心跳
InstanceController#list(),处理客户端要获取服务列表
doSrvIpxt(),让客户端订阅服务端,服务端会推送信息给客户端
pushService.addClient(); 让客户端订阅服务端,它其实就是将客户端加到服务端维护的一个map里面;
map的格式为【ConcurrentMap<String, ConcurrentMap<String, PushClient>>】
解释:客户端使用com.alibaba.nacos.naming.core.Service#onchange(),来监听服务端推送的信息。
Service#onchange()->updateIPs()->getPushService().serviceChanged(this);// 广播一个消息
->this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
注意:PushService监听ServiceChangeEvent事件。
例子:服务端剔除某服务,会使用listener.onchange(),这个listener有很多实现,比如service,
那么就是service.onchange()->方法里面会发布一个ServiceChangeEvent事件,而PushService监听
ServiceChangeEvent事件,则调用PushService#onApplication()->那么就会开启线程,将服务更新的信息
推送给已订阅的客户端
map的格式为【ConcurrentMap<String, ConcurrentMap<String, PushClient>>】
解释:客户端使用com.alibaba.nacos.naming.core.Service#onchange(),来监听服务端推送的信息。
Service#onchange()->updateIPs()->getPushService().serviceChanged(this);// 广播一个消息
->this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
注意:PushService监听ServiceChangeEvent事件。
例子:服务端剔除某服务,会使用listener.onchange(),这个listener有很多实现,比如service,
那么就是service.onchange()->方法里面会发布一个ServiceChangeEvent事件,而PushService监听
ServiceChangeEvent事件,则调用PushService#onApplication()->那么就会开启线程,将服务更新的信息
推送给已订阅的客户端
0 条评论
下一页