# 093. 深入分析 hystrix 执行时的 8 大流程步骤以及内部原理

之前几讲,我们用实际的业务背景给了一些可用性的问题; 然后借着那些最最基础的可用性的问题,然后讲解了 hystrix 最基本的支持高可用的技术:资源隔离 + 限流

创建 command,执行这个 command,配置这个 command 对应的 group 和线程池,以及线程池/信号量的容量和大小

我们要去讲解一下,你开始执行这个 command,调用了这个 command 的 execute() 方法以后, hystrix 内部的底层的执行流程和步骤以及原理是什么呢? 在讲解这个流程的过程中,我们会带出来 hystrix 其他的一些核心以及重要的功能

画图分析整个 8 大步骤的流程,然后再对每个步骤进行细致的讲解;

由于是笔记,先贴上完整的图

视频讲解图

官方图 (opens new window) - 英文不好进行了翻译

汉化语句:

  • available in cache? 是否有缓存?
  • circuit breaker open? 断路器是否打开?
  • Semaphore / Thread pool rejected? 信号量/线程池被拒绝?
  • execution fails? 执行失败?
  • fallback successful? fallback 是否执行成?
  • no;failed or not implemented 没有;失败或没有实现
  • report metrics 报告治标
  • calculate circuit health 断路器健康检查计算

下面来逐一讲解每个步骤的原理

# 1. 构建一个 HystrixCommand 或者 HystrixObservableCommand

一个 HystrixCommand 或一个 HystrixObservableCommand 对象,代表了对某个依赖服务发起的一次请求或者调用, 构造的时候,可以在构造函数中传入任何需要的配置参数

  • HystrixCommand:主要用于仅仅会返回一个结果的调用
  • HystrixObservableCommand:主要用于可能会返回多条结果的调用

# 2. 调用 command 的执行方法

执行 Command 就可以发起一次对依赖服务的调用, 要执行 Command,需要在 4 个方法中选择其中的一个:execute()queue()observe()toObservable()

其中 execute()queue() 仅仅对 HystrixCommand 适用

  • execute()

    调用后直接 block 住,属于同步调用,直到依赖服务返回单条结果,或者抛出异常

  • queue()

    返回一个 Future,属于异步调用,后面可以通过 Future 获取单条结果

  • observe()

    订阅一个 Observable 对象,Observable 代表的是依赖服务返回的结果,获取到一个那个代表结果的 Observable 对象的拷贝对象

  • toObservable()

    返回一个 Observable 对象,如果我们订阅这个对象,就会执行 command 并且获取返回结果

返回值 command
K value = command.execute();
Future<K> fValue = command.queue();
Observable<K> ohValue = command.observe();
Observable<K> ocValue = command.toObservable();

注意,上面 4 种结果都依赖 toObservable();这句话怎么理解?

拿 execute 来举例,可以看到源码中的确是使用了 toObservable() 来调用的结果

com.netflix.hystrix.HystrixCommand#execute
public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

com.netflix.hystrix.HystrixCommand#queue
public Future<R> queue() {
    /*
     * The Future returned by Observable.toBlocking().toFuture() does not implement the
     * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
     * thus, to comply with the contract of Future, we must wrap around it.
     */
    final Future<R> delegate = toObservable().toBlocking().toFuture();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 3. 检查是否开启缓存

从这一步开始,进入我们的底层的运行原理啦,了解 hysrix 的一些更加高级的功能和特性

如果这个 command 开启了请求缓存(request cache),而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果

# 4. 检查是否开启了短路器

检查这个 command 对应的依赖服务是否开启了短路器,如果断路器被打开了,那么 hystrix 就不会执行这个 command, 而是直接去执行 fallback 降级机制

# 5. 检查线程池/队列/ semaphore 是否已经满了

如果 command 对应的线程池/队列/ semaphore 已经满了,那么也不会执行 command,而是直接去调用 fallback 降级机制

# 6. 执行 command

调用 HystrixObservableCommand.construct() 或 HystrixCommand.run() 来实际执行这个 command

  • HystrixCommand.run() 是返回一个单条结果,或者抛出一个异常
  • HystrixObservableCommand.construct() 是返回一个 Observable 对象,可以获取多条结果

如果执行超过了 timeout 时长的话,那么 command 所在的线程就会抛出一个 TimeoutException, 如果 timeout 了,也会去执行 fallback 降级机制,而且就不会管 run() 或 construct() 返回的值了

这里要注意的一点是,我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个 TimeoutException, 但是还是可能会因为严重延迟的调用线程占满整个线程池的

对于上面一段话,本人知识储备不能很好的理解这一段话, hystrix 抛出了一个超时异常,但是对应的线程可能被卡住回不来? 这里的细节有点懵逼

如果没有 timeout 的话,那么就会拿到一些调用依赖服务获取到的结果,然后 hystrix 会做一些 logging 记录和 metric 统计

# 7. 短路健康检查

Hystrix 会将每一个依赖服务的调用成功、失败、拒绝、超时、等事件,都会发送给 circuit breaker 断路器, 短路器就会对调用成功/失败/拒绝/超时等事件的次数进行统计

短路器会根据这些统计次数来决定是否要进行短路,如果打开了短路器,那么在一段时间内就会直接短路, 然后如果在之后第一次检查发现调用成功了,就关闭断路器

# 8. 调用 fallback 降级机制

在以下几种情况中,hystrix 会调用 fallback 降级机制:

  • run() 或 construct() 抛出一个异常
  • 短路器打开
  • 线程池/队列/ semaphore 满了
  • command 执行超时了

即使在降级中,一定要进行网络调用,也应该将那个调用放在一个 HystrixCommand 中,进行隔离

  • 在 HystrixCommand 中,实现 getFallback() 方法,可以提供降级机制
  • 在 HystirxObservableCommand 中,实现一个 resumeWithFallback() 方法,返回一个 Observable 对象,可以提供降级结果

如果 fallback 返回了结果,那么 hystrix 就会返回这个结果

  • 对于 HystrixCommand,会返回一个 Observable 对象,其中会发返回对应的结果
  • 对于 HystrixObservableCommand,会返回一个原始的 Observable 对象

如果没有实现 fallback,或者是 fallback 抛出了异常,Hystrix 会返回一个 Observable,但是不会返回任何数据

不同的 command 执行方式,其 fallback 为空或者异常时的返回结果不同

  • 对于execute():直接抛出异常
  • 对于queue():返回一个 Future,调用 get() 时抛出异常
  • 对于observe():返回一个 Observable 对象,但是调用 subscribe() 方法订阅它时,立即抛出调用者的 onError 方法
  • 对于toObservable():返回一个 Observable 对象,但是调用 subscribe() 方法订阅它时,立即抛出调用者的 onError 方法

# 不同的执行方式

  • execute():获取一个 Future.get(),然后拿到单个结果
  • queue():返回一个 Future
  • observer():立即订阅 Observable,然后启动 8 大执行步骤,返回一个拷贝的 Observable,订阅时立即回调给你结果
  • toObservable():返回一个原始的 Observable,必须手动订阅才会去执行 8 大步骤

TIP

官网教程机翻之后,最后发现本笔记和官网教程的相似程度居然在 80% 以上