Nacos 客户端服务发现源码流程图
2022-03-29 17:15:52   0  举报             
     
         
 Nacos 客户端服务发现源码流程图
    作者其他创作
 大纲/内容
 instance.setLastBeat(System.currentTimeMillis())
  url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api
  加入队列
  拼接URL
  run方法
  调用OnPut方法
  result.addAll(clusterObj.allIPs())
  放到tasks map中
  NacosServiceRegistryAutoConfiguration
  if (!result)
  通过 命名空间Id 获取
  Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances
  private final LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue
  for (Instance instance : currentIPs)
  设置间隔 默认5 秒
  创建消息
  初始化
  添加
  进行循环所有实例
  是 DOWN 从 UP状态的Set节点中删除
  核心方法 对服务名称做hash取绝对值 对服务数量取模 只有一台负责心跳机制如果其中有一台机器宕机,不算在取模数量之内 在com.alibaba.nacos.naming.cluster.ServerListManager#init方法 spring实例化后会对 @PostConstruct 注解标识的init方法 进行调用 init 又执行了run方法 进行节点状态维护
  serverStatus(@RequestParam String serverStatus)
  取出来的是 Runnable
  获取服务
  clientBeatProcessor.setService(this)
  deregister(HttpServletRequest request)
  查找数据处理器
  定时发送心跳
  核心注册逻辑 entryIPs 是新注册的机器
  添加任务
  创建 DistroDelayTask 任务
  加入到阻塞队列 由 InnerWorker run方法执行
  transportAgent.getDatumSnapshot(each.getAddress())
   if (service == null)
   根据ephemeral来判断 默认是true 复制了一份 写时复制思想
  获取所有实例
  if (!processor.process(task))
  接收节点之间的心跳状态
  循环所有要添加的 Instance 一般只有一个
  DataStore
  计算时间 [ 当前时间 - 最后一次心跳时间 > 超时时间 默认15秒]
  Cluster
  private Set<Instance> ephemeralInstancesprivate Set<Instance> persistentInstances
  属性
  ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name))
  发送心跳信息
  从缓存获取一个数据
  更新服务
  List<Instance> result = new ArrayList<>()
  ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor()
  获取
  发送数据
  添加缓存
  循环每个集群 一般只有一个 DEFAULT
  putTask((Runnable) task)
  for (Member each : memberManager.allMembersWithoutSelf())
  处理请求 返回所有Instance 列表
  parseInstance(request)
  return instance
  run()
  获取所有集群节点
  循环设置健康状态
  通过socket 发送给客户端
  集群才用到 集群节点同步数据
  返回实例
  HealthCheckReactor.scheduleNow(clientBeatProcessor)
  return serviceInfoMap.get(key)
  处理数据
  处理快照
  创建数据返回
  Message msg = new Message()
  keys.addAll(tasks.keySet())
  把任务放到map中  由 DistroTaskEngineHolder 类的 DistroDelayTaskExecuteEngine 执行
  核心方法 创建空服务
  从 搭建集群 conf文件的 cluster.conf 文件获取所有server
  构造方法中初始化线程
  对服务名称做hash取绝对值
  distroProtocol.onReceive(distroHttpData)
  删除实例列表 标识 remove
  serverListManager.onReceiveServerStatus(serverStatus)
  @Bean
  递归
  DistroSyncChangeTask.run
  注册 Put
  distroComponentHolder.findDataProcessor(resourceType)
  解析实例 封装为 Instance
  serviceKey = toBeUpdatedServicesQueue.take()
  dataProcessor.processSnapshot(distroData)
  for (Instance instance : instances) 
  url = \"http://\" + serverIP + \":\" + EnvUtil.getPort() + EnvUtil.getContextPath()                + UtilsAndCommons.NACOS_NAMING_CONTEXT + \"/service/status\"
  更新注册表后发布一个事件 保证实例的及时性
  同步集群节点之间的状态延迟2000毫秒执行一次  线程执行 run
  new Instance()
  核心方法 处理客户端心跳
  ProcessRunnable
  获取任务
   if (ephemeral) 是否临时
  return JacksonUtils.toObj(result)
  核心方法 健康检查
  同步成功直接结束 循环所有配置,但是同步一次就停止
  创建 Nacos自动服务注册器
  拼接实例健康状态
  return result
  构建心跳信息
  udpSocket.send(ackEntry.origin)
  Nacos 客户端  发现
  空的话 创建一个空服务
  queue.take()
  拼接 Url请求路径
  服务注销
  创建任务
  processTasks()
  DistroProtocol
  判断是否包含 不包含直接进行put
  核心方法启动
  返回true false 
  核心方法
  request 请求Api 传入参数
  字符串解析为 RsInfo 对象
  if (action == DataOperation.DELETE)
  添加服务并且初始化
  获取服务 ServiInfo
  核心方法 把构建的对象 传入
  return chooseServiceMap(namespaceId).get(serviceName)
  返回所有 Instance 列表
  注册流程结束
  获取并返回所有实例
  这里是节点心跳集群之间维护, 还有一个是节点实例状态维护
  new Service()
  加入到队列
  返回结果
  clientBeatProcessor.setRsInfo(rsInfo)
  for (Instance instance : instances)
  通过命名空间Id 获取服务 返回
  创建Instances 放入 instanceList
  加入 Map
  返回
  服务注册
  执行run方法
  service.getClusterMap().get(clusterName)
  TaskExecuteWorker worker = getWorker(tag)
  加入任务
  从阻塞队列 take 服务之间的状态
  实现
  把 key value 包装为 datum 放入 dataStore put到 dataMap 中
   if (!getDistroMapper().responsible(service.getName()))
  Notifier
  PushService.onApplicationEvent(ServiceChangeEvent event)
  return true
  beat(HttpServletRequest request)
  InnerWorker 是 TaskExecuteWorker 子类 
  List<Instance> instances = service.allIPs()
  返回列表
  循环
  如果是 CHANGE 上面传入的就是 change
  request 请求 
  线程池定时执行发送心跳 默认延迟设置的周期 5秒 BeatTask是个线程 看run 方法
  加载远端所有数据快照
  同步节点之间实例状态
   同步数据 syncData
  执行的DistroSyncChangeTask run方法
  获取 ServiceInfo
  ApplicationListener
  run 方法
  instance.setHealthy(true)
  dataStore.getDataMap()
  msg.setData(status)
  task.run()
  if (!instance.isMarked())
  memberAddressInfos.remove(newMember.getAddress())
  for (String namespaceId : serviceMap.keySet())
  persistentInstances = toUpdateInstances
  空
  GlobalExecutor.scheduleUdpSender(() -> {
  呼叫server端 注册服务
  ServerStatusReporter 的run 方法
  接收服务器状态
  设置心跳最新系统时间
  ClientBeatProcessor 是个线程 看他的run方法
  定时更新本地缓存
  接收请求
  Nacos 集群加入节点
  Instances instances = new Instances()
  把以前的赋值到 oldMap
  distroHash(serviceName) % servers.size()
  判断是不是 Down状态
  从注册中心注册表 获取的数据存入  currentInstanceIds
  if (code == NamingResponseCode.RESOURCE_NOT_FOUND)
  类型
  serviceInfoMap.get(serviceInfo.getKey())
  if (action == DataOperation.CHANGE)
  拼接Url
  删除实例
  重写 ApplicationListener 接口的方法
  这里不是null 已经放了一个空的
  临时节点
  for (RecordListener listener : listeners.get(datumKey))
  呼叫服务端
  循环所有节点 发送健康状态
  getAllTaskKeys()
  从本地缓存获取  ServiceInfo 包含实例列表
  allInstances.addAll(ephemeralInstances)
  是
  创建实例容器列表
  找不到 进行注册
  processServiceJson(result)
  创建了一个线程池
  each 要同步的数据和每台机器地址封装到 distroKeyWithTarget
  this.start()
  发起请求
  核心方法处理
   instance.setHealthy(valid)
  handleFailedTask()
  getBasicIpAddress(request)
  创建了个 旧的Map
  GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey))
  DataOperation action = pair.getValue1()
  解析json字符串
  否
  注册
  获取数据
   return chooseServiceMap(namespaceId).get(serviceName)
   if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut())
  监听事件
  继承
  设置服务为 this
  bind(event)
  startLoadTask()
  new DistroDelayTaskExecuteEngine()
  for (String namespaceId : allServiceNames.keySet())
  handle(pair)
  put到 consistencyService DelegateConsistencyServiceImpl
  判断类型
  DistroProtocol 类
  distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX)
  register(HttpServletRequest request)
  startDistroTask()
  getProcessor(taskKey)
  获取服务下的所有实例
  getPushService().serviceChanged(service)
  找到相同ip 和端口的 实例进行返回
  for (Instance ip : ips)
  执行同步
  beatInfo.setPeriod(instance.getInstanceHeartBeatInterval())
  核心方法 注册方法
  解析请求返回的字符串到对象
  如果返回找不到服务会重新注册服务
  返回服务
  ServiceReporter 的 run方法
  DistroConsistencyServiceImpl
  private volatile Notifier notifier = new Notifier()
      @PostConstruct    public void init() {        // notifier 提交到线程池执行        GlobalExecutor.submitDistroNotifyTask(notifier);    }
  将更新的服务添加到队列
   if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout())
  ServiceManager.init()
  从远端加载数据
  这里其实只是校验 因为 右边代码已经添加了空服务
  是空
  return namesMap
  发送请求
  String url = \"http://\" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()                    + UtilsAndCommons.NACOS_NAMING_CONTEXT + \"/instance?\" + request.toUrl()
  递归延迟执行
  从注册中心注册表 获取 Instance
  Service
  ipArray.add(ip.toIpAddr() + \"_\" + ip.isHealthy())
  核心方法 删除实例
  拉取其他节点数据
  instance.setHealthy(false)
   for (String each : distroComponentHolder.getDataStorageTypes())
  if (!serviceMap.containsKey(service.getNamespaceId()))
  创建Http请求 要同步的数据对象
  HealthCheckReactor.scheduleCheck(clientBeatCheckTask)
  持久节点
  同步数据
  getAllServiceNames()
  客户端本地缓存 服务
  allInstances.addAll(persistentInstances)
  更新心跳状态
  像所有节点 发送msg 健康状态
  获取实例 Instance
  执行一个任务 ProcessRunnable run方法
  return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE)
  设置心跳包
  进行循环
  Nacos 客户端  注册
   udpPush(ackEntry)
   for (Instance instance : instances)
  发送Http 请求进行注册
  循环节点
  发布事件,通知实例为健康
   把原有的实例放到 oldMap
  获取已有的 put 新的
  通过key dataStore.get(datumKey) 获取 Datum 也就是 instance
  执行 run方法
  Datum<T extends Record>
  public String keypublic T value
  请求结果进行封装
  创建一个 实例列表key ephemeral(临时节点) 默认为true
  NacosNamingService.getAllInstances(String serviceName)
  传入 UPDATE_INSTANCE_ACTION_REMOVE remove
  方法结束看一下这个方法 集群实例之间的维护
  核心服务发现方法
  getPushService().serviceChanged(this)
  创建任务到线程池 看 run 方法
  return null
  发起请求 查询服务列表 这里同时发送了一个 Udp端口 用来 服务端及时推送服务端注册销毁列表 
  更新实例
  for (Object taskKey : keys)
  if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action))
  list(HttpServletRequest request)
  获取服务组名称
  判断当前 ip 和端口和心跳包一致
  注册服务
  loadAllDataSnapshotFromRemote(each)
  for (Instance instance : ips)
  获取serviceInfo
  异步 发起delete请求 本地路径 com.alibaba.nacos.naming.controllers.InstanceController.deregister
  String url = \"http://\" + serverIP + \":\" + EnvUtil.getPort() + EnvUtil.getContextPath()                + UtilsAndCommons.NACOS_NAMING_CONTEXT + \"/operator/server/status\"
  获取实例
  service.allIPs()
  创建任务 看run方法
  参数加入命名空间
  for (String cluster : clusters)
  是否标记过
  ServiceManager 在spring初始化后 会调用 @PostConstruct 标识的init 方法
  创建客户端心跳处理器
  load()
  核心方法 执行 DistroDelayTaskProcessor.process
  if (instance.getIp().equals(ip) && instance.getPort() == port)
  更新状态
   拼接url 提供server状态
  循环所有配置节点
  第一次是空
  reqApi(UtilAndComs.nacosUrlBase + \"/instance/list\
  同步集群之间实例数据的阻塞队列
  listWithHealthStatus(@RequestParam String key)
   获取 worker
  memberManager.update(server)
  distroDataStorage.getDatumSnapshot()
  获取 key
  服务发现
  serviceStatus(HttpServletRequest request)
  获取任务处理器 获取的为 DistroDelayTaskProcessor
  属性 Map的 Vlaue
  提交任务到线程池 run方法
  获取临时节点还是持久节点标志
  构建 Nacos Instance
  String datumKey = pair.getValue0()
  定时延迟递归调用发送心跳 传入周期时间 默认还是5秒
  方法返回的列表 instanceList
  参数加入命名空间 ID
  onApplicationEvent(WebServerInitializedEvent event)
  添加服务到注册表 service 目前还是空的 还没有实例
  DistroLoadDataTask run方法
  for (Instance ip : toUpdateInstances)
  deleteIp(instance)
  递归拉取服务列表
  if (NodeState.DOWN.equals(newMember.getState()))
  nacos 自动配置类 spring.factories 文件
  getNacosInstanceFromRegistration(registration)
  service.allIPs(ephemeral)
  获取所有节点实例
  getServers()
  listener.onDelete(datumKey)
  获取数据快照
  接受请求
  获取所有数据
  service.allIPs(clusters)
  集群实例状态维护方法
  获取消息
  传入 UPDATE_INSTANCE_ACTION_ADD add  
  BeatInfo beatInfo = new BeatInfo()
  返回数据
  超过15秒 健康状态改为 false
  DistroConsistencyServiceImpl 实现类的 Put方法
  线程执行
  this.serviceRegistry.register(getRegistration())
  如果之前标记过 把健不健康状态改为健康
  拼接url
  queue.put(task)
  查找数据存储
  创建了个空map
  for (Member server : sameSiteServers)
  循环所有获取的 keys
  提交任务继续异步去操作 修改节点事件的装填
  循环集群所有节点 allMembersWithoutSelf() 方法中删除了自己 也就是集群除自己所有节点
  执行 http 请求发送
  putServiceAndInit(service)
  NamingProxy                    .reqApi(EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + \"/instance/\"                            + \"statuses\
  return target >= index && target <= lastIndex
  UpdatedServiceProcessor 的 run方法
  getIpAddress(request)
  处理任务
  take
  getAllDatums()
  如果是 remove
  instanceMap.remove(instance.getDatumKey())
  注册实例 NamingService 接口
  List<Instance> instances = cluster.allIPs(true)
  GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor())
  ephemeralInstances = toUpdateInstances
  返回实例列表
  putService(service)
  异步处理同步节点之间实例状态
  如果获取的 Instance 是空 则注服务册
  绑定
  return new DistroData(new DistroKey(\"snapshot\
  核心方法 加入实例
  跳过自己节点
  处理快照数据
  放入请求解析后的 ServiceInfo
  收到请求执行
  request 发送请求
  if (result)
  如果不成功
  如果状态 是down 从列表 server列表删除
  RestResult<String> result = HttpClient.httpPutLarge(                    \"http://\
  构造方法
  如果是临时节点
  获取集群下所有实例 true 是临时节点
  return serviceMap.get(namespaceId)
   currentInstanceIds.add(instance.getInstanceId())
  service.processClientBeat(clientBeat)
  改为健康 修改的是引用所以实例已经变为健康
  计算时间 [ 当前系统时间 - 最后一次心跳时间 > 删除时间 默认30秒]
  发送msg
  再次循环
  DistroTaskEngineHolder
  http 获取数据
  实现注册
  获取所有实例名称 key 命名空间id  val 对应的服务名称列表
  service.allIPs(true)
  private final BlockingQueue<Runnable> queue
  reqApi(UtilAndComs.nacosUrlBase + \"/instance/beat\
   return new ArrayList<>(instanceMap.values())
  加载数据
  return msg
  不成功则重试
  register()
  创建服务对象
  memberManager.allMembers()
  把 datumKey action 封装一个对象 加入阻塞队列 注册完成 Notifier 本类是个线程 看run方法
  接收节点之间的状态
  for (Member server : allServers)
  HttpClient.httpGet(                \"http://\
  永久节点
  dataProcessor.processData(distroData)
  service.init()
  AbstractAutoServiceRegistration
  发布心跳超时事件
  创建实例 设置端口 ip ephemeral  集群 
  循环所有实例
  if (serviceMap.get(namespaceId) == null)
  NamingProxy.getAllData(targetServer)
  再次加入到队列
  distroComponentHolder.findDataStorage(type)
  设置心跳信息
  worker.process(task)
  重写
  instances.setInstanceList(instanceList)
  这个是集群环境决定检测心跳方法
  InnerWorker run方法
  建造定时
  空创建 ServiceInfo
   
 
 
 
 
  0 条评论
 下一页
  
   
   
   
   
  
  
  
  
  
  
  
  
 