applyHystrixSemantics.call()
线程
request log 默认启用,但是不做处理
Method(getByid)
RejectedExecutionException
ServerListUpdater
否
是否可以增加额外线程主要配合线程池MaximumSize参数使用
是
FeignLoadBelecer.executeWithLoadBalancer()
FeignLoadBelecer
对应command的超时检查任务被清理掉
SynchronousMethodHandler
Method(list())
初步请求URL的构建GET /getById/1 http/1.1
Future.get()
一个时间窗口期内(默认1s)总请求书小于20
onNext()
从Url中剔除服务名http:///getById/1
根据setterMethodMap构造HystrixCommand
提交command
executeCommandAndObserve()
否 (默认参数初始化的线程池队列肯定不会满,queueSize <= 0 返回true)
markOnCompleted
func0.call()
command
如果MaxQueueSize 不为-1,返回LinkedBlockingQueue,默认返回SynchronousQueue
设置之前创建的五个回调Action0
HystrixInvocationHandler->HystrixCommand.run()
构造LoadBalancerCommand
setRequestContext
熔断器打开了,后续的一些请求就会执行fallback逻辑
给hystrixObservable
HystrixContexSchedulerAction包装为ScheduledAction
如果command 执行完了后会将,isCommandTimedOut->COMPLETED
subscribeToStream()
通过Method 获取Handler
线程执行
markEmits
超过5秒后将状态修改半开状态,然后会尝试去执行一次,如果失败后续又会打开熔断器
cache
wrapWithAllOnNextHooks.call()
每30秒去刷新一次
基于HardCodedTaget处理请求GET http://ServerA/getById/1
构建RibbonRequest
默认一秒钟去检查一个进行线程调度
将Listener 添加到HystrixTimer
订阅
HystrixCircuitBreakerImpl
SynchronousQueue
command 状态不等于OBSERVABLE_CHAIN_CREATED
HystrixCommand.execute()
从队列中提取任务
circuitBreakersByCommand中是否存在HystrixCircuitBreaker
ServiceA 线程池
Observable名称:userObservable
创建worker new HystrixContextSchedulerWorker(actualScheduler.createWorker())
HystrixContexSchedulerAction
HystrixObservableTimeoutOperator超时处理器组件
服务实例
touchConfig()参数调成创建HystrixContextScheduler
LinkedBlockingQueue
调用者
CorePoolSize
通过服务名构建
executeCommandWithSpecifiedIsolation()
submit()
Func0.call()
CAS
ServieA192.168.31.105:8080
maximumPoolSize额外线程
(while)自旋+CAS
new HystrixThreadPoolDefault
toObservable().toBlocking().toFuture()
queue()
MaxQueueSize=-1
是否开启短路
回调函数call()
HystrixInvocationHandler
创建5个组件后面运行时回调
ServiceA线程池
FeignLoadBalancerconnect和read的默认超时1s
DeprecatedOnRunHookApplication
观察者(Observer)
创建5个组件,提供后面回调
terminateCommandCleanup.call()
请求处理
eureka server
ExecutionHookApplication
执行command run
(DomainExtractingSerevrList)ServerList
f.isDone()
Future f包装 delegate
命中缓存
command 状态不等于NOT_STARTED
CachingSpringLoadBalancerFactory
timer线程池是否初始化
URL中获取服务名ServiceA
超时检查
可观察对象(Observable)
HystrixRuntimeException
fallback机制
创建markExceptionThrown内部类发生异常的时候回调
new HystrixCommand
什么都不做
corePoolSize
额外线程,默认空闲一分钟,回收
Observable
ServieA192.168.31.106:8080
创建一个TimerListener
handleTimeoutViaFallback
dynamicCoreSize > dynamicMaximumSize
执行超时逻辑
fireOnCompletedHook.call()
Futuredelegate
command状态修改为=USER_CODE_EXECUTEDD
command状态修改为=OBSERVABLE_CHAIN_CREATED
Subscriber
检查command的timeout状态是否为NOT_EXECUTED
回调
状态检查
回调call()
请求缓存
是否支持排队
RuntimeException
注册、心跳、下线
applyHystrixSemantics.call()->applyHystrixSemantics(_cmd)
HystrixBadRequestException
circuitBreaker.attemptExecution()检查否是开熔断
selectServer()通过ZoneAwareLoadBalancer选择一个server(192.168.31.105:8080)
橘色线条初始化线程池逻辑
handleFallback降级组件
将线程池的状态由NOT_USING_THREAD ->STARTED
future f 对future delegate进行包装,然后实现cancel、以及isDone包装的目的就是因为delegate没办法去终止在执行的线程
检查队列是否满了
第一次拉取
metrics统计信息
key:Method()
IClientConfigribbon 相关配置
Observable名称:execution
ResponseEntityDecoder进行序列化
unsubscribeCommandCleanup.call()
降级处理
HystrixTimeoutException
handleBadRequestByEmittingError
LoadBalancerFeignClient.execute()
initCircuitBreaker
隔离是否采用信号量
ServieA192.168.31.107:8080
HystrixTimer.getInstance().addTimerListener(listener)
定时检查注册表中服务是否宕机
默认配置没有
HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method))
红色线条线程执行逻辑
threadPools是否存在对应服务的线程池
setterMethodMap
ThreadPoolWorker.schedule()
触发熔断开关CLOSED->OPEN设置开启时间
熔断器打开时间超过5s
默认一秒钟调度一次。检查一下command线程池,里面的异步任务是否超时。
ServerOperation.call()针对server发起请求
HystrixContextSchedulerWorker.schedule()
initThreadPool()
创建一个Runnable>listener.tick()
HystrixContexSchedulerAction#Callable.call()
将ZoneAwareLoadBalancer包装为FeignLoadBelecer,创建本地cache
subscribeOn threadPool.getScheduler
http:/192.168.31.105:8080/getById/1
黑色线条构建Observable,提交任务到线程池
handleThreadPoolRejectionViaFallback
ZoneAwareLoadBalancer
队列及线程池是否已满
异常请求比例小于默认配置的50%
command 执行结果
scheduleAtFixedRate
超时、报错、执行fallback机制