# 线程池 - 调优
有两方面:线程数调优、BlockingQueue 调优
# 线程数调优
线程数调优是比较复杂的,过多会造成线程竞争、过少会造成资源浪费,池中的任务耗费的时间和资源都可能不相同,所以同一个线程池在不同时间的线程数量都可能存在差异。 一般来说,可以根据实际的操作因素,计算出一个相对合理的线程数,业界一般把任务分为以下几类:
- CPU 密集型任务:大部分时间都在使用 CPU 进行计算,一般都是大型的复杂型的计算任务,比如挖矿(更多的是使用 GPU 挖矿)
- IO 密集型任务:大部分时间都在和 IO 交互,比如:service 方法操作 5 次数据库(网络 IO)
- 混合型任务:字面意思,实际中多数是混合任务;但是也有一个经验:增删改查类型的任务一般都偏向于 IO 密集型任务
调优公式(经验):
- CPU 密集型任务:N + 1
N 是 CPU 的数量
// 可以通过这种方式获取 CPU 的核数
public static void main(String[] args) {
int i = Runtime.getRuntime().availableProcessors();
System.out.println(i);
}
2
3
4
5
为什么要设置为 N+1? 一般来说是防止某个线程出现异常,导致会有一个 CPU 处于空闲状态。
- IO 密集型任务:2N
因为 IO 密集型任务,在和 IO 交互的时候,CPU 是空闲状态,所以这类任务可以线程数量多一点
- 混合型任务:N * U * (1 + WT/ST)
- N:CPU 核心数量
这个获取方式前面演示过 Runtime.getRuntime().availableProcessors()
- U:目标 CPU 利用率
这个是你期待要让 CPU 利用率达到多少,所以你自己定
- WT:线程等待时间
- ST:线程运行时间
关于 WT 和 ST 如何获取?可以使用 JDK 的工具 jvisualvm,命令行运行它,就会打开图形界面 这里显示的是热点方法(关于这里如何找自己线程池相关的线程池,感觉不是很好找,期待后面课程是否有相关的经验,我这里尝试运行一个线程池,然后也没有看到相关的方法还是红框里面的 ThreadPoolExecutor 的 run 方法耗时,看使用时间,应该是我启动的任务在耗时,不然不可能会有这么大的使用时间) 这里的时间解释:
- 自用时间:就是该方法使用 CPU 的时间,ST
- 总时间:就是该方法运行的时间
- 总时间 - 自用时间:就是线程等待的时间 WT
拿这个来套一下:
N * U * (1 + WT/ST)
8 * 50% * (1 + 89365-14120/89365)
8 * 50% * (1 + 75245/89365)
8 * 0.5 *(1 + 0.84) = 7.36
所以,最终可以给到 7 个线程或则 8 个线程。
从等待时间就可以看出来,当前我选这个例子,偏向于 IO 密集型的(这些等待实际上是线程休眠,一个线程池 demo 测试,前面演示过的例子之一),
根据这个分析来看,算出来的事 7 ~ 8 个数量,可以让 CPU 核数利用率达到 50%
2
3
4
5
6
7
# BlockingQueue 调优
要获取两个信息:
- 单个任务占用内存
- 线程池计划占用内存
然后使用线程池计划占用内存初一单个任务占用内存,就可以得到你的 BlockingQueue 应该设置多大,因为设置太大,会占用过多的内存
# 快速得到优化线程池的参数
package com.az.test;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
/**
* A class that calculates the optimal thread pool boundaries. It takes the
* desired target utilization and the desired work queue memory consumption as
* input and retuns thread count and work queue capacity.
*
* @author Niklas Schlimm
*/
public abstract class PoolSizeCalculator {
/**
* Control variable for the CPU time investigation.
*/
private volatile boolean expired;
/**
* Time (millis) of the test run in the CPU time calculation.
*/
private final long testTime = 3000;
/**
* Calculates the boundaries of a thread pool for a given {@link Runnable}.
* 计算对于给定的任务,其线程池推荐配置
*
* @param targetUtilization the desired utilization of the CPUs (0 <= targetUtilization <= 1),
* 希望达到的CPU占用率,从0到1分别表示0到100%
* @param targetQueueSizeBytes the desired maximum work queue size of the thread pool (bytes)
* 线程池中允许缓存的最多队列消息数据量,以字节为单位,
* 如给线程池分配了10MB的空间用于缓存处于等待队列中的线程,则赋值10000000
*/
protected void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) {
calculateOptimalCapacity(targetQueueSizeBytes);
Runnable task = creatTask();
// warm up phase
start(task);
long cputime = getCurrentThreadCPUTime();
// test intervall
start(task);
cputime = getCurrentThreadCPUTime() - cputime;
long waitTime = (testTime * 1000000) - cputime;
System.out.println("waitTime between tasks created:" + waitTime);
calculateOptimalThreadCount(cputime, waitTime, targetUtilization);
}
/**
* 计算推荐的线程池队列大小
*
* @param targetQueueSizeBytes 线程池允许分配的缓存大小
*/
private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) {
long mem = calculateMemoryUsage();
BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(mem), RoundingMode.HALF_UP);
System.out.println("Target queue memory usage (bytes): " + targetQueueSizeBytes);
System.out.println("createTask() produced " + creatTask().getClass().getName() + " which took " + mem + " bytes in a queue");
System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem);
System.out.println("* Recommended queue capacity (bytes): " + queueCapacity);
}
/**
* Brian Goetz' optimal thread count formula, see 'Java Concurrency in Practice' (chapter 8.2)
* 计算推荐的线程池大小
*
* @param cpu cpu time consumed by considered task
* 该任务要占用的CPU时间
* @param wait wait time of considered task
* 该任务要等待IO的时间
* @param targetUtilization target utilization of the system
* 希望达到的系统CPU利用率
*/
private void calculateOptimalThreadCount(long cpu, long wait, BigDecimal targetUtilization) {
BigDecimal waitTime = new BigDecimal(wait);
BigDecimal computeTime = new BigDecimal(cpu);
BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime().availableProcessors());
//使用公式计算最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
BigDecimal optimalThreadCount = numberOfCPU.multiply(targetUtilization).multiply(new BigDecimal(1).add(waitTime.divide(computeTime, RoundingMode.HALF_UP)));
System.out.println("Number of CPU: " + numberOfCPU);
System.out.println("Target utilization: " + targetUtilization);
System.out.println("Elapsed time (nanos): " + (testTime * 1000000));
System.out.println("Compute time (nanos): " + cpu);
System.out.println("Wait time (nanos): " + wait);
System.out.println("Formula: " + numberOfCPU + " * " + targetUtilization + " * (1 + " + waitTime + " / " + computeTime + ")");
System.out.println("* Optimal thread count: " + optimalThreadCount);
}
/**
* Runs the {@link Runnable} over a period defined in {@link #testTime}.
* Based on Heinz Kabbutz' ideas (http://www.javaspecialists.eu/archive/Issue124.html).
* 这里没搞懂具体干了啥
*
* @param task the runnable under investigation
*/
public void start(Runnable task) {
long start = 0;
int runs = 0;
// Accuracy of test run. It must finish within 20ms of the testTime, otherwise we retry the test. This could be configurable.
int EPSYLON = 20;
do {
if (++runs > 5) {
throw new IllegalStateException("Test not accurate");
}
expired = false;
start = System.currentTimeMillis();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
expired = true;
}
}, testTime);
while (!expired) {
task.run();
}
start = System.currentTimeMillis() - start;
timer.cancel();
} while (Math.abs(start - testTime) > EPSYLON);
collectGarbage(3);
}
/**
* 回收内存垃圾,清理内存
*
* @param times GC次数
*/
private void collectGarbage(int times) {
for (int i = 0; i < times; i++) {
System.gc();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
/**
* Calculates the memory usage of a single element in a work queue. Based on
* Heinz Kabbutz' ideas
* (http://www.javaspecialists.eu/archive/Issue029.html).
* 计算给定的线程需要多少的缓存空间
*
* @return memory usage of a single {@link Runnable} element in the thread pools work queue
*/
public long calculateMemoryUsage() {
//The sample queue size to calculate the size of a single {@link Runnable} element.
int SAMPLE_QUEUE_SIZE = 1000;
//首先清理内存,连续GC 15次,并计算当前服务占用的内存
collectGarbage(15);
long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
//生成线程队列,并添加任务
BlockingQueue<Runnable> queue = createWorkQueue();
for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
queue.add(creatTask());
}
//等待一段时间,让所有线程创建完成
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//再次计算服务当前占用的内存
long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
//单个线程所需的内存=(线程创建后的内存占用-线程创建前的内存占用)/线程数
return (mem1 - mem0) / SAMPLE_QUEUE_SIZE;
}
/**
* 创建可运行的任务,也就是你的业务线程
*
* @return an instance of your runnable task under investigation
*/
protected abstract Runnable creatTask();
/**
* 返回线程池中使用的队列实例
*
* @return queue instance
*/
protected abstract BlockingQueue<Runnable> createWorkQueue();
/**
* 计算当前 cpu 时间。根据使用的操作系统,这里可以使用各种框架。
* (例如。http://www.hyperic.com/products/sigar).
* CPU时间测量越准确,线程计数边界的结果就越准确
* @return current cpu time of current thread
*/
protected abstract long getCurrentThreadCPUTime();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
如何使用该工具类?首先要先搞个实现类
package cn.mrcode.demo.boodadmin;
import java.lang.management.ManagementFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author mrcode
* @date 2023/2/9 22:05
*/
public class MyPoolSizeCalculator extends PoolSizeCalculator{
@Override
protected Runnable creatTask() {
// 你的业务任务
return ()->{
try {
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
}
@Override
protected BlockingQueue<Runnable> createWorkQueue() {
// 线程池中的队列
return new LinkedBlockingDeque<>();
}
@Override
protected long getCurrentThreadCPUTime() {
// CPU 时间,源码中是用来计算任务耗时之类的
return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
运行该类
package cn.mrcode.demo.boodadmin;
import java.math.BigDecimal;
/**
* @author mrcode
* @date 2023/2/9 23:31
*/
public class Test {
public static void main(String[] args) {
PoolSizeCalculator calculator = new MyPoolSizeCalculator();
calculator.calculateBoundaries(
BigDecimal.valueOf(0.5), // 期望 CPU 使用率是 50%
BigDecimal.valueOf(10_000_000) // 队列最多占用内存 10M
);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
测试输出
# 目标队列内存使用量 10_000_000 也就是我们自己设置的 10M
Target queue memory usage (bytes): 10000000
# 创建一个任务在队列中占用 2684 字节
createTask() produced cn.mrcode.demo.boodadmin.MyPoolSizeCalculator$$Lambda$1/6738746 which took 2684 bytes in a queue
Formula: 10000000 / 2684
# 推荐的队列容量是 3726 个
* Recommended queue capacity (bytes): 3726
waitTime between tasks created:2935351000
# CPU 核数是 8 核
Number of CPU: 8
# 期望达到 50% 的使用率
Target utilization: 0.5
# CPU 总时间
Elapsed time (nanos): 3000000000
# CPU 使用时间
Compute time (nanos): 64649000
# CPU 等待时间
Wait time (nanos): 2935351000
# 通过这个公式计算
Formula: 8 * 0.5 * (1 + 2935351000 / 64649000)
# 推荐的线程数量是 184 个
* Optimal thread count: 184.0
# 可以看到:这种任务,大部分时间都在等待(我们休眠的),所以这种又是 IO 密集型的任务
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
根据这个建议,你在创建线程池的时候,就可以考虑使用,比如:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
10,
10L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
2
3
4
5
6
7
8
9
那么就可以修改成
ThreadPoolExecutor executor = new ThreadPoolExecutor(
184,
184, // 最大和最小可以设置成一样的,减少线程创建销毁的开销
10L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3726),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
2
3
4
5
6
7
8
9
:::info 使用工具类的话,大大的简化了调优的难度。 在实际开发中的花,这个任务同样也是需要使用你的真实的任务,这个可能就不太能使用这种 main 方法的方式进行了,更多的可能是考虑集成测试中(完全运行起来程序),然后去触发这个工具计算出来 :::
# 调优步骤
放在实际调优中,还需要结合实际业务场景进行,可以按下面的步骤进行思考:
- 业务评估:评估任务是什么类型的(IO 密集型还是 CPU 密集型、混合型),根据公式预估出线程数量和队列容量
- 结合压测,逐步调整:根据预估值进行减少、或则增加,看看结果,最终得到一个比较合适的配置(这个压测是如何压测?笔者不清楚哇)