Hystrix
2020-04-25 13:41:51 0 举报
Hystrix流程
作者其他创作
大纲/内容
terminateCommandCleanup
HystrixCommand#toObservable主要构建了Observable的五个回调类这里称返回的回调类为observableOne
Observable#unsafeCreate(new OnSubscribeDefer<T>(observableFactory))此时创建一个OnSubscribeDefer并将FuncOne作为构造器参数传进,这里将构造的新类成为subscribeDeferOne
Hystrix
ThreadPoolWorker#schedule这里是执行command的核心类
调用者
HystrixThreadPool.Factory#getInstance1.初始化线程池2.使用map结构来保存线程池3.一个服务名对应一个线程池
线程执行过后会回调AbstractCommand#executeCommandWithSpecifiedIsolation中生成的call方法
AbstractCommand#applyHystrixSemantics调用applyHystrixSemantics回调callexecutionSemaphore.tryAcquire()如果不使用Semaphore配置,那么tryAcquire使用的是TryableSemaphoreNoOp中的方法,返回true
这里的FuncOne是最开始toObservable方法中包装的会执行其中的call方法,因为其中用到了Observable.defer(applyHystrixSemantics)所以先调用OnSubScribeDefer的call方法最后调用FuncOne的call方法也就是applyHystrixSemantics的call方法
HystrixInvocationHandler#invoke熔断组件找到响应的方法调用
HystrixCommand#HystrixCommand初始化此类
handleFallBack
lift(new HystrixObservableTimeoutOperator<R>(_cmd))新建operator超时对象
修改commandStateOBSERVABLE_CHAIN_CREATED ->USER_CODE_EXECUTED
HystrixCommand#run()这个方法也就是HystrixInvocationHandler#invoke中实现的run方法
HystrixThreadPoolDefault#getScheduler()初始化线程
BlockingObservable.from(this)
BlockingObservable#from
Observable#toBlocking
AbstractCommand#AbstractCommand在此初始化相关组件
HystrixCircuitBreakerImpl#isOpen负责了断路器的判断和开启工作
HystrixDelegatingContract
HystrixCommand#execute开始执行回调
target你要调用的服务
Observable#single()lift(OperatorSingle.<T> instance())返回Holder.INSTANCE作为参数传入
ScheduledThreadPoolExecutor#scheduleAtFixedRate
回调
new BlockingObservable<T>(o)此构造器this.o = o所以o = observableOne
延迟调度任务,每隔getIntervalTimeInMilliseconds()执行一次默认1s
HystrixCircuitBreakerImpl#allowSingleTest半开启状态的判断,如果开启并且不再睡眠期,则放开一个请求并更新最后更新时间,确保睡眠期的5s内只有一个请求放入,请求成功后会回调markSuccess()方法关闭熔断
dispatchmap结构,储存着方法的Handler
当前面的Observable对象subscribeOn方法会新建OperatorSubscribeOn对象并且调用其中的call方法然后调用HystrixContextScheduler的createWorker方法
OnSubscribeLift(font color=\"#336600\
重点看这里利用RxJava来创建FutureHystrixCommand#toObservable().toBlocking().toFuture()
定时清理任务
HystrixCommand#toObservable创建多个回调函数
线程拒绝
OnSubscribeDefer(observableFactory)this.observableFactory = observableFactory;所以此类中的observableFactory = FuncOne
HystrixTargeter#target1.调用此targeter的target方法获取生成的对象2.如果熔断降级则调用targetWithFallbackFactory方法
Observable(OnSubscribe<T> f) { this.onSubscribe = f; }所以此Observable中的onSubscribe = f = subscribeDeferOne
HystrixTimer#addTimerListener
HystrixTargeter#targetWithFallbackFactory1.生成回调工厂
AbstractCommand#executeCommandAndObserve1.创建各种回调函数2.properties.executionTimeoutEnabled().get()默认配置timeOutEnabled为true
wrapWithAllOnNextHooks
new Observable<T>(RxJavaHooks.onCreate(f));这里会返回新的Observable,因为RxJavaHooks.onCreate(f)返回的就是f,f又是subscribeDeferOne,所以作为参数传进去
TimerListener#tickNOT_EXECUTED -> TIMED_OUT
ThreadPoolScheduler#createWorker()新建ThreadPoolWorker类
请求异常
将command丢到线程池并返回FutureTask
构建线程池coresize = cpu数maxsize = Integer.MAX
SynchronousMethodHandler#invoke
Observable.defer(Func0)此实的Func0在那时称作为FuncOne,实现了将五个回调类包装成一个ObserVable
是否短路
HystrixContextSchedulerWorker#schedulecall方法createWorker后会执行返回的schedule方法
Observable.subscribeRxJavaHooks.onObservableStart(font color=\"#99ccff\
circuitBreaker.allowRequest()执行判断逻辑
Observable##unsafeCreatenew Observable<T>(RxJavaHooks.onCreate(f));新建一个observable称为observableTwoonSubscribe = f = onSubscribeLiftOne
HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
使用此方法来判断是否熔断
AbstractCommand#initThreadPool初始化线程池
fireOnCompletedHook
1s内执行完毕
修改threadStateNOT_USING_THREAD -> STARTED
AbstractCommand#executeCommandWithSpecifiedIsolation这里构建一个Observable对象并先执行subscribeOn来决定了发射数据在哪个调度器上执行,查看里面传入的threadPool.getScheduler(Func0)
setterMethodMap的注入写在Feign组件里初始化时根据key获取setter
AbstractCommand#getUserExecutionObservable
HystrixInvocationHandler
超时
HystrixCommand#queue
NOT_EXECUTED
执行fallback降级
OnSubscribeDefer#call这里会执行observableFactory.call();observableFactory = FuncOne
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
HystrixCircuitBreaker.Factory#getInstance1.每个commandKey都有自己的一个熔断器2.如果commandKey不存在熔断器,则构建默认熔断器默认熔断器会对HealthCounts进行订阅。HealthCounts中包含时间窗口内(默认10s钟)请求的总次数、失败次数、失败率
RxJavaHooks.onCreate(f)进去后发现实际返回的类就是f
BlockingOperatorToFuture#toFuture(o)在此执行o.single().subscribe()因为o = observableOne
BlockingObservable#toFuture将o作为参数传入
applyHystrixSemantics最后会回调其中的call方法
worker.schedule接着会执行worker中的schedule方法,worker类就是前面新建的ThreadPoolWorker类
unsubscribeCommandCleanup
f.get()阻塞Future,然后获取结果
fallbackFactory降级对象的工程,fallback工程
AbstractCommand#initCircuitBreaker初始化熔断器
HystrixCommand#getExecutionObservable
HystrixThreadPoolDefault#touchConfig()动态调整最大线程池的数量
启动服务
是
OnSubscribeLift#call因为observableTwo.onSubscribe = onSubscribeLiftOne然后parent.call(st);因为parent = onservableOne = subscribeDeferOne
commandStateNOT_STARTED ->OBSERVABLE_CHAIN_CREATEDFuncOne中的call方法会变更状态
否
RxJavaHooks.onObservableStart(font color=\"#99ccff\
0 条评论
下一页