dubbo
2021-01-12 14:28:49 0 举报
AI智能生成
dubbo原理
作者其他创作
大纲/内容
基础
URL
配置总线或者统一契约
标准的URL格式
protocol://username:password@host:port/path?key=value&key=value
protocol
URL的协议
如http,https,FTP,SMTP
username:password
用户名密码
host:port
主机端口或域名
path
请求路径
parameters
参数键值对
dubbo中的URL
dubbo://172.17.32.91:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=32508&release=&side=provider×tamp=1593253404714dubbo://172.17.32.91:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=32508&release=&side=provider×tamp=1593253404714
契约的好处
统一规范,代码易读易懂
使用URL作为方法的入参(相当于一个key/value都是string的Map),
表达的含义比单个参数更丰富,易扩展,新参数追加key/value
表达的含义比单个参数更丰富,易扩展,新参数追加key/value
简化沟通
URL在spi中应用
被@Adaptive注解标注的适配器方法
方法被@Adaptive({"protocol"})注解,dubbo运行时会为其动态生成相应的 $Adaptive 类型
在生成的RegistryFactory$Adaptive中自动实现getRegistry(),
其中根据URL的protocol确定扩展名,从而确定使用的具体实现类
其中根据URL的protocol确定扩展名,从而确定使用的具体实现类
URL在服务暴露中应用
dubbo启动会将自身暴露的服务注册到注册中心,例如zk
ZookeeperRegistry.doRegister()
传入的URL
provider的地址172.17.32.91:20880
暴露的接口org.apache.dubbo.demo.DemoService
根据url的参数确定zk上创建的节点
dynamic参数 -> zk节点的临时还是持久节点
URL在服务订阅中应用
ZookeeperRegistry.doSubscribe启动订阅
consumer://...?application=dubbo-demo-api-consumer&category=providers,configurators,routers&interface=org.apache.dubbo.demo.DemoService...
protocol consumer 订阅协议
category 订阅的分类信息
providers
configurators
routers
interface 订阅哪个服务接口
toCategoriesPath(url)整理成一个zk路径,并在其上添加监听
dubbo中SPI应用
通过SPI机制加载插件
JDK的SPI
Classpath下的META-INF/services/目录创建一个以服务接口命名的文件
此文件记录了该jar包提供的服务接口的具体实现类
ServiceLoader.load()加载对应文件中的实现类
尝试获取当前使用的ClassLoader
获取当前线程绑定的ClassLoader,查找失败后使用SystemClassLoader
调用reload()
清理providers缓存,LinkedHashMap类型的集合
key 实现类的完整类名
value为实现类的对象
创建LazyIterator迭代器
读取SPI配置文件并实例化实现类对象
hasNext() -> hasNextService()
查找META-INF/services/目录下的SPI配置文件,并遍历
next() -> nextService()
负责实例化hasNextService()方法读取的实现类
并放到providers缓存起来
ServiceLoader.iterator()
依赖LazyIterator实现的匿名内部类
先查缓存,缓存失败再通过LazyIterator加载
遍历SPI配置文件中定义的所有实现类,并全部初始化
dubbo的SPI
SPI配置文件分类
META-INF/services/
该目录下的SPI配置文件用来兼容JDK的SPI
META-INF/dubbo/
该目录用于存放用户自定义SPI文件
META-INF/dubbo/internal/
用于存放dubbo内部使用的SPI配置文件
配置格式
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
dubbo的SPI为KV格式
key
扩展名
value
实现类
@SPI注解
接口被@SPI注解修饰,表示该接口为扩展接口
@SPI("dubbo") 如果没有指定扩展名,默认dubbo
ExtensionLoader如何处理@SPI注解
功能类似JDK SPI中ServiceLoader
Protocol protocol = ExtensionLoader
.getExtensionLoader(Protocol.class).getExtension("dubbo");
.getExtensionLoader(Protocol.class).getExtension("dubbo");
静态字段
ConcurrentMap<Class,ExtensionLoader> EXTENDION_LOADERS
dubbo中一个扩展接口对应一个ExtensionLoader实例,
该集合缓存全部的ExtensionLoader实例,key为扩展接口
该集合缓存全部的ExtensionLoader实例,key为扩展接口
ConcurrentMap<Class<?>,Object> EXTENDION_INSTANCES
缓存扩展实现类与其实例对象的映射关系
实例字段
Class<?> type 类型
当前ExtensionLoader实例负责加载扩展接口
String cacheDefaultName
记录type扩展接口@SPI注解上的值,即默认扩展名
ConcurrentMap<Class<?>, String> cachedNames
缓存了该 ExtensionLoader 加载的扩展实现类与扩展名之间的映射关系
Holder<Map<String, Class<?>>> cachedClasses
缓存了该 ExtensionLoader 加载的扩展名与扩展实现类之间的映射关系。
cachedNames 集合的反向关系缓存
cachedNames 集合的反向关系缓存
ConcurrentMap<String, Holder<Object>> cachedInstances
缓存了该 ExtensionLoader 加载的扩展名与扩展实现对象之间的映射关系
流程
ExtensionLoader#getExtensionLoader
先从EXTENSION_LOADERS缓存中获取扩展点加载器
如果为空,cas初始化扩展点加载器EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type))
ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()
从缓存中,获得自适应拓展对象,双检锁判断instance是否初始化
创建自适应拓展对象createAdaptiveExtension
设置到缓存
getExtension() 根据扩展名称获取扩展实例
双检锁从缓存获取
如果为空则初始化创建createExtension()
会遍历全部 Wrapper 类并一层层包装到真正的扩展实例对象外层
注入属性injectExtension(instance)
@Adaptive 注解与适配器
有被 @Adaptive 注解修饰的方法的类会被dubbo生成适配器类
如 Transporter ->Transporter$Adaptive
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
Client connect(URL url, ChannelHandler handler) throws RemotingException;
/ 确定扩展名,优先从URL中的client参数获取,其次是transporter参数
// 这两个参数名称由@Adaptive注解指定,最后是@SPI注解中的默认值
String extName = url.getParameter("client",
url.getParameter("transporter", "netty"));
// 这两个参数名称由@Adaptive注解指定,最后是@SPI注解中的默认值
String extName = url.getParameter("client",
url.getParameter("transporter", "netty"));
获取扩展实现的优先级,先是@Adaptive然后再试@SPI注解上的默认值
自动包装特性
isWrapperClass()
判断该扩展实现类是否包含拷贝构造函数(即构造函数只有一个参数且为扩展接口类型),
如果包含,则为 Wrapper 类,这就是判断 Wrapper 类的标准
如果包含,则为 Wrapper 类,这就是判断 Wrapper 类的标准
cachedWrapperClasses缓存包装类
自动装配特性
injectExtension()
扫描其全部 setter 方法,并根据 setter 方法的名称以及参数的类型,
加载相应的扩展实现,然后调用相应的 setter 方法填充属性
加载相应的扩展实现,然后调用相应的 setter 方法填充属性
实现的自动装配依赖了 ExtensionFactory
SpiExtensionFactory
根据扩展接口获取相应的适配器,没有到属性名称
SpringExtensionFactory
将属性名称作为 Spring Bean 的名称,从 Spring 容器中获取 Bean
@Activate注解与自动激活特性
@Activate标注在类上
group 属性
修饰的实现类是在 Provider 端被激活还是在 Consumer 端被激活
value 属性
修饰的实现类只在 URL 参数中出现指定的 key 时才会被激活
order 属性
用来确定扩展实现类的排序
loadClass() 方法对 @Activate 的扫描,其中会将包含 @Activate 注解的实现类缓存到 cachedActivates 这个实例字段
使用 cachedActivates集合
getActivateExtension()
1.获取默认激活的扩展集合
条件
①在 cachedActivates 集合中存在
②@Activate 注解指定的 group 属性与当前 group 匹配
③扩展名没有出现在 values 中(即未在配置中明确指定,也未在配置中明确指定删除)
④URL 中出现了 @Activate 注解中指定的 Key
海量定时任务,时间轮
概念
时间轮是一种高效的、批量管理定时任务的调度模型
环形结构 双向链表
一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务
指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务
单层时间轮的容量和精度都是有限的,对于精度要求特别高、时间跨度特别大或是海量定时任务需要调度的场景,通常会使用多级时间轮以及持久化存储与时间轮结合
核心接口
TimerTask
dubbo中所有的定时任务都继承TimerTask
Timeout
和TimerTask一对一
两者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系
Timer
定义了定时器的基本行为
newTimeout()
提交一个定时任务(TimerTask)并返回关联的 Timeout 对象
类似于向线程池提交任务
HashedWheelTimeout
Timeout 接口的唯一实现
HashedWheelTimer 的内部类
用途
时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器
定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle),用于在时间轮外部查看和控制定时任务
核心字段
prev、next(HashedWheelTimeout类型)
分别对应当前定时任务在链表中的前驱节点和后继节点
task(TimerTask类型)
指实际被调度的任务
deadline(long类型)
指定时任务执行的时间,创建 HashedWheelTimeout 时指定的
currentTime(创建 HashedWheelTimeout 的时间) + delay(任务延迟时间) - startTime(HashedWheelTimer 的启动时间)
时间单位为纳秒
state(volatile int类型)
定时任务当前所处状态
INIT(0)、CANCELLED(1)和 EXPIRED(2)
STATE_UPDATER 字段(AtomicIntegerFieldUpdater类型)实现 state 状态变更的原子性
remainingRounds(long类型)
当前任务剩余的时钟周期数
时间轮所能表示的时间长度是有限的,在任务到期时间与当前时刻的时间差,
超过时间轮单圈能表示的时长,就出现了套圈的情况,需要该字段值表示剩余的时钟周期
超过时间轮单圈能表示的时长,就出现了套圈的情况,需要该字段值表示剩余的时钟周期
核心方法
isCancelled()、isExpired() 、state()
用于检查当前 HashedWheelTimeout 状态
cancel()
状态设置为 CANCELLED,并将当前 HashedWheelTimeout 添加到 cancelledTimeouts 队列中等待销毁
expire()
当任务到期时,会调用该方法将当前 HashedWheelTimeout 设置为 EXPIRED 状态,
然后调用其中的 TimerTask 的 run() 方法执行定时任务
然后调用其中的 TimerTask 的 run() 方法执行定时任务
remove()
将当前 HashedWheelTimeout 从时间轮中删除
HashedWheelBucket
时间轮中的一个槽
实际上就是一个用于缓存和管理双向链表的容器,双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定时任务。
HashedWheelBucket 持有双向链表的首尾两个节点
每个节点均持有前驱和后继的引用,可以正向或是逆向遍历整个双向链表了
核心方法
addTimeout()
新增 HashedWheelTimeout 到双向链表的尾部
pollTimeout()
移除双向链表中的头结点,并将其返回
remove()
从双向链表中移除指定的 HashedWheelTimeout 节点
clearTimeouts()
循环调用 pollTimeout() 方法处理整个双向链表,并返回所有未超时或者未被取消的任务
expireTimeouts()
遍历双向链表中的全部 HashedWheelTimeout 节点
在处理到期的定时任务时,会通过 remove() 方法取出,并调用其 expire() 方法执行
对于已取消的任务,通过 remove() 方法取出后直接丢弃
对于未到期的任务,会将 remainingRounds 字段(剩余时钟周期数)减一
HashedWheelTimer
Timer 接口的实现,通过时间轮算法实现了一个定时器
根据当前时间轮指针选定对应的槽(HashedWheelBucket),从双向链表的头部开始迭代,
对每个定时任务(HashedWheelTimeout)进行计算,属于当前时钟周期则取出运行,不属于则将其剩余的时钟周期数减一操作
对每个定时任务(HashedWheelTimeout)进行计算,属于当前时钟周期则取出运行,不属于则将其剩余的时钟周期数减一操作
核心属性
workerState(volatile int类型)
时间轮当前所处状态,可选值有 init、started、shutdown。同时,有相应的 AtomicIntegerFieldUpdater 实现 workerState 的原子修改
startTime(long类型)
当前时间轮的启动时间,提交到该时间轮的定时任务的 deadline 字段值均以该时间戳为起点进行计算
wheel(HashedWheelBucket[]类型)
该数组就是时间轮的环形队列,每一个元素都是一个槽。当指定时间轮槽数为 n 时,
实际上会取大于且最靠近 n 的 2 的幂次方值
实际上会取大于且最靠近 n 的 2 的幂次方值
hashmap 大小为2的次幂原理一致
timeouts、cancelledTimeouts(LinkedBlockingQueue类型)
timeouts 队列用于缓冲外部提交时间轮中的定时任务,
cancelledTimeouts 队列用于暂存取消的定时任务
cancelledTimeouts 队列用于暂存取消的定时任务
tick(long类型)
时间轮的指针,是一个步长为 1 的单调递增计数器
mask(int类型)
mask = wheel.length - 1,执行 ticks & mask 便能定位到对应的时钟槽
和hashmap一样,类似取模,位运算比取模更快
ticksDuration(long类型)
时间指针每次加 1 所代表的实际时间,单位为纳秒
pendingTimeouts(AtomicLong类型)
当前时间轮剩余的定时任务总数
workerThread(Thread类型)
时间轮内部真正执行定时任务的线程
worker(Worker类型)
真正执行定时任务的逻辑封装这个 Runnable 对象中
使用流程
1.第一步编写TimeTask任务
2.第二步创建 HashedWheelTime
ticksPerWheel,wheel 是刚好大于等于ticksPerWheel中最小的2的N次方 N是整数
默认 512
创建work线程
3.创建timeout (1s后执行PrintTask#run方法)
第一个timeout 创建时,启动worker线程,开始循环检测,并记录开始时间startTime
新创建timeout时不会计算加入到具体bucket中
worker作为HashedWheelTimer循环检测调度器,会将新创建的timeout添加到对应的bucket,
并周期性检测每个bucket取出到期(deadline)的timeout 执行对应的TimeTask
并周期性检测每个bucket取出到期(deadline)的timeout 执行对应的TimeTask
waitForNextTick() 等待期间sleep,在sleep期间如果有timeout到期了,醒来在执行
tickDuration 就控制着时间的精准度,值越小精准度越高,worker线程则越繁忙
dubbo中发送请求超时检测time中tickDuration=30毫秒
未到期但被取消的任务会放到 cancelledTimeouts集合中, worker线程周期性的执行do while方法会调用processCancelledTasks() 会从bucket中删除调对应的timeout
worker线程执行 transferTimeoutsToBuckets() 将新创建的timeout根据deadline计算出remainingRounds几轮与idx 并加入到对应的bucket
bucket.expireTimeouts(deadline); 将到期的bucket中所有的timeout判断并进行超时处理,到期的timeout进行超时处理,即调用TimeTask的run方法
风险:TimeTask.run() 方法是阻塞同步执行的,如果某task执行时间过久则会阻塞Worker线程 进一步拖慢超时检测流程
Dubbo 中对时间轮的应用
请求超时检测,发送请求时创建 timeout
请求正常返回时cancel timeout
发送同步请求,将超时检测交给HashedWheelTimer中worker线程,发送请求的工作线程阻塞等待response
失败重试
Provider 向注册中心进行注册失败时的重试操作,或是 Consumer 向注册中心订阅时的失败重试等
周期性定时任务
定期发送心跳请求,请求超时的处理,或是网络连接断开后的重连机制
代理模式
作用
在 RPC 框架中,代理可以完成序列化、网络 I/O 操作、负载均衡、故障恢复以及服务发现等一系列操作,而上层调用方只感知到了一次本地调用
实现
JDK动态代理
编译阶段就要为每个被代理类创建一个代理proxy类,
当需要代理的类很多时,就会出现大量的 Proxy 类
当需要代理的类很多时,就会出现大量的 Proxy 类
核心
InvocationHandler 接口
Proxy.newProxyInstance()
参数
加载动态生成的代理类的类加载器
业务类实现的接口
InvocationHandler对象
原理
getProxyClass0()
如果指定的类加载器中已经创建了实现指定接口的代理类,则查找缓存;
否则通过ProxyClassFactory创建实现指定接口的代理类
否则通过ProxyClassFactory创建实现指定接口的代理类
0 条评论
下一页
为你推荐
查看更多