16.线程池设计核心原理
# 3.16 线程池设计核心原理
📍 本篇位置:第 3 卷 · 并发之道 · 第 16 篇 🎯 核心矛盾:
executor.execute(task)一行 API 看似平淡——但它的内部是一套精密的有限状态机,涉及位运算、CAS、自旋、状态流转、Worker 复用、队列协作。读懂这套机制,才算真正理解 JUC 的设计精髓 🧭 设计灵魂:线程池本质是一个精心调谐的并发状态机——它用一个int同时编码"5 个状态 + 工作线程数",用 Worker 实现"线程的复用 + 不可重入的精妙设计",用拒绝策略给系统留下"最后的退路" 🌐 跨平台覆盖:Java JUC ThreadPoolExecutor(源码级)· Netty EventLoopGroup · Tomcat StandardThreadExecutor · Go runtime GMP(隐式线程池)· .NET ThreadPool · Python concurrent.futures 🔗 延伸阅读:← 3.15 线程池的设计思想 · → 3.17 线程池使用技巧 · → 3.18 结构化并发设计思想 · → 3.13 协程核心设计思想
上一篇我们看到了线程池"为什么需要"——池化思想是工程界半个世纪的真理。本篇要解决的是更硬核的问题:Java 的 ThreadPoolExecutor 凭什么被誉为"并发设计的教科书"?它内部那个看似平凡的
int ctl变量,为什么 Doug Lea 用了几年才设计完?本篇从一个 5 万 QPS 的真实事故切入,把 ThreadPoolExecutor 拆到源码级——位运算、状态机、Worker 复用、拒绝策略。读完你会明白:API 越简单,背后的设计越精密。
# 目录介绍
- 00.真实事故引入
- 01.Executor 框架的设计哲学
- 02.ThreadPoolExecutor源码级解剖
- 03.execute流程的三段论设计
- 04.Worker内部类:复用与不可重入
- 05.状态流转:5状态4转换状态机
- 06.跨语言线程池实现对照
- 07.源码级经典陷阱
- 08.一句话总结
# 00.真实事故引入
# 0.1 凌晨3点:execute卡了30秒
我曾负责一个金融交易系统,用 Java 写的订单匹配引擎。某次大促夜里,凌晨 3 点突然告警:
03:00:00 订单提交成功率从 99.99% 跌到 73%
03:00:30 P99 延迟从 50ms 飙到 30 秒
03:01:00 上游网关熔断,业务受损
03:05:00 我们的 SRE 把交易服务全部重启,业务恢复
2
3
4
排查过程极其曲折。最终定位到一段"看起来非常无害"的代码:
// 用于发送交易回执
ExecutorService notifyPool = Executors.newFixedThreadPool(50);
// 业务路径里:
public void onOrderMatched(Order order) {
matchEngine.process(order); // 匹配
notifyPool.execute(() -> sendNotification(order)); // 异步发回执
notifyPool.execute(() -> updateStats(order)); // 异步更新统计
notifyPool.execute(() -> auditLog(order)); // 异步审计
}
2
3
4
5
6
7
8
9
10
业务方都觉得很合理——异步发回执、异步更新统计、异步审计,主流程只做核心匹配。
但真相是:
1. 大促期间 sendNotification 调用的下游短信网关变慢(5秒/次)
2. notifyPool 50 个线程很快全部卡在短信调用上
3. 后续 execute 进入"队列"——但 Executors.newFixedThreadPool 的队列是 LinkedBlockingQueue(无界!)
4. 队列在 2 分钟内堆积了 200 万个任务
5. JVM 内存被任务对象吃光 → Full GC 风暴 → STW 几十秒
6. 主线程的 execute() 调用看似只是"加入队列",但因为 STW,卡了 30 秒
2
3
4
5
6
根因有三层:
表层:业务下游短信网关变慢
中层:Executors.newFixedThreadPool 用了无界队列
深层:execute() 在 GC 时不可中断,主线程被牵连
2
3
修复后,我们彻底告别了 Executors 工厂方法:
// ❌ 危险:无界队列
ExecutorService pool = Executors.newFixedThreadPool(50);
// ✅ 显式指定所有参数
ExecutorService pool = new ThreadPoolExecutor(
50, // corePoolSize
100, // maximumPoolSize
60L, TimeUnit.SECONDS, // keepAlive
new ArrayBlockingQueue<>(10000), // ★ 有界队列!
new ThreadFactoryBuilder()
.setNameFormat("notify-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // ★ 拒绝策略
);
2
3
4
5
6
7
8
9
10
11
12
13
14
这次事故让我意识到:ExecutorService 这个 API 的"简单"是骗人的——真正用对它需要理解七八个参数的物理含义、状态机的流转、拒绝策略的取舍。
# 0.2 灵魂三问
这次事故让我反复追问:
- 为什么 Doug Lea 设计的
ThreadPoolExecutor用一个int ctl同时表达"状态 + 线程数"?这看起来很 hack,是不是有什么不得不这么做的理由? —— 这个看似奇怪的设计背后有什么物理约束? - Worker 类内部为什么要继承 AQS 实现一个不可重入锁?为什么不直接用 ReentrantLock? —— 这个反直觉的选择有什么深层原因?
- 为什么
execute()流程要分"三段论"(核心线程 → 队列 → 救急线程)而不是更简单的"线程不够就开"? —— 这个看似复杂的判断顺序是必然的吗?
# 0.3 五个层层递进的追问
要把线程池讲透,需要先回答 5 个递进问题:
- execute() 到底做了什么? —— 加入队列还是直接交线程?
- Worker 是什么? —— 它和 Thread 是什么关系?
- 状态怎么流转? —— shutdown 之后还能 execute 吗?
- 队列满了怎么办? —— 拒绝策略的设计权衡
- 谁来真正终止线程池? —— TIDYING/TERMINATED 状态的意义
这 5 个问题,构成了本篇的全部主线。
# 0.4 探索路径
flowchart LR
A[execute 怎么工作] --> B[ctl 状态变量<br/>位运算编码]
B --> C[execute 三段论<br/>核心/队列/救急]
C --> D[Worker 内部类<br/>复用 + AQS]
D --> E[状态机流转<br/>5 状态 4 转换]
E --> F[拒绝策略<br/>4 大策略]
style B fill:#cfe2ff
style D fill:#d4edda
style F fill:#fff3cd
2
3
4
5
6
7
8
9
10
# 0.5 为什么这个问题值得讲透
我想抛三个问题:
- 为什么
Executors.newFixedThreadPool是被 Effective Java、阿里规约、Google Java Style 同时禁用的"反模式"? —— 因为它的 LinkedBlockingQueue 是无界的,是内存炸弹。 - 为什么
corePoolSize == maximumPoolSize时,"keepAliveTime" 参数完全没意义? —— 因为 keepAlive 只对"超出 core 的线程"生效。 - 为什么 ThreadPoolExecutor 的源码注释长达 1500 行,被并发圈称为"必读文献"? —— 因为它是 Doug Lea 在并发设计领域的集大成之作。
读完本章你会懂:线程池不是"启动 N 个线程"——是 Java 并发设计的浓缩教科书。
# 01.Executor 框架的设计哲学
# 1.1 设计由来:Thread到Executor
Java 1.0 时代,所有人都直接用 Thread:
// Java 1.0 风格
new Thread(() -> {
process(req);
}).start();
2
3
4
问题立刻暴露:
1. Thread 是 OS 资源,创建昂贵(~1ms)
2. 没有数量限制——来 1 万请求就开 1 万线程→OOM
3. 没有任务队列——线程满了任务无处放
4. 不能重用——每个 Thread 用完即弃
5. 没有生命周期管理——shutdown 谁来负责
2
3
4
5
Doug Lea 在 JSR-166(2004 年)提出 Executor 框架——核心思想是:
把"任务的提交"和"任务的执行"解耦。
// Java 5+ 风格
ExecutorService executor = Executors.newFixedThreadPool(50);
executor.execute(() -> process(req)); // 提交任务
// 至于这个任务什么时候、由哪个线程执行——你不用管
2
3
4
这是面向对象设计原则在并发领域的应用——单一职责:
任务(Runnable):只描述"做什么"
执行器(Executor):只决定"怎么调度"
2
# 1.2 框架层次结构
classDiagram
class Executor {
<<interface>>
+execute(Runnable)
}
class ExecutorService {
<<interface>>
+submit(Callable) Future
+shutdown()
+awaitTermination()
}
class ScheduledExecutorService {
<<interface>>
+schedule(Runnable, delay)
+scheduleAtFixedRate(...)
}
class AbstractExecutorService
class ThreadPoolExecutor {
+execute(Runnable)
-ctl: AtomicInteger
-workers: HashSet
-workQueue: BlockingQueue
}
class ScheduledThreadPoolExecutor
class ForkJoinPool
Executor <|.. ExecutorService
ExecutorService <|.. ScheduledExecutorService
ExecutorService <|.. AbstractExecutorService
AbstractExecutorService <|-- ThreadPoolExecutor
ScheduledExecutorService <|.. ScheduledThreadPoolExecutor
ThreadPoolExecutor <|-- ScheduledThreadPoolExecutor
AbstractExecutorService <|-- ForkJoinPool
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
层次设计的智慧:
Executor → 最简:只能 execute
ExecutorService → 加上 submit + Future + lifecycle
ScheduledES → 加上定时调度
接口逐层加能力,实现一个 ThreadPoolExecutor 自动满足所有需求
2
3
4
5
这是 SOLID 中"接口隔离原则(ISP)"的完美范例——客户端只依赖自己用得到的接口。
# 1.3 接口设计的精妙
Runnable vs Callable:
@FunctionalInterface
public interface Runnable {
void run(); // 没有返回值,没有 checked 异常
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception; // 有返回值,可抛 checked
}
2
3
4
5
6
7
8
9
为什么需要两个?
Runnable 是 Java 1.0 就有的——和 Thread 关联
Callable 是 1.5 引入的——为线程池设计
Runnable 的限制:
没法返回结果
没法抛 checked exception
→ 不适合"任务"语义
Callable 解决了这两个问题
2
3
4
5
6
7
8
9
Future 的设计:
public interface Future<V> {
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws ...;
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}
2
3
4
5
6
7
Future 是"未来结果的占位符"——这就是 §0.5 第二题的答案:线程池让"任务执行"和"结果获取"在时间上解耦。
# 1.4 ThreadPoolExecutor的7个参数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
2
3
4
5
6
7
8
9
这 7 个参数共同决定了线程池的全部行为——下一节会逐一展开。
# 02.ThreadPoolExecutor源码级解剖
# 2.1 ctl变量:状态加线程数合一个int
打开 ThreadPoolExecutor 源码,第一行核心代码:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3 = 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 2^29 - 1
// 5 个状态,每个用 3 位高位编码
private static final int RUNNING = -1 << COUNT_BITS; // 高 3 位 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高 3 位 000
private static final int STOP = 1 << COUNT_BITS; // 高 3 位 001
private static final int TIDYING = 2 << COUNT_BITS; // 高 3 位 010
private static final int TERMINATED = 3 << COUNT_BITS; // 高 3 位 011
// 解码方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
§0.2 第一题的答案——为什么用一个 int 同时编码两个值?
| 维度 | 用两个 AtomicInteger | 用一个 ctl |
|---|---|---|
| 原子性 | ❌ 无法原子地"同时"修改 | ✅ 一次 CAS 同时改两个 |
| 一致性 | ❌ 可能"状态变了但线程数没改"的中间态 | ✅ 状态和线程数永远一致 |
| 空间 | 16 字节(两个 AtomicInteger) | 4 字节 |
| 性能 | 两次 CAS | 一次 CAS |
关键洞察:很多状态机的 bug 发生在"中间态"——A 改了状态但还没来得及改线程数,B 看到了不一致的快照。Doug Lea 用位运算把它们绑成一个原子单元,从根本上消除中间态。
这种设计的代价:代码可读性下降——但换来了绝对的并发正确性。
# 2.2 5 种状态的物理含义
stateDiagram-v2
[*] --> RUNNING: new ThreadPoolExecutor()
RUNNING --> SHUTDOWN: shutdown()
RUNNING --> STOP: shutdownNow()
SHUTDOWN --> STOP: shutdownNow()
SHUTDOWN --> TIDYING: workQueue.isEmpty() ∧ workerCount=0
STOP --> TIDYING: workerCount=0
TIDYING --> TERMINATED: terminated() 钩子返回
TERMINATED --> [*]
2
3
4
5
6
7
8
9
| 状态 | 接受新任务 | 处理队列任务 | 中断运行中线程 |
|---|---|---|---|
| RUNNING | ✅ | ✅ | ❌ |
| SHUTDOWN | ❌(拒绝) | ✅(继续处理) | ❌ |
| STOP | ❌ | ❌(清空) | ✅(中断信号) |
| TIDYING | ❌ | ❌ | ❌(已无线程) |
| TERMINATED | ❌ | ❌ | ❌(已结束) |
有趣的设计:
SHUTDOWN:仁慈关闭——已提交的任务还会执行完
STOP:暴力关闭——立即返回未执行的任务,并中断正在执行的
2
为什么需要两种?因为业务场景不同:
银行系统:必须用 shutdown()——交易任务不能丢
压测工具:可以用 shutdownNow()——立即停止
2
# 2.3 状态转换的精妙
关键问题:状态转换是怎么原子地发生的?
看 tryTerminate() 源码(简化):
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE); // 唤醒一个空闲 worker,让它去检查
return;
}
// 所有 worker 都退出,且队列空 → 推进到 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 钩子方法,子类可以覆盖
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll(); // 唤醒所有等 awaitTermination 的线程
}
return;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
几个精妙的细节:
1. 自旋 + CAS:保证状态推进的原子性
2. interruptIdleWorkers(ONLY_ONE):只唤醒一个 worker——避免"惊群"
3. terminated() 钩子:让子类可以做最终清理
4. signalAll:精确唤醒等待终止的线程
2
3
4
# 03.execute流程的三段论设计
# 3.1 三段论的算法
execute() 是线程池最核心的方法。看简化的源码:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// ========== 第一段:尝试用核心线程 ==========
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // true = core
return;
c = ctl.get(); // 失败,重读
}
// ========== 第二段:尝试入队列 ==========
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command); // 入队后状态变了,回滚
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 防御性:保证至少有一个 worker 在跑队列
}
// ========== 第三段:尝试救急线程(≤maximumPoolSize)==========
else if (!addWorker(command, false))
reject(command);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 3.2 三段论的设计哲学
§0.2 第三题。为什么是"核心 → 队列 → 救急"这个顺序?
flowchart TB
A[新任务] --> B{核心线程<br/>没满?}
B -->|未满| B1[创建核心线程<br/>立即执行]
B -->|已满| C{队列<br/>没满?}
C -->|未满| C1[入队等待]
C -->|已满| D{maxPool<br/>没到顶?}
D -->|没到| D1[创建救急线程<br/>立即执行]
D -->|到顶| E[拒绝策略]
style B1 fill:#d4edda
style C1 fill:#fff3cd
style D1 fill:#cfe2ff
style E fill:#f8d7da
2
3
4
5
6
7
8
9
10
11
12
13
这个顺序背后是工程权衡:
为什么先核心线程?
核心线程不会被回收——长期存在
新任务来时优先用它们 → 避免反复创建/销毁线程
2
为什么队列在中间?
队列比线程便宜——一个对象引用 vs 一个 OS 线程
让队列吸收"瞬时洪峰" → 避免疯狂创建线程
2
为什么救急线程在最后?
救急线程一旦创建就消耗资源
只有"队列满了说明确实超出处理能力" → 才创建
keepAliveTime 后自动回收 → 不长期占用
2
3
这个设计的反直觉之处:
直觉以为:先开线程到 max,再入队
实际是: 先到 core → 入队 → 才到 max
→ 默认"队列优先",因为入队比开线程便宜
2
3
4
# 3.3 队列饱和的"三段论"反作用
但这个设计有个坑——如果你用 LinkedBlockingQueue 不指定容量(默认 Integer.MAX_VALUE):
new ThreadPoolExecutor(
10, // core
100, // max
60, SECONDS,
new LinkedBlockingQueue<>() // ❌ 无界!
);
2
3
4
5
6
结果:第二段永远不会满 → 第三段(max=100 的救急线程)永远用不上 → maximumPoolSize 完全没意义。
这就是 §0.5 第一题的答案——Executors.newFixedThreadPool 内部就是这个配置:
public static ExecutorService newFixedThreadPool(int n) {
return new ThreadPoolExecutor(
n, n, // ★ core == max,keepAlive 也无意义
0L, MILLISECONDS,
new LinkedBlockingQueue<Runnable>() // ★ 无界!
);
}
2
3
4
5
6
7
两个致命问题:
1. 队列无界 → 任务无限堆积 → OOM
2. core == max → keepAlive 无意义 → 无法应对突发流量
2
所以:生产环境永远不要用 Executors 工厂方法。
# 04.Worker内部类:复用与不可重入
# 4.1 Worker 是什么
Worker 是 ThreadPoolExecutor 的核心私有内部类:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 抑制 interrupt 直到 runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// AQS 实现(不可重入锁)
protected boolean isHeldExclusively() { return getState() != 0; }
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Worker 同时是三种东西:
- Runnable:能被 Thread 执行
- AQS 子类:自带一把锁
- Thread 的容器:持有一个 Java Thread 实例
# 4.2 runWorker:复用线程的核心
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// ★ 核心:循环从队列取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// ... 中断检查、状态检查 ...
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // ★ 执行任务
} catch (Throwable x) {
thrown = x; throw x;
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
核心设计——Worker 是个永不停机的循环:
1. 从队列 getTask() 取任务
2. lock() → run() → unlock()
3. 回到 1,继续取下一个
4. getTask() 返回 null(线程要回收)→ 退出循环 → processWorkerExit
2
3
4
这就是"线程复用"的物理实现——一个 OS 线程跑一个 while 循环,无限处理任务。
# 4.3 Worker为什么用不可重入锁
§0.2 第二题。为什么不直接用 ReentrantLock?
关键代码:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { // ★ tryLock
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne) break;
}
} finally {
mainLock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
w.tryLock() 的意义:
Worker 在执行任务时持有自己的锁(runWorker 里的 w.lock())
shutdown 想中断"空闲"worker 时,需要 tryLock() 判断它是不是空闲
如果用可重入锁:
shutdown 线程也是某个 worker → 它能 lock 自己 → 误判为"空闲"
用不可重入锁:
shutdown 永远 tryLock 失败(因为别人持有)→ 正确判断
2
3
4
5
6
7
8
这是用不可重入特性精确表达"我在跑任务"信号。Doug Lea 把"锁"用作了"状态标记"——这是并发设计的高级技巧。
# 4.4 Worker隐藏陷阱:状态-1
注意 Worker 构造函数:
Worker(Runnable firstTask) {
setState(-1); // ← 这一行很神秘
...
}
2
3
4
为什么把 AQS 状态设成 -1?
默认 state=0 → tryLock 会成功 → 视为"空闲"
state=-1 → tryLock 失败(CAS 0→1 不成功)→ 视为"忙碌"
构造期间:worker 还没真正启动
此时如果 shutdown 误以为它"空闲"并 interrupt → 还没 run 就被中断
2
3
4
5
runWorker 的第一行就把它"释放":
w.unlock(); // setState(0) → 现在才允许 interrupt
这是一个抑制"出生即死"的精妙设计。
# 05.状态流转:5状态4转换状态机
# 5.1 完整状态图
stateDiagram-v2
[*] --> RUNNING: new ThreadPoolExecutor()
RUNNING --> SHUTDOWN: shutdown()
RUNNING --> STOP: shutdownNow()
SHUTDOWN --> STOP: shutdownNow()
SHUTDOWN --> TIDYING: workQueue.isEmpty() ∧ workerCount=0
STOP --> TIDYING: workerCount=0
TIDYING --> TERMINATED: terminated() 钩子返回
TERMINATED --> [*]
2
3
4
5
6
7
8
9
# 5.2 shutdown vs shutdownNow
// shutdown:温和关闭
public void shutdown() {
advanceRunState(SHUTDOWN); // RUNNING → SHUTDOWN
interruptIdleWorkers(); // 中断空闲 worker
onShutdown();
tryTerminate();
}
// shutdownNow:暴力关闭
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
advanceRunState(STOP); // → STOP
interruptWorkers(); // 中断所有 worker(包括正在执行的)
tasks = drainQueue(); // 把队列里的任务返回
tryTerminate();
return tasks;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
核心差异:
| shutdown | shutdownNow | |
|---|---|---|
| 新任务 | 拒绝 | 拒绝 |
| 队列任务 | 继续执行 | 立即返回 |
| 正在执行的任务 | 继续到结束 | interrupt 信号 |
| 返回值 | void | 未执行的任务列表 |
# 5.3 awaitTermination等待真正终止
public boolean awaitTermination(long timeout, TimeUnit unit) {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0L) return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
典型生产代码:
executor.shutdown();
try {
if (!executor.awaitTermination(60, SECONDS)) {
executor.shutdownNow(); // 优雅关闭超时 → 暴力关闭
if (!executor.awaitTermination(10, SECONDS)) {
log.error("Pool did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
2
3
4
5
6
7
8
9
10
11
12
这是关闭线程池的"三段式"标准做法——温和 → 暴力 → 报警。
# 5.4 拒绝策略4种内置+自定义
// 1. AbortPolicy(默认):抛异常
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException(...);
}
}
// 2. DiscardPolicy:静默丢弃
public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// do nothing
}
}
// 3. DiscardOldestPolicy:丢弃最老的
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // 丢弃队列头
e.execute(r); // 再次提交
}
}
}
// 4. CallerRunsPolicy:调用方自己跑
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run(); // 在调用 execute 的线程上同步执行
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
4 种策略的工程权衡:
| 策略 | 适用场景 |
|---|---|
| Abort | 默认:让上游知道"系统过载",明确错误 |
| Discard | 不重要的任务(如日志、统计) |
| DiscardOldest | 最新数据更重要(如实时报价) |
| CallerRuns | 背压:让生产方"自己跑",自然限流 |
CallerRunsPolicy 是生产级最佳实践——它实现了天然背压:
生产方调 execute → 线程池满 → CallerRuns 让生产方自己跑这个任务
→ 生产方下次再调 execute 已经变慢了 → 生产速率自动降下来
→ 系统进入稳态,不会雪崩
2
3
# 06.跨语言线程池实现对照
# 6.1 Netty EventLoopGroup
Netty 完全不用 JDK 的 ThreadPoolExecutor,自己实现了 EventLoopGroup:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(...);
2
3
4
5
6
7
为什么 Netty 不用 ThreadPoolExecutor?
ThreadPoolExecutor:
N 个线程从 1 个共享队列取任务
共享队列 → 锁竞争
适合"短任务"
Netty EventLoopGroup:
每个 EventLoop 一个独立线程 + 独立任务队列
Channel 永远绑定到一个 EventLoop(无并发)
→ 完全没有锁竞争!
适合"长连接 + 高吞吐"
2
3
4
5
6
7
8
9
10
这是把"并发"问题转化成"亲缘性(affinity)"问题——同一个 Channel 永远在同一线程上处理,从根本上消除竞争。
# 6.2 Tomcat 线程池防饱和
Tomcat 的 StandardThreadExecutor 修改了 ThreadPoolExecutor 的"三段论":
// Tomcat 的精妙改动
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
@Override
public boolean offer(Runnable o) {
if (parent == null) return super.offer(o);
// ★ 改动:如果还能创建线程,让队列假装"满了"
if (parent.getPoolSize() < parent.getMaximumPoolSize() &&
parent.getSubmittedCount() > parent.getPoolSize()) {
return false;
}
return super.offer(o);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Tomcat 的逻辑:
JDK 默认行为:core 满 → 入队 → 队列满才开 max
Tomcat 行为:core 满 → 队列假装满 → 直接开到 max → 队列才真的入
→ 优先开线程,而不是排队
→ 因为 Web 请求"排队等了 30 秒不如失败重试"
2
3
4
5
这是把"线程池公式"按业务特点重新调谐——Tomcat 知道自己处理的是 Web 请求,所以选了不同的优先级。
# 6.3 Go runtime:隐式线程池
Go 没有显式线程池——但runtime 内部就是一个超级线程池:
runtime 启动时:创建 GOMAXPROCS 个 OS 线程(M)
每个 M 持有一个 P(逻辑处理器)+ 本地 G 队列
go func() 把 G 加入某个 P 的队列
work-stealing:P 队列空时从其他 P "偷"一半
这就是 GMP 模型——一个动态自适应的线程池
2
3
4
5
6
7
对比 Java:
Java:程序员显式创建 ThreadPoolExecutor,调 corePoolSize 等参数
Go: runtime 帮你管,你只需要 go func()
抽象层次的差异 → 程序员心智负担差几个数量级
2
3
4
# 6.4 Python concurrent.futures
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as executor:
future = executor.submit(work, x)
result = future.result()
2
3
4
5
Python 的限制:
GIL 让多线程只能"并发"不能"并行"
所以 Python 线程池只对 IO 密集任务有用
CPU 密集要用 ProcessPoolExecutor(多进程)
2
3
# 6.5 .NET ThreadPool
// 整个 .NET 进程共享一个 ThreadPool
ThreadPool.QueueUserWorkItem(_ => Work());
// 或用更高级的 Task API
Task.Run(() => Work());
2
3
4
5
.NET 的设计:
全局 ThreadPool(单例)
+ 工作窃取(每个线程有本地队列)
+ Hill-climbing 算法动态调整线程数
→ 比 Java 的 ThreadPoolExecutor 更"智能",但灵活性低
2
3
4
5
# 07.源码级经典陷阱
# 7.1 陷阱一:Executors工厂内存炸弹(§0.1)
铁律:永远不用 Executors.newFixedThreadPool / newCachedThreadPool / newSingleThreadExecutor。
// ❌ 三个都是炸弹
Executors.newFixedThreadPool(10); // 队列无界 → OOM
Executors.newCachedThreadPool(); // 线程数 Integer.MAX_VALUE → 创建无数线程
Executors.newSingleThreadExecutor(); // 队列无界 → OOM
// ✅ 永远显式构造
new ThreadPoolExecutor(
coreSize, maxSize, keepAlive, unit,
new ArrayBlockingQueue<>(boundedSize), // ★ 必须有界
threadFactory,
new CallerRunsPolicy() // ★ 必须明确策略
);
2
3
4
5
6
7
8
9
10
11
12
阿里 Java 开发手册原话:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式。这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
# 7.2 陷阱二:core=max让keepAlive无意义
new ThreadPoolExecutor(
10, 10, // core == max
60, SECONDS, // ★ keepAlive 完全没用!
queue
);
2
3
4
5
根因:keepAliveTime 只对"超出 core 的线程"生效——core 内的线程默认永生。
修复:
// 方案 1:开启 allowCoreThreadTimeOut
ThreadPoolExecutor pool = new ThreadPoolExecutor(...);
pool.allowCoreThreadTimeOut(true); // 现在 core 线程也会超时回收
// 方案 2:让 core < max
new ThreadPoolExecutor(5, 50, 60, SECONDS, ...);
2
3
4
5
6
# 7.3 陷阱三:异常吞没
// ❌ 异常无声无息地消失
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.execute(() -> {
throw new RuntimeException("oops"); // 看不到任何异常输出!
});
2
3
4
5
根因:execute 提交的 Runnable 抛异常时,默认会被 ThreadGroup.uncaughtException 处理——通常只打印到 stderr,不会进入业务日志。
修复方案 1:用 submit + Future.get()
Future<?> f = pool.submit(() -> { ... });
try { f.get(); }
catch (ExecutionException e) {
log.error("task failed", e.getCause());
}
2
3
4
5
修复方案 2:覆盖 afterExecute
ThreadPoolExecutor pool = new ThreadPoolExecutor(...) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t == null && r instanceof Future<?>) {
try { ((Future<?>) r).get(); }
catch (Throwable e) { t = e; }
}
if (t != null) log.error("task failed", t);
}
};
2
3
4
5
6
7
8
9
10
修复方案 3:自定义 ThreadFactory 设置 UncaughtExceptionHandler
ThreadFactory tf = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, ex) -> log.error("...", ex));
return t;
};
2
3
4
5
# 7.4 陷阱四:prestartCoreThread意义
问题:默认情况下,core 线程是"懒创建"的——任务来了才创建。这导致冷启动慢。
ThreadPoolExecutor pool = new ThreadPoolExecutor(...);
pool.prestartAllCoreThreads(); // 立即创建所有 core 线程
2
适用场景:
业务高峰前预热:避免高峰时还在"创建线程"
延迟敏感场景:第一个请求不能慢
2
# 7.5 陷阱五:Worker 不可重入锁踩坑
反例:在任务内部调 executor.shutdown():
pool.execute(() -> {
process(data);
pool.shutdown(); // ❌ 这个调用会自我陷入死锁吗?
});
2
3
4
实际不会死锁——shutdown 只是改 ctl 状态 + 中断空闲 worker。当前 worker 持有自己的锁,但 shutdown 用 tryLock 不会阻塞。
真正的坑:在任务内部调 executor.shutdownNow() 然后期望立刻退出——自己中断自己时,要看任务是否响应中断。
# 7.6 陷阱六:拒绝策略选错
// ❌ 关键业务用 DiscardPolicy
new ThreadPoolExecutor(..., new DiscardPolicy());
// 静默丢弃 → 业务方不知道任务失败 → 数据不一致
2
3
铁律:
关键业务(金融、订单)→ AbortPolicy 或 CallerRunsPolicy
非关键业务(日志、统计)→ DiscardPolicy
新数据更重要(实时行情)→ DiscardOldestPolicy
需要背压(生产消费)→ CallerRunsPolicy
2
3
4
# 7.7 陷阱七:监控盲区
线程池的关键指标必须有监控:
ThreadPoolExecutor pool = ...;
scheduledMonitor.scheduleAtFixedRate(() -> {
log.info("pool stats: " +
"active={} ".format(pool.getActiveCount()) +
"size={} ".format(pool.getPoolSize()) +
"completed={} ".format(pool.getCompletedTaskCount()) +
"queue={} ".format(pool.getQueue().size()) +
"rejected={}".format(/* 自定义计数器 */));
}, 0, 10, SECONDS);
2
3
4
5
6
7
8
9
10
关键告警阈值:
活跃线程数 > maxPoolSize × 90% → WARN
队列堆积 > queueCapacity × 80% → WARN
拒绝次数 > 0 → ERROR
平均任务执行时间 > 阈值 → WARN
2
3
4
# 08.一句话总结
# 8.1 三层认知阶梯
第一层(知其然):会用 Executors,会写 execute
↓
第二层(知其所以然):理解 7 大参数、5 个状态、三段论流程、4 种拒绝策略
↓
第三层(知其将所以然):能根据业务设计自定义线程池,能定位线程池故障,能读懂源码
2
3
4
5
读完本章后,你应该能回答开头§0.2 提出的三个问题:
- 为什么用一个 int 编码状态+线程数? → 为了原子地同时改两者,从根本上消除"中间态"——这是 Doug Lea 在并发设计里的核心智慧。
- Worker 为什么用不可重入锁? → 用"锁的持有"作为"忙碌"信号,shutdown 时用 tryLock 区分"空闲 worker"和"忙碌 worker"。可重入锁会让 shutdown 误判。
- 为什么是"核心 → 队列 → 救急"三段论? → 核心线程长期存在不浪费;队列吸收瞬时洪峰最便宜;救急线程只在确实超载时启用。这是工程权衡的最优解。
# 8.2 线程池决策树
flowchart TD
A[需要线程池?] --> B{业务类型?}
B -->|CPU 密集| B1[core=N+1, max=N+1<br/>队列大]
B -->|IO 密集| B2[core=2N, max=4N<br/>队列中等]
B -->|混合| B3[拆分两个池]
B1 --> C{失败容忍度?}
B2 --> C
C -->|不能丢| C1[CallerRunsPolicy<br/>背压]
C -->|可以丢老数据| C2[DiscardOldest]
C -->|可以丢任意| C3[Discard]
style B1 fill:#cfe2ff
style B2 fill:#d4edda
style C1 fill:#fff3cd
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 8.3 七字真言
- 永不用 Executors——用 ThreadPoolExecutor 显式构造。
- 队列必须有界——无界 = 内存炸弹。
- 拒绝策略必明确——默认 Abort 通常不够。
- 监控不可省——队列、活跃数、拒绝数都要看。
- shutdown 三段式——shutdown → awaitTermination → shutdownNow。
- 线程命名要规范——便于 jstack 分析。
- 业务和工具池要分离——避免互相影响。
# 8.4 与下篇的承接
本篇我们看到了线程池内部精密的状态机设计——这是"过去 20 年 Java 并发的集大成之作"。但是,线程池本身有一个根本局限——它管理的是"任务",不是"任务之间的关系"。
ExecutorService pool = ...;
Future<A> a = pool.submit(taskA);
Future<B> b = pool.submit(taskB);
Future<C> c = pool.submit(() -> combine(a.get(), b.get()));
// 谁负责取消所有任务?谁负责等所有任务结束?
// 异常怎么传播?资源怎么清理?
2
3
4
5
6
7
下一篇 3.17 线程池使用技巧 我们会进入实战调优——如何根据业务设计线程池参数、如何避免雪崩、如何做监控。再下一篇 3.18 结构化并发设计思想 会回答"任务关系"这个本质问题。
# 🔗 延伸阅读
- 同卷上篇:3.15 线程池的设计思想
- 同卷下篇:3.17 线程池使用技巧 | 3.18 结构化并发设计思想
- 同卷相关:3.13 协程核心设计思想("线程池的协程版")
- 经典文献:
- Java Concurrency in Practice(Brian Goetz, Doug Lea 等)—— 第 8 章是 ThreadPoolExecutor 的最权威解读
- ThreadPoolExecutor 源码注释(OpenJDK java.util.concurrent.ThreadPoolExecutor.java,1500 行注释本身就是一部论文)
- 阿里巴巴 Java 开发手册(线程池规约)
- Effective Java Item 80(Joshua Bloch,Executors 优于线程)
- Netty in Action(EventLoopGroup 设计的工业典范)