线程池核心源码设计
# 33.线程池核心源码设计
# 目录介绍
- 10.1 开篇疑问
- 10.2 为什么需要线程池
- 10.3 ThreadPoolExecutor核心参数
- 10.4 源码级工作原理
- 10.5 线程池的生命周期
- 10.6 实践中的参数设置
- 10.7 线程池的异常处理
- 10.8 线程池监控与告警
- 10.9 ScheduledThreadPoolExecutor
- 10.10 ForkJoinPool工作窃取
- 10.11 常见面试深度问题
- 10.12 总结与核心要点
# 10.1 开篇疑问
疑惑:线程池的核心线程数和最大线程数有什么区别?任务提交后先进队列还是先创建线程?线程池里的线程是如何做到"复用"的?Executors 创建的线程池为什么阿里规范不推荐使用?线程池的异常会怎么处理?
答疑:线程池是 Java 并发编程最重要的工具之一。理解它的源码设计,不仅能正确使用线程池,还能深入理解"池化"这种通用设计思想。
# 10.2 为什么需要线程池
# 10.2.1 创建线程的代价
创建一个线程需要:
- 调用操作系统内核 API(从用户态切换到内核态)
- 分配栈内存(默认 1MB,
-Xss参数控制) - 创建和初始化线程上下文(TCB、寄存器上下文等)
- 线程销毁时资源回收
// 测试线程创建的开销
long start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
Thread t = new Thread(() -> {});
t.start();
t.join();
}
long cost = (System.nanoTime() - start) / 1_000_000;
// 约 500-2000ms(每个线程创建+销毁约 50-200μs)
2
3
4
5
6
7
8
9
如果每个请求都创建新线程,1000 QPS 就意味着每秒创建/销毁 1000 个线程,系统无法承受。
# 10.2.2 池化思想
线程池的核心思想:预先创建一组线程,复用它们来执行不同的任务。
传统方式: 请求 → 创建线程 → 执行任务 → 销毁线程
池化方式: 请求 → 从池中取线程 → 执行任务 → 归还线程到池中
2
好处:
- 降低资源消耗:避免频繁创建销毁线程
- 提高响应速度:任务到达时可以立即执行(不用等创建线程)
- 提高可管理性:统一管理、调优、监控线程资源
- 流量控制:通过队列和拒绝策略实现背压,防止系统过载
# 10.3 ThreadPoolExecutor核心参数
# 10.3.1 七大参数详解
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数(常驻线程)
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
2
3
4
5
6
7
8
9
| 参数 | 作用 | 类比 |
|---|---|---|
| corePoolSize | 核心线程,即使空闲也不销毁 | 正式员工 |
| maximumPoolSize | 最大线程数上限 | 正式+临时工 |
| keepAliveTime | 非核心线程空闲超过此时间则销毁 | 临时工合同到期 |
| workQueue | 任务排队的队列 | 等候区 |
| threadFactory | 创建线程的工厂(可指定名称、优先级等) | HR部门 |
| handler | 队列满且线程满时的处理策略 | 拒客策略 |
ThreadFactory 的重要性:
// 不推荐:使用默认 ThreadFactory(线程名无意义)
new ThreadPoolExecutor(..., Executors.defaultThreadFactory(), ...);
// 线程名:pool-1-thread-1, pool-1-thread-2...
// 推荐:自定义 ThreadFactory,方便问题排查
ThreadFactory factory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("order-pool-" + counter.getAndIncrement());
t.setDaemon(false);
t.setUncaughtExceptionHandler((thread, ex) -> {
log.error("线程异常: " + thread.getName(), ex);
});
return t;
}
};
// 或使用 Guava
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("order-pool-%d")
.setUncaughtExceptionHandler((t, e) -> log.error("异常", e))
.build();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 10.3.2 线程创建规则
疑惑:任务来了先创建线程还是先进队列?
论证:线程池的任务处理顺序是:
提交任务
→ 当前线程数 < corePoolSize ?
→ 是:创建核心线程执行任务
→ 否:任务队列 workQueue 满了吗?
→ 没满:放入队列等待
→ 满了:当前线程数 < maximumPoolSize ?
→ 是:创建非核心线程执行任务
→ 否:执行拒绝策略
2
3
4
5
6
7
8
图解:
提交任务
│
┌───────▼───────┐
│ 线程数<核心数? │
└───┬───────┬───┘
是 否
│ │
┌──────▼─┐ ┌──▼────────┐
│创建核心 │ │队列未满? │
│线程执行 │ └──┬────┬───┘
└────────┘ 是 否
│ │
┌──────▼─┐ ┌─▼──────────┐
│放入队列 │ │线程数<最大数│
└────────┘ └──┬────┬────┘
是 否
│ │
┌──────▼─┐ ┌─▼──────┐
│创建非核│ │拒绝策略 │
│心线程 │ └────────┘
└───────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
关键点:是先入队列,再创建非核心线程。而不是先创建到最大线程数。
# 10.3.3 四种拒绝策略
| 策略 | 行为 | 适用场景 |
|---|---|---|
| AbortPolicy(默认) | 抛出 RejectedExecutionException | 重要任务,不能丢 |
| CallerRunsPolicy | 由提交任务的线程自己执行 | 削峰,降低提交速度 |
| DiscardPolicy | 静默丢弃新任务 | 可丢弃的任务 |
| DiscardOldestPolicy | 丢弃队列中最老的任务,再提交新任务 | 时效性要求高 |
// 自定义拒绝策略(实际生产推荐)
RejectedExecutionHandler customHandler = (r, executor) -> {
// 记录日志
log.warn("线程池已满,任务被拒绝: {}", r.toString());
// 持久化到数据库或消息队列
taskBackupService.save(r);
// 告警通知
alertService.alert("线程池已满!");
};
2
3
4
5
6
7
8
9
# 10.3.4 常见的阻塞队列选择
| 队列类型 | 特点 | 适用场景 |
|---|---|---|
| LinkedBlockingQueue | 无界(默认 Integer.MAX_VALUE) | 需要限制大小! |
| ArrayBlockingQueue | 有界,数组实现 | 通用场景 |
| SynchronousQueue | 容量为0,直接传递 | CachedThreadPool |
| PriorityBlockingQueue | 优先级排序 | 有优先级的任务 |
| DelayQueue | 延迟获取 | 定时任务 |
// LinkedBlockingQueue 必须指定容量!
new LinkedBlockingQueue<>(1000); // ← 正确
new LinkedBlockingQueue<>(); // ← 危险!默认 Integer.MAX_VALUE
// SynchronousQueue 的特点:
// 每个 put 操作必须等待一个 take 操作
// 不存储元素,直接传递任务给线程
// CachedThreadPool 使用它:任务来了立即创建线程(如果没有空闲线程)
2
3
4
5
6
7
8
# 10.4 源码级工作原理
# 10.4.1 ctl状态控制字段
线程池用一个 AtomicInteger 同时存储运行状态和线程数:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 高3位:线程池状态
// 低29位:线程数量(最大约5亿)
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 0x1FFFFFFF
// 线程池状态
RUNNING = -1 << 29 // 111:接受新任务,处理队列任务
SHUTDOWN = 0 << 29 // 000:不接受新任务,处理队列任务
STOP = 1 << 29 // 001:不接受,不处理,中断进行中的
TIDYING = 2 << 29 // 010:所有任务终止,workerCount=0
TERMINATED = 3 << 29 // 011:terminated() 执行完
// 拆分方法
private static int runStateOf(int c) { return c & ~COUNT_MASK; } // 高3位
private static int workerCountOf(int c) { return c & COUNT_MASK; } // 低29位
private static int ctlOf(int rs, int wc) { return rs | wc; } // 合并
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
设计精髓:用一个原子变量同时维护两个信息,避免了多变量之间的一致性问题。一个 CAS 操作就能同时更新状态和线程数。
# 10.4.2 execute提交流程
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 步骤1:线程数 < 核心线程数,创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // true=核心线程
return;
c = ctl.get(); // 添加失败(并发竞争),重新读取 ctl
}
// 步骤2:核心线程已满,尝试放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查:放入队列后,线程池可能被关闭了
if (!isRunning(recheck) && remove(command))
reject(command); // 线程池已关闭,从队列移除并拒绝
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 确保有线程处理队列任务
}
// 步骤3:队列已满,尝试创建非核心线程
else if (!addWorker(command, false)) // false=非核心线程
reject(command); // 线程数已达最大,执行拒绝策略
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
步骤2中 workerCountOf(recheck) == 0 的含义:
// 场景:核心线程数为0(如 CachedThreadPool)
// 或者 allowCoreThreadTimeOut=true 且所有核心线程都已超时退出
// 此时任务放入了队列,但没有线程来消费
// 所以需要创建一个线程(firstTask=null,从队列取任务执行)
2
3
4
# 10.4.3 addWorker源码深度分析
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 检查线程池状态:SHUTDOWN 及以上状态不接受新任务
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// 检查线程数是否超过限制
if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// CAS 增加线程数
if (compareAndIncrementWorkerCount(c))
break retry; // CAS 成功,跳出外层循环
c = ctl.get();
// CAS 失败,检查状态是否变化
if (runStateAtLeast(c, SHUTDOWN))
continue retry; // 状态变了,重新检查外层
// 状态没变,只是线程数竞争失败,重试内层 CAS
}
}
// ctl 线程数已经 +1,开始真正创建线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建 Worker
final Thread t = w.thread; // Worker 内部有一个 Thread
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
workers.add(w); // 加入 workers 集合
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 记录最大线程数
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程!
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w); // 失败回滚:线程数 -1,从 workers 移除
}
return workerStarted;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 10.4.4 Worker线程的运行
每个线程被封装为 Worker 对象(继承了 AQS,实现了不可重入锁):
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
Thread thread; // 工作线程
Runnable firstTask; // 第一个任务(可能为 null)
volatile long completedTasks; // 完成的任务数
Worker(Runnable firstTask) {
setState(-1); // 禁止在 start 前被中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// 不可重入锁实现(用于判断线程是否正在执行任务)
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Worker 继承 AQS 的原因:
// Worker 实现了一个不可重入的互斥锁:
// 1. 线程正在执行任务时 → state=1(锁定状态)
// 2. 线程空闲等待任务时 → state=0(未锁定状态)
// 3. shutdown() 时只中断空闲线程(tryLock() 成功才中断)
// 4. 正在执行任务的线程不会被 shutdown() 中断
// 为什么不用 ReentrantLock?
// 因为 ReentrantLock 可重入,一个线程可以多次获取锁
// 而 Worker 需要不可重入:防止 shutdownNow 时重复中断
2
3
4
5
6
7
8
9
# 10.4.5 线程复用的秘密
疑惑:线程执行完一个任务后不是就结束了吗?怎么复用?
论证:秘密在 runWorker() 方法中。线程执行完第一个任务后,不会结束,而是进入循环,从队列中不断取任务执行:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断(将 state 从 -1 设为 0)
boolean completedAbruptly = true;
try {
// 核心循环:执行 firstTask 或从队列取任务
while (task != null || (task = getTask()) != null) {
w.lock(); // 标记线程正在执行任务
// 如果线程池正在停止,确保线程被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子方法(可重写)
try {
task.run(); // 执行用户的任务!
afterExecute(task, null); // 钩子方法
} catch (Throwable ex) {
afterExecute(task, ex); // 异常也会调用
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock(); // 标记线程空闲
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 线程退出处理
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 10.4.6 getTask与线程回收
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
// 线程池已 SHUTDOWN 且队列为空,或线程池已 STOP → 返回 null
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null; // 返回 null → runWorker 的 while 循环结束 → 线程退出
}
int wc = workerCountOf(c);
// 是否需要超时回收?
// 1. allowCoreThreadTimeOut=true → 核心线程也会超时
// 2. 当前线程数 > corePoolSize → 非核心线程需要超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 超时了或线程数超过最大值 → 尝试减少线程数
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null; // 返回 null → 线程退出
continue;
}
try {
// 关键!
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时等待
workQueue.take(); // 永久等待
if (r != null)
return r;
timedOut = true; // poll 超时返回 null
} catch (InterruptedException retry) {
timedOut = false; // 被中断,重试
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
线程复用本质:
线程执行完任务后不退出,而是在 while 循环中
通过 BlockingQueue.take()/poll() 阻塞等待新任务。
核心线程: workQueue.take() → 永久阻塞,直到有新任务
非核心线程: workQueue.poll(keepAliveTime) → 超时后返回 null → 线程退出
线程一直活着 → 新任务来了 → 继续执行 → 再等待
2
3
4
5
6
7
# 10.5 线程池的生命周期
# 10.5.1 五种状态转换
RUNNING (初始状态) ← 唯一接受新任务的状态
│ shutdown()
↓
SHUTDOWN (不接受新任务,处理队列中的)
│ 队列空 + 线程数为0
↓
TIDYING (所有任务终止)
│ terminated() 回调
↓
TERMINATED (终态)
RUNNING
│ shutdownNow()
↓
STOP (中断所有线程,不处理队列)
│ 线程数为0
↓
TIDYING → TERMINATED
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 10.5.2 shutdown与shutdownNow的区别
| 方法 | 新任务 | 队列中的任务 | 正在执行的任务 | 返回值 |
|---|---|---|---|---|
| shutdown() | 拒绝 | 继续处理 | 等待完成 | void |
| shutdownNow() | 拒绝 | 不处理(返回) | 发送中断 | List<Runnable> |
# 10.5.3 优雅关闭线程池
// 优雅关闭的标准模板
public void gracefulShutdown(ExecutorService executor) {
executor.shutdown(); // 不接受新任务,等待已有任务完成
try {
// 等待一段时间,让任务完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 超时后强制关闭
// 再等待一段时间
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
log.error("线程池无法关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
// Spring 中的线程池关闭
@Bean(destroyMethod = "shutdown")
public ExecutorService executorService() {
return new ThreadPoolExecutor(...);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 10.6 实践中的参数设置
# 10.6.1 CPU密集型任务
特点:大量计算,几乎不阻塞。
// 核心线程数 = CPU核心数 + 1
int cpuCores = Runtime.getRuntime().availableProcessors();
int coreSize = cpuCores + 1; // +1 是为了当某线程偶尔因页缺失等暂停时能充分利用CPU
2
3
# 10.6.2 IO密集型任务
特点:大量等待 IO(网络、磁盘),CPU 空闲时间长。
// 核心线程数 = CPU核心数 × 2 (经验值)
// 更精确的公式:
// 最优线程数 = CPU核心数 × (1 + IO等待时间/CPU计算时间)
// 例如:4核CPU,IO等待时间200ms,CPU计算时间50ms
// 最优线程数 = 4 × (1 + 200/50) = 4 × 5 = 20
int coreSize = cpuCores * 2;
2
3
4
5
6
7
8
更科学的方式——基于压测:
1. 先设一个初始值(如 CPU核心数 × 2)
2. 逐步增加线程数
3. 观察吞吐量和响应时间
4. 找到吞吐量不再增长、响应时间开始升高的拐点
5. 该拐点就是最优线程数
2
3
4
5
# 10.6.3 为什么不推荐Executors
// 1. newFixedThreadPool — LinkedBlockingQueue 无界队列,可能OOM
Executors.newFixedThreadPool(10);
// 等价于 new ThreadPoolExecutor(10, 10, 0L, new LinkedBlockingQueue<>())
// 队列无限增长,任务堆积可能耗尽内存
// 2. newCachedThreadPool — 最大线程数为 Integer.MAX_VALUE,可能OOM
Executors.newCachedThreadPool();
// 等价于 new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60s, new SynchronousQueue<>())
// 瞬时大量请求会创建大量线程,耗尽系统资源
// 3. newSingleThreadExecutor — 同样是无界队列
Executors.newSingleThreadExecutor();
// 等价于 new ThreadPoolExecutor(1, 1, 0L, new LinkedBlockingQueue<>())
// 正确做法:手动创建,明确每个参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
60, TimeUnit.SECONDS, // 非核心线程存活时间
new LinkedBlockingQueue<>(1000), // 有界队列!
new ThreadFactoryBuilder().setNameFormat("biz-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 10.6.4 动态线程池
生产环境中,很难一开始就确定最优参数。动态线程池允许运行时调整参数:
// ThreadPoolExecutor 提供了 setter 方法
executor.setCorePoolSize(20); // 动态调整核心线程数
executor.setMaximumPoolSize(40); // 动态调整最大线程数
executor.setKeepAliveTime(30, TimeUnit.SECONDS);
// 注意:
// 1. 增大 corePoolSize 时,如果当前线程数 < 新的 corePoolSize
// 且队列有任务,会立即创建新线程
// 2. 减小 corePoolSize 时,多余的线程在空闲后自动回收
// 但 workQueue 的容量无法动态调整(LinkedBlockingQueue 的 capacity 是 final 的)
// 解决方案:使用可调整容量的队列(如开源的 ResizableCapacityLinkedBlockingQueue)
2
3
4
5
6
7
8
9
10
11
12
# 10.7 线程池的异常处理
// 问题:线程池中的任务抛异常会怎样?
// 1. execute 提交:异常会传播,线程终止并被新线程替换
executor.execute(() -> {
throw new RuntimeException("boom");
// 异常会被 UncaughtExceptionHandler 处理
// 这个线程死亡,线程池创建新线程替换
});
// 2. submit 提交:异常被封装在 Future 中,调用 get() 时抛出
Future<?> future = executor.submit(() -> {
throw new RuntimeException("boom");
// 异常不会传播!线程不会死亡
// 异常封装在 Future 中
});
future.get(); // 这里抛出 ExecutionException
// 最佳实践:
// 方案1: 在任务内部 try-catch
executor.execute(() -> {
try {
riskyOperation();
} catch (Exception e) {
log.error("任务执行失败", e);
}
});
// 方案2: 重写 afterExecute 钩子方法
ThreadPoolExecutor executor = new ThreadPoolExecutor(...) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t != null) {
log.error("任务异常", t);
}
// 处理 submit 的异常
if (t == null && r instanceof Future<?>) {
try {
((Future<?>) r).get();
} catch (ExecutionException e) {
log.error("任务异常", e.getCause());
} catch (Exception e) {
// ignore
}
}
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 10.8 线程池监控与告警
// ThreadPoolExecutor 提供了丰富的监控方法
ThreadPoolExecutor executor = ...;
// 核心监控指标
int poolSize = executor.getPoolSize(); // 当前线程数
int activeCount = executor.getActiveCount(); // 活跃线程数
long completedCount = executor.getCompletedTaskCount(); // 已完成任务数
long taskCount = executor.getTaskCount(); // 总任务数
int queueSize = executor.getQueue().size(); // 队列中等待的任务数
int largestPoolSize = executor.getLargestPoolSize(); // 曾经的最大线程数
// 监控示例
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
log.info("线程池状态: 线程数={}, 活跃={}, 队列={}, 已完成={}",
executor.getPoolSize(),
executor.getActiveCount(),
executor.getQueue().size(),
executor.getCompletedTaskCount());
// 告警:队列使用率超过 80%
double queueUsage = executor.getQueue().size() * 1.0
/ ((LinkedBlockingQueue) executor.getQueue()).remainingCapacity();
if (queueUsage > 0.8) {
alert("线程池队列使用率超过80%!");
}
}, 0, 10, TimeUnit.SECONDS);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 10.9 ScheduledThreadPoolExecutor
// 定时/周期性任务的线程池
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(4);
// 延迟执行
scheduler.schedule(() -> {
System.out.println("延迟3秒执行");
}, 3, TimeUnit.SECONDS);
// 固定频率执行(不考虑任务执行时间)
scheduler.scheduleAtFixedRate(() -> {
System.out.println("每5秒执行一次");
}, 0, 5, TimeUnit.SECONDS);
// 固定延迟执行(上一次执行完成后等待指定时间再执行)
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("上次执行完成后等3秒再执行");
}, 0, 3, TimeUnit.SECONDS);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
底层实现:内部使用 DelayedWorkQueue(基于堆的优先级队列),按执行时间排序。
# 10.10 ForkJoinPool工作窃取
// ForkJoinPool(JDK 7+):分治任务的线程池
// 核心:工作窃取(Work-Stealing)算法
// 每个线程有自己的双端队列(Deque)
// 线程从自己队列的头部取任务
// 当自己的队列为空时,从其他线程队列的尾部"窃取"任务
// 使用场景:递归分治任务
ForkJoinPool pool = new ForkJoinPool(4);
long result = pool.invoke(new SumTask(arr, 0, arr.length));
class SumTask extends RecursiveTask<Long> {
private int[] arr;
private int from, to;
private static final int THRESHOLD = 1000;
@Override
protected Long compute() {
if (to - from <= THRESHOLD) {
// 小任务直接计算
long sum = 0;
for (int i = from; i < to; i++) sum += arr[i];
return sum;
}
// 大任务拆分
int mid = (from + to) / 2;
SumTask left = new SumTask(arr, from, mid);
SumTask right = new SumTask(arr, mid, to);
left.fork(); // 异步执行左半部分
long rightResult = right.compute(); // 同步执行右半部分
long leftResult = left.join(); // 等待左半部分结果
return leftResult + rightResult;
}
}
// Java 8 的并行流 parallelStream() 底层就使用 ForkJoinPool.commonPool()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 10.11 常见面试深度问题
Q1:线程池中的线程抛异常后会怎样?
execute 提交的任务抛异常:线程终止,线程池创建新线程替换。submit 提交的任务抛异常:异常被封装在 Future 中,线程不终止。
Q2:核心线程可以被回收吗?
默认不回收。但可以通过 allowCoreThreadTimeOut(true) 设置核心线程也受 keepAliveTime 限制,空闲超时后回收。
Q3:线程池的线程是怎么保活的?
通过 BlockingQueue.take() 无限期阻塞等待。线程处于 WAITING 状态,不消耗 CPU。
Q4:为什么 Worker 继承 AQS 而不是用 ReentrantLock?
因为 Worker 需要不可重入锁。在 shutdown() 时,正在执行任务的线程(已持有锁)不应该被中断。如果用 ReentrantLock(可重入),shutdown 方法中 tryLock 会成功,可能中断正在执行的任务。
# 10.12 总结与核心要点
线程池设计哲学:
- 资源复用:线程不销毁,通过阻塞队列等待新任务,实现线程复用
- 弹性扩缩:核心线程常驻 + 非核心线程按需创建/销毁
- 背压机制:队列 + 拒绝策略,防止任务洪峰压垮系统
- 单变量原子状态:ctl 用一个 int 同时管理状态和线程数
核心要点速查:
| 问题 | 答案 |
|---|---|
| 任务执行优先级 | 核心线程 → 任务队列 → 非核心线程 → 拒绝策略 |
| 线程复用本质 | Worker 在 while 循环中调用 BlockingQueue.take() 阻塞等待 |
| 为什么不用 Executors | 无界队列或无限线程数,可能 OOM |
| execute vs submit | execute 异常传播,submit 异常封装在 Future |
| shutdown vs shutdownNow | shutdown 等待完成,shutdownNow 中断并返回队列 |
| Worker 为何继承 AQS | 不可重入锁,防止 shutdown 中断执行中的线程 |
| 动态调整参数 | setCorePoolSize/setMaximumPoolSize 运行时生效 |