Nacos2.1.0源码
2025-04-06 10:29:03 0 举报
123
作者其他创作
大纲/内容
font color=\"#e855a4\
result.setHosts(getAllInstancesFromIndex(service));
放入本地缓存
return list;
registerService
return dataProcessor.processData(distroData);
ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
NamingClientProxyDelegate
clientConnectionEventListener.clientDisConnected(connection);
发起grpc调用
return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
循环内
DistroDataRequestHandler.handle
finally
Member member = memberManager.find(targetServer);
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
服务订阅表
里面就是把instance里的属性拿出来放到instanceInfo
ServiceInfo serviceObj = namingClientProxy.queryInstancesOfService(font color=\"#ed77b6\
this.namespace = namespace;this.group = group;this.name = name;this.ephemeral = ephemeral;
processTasks();
for (String outDateConnectionId : outDatedConnections) { if (!successConnections.contains(outDateConnectionId)) { unregister(outDateConnectionId); } }
return publisherIndexes.containsKey(service) ? publisherIndexes.get(service) : new ConcurrentHashSet<>();
result = font color=\"#ed77b6\
return result;
return (ServiceInfo)this.serviceInfoMap.get(key);
CHANGE、ADD
这个逻辑就不画了,到时候直接看源码
subscriberIndexes.get(service).add(clientId)
serviceIndexesManager.getAllClientsRegisteredService(service)
return subscriberIndexes.containsKey(service) ? subscriberIndexes.get(service) : new ConcurrentHashSet<>();
result.add(instance);
switch (request.getType()) { case NamingRemoteConstants.font color=\"#e855a4\
for (Object taskKey : keys)
异步处理
List<Instance> hosts
AbstractDelayTask task = removeTask(taskKey);
Connection remove = this.connections.remove(connectionId);
run()
incFailCount();
clientDisconnected(connect.getMetaInfo().getConnectionId());
setHosts
PushDataWrapper wrapper = generatePushData();
DistroClientDataProcessor
InstanceRequestHandler.handle
Set<Instance> result = new HashSet<>();
服务端
this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);
this.font color=\"#ed77b6\
handleNacosException(e);
Set<String> successConnections = new HashSet<>();
return font color=\"#ed77b6\
this.hosts = hosts;
serviceInfoHolder.processServiceInfo(result);
grpc调用
失败重试
DistroData distroData = getDistroData(type);
服务发现是第一次调用服务接口时根据服务名去服务端获取的,这个参看ribbon的源码讲解。1.4.1那边的源码讲的有点问题,真正的路口在这边
ServiceInfo result = emptyServiceInfo(service);
getTargetClientIds()
this.register();
handleSyncData(request.getDistroData());
return new LinkedList<>(result);
client是grpc底层的东西,类似于netty的socketchannel(与之对应的是serverscoketchannel,代表着服务端),代表着客户端。每个客户端与nacos服务端连接的时候,都生成唯一的clientId,所以每个client对象代表着一个客户端。
serviceStorage.getData(service)
ServiceEvent.ServiceSubscribedEvent
Client client = clientManager.getClient(clientSyncData.getClientId());
String resultGroupedName = groupName + \"@@\" + serviceName;
从服务注册表里拿
Service service = serviceChangedEvent.getService();
IpPortBasedClient
for (String each : serviceIndexesManager.getAllClientsRegisteredService(service))
distroProtocol.onReceive(distroData)
DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
keys.addAll(tasks.keySet());
true
ConnectionManager.start()
ClientServiceIndexesManager.handleClientOperation
Client client = clientManager.getClient(clientId);
serviceInfoHolder.processServiceInfo(serviceObj)
getPushData(service)
服务注册表
从服务注册表拿
onApplicationEvent(WebServerInitializedEvent event)
font color=\"#ed77b6\
ClientServiceIndexesManager.onEvent
Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);
processor.process(task)
DELETE
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
String serviceId = registration.getServiceId();
InstanceRequest request = new InstanceRequest(font color=\"#e855a4\
这么map表示某个服务被哪些nacos客户端订阅了
AbstractDelayTask existTask = tasks.get(key);
nacos服务端集群其他节点
outDatedConnections.add(client.getMetaInfo().getConnectionId());
//font color=\"#e74f4c\
Collection<Object> keys = getAllTaskKeys();
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);
ServiceInfo result = new ServiceInfo(); result.setName(service.getName()); result.setGroupName(service.getGroup()); result.setLastRefTime(System.currentTimeMillis()); result.setCacheMillis(switchDomain.getDefaultPushCacheMillis()); return result;
NacosDelayTaskExecuteEngine的构造方法
客户端
ConnectionBasedClient client = clients.remove(clientId);
Client client = clientManager.getClient(distroKey.getResourceKey());
添加客户端信息
publisherIndexes.get(service).add(clientId);
ServiceEvent.ServiceChangedEvent
processData
serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo());
PushDelayTask
就是spring.application.name配置的微服务名
catch
SubscribeServiceRequestHandler.handle
Connection connection = getConnection(outDateConnectionId);
return delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service) : delayTask.getTargetClients();
getRegistration()
Service service = event.getService();
PushExecuteTask.run
return response.getServiceInfo();
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
循环外
DistroClientDataProcessor.syncToAllServer
return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
@ConfigurationProperties(\"spring.cloud.nacos.discovery\")public class NacosDiscoveryProperties {
Service
就是往本地注册表放数据
Client client = clientManager.getClient(each);
PushDelayTaskProcessor
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay
pushToAll = true;targetClients = null;
就是自己的application.yml文件配置的group
服务注册
List<Instance> list = serviceInfo.getHosts()
String clientId = event.getClientId();
集群同步逻辑
Instance instance = getNacosInstanceFromRegistration(registration);
ConnectionBasedClientManager
通过grpc从服务端去拉服务实例
handlerClientSyncData(clientSyncData);
return !isEmpty(clusters) ? name + \"@@\" + clusters : name;
给nacos客户端发出探测请求,如果成功收到响应,说明客户端是正常的。2.1.0用的是grpc长连接,所以可以做探测操作,1.4.1是http短连接
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
异步调用
Instance instance = new Instance();instance.setIp(registration.getHost());instance.setPort(registration.getPort());instance.setWeight(nacosDiscoveryProperties.getWeight());instance.setClusterName(nacosDiscoveryProperties.getClusterName());instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());instance.setMetadata(registration.getMetadata());instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());return instance;
InstancePublishInfo instanceInfo = getPublishInfo(instance);
serviceInfo = this.font color=\"#ed77b6\
String namespaceId = request.getNamespace(); String serviceName = request.getServiceName(); String groupName = request.getGroupName(); String app = request.getHeader(\"app\
for (String outDateConnectionId : outDatedConnections)
return new SubscribeServiceResponse(ResponseCode.font color=\"#e855a4\
Connection connection = connectionManager.getConnection(connectionId);
2.1.0注册表升级成这种形式主要原因:1、注册表是ConcurrentHashMap,因此读写并发冲突就自动解决了。2、1.4.1的注册表就一个大map,所有nacos客户端都操作这个map,要控制并发的话力度比较大,所以他们采用了COW机制。2.1.0把一个注册表拆成多个map,减小了map的复杂度,有助于降低并发冲突带来的影响。publisherIndexes其实就是把clientid保存起来,处理map的逻辑会快很多,会简单很对,不像1.4.1,要做很多的比对。其实就是把复杂的map拆成多个简单的map,逻辑上处理起来更简单更快,对并发影响更小。
run
failCount++;
String group = nacosDiscoveryProperties.getGroup();
1、如果是服务注册产生的事件,走true逻辑,给所有订阅了这个服务的客户端推送。2、如果是服务发现因为订阅产生的事件,走false逻辑,给当前客户端推送。
Service singleton = ServiceManager.getInstance().getSingleton(service);
服务健康检测
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
NamingService namingService = namingService();
if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME)
服务发现
Connection client = entry.getValue();
return publishers.get(service);
NacosServiceRegistryAutoConfiguration
从这里可以看出,service下面只能放临时实例或持久实例了,不能再跟1.4.x一样两个都能放了
定时去服务端拉取注册表到本地缓存,这个对于2.1版本来说还是需要的,虽然2.1版本有服务订阅功能,类似与zk的监听机制。但是可能网络问题,变动消息推送失败了。
CHANGE
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
Set<String> outDatedConnections = new HashSet<>();long now = System.currentTimeMillis();
rpcClient.request(request)
for (String each : getTargetClientIds())
SubscribeServiceRequest request = new SubscribeServiceRequest(font color=\"#ed77b6\
NamingPushRequestHandler.requestReply
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
this.pushToAll = false;this.targetClients.add(targetClient);
自动配置类
Set<Instance> result = new HashSet<>(); Set<String> clusters = new HashSet<>(); for (String each : font color=\"#e855a4\
getExecuteClientProxy(instance)
return Optional.ofNullable(client.getInstancePublishInfo(service));
NacosTaskProcessor processor = getProcessor(taskKey);
从服务注册表和服务订阅表移除
PushExecutorRpcImpl
服务订阅表里拿
getAllInstancesFromIndex(service)
equals方法
this.serviceRegistry.register(this.getRegistration());
是
所有客户端连接
Client client = event.getClient();for (Service each : client.getAllSubscribeService()) { font color=\"#e74f4c\
有个namespace属性,其实就是客户端配置的namespaceid
for (Member each : font color=\"#e855a4\
NamingSubscriberServiceV2Impl.onEvent
NacosServiceRegistryAutoConfiguration自动配置类里
doExecuteWithCallback(new DistroExecuteCallback());
这个就是注册表在客户端的本地缓存
异步调用,集群同步逻辑
this.start();
拉取失败的次数越多,定时执行的时间间隔就越长
String clientIp = client.getMetaInfo().getClientIp();
EphemeralIpPortClientManager
DistroDelayTaskProcessor.process
putIfAbsent和computeIfAbsent都表示map里有就不放入,没有才放入。map里有表示服务被注册过了。所以如果是同个微服务多个机子注册,这个singletonRepository集合里只保存第一次注册上来的。确实也只要存第一次就行了,因为service里没装ip和port
unregister(outDateConnectionId);
NacosNamingService.selectInstances

收藏

收藏
0 条评论
下一页