AQS同步框架源码
# 35.AQS同步框架源码
# 目录介绍
- 1. 案例引入
- 2. AQS 是什么
- 3. state 字段的设计哲学
- 4. CLH 变体队列
- 5. 独占模式源码追踪
- 6. 共享模式源码追踪
- 7. Condition 条件队列
- 8. AQS 之上的同步器全家福
- 9. 综合回扣与设计哲学
# 1. 案例引入
# 1.1 自己写一把锁的失败之旅
某次面试官问"用 CAS 写一把可重入锁,会怎么写?"——很多同学的初版长这样:
public class MyLock {
private final AtomicReference<Thread> owner = new AtomicReference<>();
private int holdCount;
public void lock() {
Thread current = Thread.currentThread();
if (owner.get() == current) { // ① 重入
holdCount++;
return;
}
while (!owner.compareAndSet(null, current)) { // ② 拿不到就自旋
// 死循环空转
}
holdCount = 1;
}
public void unlock() {
if (owner.get() != Thread.currentThread()) throw new IllegalMonitorStateException();
if (--holdCount == 0) owner.set(null);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
看着能跑,生产一上就崩:
- ❌ 高并发时几百个线程同时空转——CPU 100%,电费账单爆炸
- ❌ 没有"排队"概念——可能一个线程被插队几千次都拿不到锁(饥饿)
- ❌ 不响应中断、不支持超时、不支持 Condition、不支持公平/非公平切换
- ❌ 自己实现唤醒机制?要么忙等浪费 CPU,要么 wait/notify 又得包一层 monitor
问题的本质:"竞争失败的线程要往哪放?怎么排队?怎么唤醒?"——这正是 AQS 要解决的核心。Doug Lea 用 1500 行代码把这套问题封装成了通用框架,整个 JUC 包都建立在它之上。
# 1.2 ReentrantLock vs synchronized 性能反转之谜
08 篇讲过 JDK 6 后 synchronized 锁升级(偏向→轻量级→重量级),很多人据此得出结论"synchronized 比 ReentrantLock 快"——但生产数据并不总是这样:
压测场景:32 核机器,64 线程并发,临界区耗时 100μs
吞吐 (ops/s) P99 延迟
synchronized 1.2M 850μs ← 重量级锁后劣势明显
ReentrantLock 3.8M 320μs ← AQS + park/unpark 优势
2
3
4
5
反转的根因在哪?为什么 Doug Lea 的"用户态 Java 队列 + park"反而比 JVM 内置的"重量级 monitor"还快?答案在 §5、§7 揭晓。
# 1.3 我们要回答什么
第 36 篇是卷五第 5 篇,承接 35 篇 Thread 生命周期,是 37(Lock 三剑客)/ 38(CAS)/ 39(线程池)三篇的共同地基:
35.Thread (地基) → 36.AQS (本篇/同步框架) → 37.ReentrantLock & RWLock & StampedLock
→ 38.CAS & Atomic & Unsafe
→ 39.线程池 (Worker 也是 AQS)
2
3
带着 5 个问题展开:
追问 ①:AQS 凭什么能用一个 int + 一个队列做出几十种同步器? → §2、§3
追问 ②:CLH 队列的"变体"在哪?为什么改造? → §4
追问 ③:acquire 的"模板方法"模式如何把 30 行代码用到 ReentrantLock/Semaphore/CountDownLatch? → §5、§6
追问 ④:Condition 的双队列设计为什么比 Object.wait 强? → §7
追问 ⑤:为什么说"懂 AQS 就懂半个 JUC"? → §8
2
3
4
5
# 2. AQS 是什么
# 2.1 一句话定义
AQS(AbstractQueuedSynchronizer)= state(一个 int 表示的同步状态) + CLH 变体队列(等待线程排队) + 模板方法(钩子让子类定制语义)。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements Serializable {
private volatile int state; // ★ 同步状态(核心)
private transient volatile Node head; // CLH 队头
private transient volatile Node tail; // CLH 队尾
private transient Thread exclusiveOwnerThread; // 当前持锁线程(独占模式)
// ★ 子类必须重写的钩子方法
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
这就是整个 JUC 包的"基石"——ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、ThreadPoolExecutor.Worker、FutureTask 全部继承它。
# 2.2 它在 JUC 中的地位
AbstractQueuedSynchronizer (AQS)
│
┌──────────────────────────┼──────────────────────────┐
│ │ │
独占模式 共享模式 双模式
│ │ │
↓ ↓ ↓
ReentrantLock Semaphore ReentrantReadWriteLock
ReentrantLock.Sync CountDownLatch (读共享,写独占)
ThreadPoolExecutor.Worker CyclicBarrier (内含)
FutureTask Phaser (改造版)
ForkJoinPool (改造版)
2
3
4
5
6
7
8
9
10
11
12
Doug Lea 的设计目标:把"线程排队 + 阻塞唤醒 + 超时 + 中断"这些通用机制封装一次,子类只需要回答两个问题:
- 状态怎么算"获取成功"(重写
tryAcquire/tryAcquireShared) - 状态怎么算"释放完毕"(重写
tryRelease/tryReleaseShared)
剩下的所有事情——入队、阻塞、唤醒、传播——AQS 全包。这是模板方法模式(Template Method)的教科书级应用。
# 2.3 AQS 的核心三件套
┌─────────────────────────────────────────────────────────────┐
│ AQS │
│ │
│ ① state(int + volatile + CAS) │
│ 表示同步状态:锁次数、信号量许可、栅栏计数... │
│ │
│ ② CLH 变体队列(双向链表 + waitStatus) │
│ 竞争失败的线程按 FIFO 入队,靠 LockSupport.park 阻塞 │
│ │
│ ③ 模板方法(acquire/release + tryAcquire/tryRelease) │
│ 上层流程固化,下层语义留给子类 │
└─────────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
接下来三章逐个拆解。
# 3. state 字段的设计哲学
# 3.1 一个 int 装下所有状态
private volatile int state;
protected final int getState() { return state; }
protected final void setState(int newState) { state = newState; }
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
2
3
4
5
6
7
32 位 int 是 AQS 的"通用货币"——所有子类都把自己的状态语义编码到这一个 int 里。
# 3.2 不同同步器的 state 语义
| 同步器 | state 含义 | 编码方式 |
|---|---|---|
ReentrantLock | 重入次数 | state == 0 表示无人持有;> 0 表示重入次数 |
Semaphore | 剩余许可数 | state = 初始许可数,acquire 减 1,release 加 1 |
CountDownLatch | 剩余计数 | state = N,countDown 减 1,到 0 时全部唤醒 |
ReentrantReadWriteLock | 高 16 位读 + 低 16 位写 | state >>> 16 是读锁数;state & 0xFFFF 是写锁重入数 |
ThreadPoolExecutor.Worker | 0/1 简单标志 | -1 表示不可中断(runWorker 前),0 = 未锁,1 = 已锁 |
最精彩的是 ReentrantReadWriteLock——它把两种锁状态压在同一个 int 里:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 读锁单位 = 65536
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 读锁数 = 高 16 位
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 写锁数 = 低 16 位
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
2
3
4
5
6
7
8
9
这样 一次 CAS 就能同时观察读锁和写锁的状态,避免了双字段同步带来的原子性破坏。这种"位编码"思路在 §8.2 还会再展开。
# 3.3 CAS + volatile 的状态守卫
state 字段写法严守"读 volatile,写 CAS":
// 读:直接读 volatile
int c = getState();
// 写:永远 CAS(除非持锁后的覆盖)
if (compareAndSetState(0, acquires)) { ... }
2
3
4
5
为什么不用 synchronized 保护 state?因为那样 AQS 就自己变成 monitor了——需要更底层的同步机制保护它,无限递归。CAS 是无锁原语,由 CPU 指令(x86 的 LOCK CMPXCHG)保证原子性,是 AQS 唯一可以信任的"原子操作"。这部分在 38 篇 CAS 还会深挖。
# 4. CLH 变体队列
# 4.1 原始 CLH 与 AQS 改造
原始 CLH(Craig, Landin, Hagersten)队列是一种自旋锁——每个线程在自己的前驱节点上自旋读取状态,避免在同一变量上自旋造成的缓存抖动。
但 AQS 不能纯自旋——线程数多时纯自旋会烧 CPU。Doug Lea 做了三处关键改造:
| 维度 | 原始 CLH | AQS 变体 |
|---|---|---|
| 阻塞方式 | 自旋 | park/unpark(OS 层挂起) |
| 链表方向 | 隐式链接(仅用前驱) | 显式双向链表(prev + next) |
| 状态 | 仅 locked 标志 | waitStatus 五态(CANCELLED / SIGNAL / CONDITION / PROPAGATE / 0) |
| 用途 | 单纯互斥锁 | 独占 + 共享 + 条件队列三合一 |
为什么需要 prev 指针:取消(CANCELLED)节点要从队列移除时,需要双向链表才能 O(1) 摘除——这是 AQS 频繁要做的事情。
# 4.2 Node 节点结构
static final class Node {
static final Node SHARED = new Node(); // 共享模式标记
static final Node EXCLUSIVE = null; // 独占模式标记
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; // ★ 节点状态
volatile Node prev; // ★ 前驱(双向链表)
volatile Node next; // ★ 后继
volatile Thread thread; // ★ 该节点对应的线程
Node nextWaiter; // ★ Condition 队列单链 / 共享模式标记
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
一个节点 = 一个等待线程。thread 字段持有线程引用,waitStatus 表达线程意图,prev/next 串成同步队列,nextWaiter 串成条件队列(§7)。
# 4.3 waitStatus 的五种取值
0 初始态 新节点入队默认值
SIGNAL=-1 "我后继需要被唤醒" 前驱处于此态时,自己 park 才安全
CANCELLED= 1 "我取消了" 超时/中断后被打上,会被清理
CONDITION=-2 "我在条件队列上等待" 仅 Condition 队列节点
PROPAGATE=-3 "唤醒应向后传播" 共享模式专用(§6.2)
2
3
4
5
SIGNAL 的妙处:每个节点只在前驱 waitStatus = SIGNAL 时才放心 park——这是一种**"前驱承诺唤醒"**的契约,让阻塞和唤醒严格匹配。
# 4.4 入队的 CAS 自旋大法
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail); // 先连前驱
if (compareAndSetTail(oldTail, node)) { // ★ CAS 设 tail
oldTail.next = node; // 再连后继
return node;
}
} else {
initializeSyncQueue(); // 队列为空时延迟初始化
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
两步连接的精妙:先把 node.prev 指向 oldTail(无人可见,安全),然后 CAS 抢占 tail(一次原子操作),最后回填 oldTail.next(此时 prev 已正确,遍历不会断链)。这一招在 §5 的 cancelAcquire 也会用到——双向链表的 prev 永远先维护好,next 是后置维护。
# 5. 独占模式源码追踪
# 5.1 acquire 模板方法
这是 AQS 全部精华浓缩在 4 行的模板方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) // ① 子类语义:能否拿到
&& acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // ② 拿不到就入队 + 自旋等待
selfInterrupt(); // ③ 期间被中断,补打中断标志
}
2
3
4
5
三步走:
tryAcquire(arg)——子类实现,决定"什么叫拿到了"。例如 ReentrantLock 是"CAS 把 state 从 0 改成 1"addWaiter+acquireQueued——AQS 自己实现,统一负责入队 + park- 中断补打——park 期间若被中断,AQS 不抛异常,但会在退出时把标志位补上(35 篇 §5.2 中断响应原则的体现)
模板方法模式的精妙:上层的"排队 + 阻塞 + 唤醒"流程一次写死,下层语义完全由子类决定——所以一个 acquire 同时支撑了 ReentrantLock 和 Semaphore 两种语义完全不同的同步器。
# 5.2 addWaiter 入队
§4.4 已展开,入队后返回当前节点。
# 5.3 acquireQueued 自旋等待
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) { // ★ 自旋
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // ★ 前驱是 head,再尝试一次
setHead(node); // 自己变成新 head
p.next = null; // 帮 GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)) { // ★ 该 park 了
interrupted |= parkAndCheckInterrupt(); // ★ 真正阻塞
}
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
核心逻辑——只有"前驱是 head"的节点才有资格再次尝试拿锁:
head → A → B → C → tail (B、C park 中)
↑
A 是 head 的直接后继 → 有资格 tryAcquire
B 不是 → 必须 park
2
3
4
这样保证 FIFO 公平性 + 减少无效 CAS——一次只让一个候选人去抢,避免"惊群"。
# 5.4 shouldParkAfterFailedAcquire 的精妙清理
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) return true; // ① 前驱已承诺唤醒,安全 park
if (ws > 0) { // ② 前驱已 CANCELLED
do {
node.prev = pred = pred.prev; // ★ 跳过所有取消节点
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL); // ③ 把前驱状态改成 SIGNAL
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
这 10 行代码做了三件事:
- 顺手清理 CANCELLED 节点——等待队列里的"僵尸"被实时摘除(参考 35 篇 ThreadLocalMap 的"主动 + 被动清理"思路)
- 状态机推动——把前驱改成 SIGNAL,这是后续 release 唤醒的依据
- 第一次循环必返 false——给当前线程再一次 tryAcquire 的机会,避免冗余 park(性能优化)
# 5.5 release 唤醒后继
public final boolean release(int arg) {
if (tryRelease(arg)) { // 子类释放成功
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // ★ 唤醒 head 的后继
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev) // ★ 从 tail 反向找
if (p.waitStatus <= 0) s = p;
}
if (s != null) LockSupport.unpark(s.thread);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
逆向遍历的设计奥秘:为什么从 tail 反向找最近的有效后继?因为 §4.4 入队是先连 prev、再 CAS tail、最后连 next——next 链路在某些瞬间可能短暂为 null,但 prev 链路永远完整。反向遍历是 AQS 应对"半成品入队"的容错手段——这是 Doug Lea 在评论里专门解释过的设计。
# 6. 共享模式源码追踪
# 6.1 acquireShared 与传播机制
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // ★ 关键:传播
p.next = null;
if (interrupted) selfInterrupt();
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node); throw t;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
setHeadAndPropagate 是共享模式的灵魂——独占模式拿到锁后只唤醒一个后继,但共享模式拿到许可后继续唤醒后继,因为可能多个线程都能并发拿许可:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // ★ 接力唤醒
}
}
2
3
4
5
6
7
8
9
10
# 6.2 PROPAGATE 状态的存在意义
PROPAGATE = -3 是 JDK 7 才加的状态,专为修复一个唤醒丢失的并发 bug:
线程 A 释放 → 准备 unpark 后继
线程 B 拿到 → 改 head
此时 A 看到的 head.ws = 0(B 还没设 SIGNAL)
A 跳过 unpark
线程 C 在等许可,但 A 已不会再唤醒它,B 也以为没人等了
→ 唤醒丢失!
2
3
4
5
6
修复方式:releaseShared 把 head.ws 从 0 标成 PROPAGATE,让后续线程看到这个标记后继续接力唤醒。这种"标记位 + 接力"的思路是无锁并发设计的精髓——用最小的状态量挽救唤醒不可靠。
# 6.3 CountDownLatch 5 行实现
最能体现 AQS"一招吃遍天下"的就是 CountDownLatch——核心代码加起来不到 30 行:
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) { setState(count); } // state = N
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // ★ 0 才算成功
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0; // ★ 减到 0 才返回 true,触发传播
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
await() = acquireSharedInterruptibly(1),countDown() = releaseShared(1)——其余排队、阻塞、唤醒、中断响应全部由 AQS 完成。Doug Lea 把"大量代码 → 模板方法 → 子类填空"的设计模式做到了极致。
# 6.4 Semaphore 公平/非公平双实现
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) { setState(permits); }
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining; // 负数表示失败
}
}
}
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) return -1; // ★ 公平:前面有人就不抢
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
公平 vs 非公平的差异就一行:是否调用 hasQueuedPredecessors() 检查队列前面是否有人。公平减少饥饿,非公平提升吞吐——37 篇 ReentrantLock 还会再细讲选型。
# 7. Condition 条件队列
# 7.1 双队列结构图
Condition 是 ReentrantLock 配套的"等待/通知"机制,对标 Object.wait/notify,但更强大——支持多个独立条件队列:
ReentrantLock lock = new ReentrantLock();
Condition notFull = lock.newCondition(); // 队列不满
Condition notEmpty = lock.newCondition(); // 队列不空
2
3
内部结构:
┌─────── 同步队列 (CLH) ───────┐
│ │
head → A → B → C → tail
↑
生产者拿到锁后 await
│
↓ 节点搬到 ↓
┌──── notFull 条件队列 ────┐
│ │
firstWaiter → D → E → lastWaiter
┌──── notEmpty 条件队列 ───┐
│ │
firstWaiter → F → G → lastWaiter
2
3
4
5
6
7
8
9
10
11
12
13
14
核心思想:等待某个条件的线程从"同步队列"挪到"条件队列",被 signal 时再挪回"同步队列"——两个队列复用同一个 Node,仅通过 nextWaiter 字段区分(§4.2)。
# 7.2 await 的完整迁移
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Node node = addConditionWaiter(); // ① 入条件队列
int savedState = fullyRelease(node); // ② 完全释放锁(含重入次数!)
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // ③ park 直到被搬回同步队列
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // ④ 重新拿锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) unlinkCancelledWaiters();
if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
最关键的两点:
fullyRelease——一次性释放所有重入次数(不是 -1,而是把 state 置 0),保存到savedState,唤醒后再恢复。这是 Object.wait 做不到的——synchronized 的重入释放没有 API 可触达。- 被 signal 后必须重新走 acquireQueued——signal 只是把节点搬队,不送锁,被唤醒的线程仍要竞争锁(这与下文 §7.4 对比 notify 是关键)。
# 7.3 signal 的节点搬运
public final void signal() {
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) doSignal(first);
}
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false;
Node p = enq(node); // ★ 搬到同步队列尾部
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 前驱已 CANCELLED 时直接唤醒
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
signal 不唤醒线程,只是搬运节点——线程仍然在 park,要等真正的"前驱释放锁"才能被 unpark。这种"搬运 + 排队"的设计避免了"惊群"——signal 一次只搬一个节点,挪到队尾乖乖排队。
# 7.4 与 Object.wait/notify 的对比
| 维度 | Object.wait/notify | Condition.await/signal |
|---|---|---|
| 配套锁 | synchronized | ReentrantLock |
| 条件队列数量 | 1 个(每个 monitor 一个) | N 个(一把锁可以创多个 Condition) |
| 等待时机区分 | 都用同一队列,notifyAll 全炸 | 不同条件用不同队列,精准唤醒 |
| 重入释放 | 整体释放(无 API 控制) | fullyRelease 显式管理重入次数 |
| 中断/超时 | 弱 | 强(awaitNanos / awaitUntil) |
经典生产者消费者——用 Condition 写比 synchronized + wait/notifyAll 高效得多:
final ReentrantLock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) notFull.await(); // ★ 队满时等"不满"
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // ★ 精准唤醒消费者
} finally { lock.unlock(); }
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) notEmpty.await(); // ★ 队空时等"不空"
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // ★ 精准唤醒生产者
return x;
} finally { lock.unlock(); }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
对比 notifyAll 把生产者消费者一次性全唤醒后大家再争锁的低效模式——Condition 的多队列让等不同条件的线程互不打扰,这是 ArrayBlockingQueue 的实现原型。
# 8. AQS 之上的同步器全家福
# 8.1 ReentrantLock 公平/非公平
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract boolean initialTryLock();
final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free) setExclusiveOwnerThread(null);
setState(c);
return free;
}
}
static final class NonfairSync extends Sync {
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // ★ 直接抢
setExclusiveOwnerThread(current);
return true;
}
// ... 重入处理
return false;
}
}
static final class FairSync extends Sync {
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() && compareAndSetState(0, 1)) { // ★ 看队列再抢
setExclusiveOwnerThread(current);
return true;
}
}
// ... 重入处理
return false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
就这点差异——非公平多了一次"先插队抢一下"的机会,吞吐高但可能饥饿;公平严格 FIFO,吞吐低但无饥饿。生产 95% 用非公平(即默认 new ReentrantLock()),仅在严格防饥饿场景用公平。37 篇会展开三剑客全家桶选型。
# 8.2 ReentrantReadWriteLock 高低位拆分
§3.2 已揭过——state 一个 int 同时表达读锁数 + 写锁数:
高 16 位 (sharedCount) 低 16 位 (exclusiveCount)
┌────────────────────┐ ┌────────────────────┐
│ 读锁线程数 (≤ 65535)│ │ 写锁重入次数 (≤ 65535)│
└────────────────────┘ └────────────────────┘
2
3
4
升降级规则:
- ✅ 写锁 → 读锁(降级):合法,避免数据不一致
- ❌ 读锁 → 写锁(升级):会死锁,禁止
为什么升级会死锁:A 持读锁要升级为写锁,必须等所有读锁释放——但 B 也在读且也想升级,两人就互相死等。降级是单向的,所以安全。这是 JDK 8 后 StampedLock 改用乐观读的根本原因之一——37 篇展开。
# 8.3 CountDownLatch / CyclicBarrier / Phaser
| 同步器 | 是否基于 AQS | 用途 |
|---|---|---|
CountDownLatch | ✅ 是(共享模式) | 一次性计数门,到 0 全开 |
CyclicBarrier | ❌ 否(用 ReentrantLock + Condition) | 可重置栅栏,所有线程汇合后一起放行 |
Phaser | ❌ 否(自己一套队列) | 多阶段栅栏,支持动态注册/注销 |
注意 CyclicBarrier 不是直接基于 AQS——但它内部用了 ReentrantLock + Condition,间接吃 AQS 红利。Phaser 是 Doug Lea 的"野路子"——不用 AQS,自己写了一套树形队列以支持大规模分层栅栏。
# 8.4 ThreadPoolExecutor.Worker 也是 AQS
ThreadPoolExecutor 的 Worker 类继承 AQS——目的是给 Worker 提供"是否在运行任务中"的轻量锁:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// state = -1 不可中断(runWorker 前)
// state = 0 未锁
// state = 1 已锁(运行中)
protected boolean isHeldExclusively() { return getState() != 0; }
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
为什么用 AQS 而不用 ReentrantLock?因为 Worker 的锁不可重入(重入了就出问题),用 AQS 写一个"非重入锁"只要 5 行——这就是模板方法的威力。这给我们的启示:当业务里需要"自定义同步语义"时,继承 AQS 永远是首选——比你自己用 CAS 拼凑一万倍可靠。39 篇会再深入展开。
# 8.5 JDK 9+ AQS 的演进
- JDK 9:内部从
Unsafe.compareAndSwap改用VarHandle(卷四 31 篇) - JDK 14:
LockSupport.parkNanos在虚拟线程上采用 yield 而非 OS park(为 Loom 铺路) - JDK 21:虚拟线程 acquire AQS 时不固定 carrier 线程,是虚拟线程的友好同步器(41 篇会展开 Pinning 问题——synchronized 会 Pin,AQS 不会)
这就是为什么 Loom 时代要把 synchronized 改成 ReentrantLock——AQS 早早就给虚拟线程留好了门。
# 9. 综合回扣与设计哲学
# 9.1 案例真相揭晓
① §1.1 自己写锁失败的真相:手写锁缺少**"竞争失败的线程要往哪放"这个关键设计——AQS 用 CLH 变体队列 + park/unpark 完美解决,配上模板方法把通用机制封装一次,子类只填语义。根治:永远不要手写锁,AQS 已经帮你写好了 1500 行;要自定义语义就继承 AQS**,重写两个钩子方法。
② §1.2 ReentrantLock 反超 synchronized 的真相:synchronized 重量级锁要走 OS monitor(一次进入 ≈ 600ns),AQS 全在用户态用 CAS + park 实现(一次入队 ≈ 100ns),高竞争下差距 4-5 倍。但低竞争场景(无锁/偏向锁阶段)synchronized 反而更快——这是 08 篇说过的"锁升级"红利。结论:高竞争用 ReentrantLock,低竞争 synchronized 也够。
③ 5 大追问全部作答:
| 追问 | 答案 | 章节 |
|---|---|---|
| ① 为什么 int + 队列 = 几十种同步器 | state 字段编码灵活 + 模板方法只需重写 try* | §3.2、§5.1 |
| ② CLH 变体在哪 | 用 park 替自旋 + 显式双向链表 + waitStatus 五态 | §4.1 |
| ③ 30 行代码用到所有同步器 | acquire 模板方法 + tryAcquire 钩子 | §5.1、§6.3 |
| ④ Condition 凭啥强 | 双队列 + N 条件队列 + fullyRelease 重入控制 | §7.1、§7.4 |
| ⑤ 为什么"懂 AQS = 懂半个 JUC" | 全 JUC 同步类全部继承或依赖 AQS | §8 |
# 9.2 AQS 三大设计哲学
1. "模板方法 + 钩子方法"是把"通用流程"和"业务语义"解耦的最佳模式:AQS 把"入队 - 阻塞 - 唤醒 - 传播"这套通用流程写死在 acquire/release 里,把"什么叫拿到""什么叫释放完"留给子类的 tryAcquire/tryRelease 钩子。同样的思路在 Spring 里随处可见——AbstractApplicationContext.refresh() 是模板方法,onRefresh 是钩子;卷四 34 篇 AOP 的 advisor 链条也是同款思路。这给我们的启示:当一个领域需要支持"多种实现 + 共享流程"时,模板方法永远比"复制粘贴 + 改三行"的工程实践更可靠。
2. "用户态阻塞 + 状态机推动"是绕过内核态切换的核心招术:AQS 完全在 Java 层用 CAS + park 实现互斥,把"线程调度"留给 OS——但 OS 看到的只是"一堆 park 的线程",不知道它们在等什么。这种**"决策在用户态,挂起在内核态"的二元设计是 35 年来 JUC 性能爆杀 synchronized 的根本原因。35 篇说"线程是昂贵的物理对象",AQS 给出的答案是让昂贵的东西睡得久一点、醒得准一点、状态切换少一点**——park/unpark 就是为这个目标量身定做。
3. "状态位 + 接力唤醒"是无锁并发的精髓:AQS 用 5 个 waitStatus 取值 + state 一个 int + Node 双向链表,把"几百个线程的等待状态"压缩成最小数据结构。最绝的是 PROPAGATE 修复"唤醒丢失"的故事——Doug Lea 用 1 个状态位补救了一个隐蔽的并发 bug。这给我们的启示:并发设计中,最贵的不是计算,而是状态同步——能用一个 volatile int + CAS 表达的,绝不要用 synchronized 包一层;能用"接力"完成的,绝不要用"广播"。
# 9.3 知识地图与速查表
AQS 总图
│
┌────────────────┼────────────────┐
│ │ │
state CLH 队列 模板方法
(一个 int) (双向链表) (acquire/release)
│ │ │
↓ ↓ ↓
子类语义 waitStatus 子类填空
─ 重入次数 ─ SIGNAL=-1 ─ tryAcquire
─ 许可数 ─ CANCELLED=1 ─ tryRelease
─ 计数 ─ CONDITION=-2 ─ tryAcquireShared
─ 高低位拆分 ─ PROPAGATE=-3 ─ tryReleaseShared
─ isHeldExclusively
│ │ │
└────────────────┼────────────────┘
↓
┌────────────────┐
│ 独占 / 共享 │
└────────────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
ReentrantLock Semaphore ReentrantRWLock
FutureTask CountDownLatch (高低位编码)
Worker ─ StampedLock(37 篇)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
核心 API 速查表:
模板方法(不要重写) 钩子方法(子类重写)
acquire(arg) tryAcquire(arg) → 独占获取
release(arg) tryRelease(arg) → 独占释放
acquireShared(arg) tryAcquireShared(arg) → 共享获取
releaseShared(arg) tryReleaseShared(arg) → 共享释放
acquireInterruptibly(arg) isHeldExclusively() → 是否独占
tryAcquireNanos(arg, t)
2
3
4
5
6
7
waitStatus 速查:
0 初始默认
SIGNAL=-1 "我后继需要被唤醒" ← 入队后必修改前驱为此态才能放心 park
CANCELLED= 1 "我取消了" ← 超时/中断时打上,会被实时清理
CONDITION=-2 "我在条件队列上" ← Condition 队列专用
PROPAGATE=-3 "唤醒应向后传播" ← 共享模式修补"唤醒丢失" bug
2
3
4
5
state 编码模式表:
单状态 ReentrantLock state = 重入次数
单状态 Semaphore state = 剩余许可
单状态 CountDownLatch state = 剩余计数
双状态 ReentrantRWLock state = (sharedCount << 16) | exclusiveCount
非重入锁 Worker state = 0/1
2
3
4
5
🎯 下一篇预告:第 37 篇《ReentrantLock & ReadWriteLock & StampedLock 三剑客》——本篇讲完 AQS 总框架,下一篇切入三大主流 Lock 的工程化对比:ReentrantLock 的公平/非公平选型、ReadWriteLock 的读写降级与升级死锁、StampedLock 的乐观读为什么是读多场景的性能炸弹、三者在 Loom 虚拟线程时代的不同表现,把 Lock API 全家桶一次打透。