# 106. 生产环境中的线程池自动扩容与缩容的动态资源分配经验

可能会出现一种情况,比如说我们的某个依赖在高峰期,需要耗费 100 个线程,但是在那个时间段,刚好其他的依赖的线程池其实就维持一两个就可以了

但是,如果我们都是设置死的,每个服务就给 10个 线程,那就很坑,可能就导致有的服务在高峰期需要更多的资源,但是没资源了,导致很多的 reject

但是其他的服务,每秒钟就易一两个请求,结果也占用了 10个 线程,占着茅坑不拉屎

可以利用 hystrix 的配置成弹性的线程资源调度的模式

super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test-group"))
        .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                // 配置线程池大小,同时并发能力个数
                .withCoreSize(10)
                // 设置线程池的最大大小,只有在设置 allowMaximumSizeToDivergeFromCoreSize 的时候才能生效
                .withMaximumSize(100)
                // 设置之后,其实 coreSize 就失效了
                .withAllowMaximumSizeToDivergeFromCoreSize(true)
                // 设置保持存活的时间,单位是分钟,默认是 1
                // 当线程池中线程空闲超过该时间之后,就会被销毁
                .withKeepAliveTimeMinutes(1)
                // 配置等待线程个数;如果不配置该项,则没有等待,超过则拒绝
                .withMaxQueueSize(5)
                // 由于 maxQueueSize 是初始化固定的,该配置项是动态调整最大等待数量的
                // 可以热更新;规则:只能比 MaxQueueSize 小,
                .withQueueSizeRejectionThreshold(2)
        )
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

通过以下测试方法

@Test
public void test2() throws InterruptedException {
    int count = 13;
    CountDownLatch downLatch = new CountDownLatch(count);
    for (int i = 0; i < count; i++) {
        int finalI = i;
        new Thread(() -> {
            CommandLimit commandLimit = new CommandLimit();
            String execute = commandLimit.execute();
            System.out.println(Thread.currentThread() + " " + finalI + " : " + execute + "  :  " + new Date());
            downLatch.countDown();
        }).start();
    }
    downLatch.await();
    test3();
    // 休眠一分钟后,再次访问,查看线程池中线程
    TimeUnit.MINUTES.sleep(1);
    test3();
}

@Test
public void test3() throws InterruptedException {
    int count = 13;
    CountDownLatch downLatch = new CountDownLatch(count);
    for (int i = 0; i < count; i++) {
        int finalI = i;
        new Thread(() -> {
            CommandLimit commandLimit = new CommandLimit();
            String execute = commandLimit.execute();
            System.out.println(Thread.currentThread() + " " + finalI + " : " + execute + "  :  " + new Date());
            downLatch.countDown();
        }).start();
    }
    downLatch.await();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

输出日志

Thread[Thread-5,5,main] 5 : 降级  :  Sun Jun 16 16:19:07 CST 2019
Thread[Thread-11,5,main] 11 : 降级  :  Sun Jun 16 16:19:07 CST 2019
Thread[Thread-6,5,main] 6 : 降级  :  Sun Jun 16 16:19:07 CST 2019
Thread[Thread-1,5,main] 1 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-2,5,main] 2 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-8,5,main] 8 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-0,5,main] 0 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-3,5,main] 3 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-7,5,main] 7 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-4,5,main] 4 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-12,5,main] 12 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-10,5,main] 10 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-9,5,main] 9 : success  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-25,5,main] 12 : 降级  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-17,5,main] 4 : 降级  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-24,5,main] 11 : 降级  :  Sun Jun 16 16:19:08 CST 2019
Thread[Thread-14,5,main] 1 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-13,5,main] 0 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-16,5,main] 3 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-15,5,main] 2 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-18,5,main] 5 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-19,5,main] 6 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-20,5,main] 7 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-21,5,main] 8 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-23,5,main] 10 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-22,5,main] 9 : success  :  Sun Jun 16 16:19:09 CST 2019
Thread[Thread-38,5,main] 12 : 降级  :  Sun Jun 16 16:20:09 CST 2019
Thread[Thread-37,5,main] 11 : 降级  :  Sun Jun 16 16:20:09 CST 2019
Thread[Thread-36,5,main] 10 : 降级  :  Sun Jun 16 16:20:09 CST 2019
Thread[Thread-27,5,main] 1 : success  :  Sun Jun 16 16:20:10 CST 2019
Thread[Thread-28,5,main] 2 : success  :  Sun Jun 16 16:20:10 CST 2019
Thread[Thread-31,5,main] 5 : success  :  Sun Jun 16 16:20:10 CST 2019
Thread[Thread-26,5,main] 0 : success  :  Sun Jun 16 16:20:10 CST 2019
Thread[Thread-30,5,main] 4 : success  :  Sun Jun 16 16:20:10 CST 2019
Thread[Thread-35,5,main] 9 : success  :  Sun Jun 16 16:20:10 CST 2019
Thread[Thread-33,5,main] 7 : success  :  Sun Jun 16 16:20:10 CST 2019
Thread[Thread-29,5,main] 3 : success  :  Sun Jun 16 16:20:10 CST 2019
Thread[Thread-32,5,main] 6 : success  :  Sun Jun 16 16:20:10 CST 2019
Thread[Thread-34,5,main] 8 : success  :  Sun Jun 16 16:20:10 CST 2019

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

三个被降级,因为一次只能最大 10 个线程,会发现所有的线程名称都是新的, 所以这里每一个 command 都是一个新的线程,但是在官网文档中又看到说,在空闲之后, 会把线程销毁。这个就看不太明白是怎么一回事情了

# 配置 withMaximumSize 无效解决

项目中实战 hystrix 的时候发现这里的配置老是不太对, 如下面这个配置,我给了最大线程 100,但是给 50 个并发线程就会大量的降级

super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test-group"))
        .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                // 配置线程池大小,同时并发能力个数
                .withCoreSize(10)
                // 设置线程池的最大大小,只有在设置 allowMaximumSizeToDivergeFromCoreSize 的时候才能生效
                .withMaximumSize(100)
                // 设置之后,其实 coreSize 就失效了
                .withAllowMaximumSizeToDivergeFromCoreSize(true)
                // 设置保持存活的时间,单位是分钟,默认是 1
                // 当线程池中线程空闲超过该时间之后,就会被销毁
                .withKeepAliveTimeMinutes(1)
                // 配置等待线程个数;如果不配置该项,则没有等待,超过则拒绝
                .withMaxQueueSize(5)
                // 由于 maxQueueSize 是初始化固定的,该配置项是动态调整最大等待数量的
                // 可以热更新;规则:只能比 MaxQueueSize 小,
                .withQueueSizeRejectionThreshold(2)
        )
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

这个问题困扰了我很长时间,后来跟踪源码,当 command 被拒绝的时候,会报错

Caused by: java.util.concurrent.RejectedExecutionException: Rejected command because thread-pool queueSize is at rejection threshold.
	at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:103)
1
2

进入源码后会发现,这里会先判定线程池队列大小是否已经超过了 queueSizeRejectionThreshold 大小; 在前面已经学到过该大小是可以动态调整的。含义是调整队列大小;

com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.HystrixContextSchedulerWorker#schedule(rx.functions.Action0)
@Override
public Subscription schedule(Action0 action) {
   if (threadPool != null) {
       if (!threadPool.isQueueSpaceAvailable()) {
           throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
       }
   }
   return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}

com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault#isQueueSpaceAvailable
@Override
public boolean isQueueSpaceAvailable() {
  if (queueSize <= 0) {
      // we don't have a queue so we won't look for space but instead
      // let the thread-pool reject or not
      return true;
  } else {
      return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

那么这里有一个疑问了,为什么会先判断是否已经超了?


这句代码 return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));

继续深入 schedule 方法源码

ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
          FutureTask<?> f = (FutureTask<?>) executor.submit(sa);

1
2
3
4
5
6
7
8

会发现使用了 ThreadPoolExecutor.submit 方法;我在 debug 的时候看过了 ThreadPoolExecutor 的参数,的确是我们设置的; 之前不知道 ThreadPoolExecutor 的用法,所以根本想不到为什么。

百度之后,ThreadPoolExecutor 会检查队列,但是在这之前 hystrix 会先检查一次,所以就导致了还没有进入线程池就报错了;

那么问题是为什么队列里面有这么的线程,而且在日志中看不出来到底有几个线程在执行呢?最简单的回答就是 hystrix 包装了线程池;

最后找到一个根源问题:百度之后,创建 ThreadPoolExecutor 的时候,会传入一个 BlockingQueue,如果使用无限容量的阻塞队列(如 LinkedBlockingQueue)时, 不会创建临时线程(因为队列不会满),所以线程数保持 corePoolSize。 而刚好在 debug 时看到的队列就是 LinkedBlockingQueue;

这个时候就真相大白了;原因就是设置了队列大小!下面使用一个 ThreadPoolExecutor 的测试用例来浮现这个问题

@Test
public void test4() throws InterruptedException, ExecutionException {
    BlockingQueue queue = new SynchronousQueue(); // 1
    queue = new LinkedBlockingDeque<>(20);  // 2
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            10,
            50,
            60,
            TimeUnit.SECONDS,
            queue);
    CountDownLatch c = new CountDownLatch(40);
    IntStream.range(0, 40)
            .parallel()
            .mapToObj(item -> (Runnable) () -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                c.countDown();

            })
            .forEach(item -> threadPoolExecutor.submit(item));
    c.await();
}    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  • 当使用 LinkedBlockingDeque 时,运行后观察控制台,只会发现最多只有 20 个线程被复用;这里的队列大小是 20;
  • 当使用 SynchronousQueue 时,就会瞬间出现 40 个线程

回到最初的问题:配置 withMaximumSize 偶尔有效偶尔无效

解决方案:想使用 withMaximumSize 动态调整线程数量的时候,就不要设置等待队列; 如下

super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetProductCommandGroup"))
                // 不同的线程池
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("GetProductCommand2Pool"))
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                                // 配置线程池大小,同时并发能力个数
                                .withCoreSize(10)
                                // 设置线程池的最大大小,只有在设置 allowMaximumSizeToDivergeFromCoreSize 的时候才能生效
                                .withMaximumSize(50)
                                // 设置之后,其实 coreSize 就失效了
                                .withAllowMaximumSizeToDivergeFromCoreSize(true)
                                // 设置保持存活的时间,单位是分钟,默认是 1
                                // 当线程池中线程空闲超过该时间之后,就会被销毁
                                .withKeepAliveTimeMinutes(1)
                                // 配置等待线程个数;如果不配置该项,则没有等待,超过则拒绝
//                                        .withMaxQueueSize(20)
//                                        .withQueueSizeRejectionThreshold(20)
                        // 由于 maxQueueSize 是初始化固定的,该配置项是动态调整最大等待数量的
                        // 可以热更新;规则:只能比 MaxQueueSize 小,
//                        .withQueueSizeRejectionThreshold(2)
                )

);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

TIP

前面和本章讲解的配置相关实践的内容,这些在 官网文档 (opens new window) 中都有写到,老样子就是全部是英文文档