编程进阶网 编程进阶网
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接

杨充

专注编程 · 终身学习者
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接
  • README
  • C语言入门精通

  • Cpp入门到精通

  • Java入门精通

    • README
    • 入门教程

    • 综合案例

    • 专栏博客

      • README
      • JVM内存模型与对象
      • 类加载与双亲委派
      • 垃圾回收与GC调优
      • 异常体系与JVM机制
      • 字节码指令集javap实战
      • JIT编译与去优化机制
      • JVM性能诊断工具链
      • OOM八大现场全景剖析
      • JVM参数调优全景图
      • GraalVM与AOT编译原理
      • HashMap底层哈希设计
      • String不可变与常量池
      • ArrayList与LinkedList源码
      • ConcurrentHashMap并发
      • TreeMap与红黑树原理
      • LinkedHashMap与LRU实现
      • Java数字类型原理
      • Object通用方法的契约
      • 泛型擦除与类型系统
      • 枚举原理与最佳实践
      • 注解原理与编译期处理
      • Lambda与引用底层原理
      • Stream原理与流水线设计
      • Optional设计原理
      • Record密封类与模式
      • 反射机制与动态代理
      • MethodHandle与VarHandle
      • 三大字节码框架对比
      • JavaAgent与Instrumentation机制
      • AOP三种实现路线对比
      • synchronized与锁升级
      • volatile与JMM内存模型
      • 线程池核心源码设计
        • 10.1 开篇疑问
        • 10.2 为什么需要线程池
          • 10.2.1 创建线程的代价
          • 10.2.2 池化思想
        • 10.3 ThreadPoolExecutor核心参数
          • 10.3.1 七大参数详解
          • 10.3.2 线程创建规则
          • 10.3.3 四种拒绝策略
          • 10.3.4 常见的阻塞队列选择
        • 10.4 源码级工作原理
          • 10.4.1 ctl状态控制字段
          • 10.4.2 execute提交流程
          • 10.4.3 addWorker源码深度分析
          • 10.4.4 Worker线程的运行
          • 10.4.5 线程复用的秘密
          • 10.4.6 getTask与线程回收
        • 10.5 线程池的生命周期
          • 10.5.1 五种状态转换
          • 10.5.2 shutdown与shutdownNow的区别
          • 10.5.3 优雅关闭线程池
        • 10.6 实践中的参数设置
          • 10.6.1 CPU密集型任务
          • 10.6.2 IO密集型任务
          • 10.6.3 为什么不推荐Executors
          • 10.6.4 动态线程池
        • 10.7 线程池的异常处理
        • 10.8 线程池监控与告警
        • 10.9 ScheduledThreadPoolExecutor
        • 10.10 ForkJoinPool工作窃取
        • 10.11 常见面试深度问题
        • 10.12 总结与核心要点
      • Thread线程生命周期
      • AQS同步框架源码
      • 并发锁三剑客
      • CAS和Atomic深入分析
      • 五大同步器对比
      • CompletableFuture异步
      • IO模型演进BIO到AIO
      • ByteBuffer与堆外内存
      • 序列化原理与替代方案
      • 文件IO与NIO.2
      • 面向对象的真意
      • JDK设计模式上
      • JDK设计模式下
      • SPI与模块化设计
  • Go入门到精通

  • JavaScript入门

  • CodeX
  • Java入门精通
  • 专栏博客
杨充
2026-06-02
目录

线程池核心源码设计

# 33.线程池核心源码设计

# 目录介绍

  • 10.1 开篇疑问
  • 10.2 为什么需要线程池
    • 10.2.1 创建线程的代价
    • 10.2.2 池化思想
  • 10.3 ThreadPoolExecutor核心参数
    • 10.3.1 七大参数详解
    • 10.3.2 线程创建规则
    • 10.3.3 四种拒绝策略
    • 10.3.4 常见的阻塞队列选择
  • 10.4 源码级工作原理
    • 10.4.1 ctl状态控制字段
    • 10.4.2 execute提交流程
    • 10.4.3 addWorker源码深度分析
    • 10.4.4 Worker线程的运行
    • 10.4.5 线程复用的秘密
    • 10.4.6 getTask与线程回收
  • 10.5 线程池的生命周期
    • 10.5.1 五种状态转换
    • 10.5.2 shutdown与shutdownNow的区别
    • 10.5.3 优雅关闭线程池
  • 10.6 实践中的参数设置
    • 10.6.1 CPU密集型任务
    • 10.6.2 IO密集型任务
    • 10.6.3 为什么不推荐Executors
    • 10.6.4 动态线程池
  • 10.7 线程池的异常处理
  • 10.8 线程池监控与告警
  • 10.9 ScheduledThreadPoolExecutor
  • 10.10 ForkJoinPool工作窃取
  • 10.11 常见面试深度问题
  • 10.12 总结与核心要点

# 10.1 开篇疑问

疑惑:线程池的核心线程数和最大线程数有什么区别?任务提交后先进队列还是先创建线程?线程池里的线程是如何做到"复用"的?Executors 创建的线程池为什么阿里规范不推荐使用?线程池的异常会怎么处理?

答疑:线程池是 Java 并发编程最重要的工具之一。理解它的源码设计,不仅能正确使用线程池,还能深入理解"池化"这种通用设计思想。

# 10.2 为什么需要线程池

# 10.2.1 创建线程的代价

创建一个线程需要:

  1. 调用操作系统内核 API(从用户态切换到内核态)
  2. 分配栈内存(默认 1MB,-Xss 参数控制)
  3. 创建和初始化线程上下文(TCB、寄存器上下文等)
  4. 线程销毁时资源回收
// 测试线程创建的开销
long start = System.nanoTime();
for (int i = 0; i < 10000; i++) {
    Thread t = new Thread(() -> {});
    t.start();
    t.join();
}
long cost = (System.nanoTime() - start) / 1_000_000;
// 约 500-2000ms(每个线程创建+销毁约 50-200μs)
1
2
3
4
5
6
7
8
9

如果每个请求都创建新线程,1000 QPS 就意味着每秒创建/销毁 1000 个线程,系统无法承受。

# 10.2.2 池化思想

线程池的核心思想:预先创建一组线程,复用它们来执行不同的任务。

传统方式: 请求 → 创建线程 → 执行任务 → 销毁线程
池化方式: 请求 → 从池中取线程 → 执行任务 → 归还线程到池中
1
2

好处:

  1. 降低资源消耗:避免频繁创建销毁线程
  2. 提高响应速度:任务到达时可以立即执行(不用等创建线程)
  3. 提高可管理性:统一管理、调优、监控线程资源
  4. 流量控制:通过队列和拒绝策略实现背压,防止系统过载

# 10.3 ThreadPoolExecutor核心参数

# 10.3.1 七大参数详解

public ThreadPoolExecutor(
    int corePoolSize,        // 核心线程数(常驻线程)
    int maximumPoolSize,     // 最大线程数
    long keepAliveTime,      // 非核心线程空闲存活时间
    TimeUnit unit,           // 时间单位
    BlockingQueue<Runnable> workQueue,     // 任务队列
    ThreadFactory threadFactory,           // 线程工厂
    RejectedExecutionHandler handler       // 拒绝策略
)
1
2
3
4
5
6
7
8
9
参数 作用 类比
corePoolSize 核心线程,即使空闲也不销毁 正式员工
maximumPoolSize 最大线程数上限 正式+临时工
keepAliveTime 非核心线程空闲超过此时间则销毁 临时工合同到期
workQueue 任务排队的队列 等候区
threadFactory 创建线程的工厂(可指定名称、优先级等) HR部门
handler 队列满且线程满时的处理策略 拒客策略

ThreadFactory 的重要性:

// 不推荐:使用默认 ThreadFactory(线程名无意义)
new ThreadPoolExecutor(..., Executors.defaultThreadFactory(), ...);
// 线程名:pool-1-thread-1, pool-1-thread-2...

// 推荐:自定义 ThreadFactory,方便问题排查
ThreadFactory factory = new ThreadFactory() {
    private final AtomicInteger counter = new AtomicInteger(0);
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("order-pool-" + counter.getAndIncrement());
        t.setDaemon(false);
        t.setUncaughtExceptionHandler((thread, ex) -> {
            log.error("线程异常: " + thread.getName(), ex);
        });
        return t;
    }
};

// 或使用 Guava
ThreadFactory factory = new ThreadFactoryBuilder()
    .setNameFormat("order-pool-%d")
    .setUncaughtExceptionHandler((t, e) -> log.error("异常", e))
    .build();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 10.3.2 线程创建规则

疑惑:任务来了先创建线程还是先进队列?

论证:线程池的任务处理顺序是:

提交任务
  → 当前线程数 < corePoolSize ?
    → 是:创建核心线程执行任务
    → 否:任务队列 workQueue 满了吗?
      → 没满:放入队列等待
      → 满了:当前线程数 < maximumPoolSize ?
        → 是:创建非核心线程执行任务
        → 否:执行拒绝策略
1
2
3
4
5
6
7
8

图解:

                    提交任务
                      │
              ┌───────▼───────┐
              │ 线程数<核心数? │
              └───┬───────┬───┘
                 是       否
                  │       │
           ┌──────▼─┐  ┌──▼────────┐
           │创建核心 │  │队列未满?  │
           │线程执行 │  └──┬────┬───┘
           └────────┘    是    否
                          │     │
                   ┌──────▼─┐ ┌─▼──────────┐
                   │放入队列 │ │线程数<最大数│
                   └────────┘ └──┬────┬────┘
                                是    否
                                │     │
                         ┌──────▼─┐ ┌─▼──────┐
                         │创建非核│ │拒绝策略 │
                         │心线程  │ └────────┘
                         └───────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

关键点:是先入队列,再创建非核心线程。而不是先创建到最大线程数。

# 10.3.3 四种拒绝策略

策略 行为 适用场景
AbortPolicy(默认) 抛出 RejectedExecutionException 重要任务,不能丢
CallerRunsPolicy 由提交任务的线程自己执行 削峰,降低提交速度
DiscardPolicy 静默丢弃新任务 可丢弃的任务
DiscardOldestPolicy 丢弃队列中最老的任务,再提交新任务 时效性要求高
// 自定义拒绝策略(实际生产推荐)
RejectedExecutionHandler customHandler = (r, executor) -> {
    // 记录日志
    log.warn("线程池已满,任务被拒绝: {}", r.toString());
    // 持久化到数据库或消息队列
    taskBackupService.save(r);
    // 告警通知
    alertService.alert("线程池已满!");
};
1
2
3
4
5
6
7
8
9

# 10.3.4 常见的阻塞队列选择

队列类型 特点 适用场景
LinkedBlockingQueue 无界(默认 Integer.MAX_VALUE) 需要限制大小!
ArrayBlockingQueue 有界,数组实现 通用场景
SynchronousQueue 容量为0,直接传递 CachedThreadPool
PriorityBlockingQueue 优先级排序 有优先级的任务
DelayQueue 延迟获取 定时任务
// LinkedBlockingQueue 必须指定容量!
new LinkedBlockingQueue<>(1000);  // ← 正确
new LinkedBlockingQueue<>();       // ← 危险!默认 Integer.MAX_VALUE

// SynchronousQueue 的特点:
// 每个 put 操作必须等待一个 take 操作
// 不存储元素,直接传递任务给线程
// CachedThreadPool 使用它:任务来了立即创建线程(如果没有空闲线程)
1
2
3
4
5
6
7
8

# 10.4 源码级工作原理

# 10.4.1 ctl状态控制字段

线程池用一个 AtomicInteger 同时存储运行状态和线程数:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 高3位:线程池状态
// 低29位:线程数量(最大约5亿)
private static final int COUNT_BITS = Integer.SIZE - 3;  // 29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 0x1FFFFFFF

// 线程池状态
RUNNING    = -1 << 29  // 111:接受新任务,处理队列任务
SHUTDOWN   =  0 << 29  // 000:不接受新任务,处理队列任务
STOP       =  1 << 29  // 001:不接受,不处理,中断进行中的
TIDYING    =  2 << 29  // 010:所有任务终止,workerCount=0
TERMINATED =  3 << 29  // 011:terminated() 执行完

// 拆分方法
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }  // 高3位
private static int workerCountOf(int c)  { return c & COUNT_MASK; }   // 低29位
private static int ctlOf(int rs, int wc) { return rs | wc; }         // 合并
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

设计精髓:用一个原子变量同时维护两个信息,避免了多变量之间的一致性问题。一个 CAS 操作就能同时更新状态和线程数。

# 10.4.2 execute提交流程

public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    
    int c = ctl.get();
    
    // 步骤1:线程数 < 核心线程数,创建核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))  // true=核心线程
            return;
        c = ctl.get();  // 添加失败(并发竞争),重新读取 ctl
    }
    
    // 步骤2:核心线程已满,尝试放入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 双重检查:放入队列后,线程池可能被关闭了
        if (!isRunning(recheck) && remove(command))
            reject(command);  // 线程池已关闭,从队列移除并拒绝
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);  // 确保有线程处理队列任务
    }
    
    // 步骤3:队列已满,尝试创建非核心线程
    else if (!addWorker(command, false))  // false=非核心线程
        reject(command);  // 线程数已达最大,执行拒绝策略
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

步骤2中 workerCountOf(recheck) == 0 的含义:

// 场景:核心线程数为0(如 CachedThreadPool)
// 或者 allowCoreThreadTimeOut=true 且所有核心线程都已超时退出
// 此时任务放入了队列,但没有线程来消费
// 所以需要创建一个线程(firstTask=null,从队列取任务执行)
1
2
3
4

# 10.4.3 addWorker源码深度分析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // 检查线程池状态:SHUTDOWN 及以上状态不接受新任务
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        
        for (;;) {
            // 检查线程数是否超过限制
            if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // CAS 增加线程数
            if (compareAndIncrementWorkerCount(c))
                break retry;  // CAS 成功,跳出外层循环
            c = ctl.get();
            // CAS 失败,检查状态是否变化
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;  // 状态变了,重新检查外层
            // 状态没变,只是线程数竞争失败,重试内层 CAS
        }
    }
    
    // ctl 线程数已经 +1,开始真正创建线程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);  // 创建 Worker
        final Thread t = w.thread;   // Worker 内部有一个 Thread
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                    workers.add(w);  // 加入 workers 集合
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;  // 记录最大线程数
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();  // 启动线程!
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);  // 失败回滚:线程数 -1,从 workers 移除
    }
    return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

# 10.4.4 Worker线程的运行

每个线程被封装为 Worker 对象(继承了 AQS,实现了不可重入锁):

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    Thread thread;          // 工作线程
    Runnable firstTask;     // 第一个任务(可能为 null)
    volatile long completedTasks; // 完成的任务数
    
    Worker(Runnable firstTask) {
        setState(-1);  // 禁止在 start 前被中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    public void run() {
        runWorker(this);
    }
    
    // 不可重入锁实现(用于判断线程是否正在执行任务)
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

Worker 继承 AQS 的原因:

// Worker 实现了一个不可重入的互斥锁:
// 1. 线程正在执行任务时 → state=1(锁定状态)
// 2. 线程空闲等待任务时 → state=0(未锁定状态)
// 3. shutdown() 时只中断空闲线程(tryLock() 成功才中断)
// 4. 正在执行任务的线程不会被 shutdown() 中断

// 为什么不用 ReentrantLock?
// 因为 ReentrantLock 可重入,一个线程可以多次获取锁
// 而 Worker 需要不可重入:防止 shutdownNow 时重复中断
1
2
3
4
5
6
7
8
9

# 10.4.5 线程复用的秘密

疑惑:线程执行完一个任务后不是就结束了吗?怎么复用?

论证:秘密在 runWorker() 方法中。线程执行完第一个任务后,不会结束,而是进入循环,从队列中不断取任务执行:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断(将 state 从 -1 设为 0)
    boolean completedAbruptly = true;
    try {
        // 核心循环:执行 firstTask 或从队列取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();  // 标记线程正在执行任务
            // 如果线程池正在停止,确保线程被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);  // 钩子方法(可重写)
                try {
                    task.run();           // 执行用户的任务!
                    afterExecute(task, null);  // 钩子方法
                } catch (Throwable ex) {
                    afterExecute(task, ex);    // 异常也会调用
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();  // 标记线程空闲
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);  // 线程退出处理
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 10.4.6 getTask与线程回收

private Runnable getTask() {
    boolean timedOut = false;
    
    for (;;) {
        int c = ctl.get();
        
        // 线程池已 SHUTDOWN 且队列为空,或线程池已 STOP → 返回 null
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;  // 返回 null → runWorker 的 while 循环结束 → 线程退出
        }
        
        int wc = workerCountOf(c);
        
        // 是否需要超时回收?
        // 1. allowCoreThreadTimeOut=true → 核心线程也会超时
        // 2. 当前线程数 > corePoolSize → 非核心线程需要超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 超时了或线程数超过最大值 → 尝试减少线程数
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;  // 返回 null → 线程退出
            continue;
        }
        
        try {
            // 关键!
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  // 超时等待
                workQueue.take();  // 永久等待
            if (r != null)
                return r;
            timedOut = true;  // poll 超时返回 null
        } catch (InterruptedException retry) {
            timedOut = false;  // 被中断,重试
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

线程复用本质:

线程执行完任务后不退出,而是在 while 循环中
通过 BlockingQueue.take()/poll() 阻塞等待新任务。
  
核心线程: workQueue.take()  → 永久阻塞,直到有新任务
非核心线程: workQueue.poll(keepAliveTime) → 超时后返回 null → 线程退出

线程一直活着 → 新任务来了 → 继续执行 → 再等待
1
2
3
4
5
6
7

# 10.5 线程池的生命周期

# 10.5.1 五种状态转换

RUNNING (初始状态) ← 唯一接受新任务的状态
  │ shutdown()
  ↓
SHUTDOWN (不接受新任务,处理队列中的)
  │ 队列空 + 线程数为0
  ↓
TIDYING (所有任务终止)
  │ terminated() 回调
  ↓
TERMINATED (终态)

RUNNING
  │ shutdownNow()
  ↓
STOP (中断所有线程,不处理队列)
  │ 线程数为0
  ↓
TIDYING → TERMINATED
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 10.5.2 shutdown与shutdownNow的区别

方法 新任务 队列中的任务 正在执行的任务 返回值
shutdown() 拒绝 继续处理 等待完成 void
shutdownNow() 拒绝 不处理(返回) 发送中断 List<Runnable>

# 10.5.3 优雅关闭线程池

// 优雅关闭的标准模板
public void gracefulShutdown(ExecutorService executor) {
    executor.shutdown();  // 不接受新任务,等待已有任务完成
    try {
        // 等待一段时间,让任务完成
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow();  // 超时后强制关闭
            // 再等待一段时间
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                log.error("线程池无法关闭");
            }
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

// Spring 中的线程池关闭
@Bean(destroyMethod = "shutdown")
public ExecutorService executorService() {
    return new ThreadPoolExecutor(...);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 10.6 实践中的参数设置

# 10.6.1 CPU密集型任务

特点:大量计算,几乎不阻塞。

// 核心线程数 = CPU核心数 + 1
int cpuCores = Runtime.getRuntime().availableProcessors();
int coreSize = cpuCores + 1;  // +1 是为了当某线程偶尔因页缺失等暂停时能充分利用CPU
1
2
3

# 10.6.2 IO密集型任务

特点:大量等待 IO(网络、磁盘),CPU 空闲时间长。

// 核心线程数 = CPU核心数 × 2 (经验值)
// 更精确的公式:
// 最优线程数 = CPU核心数 × (1 + IO等待时间/CPU计算时间)

// 例如:4核CPU,IO等待时间200ms,CPU计算时间50ms
// 最优线程数 = 4 × (1 + 200/50) = 4 × 5 = 20

int coreSize = cpuCores * 2;
1
2
3
4
5
6
7
8

更科学的方式——基于压测:

1. 先设一个初始值(如 CPU核心数 × 2)
2. 逐步增加线程数
3. 观察吞吐量和响应时间
4. 找到吞吐量不再增长、响应时间开始升高的拐点
5. 该拐点就是最优线程数
1
2
3
4
5

# 10.6.3 为什么不推荐Executors

// 1. newFixedThreadPool — LinkedBlockingQueue 无界队列,可能OOM
Executors.newFixedThreadPool(10);
// 等价于 new ThreadPoolExecutor(10, 10, 0L, new LinkedBlockingQueue<>())
// 队列无限增长,任务堆积可能耗尽内存

// 2. newCachedThreadPool — 最大线程数为 Integer.MAX_VALUE,可能OOM
Executors.newCachedThreadPool();
// 等价于 new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60s, new SynchronousQueue<>())
// 瞬时大量请求会创建大量线程,耗尽系统资源

// 3. newSingleThreadExecutor — 同样是无界队列
Executors.newSingleThreadExecutor();
// 等价于 new ThreadPoolExecutor(1, 1, 0L, new LinkedBlockingQueue<>())

// 正确做法:手动创建,明确每个参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10,                             // 核心线程数
    20,                             // 最大线程数
    60, TimeUnit.SECONDS,           // 非核心线程存活时间
    new LinkedBlockingQueue<>(1000), // 有界队列!
    new ThreadFactoryBuilder().setNameFormat("biz-pool-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 10.6.4 动态线程池

生产环境中,很难一开始就确定最优参数。动态线程池允许运行时调整参数:

// ThreadPoolExecutor 提供了 setter 方法
executor.setCorePoolSize(20);     // 动态调整核心线程数
executor.setMaximumPoolSize(40);  // 动态调整最大线程数
executor.setKeepAliveTime(30, TimeUnit.SECONDS);

// 注意:
// 1. 增大 corePoolSize 时,如果当前线程数 < 新的 corePoolSize
//    且队列有任务,会立即创建新线程
// 2. 减小 corePoolSize 时,多余的线程在空闲后自动回收

// 但 workQueue 的容量无法动态调整(LinkedBlockingQueue 的 capacity 是 final 的)
// 解决方案:使用可调整容量的队列(如开源的 ResizableCapacityLinkedBlockingQueue)
1
2
3
4
5
6
7
8
9
10
11
12

# 10.7 线程池的异常处理

// 问题:线程池中的任务抛异常会怎样?

// 1. execute 提交:异常会传播,线程终止并被新线程替换
executor.execute(() -> {
    throw new RuntimeException("boom");
    // 异常会被 UncaughtExceptionHandler 处理
    // 这个线程死亡,线程池创建新线程替换
});

// 2. submit 提交:异常被封装在 Future 中,调用 get() 时抛出
Future<?> future = executor.submit(() -> {
    throw new RuntimeException("boom");
    // 异常不会传播!线程不会死亡
    // 异常封装在 Future 中
});
future.get();  // 这里抛出 ExecutionException

// 最佳实践:
// 方案1: 在任务内部 try-catch
executor.execute(() -> {
    try {
        riskyOperation();
    } catch (Exception e) {
        log.error("任务执行失败", e);
    }
});

// 方案2: 重写 afterExecute 钩子方法
ThreadPoolExecutor executor = new ThreadPoolExecutor(...) {
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        if (t != null) {
            log.error("任务异常", t);
        }
        // 处理 submit 的异常
        if (t == null && r instanceof Future<?>) {
            try {
                ((Future<?>) r).get();
            } catch (ExecutionException e) {
                log.error("任务异常", e.getCause());
            } catch (Exception e) {
                // ignore
            }
        }
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# 10.8 线程池监控与告警

// ThreadPoolExecutor 提供了丰富的监控方法
ThreadPoolExecutor executor = ...;

// 核心监控指标
int poolSize = executor.getPoolSize();           // 当前线程数
int activeCount = executor.getActiveCount();      // 活跃线程数
long completedCount = executor.getCompletedTaskCount(); // 已完成任务数
long taskCount = executor.getTaskCount();         // 总任务数
int queueSize = executor.getQueue().size();       // 队列中等待的任务数
int largestPoolSize = executor.getLargestPoolSize(); // 曾经的最大线程数

// 监控示例
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
    log.info("线程池状态: 线程数={}, 活跃={}, 队列={}, 已完成={}",
        executor.getPoolSize(),
        executor.getActiveCount(),
        executor.getQueue().size(),
        executor.getCompletedTaskCount());
    
    // 告警:队列使用率超过 80%
    double queueUsage = executor.getQueue().size() * 1.0 
        / ((LinkedBlockingQueue) executor.getQueue()).remainingCapacity();
    if (queueUsage > 0.8) {
        alert("线程池队列使用率超过80%!");
    }
}, 0, 10, TimeUnit.SECONDS);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 10.9 ScheduledThreadPoolExecutor

// 定时/周期性任务的线程池
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(4);

// 延迟执行
scheduler.schedule(() -> {
    System.out.println("延迟3秒执行");
}, 3, TimeUnit.SECONDS);

// 固定频率执行(不考虑任务执行时间)
scheduler.scheduleAtFixedRate(() -> {
    System.out.println("每5秒执行一次");
}, 0, 5, TimeUnit.SECONDS);

// 固定延迟执行(上一次执行完成后等待指定时间再执行)
scheduler.scheduleWithFixedDelay(() -> {
    System.out.println("上次执行完成后等3秒再执行");
}, 0, 3, TimeUnit.SECONDS);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

底层实现:内部使用 DelayedWorkQueue(基于堆的优先级队列),按执行时间排序。

# 10.10 ForkJoinPool工作窃取

// ForkJoinPool(JDK 7+):分治任务的线程池
// 核心:工作窃取(Work-Stealing)算法

// 每个线程有自己的双端队列(Deque)
// 线程从自己队列的头部取任务
// 当自己的队列为空时,从其他线程队列的尾部"窃取"任务

// 使用场景:递归分治任务
ForkJoinPool pool = new ForkJoinPool(4);
long result = pool.invoke(new SumTask(arr, 0, arr.length));

class SumTask extends RecursiveTask<Long> {
    private int[] arr;
    private int from, to;
    private static final int THRESHOLD = 1000;
    
    @Override
    protected Long compute() {
        if (to - from <= THRESHOLD) {
            // 小任务直接计算
            long sum = 0;
            for (int i = from; i < to; i++) sum += arr[i];
            return sum;
        }
        // 大任务拆分
        int mid = (from + to) / 2;
        SumTask left = new SumTask(arr, from, mid);
        SumTask right = new SumTask(arr, mid, to);
        left.fork();   // 异步执行左半部分
        long rightResult = right.compute();  // 同步执行右半部分
        long leftResult = left.join();       // 等待左半部分结果
        return leftResult + rightResult;
    }
}

// Java 8 的并行流 parallelStream() 底层就使用 ForkJoinPool.commonPool()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 10.11 常见面试深度问题

Q1:线程池中的线程抛异常后会怎样?

execute 提交的任务抛异常:线程终止,线程池创建新线程替换。submit 提交的任务抛异常:异常被封装在 Future 中,线程不终止。

Q2:核心线程可以被回收吗?

默认不回收。但可以通过 allowCoreThreadTimeOut(true) 设置核心线程也受 keepAliveTime 限制,空闲超时后回收。

Q3:线程池的线程是怎么保活的?

通过 BlockingQueue.take() 无限期阻塞等待。线程处于 WAITING 状态,不消耗 CPU。

Q4:为什么 Worker 继承 AQS 而不是用 ReentrantLock?

因为 Worker 需要不可重入锁。在 shutdown() 时,正在执行任务的线程(已持有锁)不应该被中断。如果用 ReentrantLock(可重入),shutdown 方法中 tryLock 会成功,可能中断正在执行的任务。

# 10.12 总结与核心要点

线程池设计哲学:

  1. 资源复用:线程不销毁,通过阻塞队列等待新任务,实现线程复用
  2. 弹性扩缩:核心线程常驻 + 非核心线程按需创建/销毁
  3. 背压机制:队列 + 拒绝策略,防止任务洪峰压垮系统
  4. 单变量原子状态:ctl 用一个 int 同时管理状态和线程数

核心要点速查:

问题 答案
任务执行优先级 核心线程 → 任务队列 → 非核心线程 → 拒绝策略
线程复用本质 Worker 在 while 循环中调用 BlockingQueue.take() 阻塞等待
为什么不用 Executors 无界队列或无限线程数,可能 OOM
execute vs submit execute 异常传播,submit 异常封装在 Future
shutdown vs shutdownNow shutdown 等待完成,shutdownNow 中断并返回队列
Worker 为何继承 AQS 不可重入锁,防止 shutdown 中断执行中的线程
动态调整参数 setCorePoolSize/setMaximumPoolSize 运行时生效
上次更新: 2026/06/10, 11:13:41
volatile与JMM内存模型
Thread线程生命周期

← volatile与JMM内存模型 Thread线程生命周期→

最近更新
01
信号崩溃快速排查
06-15
02
CoreDump破案
06-15
03
perf火焰图实战
06-15
更多文章>
Theme by Vdoing | Copyright © 2019-2026 杨充 | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式