# 093. 深入分析 hystrix 执行时的 8 大流程步骤以及内部原理
之前几讲,我们用实际的业务背景给了一些可用性的问题; 然后借着那些最最基础的可用性的问题,然后讲解了 hystrix 最基本的支持高可用的技术:资源隔离 + 限流
创建 command,执行这个 command,配置这个 command 对应的 group 和线程池,以及线程池/信号量的容量和大小
我们要去讲解一下,你开始执行这个 command,调用了这个 command 的 execute()
方法以后,
hystrix 内部的底层的执行流程和步骤以及原理是什么呢?
在讲解这个流程的过程中,我们会带出来 hystrix 其他的一些核心以及重要的功能
画图分析整个 8 大步骤的流程,然后再对每个步骤进行细致的讲解;
由于是笔记,先贴上完整的图
视频讲解图
官方图 (opens new window) - 英文不好进行了翻译
汉化语句:
- available in cache? 是否有缓存?
- circuit breaker open? 断路器是否打开?
- Semaphore / Thread pool rejected? 信号量/线程池被拒绝?
- execution fails? 执行失败?
- fallback successful? fallback 是否执行成?
- no;failed or not implemented 没有;失败或没有实现
- report metrics 报告治标
- calculate circuit health 断路器健康检查计算
下面来逐一讲解每个步骤的原理
# 1. 构建一个 HystrixCommand 或者 HystrixObservableCommand
一个 HystrixCommand 或一个 HystrixObservableCommand 对象,代表了对某个依赖服务发起的一次请求或者调用, 构造的时候,可以在构造函数中传入任何需要的配置参数
- HystrixCommand:主要用于仅仅会返回一个结果的调用
- HystrixObservableCommand:主要用于可能会返回多条结果的调用
# 2. 调用 command 的执行方法
执行 Command 就可以发起一次对依赖服务的调用,
要执行 Command,需要在 4 个方法中选择其中的一个:execute()
、queue()
、observe()
、toObservable()
其中 execute()
和 queue()
仅仅对 HystrixCommand 适用
execute()
调用后直接 block 住,属于同步调用,直到依赖服务返回单条结果,或者抛出异常
queue()
返回一个 Future,属于异步调用,后面可以通过 Future 获取单条结果
observe()
订阅一个 Observable 对象,Observable 代表的是依赖服务返回的结果,获取到一个那个代表结果的 Observable 对象的拷贝对象
toObservable()
返回一个 Observable 对象,如果我们订阅这个对象,就会执行 command 并且获取返回结果
返回值 | command |
---|---|
K | value = command.execute(); |
Future<K> | fValue = command.queue(); |
Observable<K> | ohValue = command.observe(); |
Observable<K> | ocValue = command.toObservable(); |
注意,上面 4 种结果都依赖 toObservable();这句话怎么理解?
拿 execute 来举例,可以看到源码中的确是使用了 toObservable() 来调用的结果
com.netflix.hystrix.HystrixCommand#execute
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
com.netflix.hystrix.HystrixCommand#queue
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 3. 检查是否开启缓存
从这一步开始,进入我们的底层的运行原理啦,了解 hysrix 的一些更加高级的功能和特性
如果这个 command 开启了请求缓存(request cache),而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果
# 4. 检查是否开启了短路器
检查这个 command 对应的依赖服务是否开启了短路器,如果断路器被打开了,那么 hystrix 就不会执行这个 command, 而是直接去执行 fallback 降级机制
# 5. 检查线程池/队列/ semaphore 是否已经满了
如果 command 对应的线程池/队列/ semaphore 已经满了,那么也不会执行 command,而是直接去调用 fallback 降级机制
# 6. 执行 command
调用 HystrixObservableCommand.construct() 或 HystrixCommand.run() 来实际执行这个 command
- HystrixCommand.run() 是返回一个单条结果,或者抛出一个异常
- HystrixObservableCommand.construct() 是返回一个 Observable 对象,可以获取多条结果
如果执行超过了 timeout 时长的话,那么 command 所在的线程就会抛出一个 TimeoutException, 如果 timeout 了,也会去执行 fallback 降级机制,而且就不会管 run() 或 construct() 返回的值了
这里要注意的一点是,我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个 TimeoutException, 但是还是可能会因为严重延迟的调用线程占满整个线程池的
对于上面一段话,本人知识储备不能很好的理解这一段话, hystrix 抛出了一个超时异常,但是对应的线程可能被卡住回不来? 这里的细节有点懵逼
如果没有 timeout 的话,那么就会拿到一些调用依赖服务获取到的结果,然后 hystrix 会做一些 logging 记录和 metric 统计
# 7. 短路健康检查
Hystrix 会将每一个依赖服务的调用成功、失败、拒绝、超时、等事件,都会发送给 circuit breaker 断路器, 短路器就会对调用成功/失败/拒绝/超时等事件的次数进行统计
短路器会根据这些统计次数来决定是否要进行短路,如果打开了短路器,那么在一段时间内就会直接短路, 然后如果在之后第一次检查发现调用成功了,就关闭断路器
# 8. 调用 fallback 降级机制
在以下几种情况中,hystrix 会调用 fallback 降级机制:
- run() 或 construct() 抛出一个异常
- 短路器打开
- 线程池/队列/ semaphore 满了
- command 执行超时了
即使在降级中,一定要进行网络调用,也应该将那个调用放在一个 HystrixCommand 中,进行隔离
- 在 HystrixCommand 中,实现 getFallback() 方法,可以提供降级机制
- 在 HystirxObservableCommand 中,实现一个 resumeWithFallback() 方法,返回一个 Observable 对象,可以提供降级结果
如果 fallback 返回了结果,那么 hystrix 就会返回这个结果
- 对于 HystrixCommand,会返回一个 Observable 对象,其中会发返回对应的结果
- 对于 HystrixObservableCommand,会返回一个原始的 Observable 对象
如果没有实现 fallback,或者是 fallback 抛出了异常,Hystrix 会返回一个 Observable,但是不会返回任何数据
不同的 command 执行方式,其 fallback 为空或者异常时的返回结果不同
- 对于execute():直接抛出异常
- 对于queue():返回一个 Future,调用 get() 时抛出异常
- 对于observe():返回一个 Observable 对象,但是调用 subscribe() 方法订阅它时,立即抛出调用者的 onError 方法
- 对于toObservable():返回一个 Observable 对象,但是调用 subscribe() 方法订阅它时,立即抛出调用者的 onError 方法
# 不同的执行方式
- execute():获取一个 Future.get(),然后拿到单个结果
- queue():返回一个 Future
- observer():立即订阅 Observable,然后启动 8 大执行步骤,返回一个拷贝的 Observable,订阅时立即回调给你结果
- toObservable():返回一个原始的 Observable,必须手动订阅才会去执行 8 大步骤
TIP
官网教程机翻之后,最后发现本笔记和官网教程的相似程度居然在 80% 以上