多线程锁选型
# 第30章:多线程锁选型
# 目录介绍
- 1. 案例引入:两条主线
- 2. 多线程同步的九种翻车
- 3. 锁机制总图
- 4. 五大锁机制详解
- 5. RAII 守护与 lock 系列
- 6. 死锁三步法
- 7. 性能与可观测
- 8. 无锁与原子模式
- 9. 五步选型方法论
- 10. 典型场景速查
- 11. 工程化最佳实践
- 12. 综合案例串讲
- 13. 章节小结
# 1. 案例引入:两条主线
讲多线程锁,最忌一上来就摆 API。std::mutex、std::shared_mutex、std::atomic、std::recursive_mutex、std::scoped_lock——把名字背下来不难,难的是真出事故时还记得选哪一个。
所以本章先开两条主线:一条是真实生产事故,一条是22 行 MCVE。两条主线会贯穿全章,每一节都会回到它们,看看新学的知识能不能解释或者修好它们。
# 1.1 主线一:QPS 崩盘的搜索服务
某搜索服务里有一份"商品标签索引",是 std::unordered_map<uint64_t, TagList>,QPS 大约 8 万,读写比例 99:1。早期实现是一把全局 std::mutex:
class TagIndex {
public:
TagList Get(uint64_t id) const {
std::lock_guard<std::mutex> g(mu_); // ① 读也要互斥
auto it = data_.find(id);
return it == data_.end() ? TagList{} : it->second;
}
void Update(uint64_t id, TagList tl) {
std::lock_guard<std::mutex> g(mu_);
data_[id] = std::move(tl);
}
private:
mutable std::mutex mu_;
std::unordered_map<uint64_t, TagList> data_;
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
业务接入半年都没事。直到某天晚上推了一次"标签批量回灌",瞬间触发:
[ALERT] tag-svc p99 latency: 12ms -> 980ms
[ALERT] tag-svc qps: 80k -> 9k
[ALERT] thread pool: 256/256 BUSY
2
3
监控曲线看上去像 CPU 突然全堵在锁上。第一反应是"锁颗粒太粗",但同事 A 提出一种猜想:是不是顺手把这把锁换成 std::shared_mutex 就能解决?
故事先停在这里。请记住三个数字:QPS 8 万、读写 99:1、batch 写入。这三个数字决定了正确答案——但绝不是简单地"换 shared_mutex"。后面会用 5、7、9、10 章的工具一步步把它拆开。
┌──────────────────────────────────────────────────────────────┐
│ p99 12ms ───────────┐ │
│ │ (batch 回灌开始) │
│ ▼ │
│ p99 ████████████████████████████ 980ms │
│ qps 80k ────────────┐ │
│ │ │
│ ▼ │
│ qps ███ 9k │
└──────────────────────────────────────────────────────────────┘
监控曲线告诉我们:临界区严重排队
2
3
4
5
6
7
8
9
10
11
# 1.2 主线二:22 行 MCVE 复现死锁
第二条主线短得多。某离线工具,做"账户互转金额":
// account.cc,22 行可复现 (g++ -std=c++17 -O0 -pthread)
#include <mutex>
#include <thread>
#include <iostream>
struct Account {
std::mutex mu;
long balance = 1000;
};
void Transfer(Account& a, Account& b, long n) {
std::lock_guard<std::mutex> ga(a.mu); // ① 锁 A
std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::lock_guard<std::mutex> gb(b.mu); // ② 锁 B
a.balance -= n; b.balance += n;
}
int main() {
Account x, y;
std::thread t1([&]{ Transfer(x, y, 100); });
std::thread t2([&]{ Transfer(y, x, 100); });
t1.join(); t2.join(); // ③ 永远 join 不回来
std::cout << x.balance << "," << y.balance << "\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
跑起来程序卡死,pstack 一看两个线程都停在第二个 lock_guard 构造里。这就是教科书上的"AB-BA 死锁",但它有意思的地方在于:单线程跑、双线程跑都不一定复现——只有当 t1 拿到 A、t2 拿到 B 之后两个都去抢对方时才出事。sleep_for(1ms) 只是把这个窗口放大到必出。
t1: lock(x.mu) ── 等待 ──▶ lock(y.mu)
▲
│ 已被 t2 持有
│
t2: lock(y.mu) ── 等待 ──▶ lock(x.mu)
▲
│ 已被 t1 持有
│
╰────── 互相等待,谁都过不去 ───────╯
2
3
4
5
6
7
8
9
这 22 行将贯穿第 6 章。我们会用 std::scoped_lock、std::lock 加 adopt_lock、全局锁顺序三种方式各修一遍,并且解释它们各自适合什么场景。
# 1.3 顺藤摸到根因
两条主线表面上完全不同——一条是性能、一条是死锁,但根因都是"锁选型 + 锁使用模式"出了问题:
- 主线一:选了"互斥锁"做"读多写少"的工作。读和读本来可以并发,却被序列化了。
- 主线二:选了"两把独立的
mutex+ 手写顺序"做"原子地操作两块数据"。手写顺序在并发下崩了。
把它们抽象成一句话:锁不是"加一把就万事大吉",而是要回答四个问题——保护的是什么数据?谁会访问?读写比例?冲突概率?
┌────────────────────────────────────────────┐
│ 四问:保护什么 / 谁访问 / │
│ 读写比 / 冲突率 │
│ │
│ ① 保护什么 → 决定锁的作用域 │
│ ② 谁访问 → 决定锁的可见性 │
│ ③ 读写比 → 决定 mutex/shared │
│ ④ 冲突率与临界区 → 决定 lock/atomic │
└────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
第 9 章会把这四问扩展成"五步选型方法论"。
# 1.4 本篇要回答什么
围绕两条主线,我们会回答:
- C++ 标准库一共提供了哪些锁?它们的内部实现差别在哪?(第 3、4 章)
- RAII 包装为什么必要?
lock_guard / unique_lock / shared_lock / scoped_lock该怎么选?(第 5 章) - 死锁是怎么发生的?除了"加同样的顺序"还有什么工程手段?(第 6 章)
- 锁的性能开销究竟有多大?读写锁是不是一定更快?(第 7 章)
- 什么时候可以用
atomic完全不上锁?(第 8 章) - 当不知道选什么的时候,按什么顺序思考?(第 9 章)
最终在第 12 章回到主线一和主线二,给出完整的修复。
# 2. 多线程同步的九种翻车
在讲"该用什么"之前,先看"会怎么错"。下面九种翻车按出现频率从高到低,每一种都会在后续章节里被某个工具/方法精准命中。
# 2.1 不加锁的数据竞争
最朴素的错:以为"int 是原子写"。
int counter = 0;
void worker() { for (int i = 0; i < 1'000'000; ++i) ++counter; }
// 两个线程跑完,counter 不是 2_000_000
2
3
++counter 在汇编里至少是 read-modify-write 三步,CPU 缓存还会再带来重排和不可见。任何被多线程同时写、或一写多读的对象,必须有同步原语——不管它是 int 还是 std::map。
TSan 一跑就报:
WARNING: ThreadSanitizer: data race
Write of size 4 at 0x... by thread T2
Previous read of size 4 at 0x... by thread T1
2
3
修法:换成 std::atomic<int>,或者用 std::mutex 守护。具体哪种参考第 9 章。
# 2.2 lock/unlock 异常不安全
也就是开头那段代码:
std::mutex m;
void f() {
m.lock();
do_work(); // ← 抛异常就跑了
m.unlock(); // 永远到不了
}
2
3
4
5
6
只要 do_work() 路径上任何一行可能抛——包括 new、vector::push_back、std::stoi、第三方库的 throw——这把锁就会永远持有,后面所有访问者都死等。
修法:永远用 RAII 包装。第 5 章详谈。
# 2.3 同一线程二次加锁
std::mutex mu;
void A() { std::lock_guard g(mu); B(); }
void B() { std::lock_guard g(mu); /*...*/ } // 同线程又来一次
2
3
std::mutex 不可重入:同一线程第二次 lock() 行为是未定义(多数实现是死锁自己)。这种 bug 最阴险,因为单线程跑没事,并发跑也不一定出事——只有当 A → B 的调用链被某个回调触发时才暴露。
修法(按推荐顺序):
- 重新设计:让
B不持锁版本和持锁版本分离。 - 不得已:换
std::recursive_mutex(4.3 节会论证为什么"不得已")。
# 2.4 多锁顺序错乱致死锁
就是主线二。两个或更多锁要同时持有时,任何不一致的顺序都可能死锁。
t1: lock(A) → lock(B)
t2: lock(B) → lock(A)
2
修法(第 6 章详谈):
- C++17:
std::scoped_lock lk(a, b)一次原子锁多个。 - C++11:
std::lock(a, b)加adopt_lock。 - 工程:全局锁顺序约定(如按地址排序)。
# 2.5 读多写少误用互斥锁
主线一的根因之一。99:1 的读写比例下,互斥锁让所有读串行,相当于一根独木桥。
原本:8 万 QPS,读并发
误用 mutex 后:8 万次顺序排队过桥
2
修法:用 std::shared_mutex,读取用 std::shared_lock,写入用 std::unique_lock(4.2 节)。但注意:临界区极短时反而更慢,第 7 章会量化。
# 2.6 写者饥饿与读者饥饿
切到 shared_mutex 不是免费午餐:
- 读者太多、太频繁,写者抢不到锁,新数据进不来 → 写者饥饿。
- 反过来某些实现写优先,读者排长队 → 读者饥饿。
不同实现策略不一样,第 4.2 节会讲。
# 2.7 锁粒度过粗成性能瓶颈
std::mutex global_;
std::unordered_map<Key, Val> map_;
// 所有 key 共用一把锁
2
3
如果业务上多数访问命中不同 key,可以分片:
struct Shard { std::mutex mu; std::unordered_map<Key, Val> data; };
std::array<Shard, 64> shards_;
auto& s = shards_[hash(k) & 63];
std::lock_guard g(s.mu);
2
3
4
64 分片可以把冲突率几乎线性降低。Java 早期 ConcurrentHashMap 就是这个套路。
# 2.8 持锁期间做 IO 与回调
std::lock_guard g(mu);
auto rsp = http_client.Get(url); // ⚠ 持锁做网络 IO
callback(snapshot); // ⚠ 持锁回调用户代码
2
3
两个问题:
- 网络 IO 几十毫秒,临界区被无谓拉长。
- 用户回调可能再次访问同一个对象,导致 2.3 重入或 2.4 死锁。
修法:临界区只做最小的内存操作,把要发布的数据复制一份出来,离开临界区再调用回调(11.3 节)。
# 2.9 atomic 误当锁使
std::atomic<bool> ready_{false};
void publish() {
big_data_ = LoadFromFile(); // ① 非原子写
ready_.store(true); // ② 原子写
}
// 另一个线程
if (ready_.load()) use(big_data_); // ③ 不保证看到完整数据
2
3
4
5
6
7
光把 flag 换成 atomic 不能解决"复合对象"的发布问题,需要正确的 memory_order(8.1 节)或者干脆 atomic_shared_ptr(8.3 节)。
九种翻车里,(2.2, 2.4, 2.5) 是主线案例直接踩中的。后面章节我们逐一解决,先讲原理,再讲工具,再讲方法。
# 3. 锁机制总图
锁不是凭空出现的语言特性,而是从硬件原子指令一路封装上来的。这一章先把整个家谱铺开,让你知道 std::mutex 实际上"长在哪一层"。
# 3.1 从原子到互斥:硬件视角
CPU 提供的最底层原子原语是 CAS(Compare-And-Swap) 或 LL/SC(Load-Linked / Store-Conditional)。x86 的 lock cmpxchg、ARM 的 ldxr/stxr 就是它们。
最简单的"上锁"思路:
1. 用 CAS 把锁字从 0 改成 1,成功了我就是持有者。
2. 失败了说明别人持有,怎么办?
a. 一直 CAS 直到成功 → 自旋锁(spinlock)
b. 告诉内核"挂起我,等锁释放再叫醒" → 重量级锁
2
3
4
Linux 用户态的 std::mutex 实际上是 futex(fast userspace mutex):无竞争时纯用户态 CAS,零系统调用;有竞争才陷内核挂起。这就是为什么"不竞争的锁几乎免费"。
┌────────────────────────────────────────────────┐
│ 无竞争:纯用户态 CAS,约 20ns │
│ ───────────────────────────────────── │
│ 有竞争:陷内核 + 切上下文,约 1-10μs │
└────────────────────────────────────────────────┘
差距 50-500 倍
2
3
4
5
6
# 3.2 互斥与同步原语家谱
原子操作(CAS / LL-SC)
│
┌─────────────────┼──────────────────────┐
▼ ▼ ▼
自旋锁 futex / WaitOnAddress 原子量
spinlock (内核辅助挂起) std::atomic<T>
│ │ │
│ ┌─────────┼──────────┐ │
│ ▼ ▼ ▼ │
│ std::mutex shared_mutex recursive_ │
│ mutex │
│ │ │ │ │
│ └─────────┴──────────┘ │
│ RAII 包装: │
│ lock_guard / unique_lock / │
│ shared_lock / scoped_lock │
▼ ▼
pthread_spin atomic_flag / fence
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
记住这张图比记任何 API 都重要:所有锁本质上是 CAS 加挂起机制的不同封装,所有 RAII 包装是为了让"忘了 unlock"成为不可能。
# 3.3 用户态锁与内核态锁
| 类型 | 典型例子 | 无竞争开销 | 有竞争开销 | 适用 |
|---|---|---|---|---|
| 纯用户态 | atomic、自旋锁 | ~5-20ns | 烧 CPU | 临界区极短(<100ns) |
| futex 混合 | std::mutex、pthread_mutex | ~20-30ns | ~1-10μs | 默认选择 |
| 内核重量级 | Windows CRITICAL_SECTION 跨进程模式 | μs 级 | μs 级 | 跨进程同步 |
90% 场景,用 std::mutex 都对。需要再细的话,先把它解决了再说。
# 3.4 一把锁的生命周期
以 std::mutex 为例:
构造 ──▶ lock() ──▶ unlock() ──▶ ... ──▶ 析构
│ ▲
│ │
▼ │
有竞争?──Y──▶ 挂起等待 ─┘
│
N
▼
CAS 成功,进入临界区
2
3
4
5
6
7
8
9
注意:析构时不能仍被持有,否则未定义行为。这条规则就是 lock_guard 永远把锁绑定到栈对象上的根本原因——栈对象析构时一定会先 unlock()。
# 3.5 加锁流程全景图
把第 4-6 章要讲的内容放在一张图上预览:
┌──────────────────────────────────────────────────────────────┐
│ 业务代码进入临界区 │
└──────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
共享数据多写? 多个锁? 单变量计数?
│ │ │
▼ ▼ ▼
std::mutex std::scoped_lock std::atomic
│ │ │
▼ ▼ ▼
RAII 包装: 原子地锁所有 无锁 (8 章)
lock_guard std::lock 后
unique_lock adopt_lock
│
▼
┌─────────────────────────────────────┐
│ 读>>写?换 shared_mutex (4.2) │
│ 同线程会再进入?谨慎 recursive(4.3)│
│ 临界区极短?考虑 atomic (8) │
└─────────────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
下一章正式逐个拆解这五大原语。
# 4. 五大锁机制详解
C++ 标准库提供的同步原语并不多,本节挑五个真正会经常用的逐个剖析:API、内部实现、典型用法、性能数量级、踩坑点。
# 4.1 std::mutex 互斥锁
定义:std::mutex 是不可重入互斥锁,同一时刻最多一个线程持有。
API:
std::mutex mu;
mu.lock(); // 阻塞直到拿到锁
mu.try_lock(); // 立刻返回 true / false
mu.unlock(); // 释放
2
3
4
实现略览(GCC libstdc++ on Linux):底层包了 pthread_mutex_t,进一步是 futex。状态字三种:
0 = 未持有
1 = 已持有,无人等待
2 = 已持有,有线程在排队(contended)
2
3
无竞争时只做一次 CAS(0→1),开销约 20ns;进入 contended 状态后 unlock 需要做一次 futex_wake 系统调用,开销约 1μs。
典型用法:
class Counter {
public:
void Add(int x) {
std::lock_guard<std::mutex> g(mu_);
n_ += x;
}
int Snapshot() const {
std::lock_guard<std::mutex> g(mu_);
return n_;
}
private:
mutable std::mutex mu_;
int n_ = 0;
};
2
3
4
5
6
7
8
9
10
11
12
13
14
核心要点:
- 永远用 RAII 包装,不要直接
lock/unlock(2.2 节)。 - 不可重入:同一线程二次
lock()行为 UB(2.3 节)。 - 不能被
move/copy:std::mutex既不可拷贝也不可移动,所以含有mutex的类自己也丢了拷贝/移动语义(往往得手写)。 Snapshot()也加锁:哪怕是读,只要其它线程可能写,就得加锁。
性能数量级(近代 x86,单 socket):
| 操作 | 数量级 |
|---|---|
| 无竞争 lock+unlock | ~20-30ns |
| 中等竞争(2-4 线程) | 100-500ns |
| 高度竞争(>16 线程) | μs 级,伴随 cache 抖动 |
别名:std::timed_mutex 多了 try_lock_for / try_lock_until,可以带超时;其它一致。需要"等不到就放弃"的场景才用。
# 4.2 std::shared_mutex 读写锁
定义(C++17):允许多个线程同时持有"共享(读)锁",但写锁是排他的。
API:
std::shared_mutex rw;
rw.lock(); // 写锁
rw.unlock();
rw.lock_shared(); // 读锁
rw.unlock_shared();
2
3
4
5
配合 RAII:
{ std::shared_lock<std::shared_mutex> g(rw); /* 读 */ }
{ std::unique_lock<std::shared_mutex> g(rw); /* 写 */ }
2
内部状态(典型实现):用一个原子计数器记录读者数量,加一个写者标志位:
state = [writer_flag : 1bit][reader_count : 31bit]
读锁:CAS 把 reader_count + 1(前提是 writer_flag = 0 且没有 pending writer)。 写锁:CAS 把 writer_flag = 1(前提是 reader_count = 0)。
何时显著比 mutex 快:
读临界区"足够长",且读>>写。
"足够长"通常意味着 > 1μs 的工作量。否则读锁本身的 CAS + 内存同步开销就吃掉了"读并发"省的钱。下面是一组经验数字(24 核机器,纯计数测试):
临界区耗时 mutex 吞吐 shared_mutex 吞吐
50ns ~30M ops/s ~12M ops/s ← shared 更慢
500ns ~12M ops/s ~28M ops/s ← 持平/略快
5μs ~ 1M ops/s ~10M ops/s ← 10x 优势
2
3
4
这是为什么主线一不能盲目换 shared_mutex:那个 unordered_map::find 临界区可能就 100-300ns,换了之后反而更慢。正确修法是分片 + mutex(11.1、12.1 节)。
写者饥饿问题:默认 std::shared_mutex 没承诺公平性。读者持续不断时写者可能长时间拿不到锁。如果业务需要"写优先",要么用 pthread_rwlockattr_setkind_np 设置 WRITER_NONRECURSIVE_PREFER(Linux),要么自己写一把带"pending writer 信号"的版本。
典型用法:配置热更新(读巨多写极少):
class Config {
public:
std::shared_ptr<const Snapshot> Read() const {
std::shared_lock g(rw_);
return snap_;
}
void Reload(std::shared_ptr<const Snapshot> s) {
std::unique_lock g(rw_);
snap_ = std::move(s);
}
private:
mutable std::shared_mutex rw_;
std::shared_ptr<const Snapshot> snap_;
};
2
3
4
5
6
7
8
9
10
11
12
13
14
注:这个场景更优解其实是
atomic_shared_ptr(8.3 节),完全无锁。
# 4.3 std::recursive_mutex 递归锁
定义:同一线程可以多次 lock,但必须对称地 unlock 同样次数才真正释放。
API:同 mutex。
何时用:当一个对象的多个 public 方法互相调用且都要加锁时:
class Tree {
std::recursive_mutex mu_;
public:
void Insert(int x) {
std::lock_guard g(mu_);
if (Need_Rebalance()) Rebalance();
}
void Rebalance() {
std::lock_guard g(mu_); // 同线程再次进入,合法
/* ... */
}
};
2
3
4
5
6
7
8
9
10
11
12
如果用 std::mutex,上面的 Insert → Rebalance 链就会自死锁(2.3)。
为什么"通常是设计问题":
- 性能损耗:递归锁要额外维护
owner_tid + count,加锁/解锁开销大约比mutex多 30-50%。 - 掩盖坏抽象:往往说明"对外加锁的 public 函数"和"对内不加锁的 private 函数"职责没分开。
- TSan 噪音:递归锁会让 TSan 误判幸存的竞争。
更好的做法——把不加锁的内部实现拆出来:
class Tree {
std::mutex mu_;
void RebalanceLocked() { /* assume mu_ already held */ }
public:
void Insert(int x) {
std::lock_guard g(mu_);
if (Need_Rebalance()) RebalanceLocked();
}
void Rebalance() {
std::lock_guard g(mu_);
RebalanceLocked();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
约定:*Locked 后缀的函数表示"调用方必须持有 mu_"。这是 Google C++ Style 推荐的写法。
# 4.4 std::atomic 原子量
定义:把一个对象的"读 / 写 / 读改写"操作打包成原子的,无需用户加锁。
API:
std::atomic<int> n{0};
n.store(1); // 原子写
int x = n.load(); // 原子读
n.fetch_add(1); // 原子加
bool ok = n.compare_exchange_strong(expected, desired); // CAS
2
3
4
5
适用对象:内置整型、指针、bool,以及 is_trivially_copyable_v<T> 的小对象(不超过两个机器字时常用 CAS 实现,否则可能 fall back 到带锁实现)。
static_assert(std::atomic<int>::is_always_lock_free); // 编译期检查
适合的场景:
- 计数器:
std::atomic<size_t> qps_counter_; - 状态标志:
std::atomic<bool> stop_; - 指针发布:
std::atomic<Node*> head_; - 单调时戳:
std::atomic<uint64_t> seq_;
不适合的场景:
- 多个相关变量要"原子地一起更新"——还得回到锁。
- 复合容器(map / vector)——除非用 RCU /
atomic_shared_ptr。
memory_order 留到 8.1 节统一讲。默认 seq_cst 永远是对的(只是可能不是最优)。
// 计数器:典型用法
std::atomic<size_t> req_count_{0};
void on_req() { req_count_.fetch_add(1, std::memory_order_relaxed); }
2
3
relaxed 对纯计数足够,因为我们不关心可见顺序、只关心总数最终正确。
# 4.5 自旋锁与 spin/wait 之争
C++ 标准库直到 C++20 才有 std::atomic_flag::test_and_set + wait/notify,可以拼出自旋锁;标准里没有正式的 std::spinlock。但工程上经常自己手写:
class SpinLock {
std::atomic_flag flag_ = ATOMIC_FLAG_INIT;
public:
void lock() {
while (flag_.test_and_set(std::memory_order_acquire)) {
#if defined(__x86_64__)
__builtin_ia32_pause(); // PAUSE 指令,降功耗减少流水线 stall
#endif
}
}
void unlock() { flag_.clear(std::memory_order_release); }
};
2
3
4
5
6
7
8
9
10
11
12
用 vs 不用:
| 维度 | 自旋锁 | std::mutex |
|---|---|---|
| 无竞争开销 | ~5ns | ~20ns |
| 竞争且临界区 < 100ns | 更快 | 慢(系统调用代价回不来) |
| 竞争且临界区 > 1μs | 烧 CPU | 远更优 |
| 跨用户/内核态 | 不支持 | 支持 |
| 优先级反转 | 严重 | 较弱 |
经验:普通应用层代码不要写自旋锁,让 std::mutex 在 futex 层面替你做"先小自旋再挂起"的混合策略。只有当你测量过临界区 < 100ns 且高度竞争,自旋才有意义。
# 4.6 mutex 与 shared_mutex 的分水岭
这是被问最多的问题之一。一句话答案:shared_mutex 只在"读临界区足够大、且读>>写"时有收益。下面给一个判断流程:
┌──────────────────────────────┐
│ 读写比例 < 10:1 ? │
└─────────────┬────────────────┘
│ 是
▼ 直接 mutex
│
│ 否(读 >>写)
▼
┌──────────────────────────────┐
│ 读临界区单次耗时 < 1μs ? │
└─────────────┬────────────────┘
│ 是
▼ 仍然 mutex(或考虑分片)
│
│ 否(读临界区大)
▼
┌──────────────────────────────┐
│ shared_mutex 真正能赚到 │
└──────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
如果你不知道读写比例和临界区大小——先 mutex 上去跑监控,看到瓶颈再说。这一点比"提前优化"重要得多。
# 5. RAII 守护与 lock 系列
锁本身只是把"互斥"这件事提供出来;让"忘了解锁 / 异常路径不解锁"成为不可能的,是RAII 守护。本章覆盖五个守护类。
# 5.1 lock_guard 最小守护
template<class M> class lock_guard {
public:
explicit lock_guard(M& m) : m_(m) { m_.lock(); }
~lock_guard() { m_.unlock(); }
lock_guard(const lock_guard&) = delete;
private:
M& m_;
};
2
3
4
5
6
7
8
特点:
- 构造时立刻 lock,析构时立刻 unlock。
- 不支持
unlock()/try_lock/ 转交所有权。 - 体积最小,性能最好。
用法:
void f() {
std::lock_guard<std::mutex> g(mu_);
// ... 临界区 ...
} // 出作用域自动 unlock
2
3
4
C++17 起可省略模板参数(CTAD):
std::lock_guard g(mu_);
适用:90% 场景,默认就用这个。
# 5.2 unique_lock 灵活守护
unique_lock 比 lock_guard 大也慢一点点(多个 bool 表示"是否持有"),换来的能力是:
std::unique_lock<std::mutex> g(mu_); // 默认 lock
g.unlock(); // 中途显式解锁
g.lock(); // 再锁回来
g.try_lock(); // 非阻塞尝试
auto g2 = std::move(g); // 所有权可转移
2
3
4
5
什么时候必须用 unique_lock:
- 和
std::condition_variable::wait()配合——wait内部要 unlock 再 lock,必须传unique_lock。 - 中途要释放锁让其它线程进来,做完非临界区事再锁回来。
- 把锁的所有权转移给另一个函数:
std::unique_lock<std::mutex> LockedView() {
std::unique_lock g(mu_);
return g; // 拷贝省略,调用方持有锁
}
2
3
4
如果用不到这些,就用 lock_guard。
# 5.3 shared_lock 共享守护
C++14 加入,专为 shared_mutex 服务的"读"守护:
std::shared_mutex rw_;
TagList Get(uint64_t id) const {
std::shared_lock<std::shared_mutex> g(rw_); // 多个线程可以同时进入
/* ... */
}
void Update(uint64_t id, TagList tl) {
std::unique_lock<std::shared_mutex> g(rw_); // 排他
/* ... */
}
2
3
4
5
6
7
8
9
API 跟 unique_lock 几乎一样,但调用的是 lock_shared / unlock_shared。
坑点:用 lock_guard<shared_mutex> 时默认走的是写锁(因为 lock_guard 永远调用 lock()),不是读锁。要读就必须显式用 shared_lock。
# 5.4 scoped_lock 多锁一锁
C++17 起,专为"同时持有多把锁"的场景:
std::mutex a, b, c;
{
std::scoped_lock g(a, b, c); // 内部用 std::lock 原子地锁三把
// ... 临界区 ...
} // 反序解锁
2
3
4
5
它内部等价于:
std::lock(a, b, c);
std::lock_guard<std::mutex> ga(a, std::adopt_lock);
std::lock_guard<std::mutex> gb(b, std::adopt_lock);
std::lock_guard<std::mutex> gc(c, std::adopt_lock);
2
3
4
但写法干净多了,这是主线二死锁的最佳修法。
注意:scoped_lock 接受零把锁、一把锁也合法(零把 = 空守护),这对模板编程很有用。
# 5.5 defer/adopt/try_to_lock 三策
unique_lock / lock_guard / scoped_lock 都支持一个第二个参数(tag),表示"我不要默认 lock 行为":
| Tag | 含义 | 典型用法 |
|---|---|---|
std::defer_lock | 先不上锁,之后再手动 lock | 想先做 try_lock |
std::adopt_lock | 锁已经被持有,托管它的解锁 | 和 std::lock(a,b) 配合 |
std::try_to_lock | 构造时调用 try_lock,可能没锁上 | 非阻塞流程 |
// 例:超时尝试
std::unique_lock g(mu_, std::try_to_lock);
if (!g.owns_lock()) { /* 没拿到 */ return; }
2
3
# 5.6 与异常机制的合奏
回到 2.2 的"lock/unlock 不安全"。改成 RAII:
void f() {
std::lock_guard g(mu_);
do_work(); // 抛了?没事,g 析构会 unlock
}
2
3
4
这就是 RAII 与异常的"合奏":栈展开时所有局部对象析构,g 自然解锁。这一点和第 29 章(异常安全 RAII)讲的完全一致——所有需要"配对释放"的资源都该走这条路。
反例:千万不要把锁作为类成员"持有":
struct Bad { std::lock_guard<std::mutex> g; // ❌ 类成员持锁,作用域不明 };1
2
3这会让锁的生命周期等于对象本身,等价于把锁"延长"到对象析构,几乎一定会出问题。
# 6. 死锁三步法
回到主线二。死锁不仅仅是"锁顺序不对",它有四个公认要素,缺一不可。
# 6.1 死锁四要素
- 互斥:资源被独占(锁本身就是这样)。
- 持有并等待:拿着 A 去等 B。
- 不可抢占:除非主动
unlock,别人抢不走。 - 环路等待:t1 等 t2,t2 等 t1(或更长的环)。
破坏任意一条就破坏了死锁。工程上最常用的是破坏 (2)、(4):
- 破坏 (2):一次性锁所有需要的锁(
std::lock/scoped_lock)。 - 破坏 (4):全局约定锁顺序,所有人按同一顺序拿锁。
# 6.2 std::lock 与 scoped_lock
直接修主线二:
void Transfer(Account& a, Account& b, long n) {
std::scoped_lock g(a.mu, b.mu); // C++17:一次性原子锁多个
a.balance -= n; b.balance += n;
}
2
3
4
或者 C++11 风格:
void Transfer(Account& a, Account& b, long n) {
std::lock(a.mu, b.mu);
std::lock_guard<std::mutex> ga(a.mu, std::adopt_lock);
std::lock_guard<std::mutex> gb(b.mu, std::adopt_lock);
a.balance -= n; b.balance += n;
}
2
3
4
5
6
std::lock 内部的算法是"先 try_lock 一遍,失败就退回去全部释放,再换顺序重来",保证整批锁要么全部拿到、要么一把也不持有。这就破坏了"持有并等待"。
# 6.3 全局锁顺序约定
如果你不能或不想用 scoped_lock(比如锁分布在不同对象、不同模块),可以约定全局顺序。最常用的是"按地址排序":
void Transfer(Account& a, Account& b, long n) {
auto* x = &a;
auto* y = &b;
if (x > y) std::swap(x, y); // 永远先锁地址小的
std::lock_guard<std::mutex> g1(x->mu);
std::lock_guard<std::mutex> g2(y->mu);
a.balance -= n; b.balance += n;
}
2
3
4
5
6
7
8
这破坏的是"环路等待"。在大型系统里更常用的是层级锁(lock leveling):每把锁绑一个"层级数字",规定线程当前持有的锁层级必须严格大于要新拿的锁层级。Google Abseil、Chromium 都有这种工具。
# 6.4 死锁不一定卡死
有一类隐蔽问题——死锁了但程序看起来没卡死:
- 部分卡死:只是某几个线程互等,其它线程在跑,监控只看到 QPS 缓慢下降。
- 超时短路:用了
try_lock_for+ 重试,宏观看像"偶发慢请求"。 - 跨模块:A 模块的锁等 B 模块的锁,分别看代码都对。
排查工具:
gdbattach 上去thread apply all bt,找两个线程都停在pthread_mutex_lock的栈。- Linux
pstack <pid>一行一行扫。 - TSan 的
--detect-deadlocks=1(实验特性)。 - Abseil
Mutex自带"潜在死锁报告"。
主线二走 scoped_lock 修完后,跑:
$ ./account
1000,1000
2
正常退出。
# 7. 性能与可观测
锁的"对错"靠 5、6 章保证;锁的"快慢"靠这一章。
# 7.1 上下文切换的代价
std::mutex 有竞争时会陷入内核挂起,发生上下文切换。一次切换的成本:
直接成本(保存/恢复寄存器、切栈) 1-3μs
间接成本(cache 冷、TLB 冷) 3-10μs(甚至更多)
2
也就是说:一次有竞争的 lock+unlock 抵得上几千个无竞争的。这是为什么"先减少冲突,再换无锁结构"几乎永远是更划算的优化方向。
# 7.2 锁粒度与分片
第一招永远是减小临界区:
// ❌ 大粒度
std::lock_guard g(mu_);
auto v = ParseJson(buf); // 计算放在临界区
map_[k] = v;
// ✅ 小粒度
auto v = ParseJson(buf); // 计算放外面
{ std::lock_guard g(mu_); map_[k] = v; }
2
3
4
5
6
7
8
第二招是分片(2.7 节):
struct ShardedMap {
static constexpr size_t kN = 64;
struct Shard { std::mutex mu; std::unordered_map<Key, Val> data; };
std::array<Shard, kN> shards;
size_t Bucket(const Key& k) const { return std::hash<Key>{}(k) % kN; }
Val Get(const Key& k) const {
auto& s = shards[Bucket(k)];
std::lock_guard g(s.mu);
auto it = s.data.find(k);
return it == s.data.end() ? Val{} : it->second;
}
void Put(const Key& k, Val v) {
auto& s = shards[Bucket(k)];
std::lock_guard g(s.mu);
s.data[k] = std::move(v);
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
64 分片大致把同 key 冲突概率降到 1/64。这是主线一更可能的正解。
# 7.3 伪共享与 cache line
struct BadCounter {
std::atomic<long> a;
std::atomic<long> b; // 与 a 同 64 字节 cache line
};
2
3
4
线程 1 频繁改 a、线程 2 频繁改 b:硬件上两个 atomic 在同一条 cache line,每次写都会让对方的副本失效,性能直接掉 5-10 倍。这就是伪共享(false sharing)。
修法:
struct GoodCounter {
alignas(64) std::atomic<long> a;
alignas(64) std::atomic<long> b;
};
2
3
4
或者 C++17:
struct GoodCounter {
alignas(std::hardware_destructive_interference_size) std::atomic<long> a;
alignas(std::hardware_destructive_interference_size) std::atomic<long> b;
};
2
3
4
别忘了:不仅 atomic 会被伪共享,普通字段 + 各自锁也会。比如分片表里两个 Shard 紧挨着,写 Shard[0] 会令 Shard[1] 的 cache 失效。修法同上,分片对齐到 cache line。
# 7.4 perf 与 lock_stat
Linux 上量化锁开销最直接的工具:
# 1. perf record 看锁热点
perf record -F 99 -g ./my_app
perf report
# 2. perf lock 直接统计 futex
perf lock record ./my_app
perf lock report
# -> 给出每把锁的总等待时间、最大等待、争用次数
2
3
4
5
6
7
8
如果是 pthread_mutex,glibc 还有:
# 编译时 -lpthread + 设置环境变量
PTHREAD_MUTEX_LOCK_STAT=1 ./my_app
2
判别准则:
- 锁等待时间 > 1% 的 CPU 时间 → 值得调优。
- 单把锁等待时间 > 总锁时间的 50% → 这把锁就是瓶颈,先动它。
# 7.5 TSan 揭示数据竞争
-fsanitize=thread(GCC/Clang)会插桩内存访问,运行期发现:
- 同一地址被两个线程访问且至少一次写,且没有 happens-before。
- 锁顺序冲突警告(不一定真死锁,但提示风险)。
$ g++ -fsanitize=thread -O1 -g main.cc && ./a.out
==================
WARNING: ThreadSanitizer: data race (pid=12345)
Write of size 4 at 0x7b... by thread T2:
#0 worker main.cc:7
Previous read of size 4 at 0x7b... by thread T1:
#0 worker main.cc:6
==================
2
3
4
5
6
7
8
成本:CPU 5-10x 慢,内存 5-10x 多。只在 CI、压测环境用。
主线案例如果在上线前在压测里跑过 TSan,会直接报出 2.2、2.5 那种问题。
# 7.6 编译期约束最便宜
最划算的工具反而是类型 + 注解——一行 bug 都不让进 master:
-Wthread-safety(Clang),配合GUARDED_BY(mu_)注解(11.5 节)。- 自己写的
LockedRef<T>类型——拿不到锁就拿不到指针。 clang-tidy的bugprone-not-null-terminated-result等规则。
这些手段在编译期生效,零运行期开销。任何能"在编译期就否决"的 bug 都比"运行期排查"便宜 1000 倍。
# 8. 无锁与原子模式
锁能解决正确性,但很多场景根本不需要锁。本章讲怎么用 atomic 和它周边的工具搭出无锁原语。
# 8.1 atomic 的六种 memory_order
C++ 给了六种 memory order:
| order | 含义 | 何时用 |
|---|---|---|
relaxed | 只保证操作原子,不保证顺序 | 纯计数(fetch_add) |
consume | 几乎没人正确实现,标准已不推荐 | 不用 |
acquire | 读操作;之后的访存不能重排到它前面 | 读 flag,再读数据 |
release | 写操作;之前的访存不能重排到它后面 | 写数据,再写 flag |
acq_rel | acquire + release,用于 RMW(CAS、fetch_add) | 同时承担发布和取数据 |
seq_cst | 全局顺序一致,最强 | 默认;想不清楚就用它 |
最经典的 release/acquire 配对:
// 发布者
big_data_ = LoadFromFile(); // 写数据
ready_.store(true, std::memory_order_release);
// 订阅者
if (ready_.load(std::memory_order_acquire)) // 读 flag
use(big_data_); // 一定能看到完整数据
2
3
4
5
6
7
这就是 2.9 那个 bug 的修法。release 保证"在它之前的所有写"都对 acquire 一方可见。
relaxed 的正确用法:
std::atomic<size_t> total_;
void on_req() { total_.fetch_add(1, std::memory_order_relaxed); }
size_t Snapshot() { return total_.load(std::memory_order_relaxed); }
2
3
为什么 OK?因为我们只关心"总数最终正确",不关心"我看到的总数和 flag 同步"。
初学者建议:全部用默认 seq_cst,等性能 profile 表明需要再换。99% 的代码这么写没问题。
# 8.2 CAS 自旋常见套路
想做"读 → 修改 → 比较 → 写回",用 CAS 自旋:
std::atomic<long> v_{0};
void IncIfPositive() {
long old = v_.load();
while (old > 0 && !v_.compare_exchange_weak(old, old + 1)) {
// old 被 CAS 自动更新为当前值;继续重试
}
}
2
3
4
5
6
7
要点:
compare_exchange_weak可能"伪失败"(即使 old 和当前值相等也可能失败),适合放在循环里。compare_exchange_strong不会伪失败,但开销略大,适合单次判断。- 第一个参数是引用,失败时会被更新为"当前实际值",不需要你再
load()。
# 8.3 atomic_shared_ptr 与发布
C++20 之前 std::shared_ptr 的引用计数操作是原子的,但整个 shared_ptr 对象(指针 + 控制块指针)的替换不是原子的。如果想做"无锁热更新指针",要么用平台扩展的 std::atomic<std::shared_ptr<T>>(C++20)、要么用早期的 std::atomic_load/atomic_store 一组自由函数(已 deprecated)。
// C++20 标准做法
std::atomic<std::shared_ptr<const Config>> cfg_;
std::shared_ptr<const Config> Read() { return cfg_.load(); }
void Reload(std::shared_ptr<const Config> p) { cfg_.store(std::move(p)); }
2
3
4
5
这就是 4.2 末尾"配置热更新"的真正最优解:读者完全无锁。
# 8.4 condition_variable 配对
std::condition_variable 用来"等到某个条件成立"。必须和 unique_lock<std::mutex> 配对:
std::mutex mu_;
std::condition_variable cv_;
std::queue<Task> q_;
bool stop_ = false;
void Producer(Task t) {
{ std::lock_guard g(mu_); q_.push(std::move(t)); }
cv_.notify_one();
}
void Consumer() {
std::unique_lock g(mu_);
cv_.wait(g, [&]{ return stop_ || !q_.empty(); }); // 关键:lambda 判定
if (stop_) return;
auto t = std::move(q_.front()); q_.pop();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
要点:
wait(g, pred)是"循环 + 唤醒后重判"的语法糖——永远写带 predicate 的版本,避免"虚假唤醒(spurious wakeup)"。notify_one / notify_all必须在持锁或刚释放锁之后调用,否则可能丢通知(具体讨论看 cppreference)。- 选
notify_one还是notify_all:唤醒一个可能不够(条件还不满足,仍要 wait);唤醒所有可能羊群效应。最稳的是"只有一个能干活时 notify_one,所有等待者都该被叫醒(如 shutdown)时 notify_all"。
# 8.5 once_flag 与 call_once
"首次初始化,线程安全"是 6.1 的子集:
std::once_flag flag_;
Config* g_cfg_ = nullptr;
void Init() { g_cfg_ = LoadConfig(); }
Config& GetConfig() {
std::call_once(flag_, Init);
return *g_cfg_;
}
2
3
4
5
6
7
8
call_once 保证 Init 在所有线程间恰好执行一次,并且执行完成对所有线程可见(带 release/acquire 语义)。
注意:C++11 起,函数内 static 变量初始化本身就是线程安全的("Magic Statics"),所以更地道的写法是:
Config& GetConfig() {
static Config inst = LoadConfig(); // 线程安全
return inst;
}
2
3
4
call_once 适用于"初始化对象不是简单 static"或"想跨函数共享 flag"的场景。
# 9. 五步选型方法论
把前面所有原理压缩成一个可操作的清单,从需求到落地的五步。
# 9.1 第一步:盘清读写比例
在过去 7 天的监控里,这块数据被读、被写各多少次?
| 比例 | 倾向 |
|---|---|
| 1:1 ~ 10:1 | std::mutex |
| 10:1 ~ 100:1 | 临界区大→shared_mutex;临界区小→mutex + 分片 |
| 100:1 以上 | atomic_shared_ptr 发布 / RCU |
没监控?先 mutex 上去 + 加埋点。错的选型靠 profile 改,比拍脑袋猜安全。
# 9.2 第二步:度量临界区大小
用 std::chrono::steady_clock 围一下临界区内最长那条路径:
auto t0 = std::chrono::steady_clock::now();
{ std::lock_guard g(mu_); /* 临界区 */ }
auto dt = std::chrono::steady_clock::now() - t0;
2
3
| 临界区长度 | 倾向 |
|---|---|
| < 100ns(几条整数运算) | atomic 通常更好 |
| 100ns ~ 1μs(map 查找) | mutex |
| > 1μs(计算 / IO) | 先想办法拆出临界区,剩下的再选锁 |
临界区里有 IO?立刻重构(11.3 节)。
# 9.3 第三步:评估冲突概率
同一时刻,预计有多少线程会抢同一把锁?
- 概率 < 1%(多 key 不同锁):哪种锁都行,
mutex默认。 - 概率 10-30%:开始有可观察的等待,考虑分片。
- 概率 > 50%:要么分片,要么换数据结构(lock-free queue 等)。
冲突概率可以从两个角度估:
- 静态:业务上同一资源是否常被多个线程并发访问?
- 动态:用
perf lock看 contention 比例。
# 9.4 第四步:圈定可见性
锁守护的"是什么"?必须有一份明确的 invariant:
// 文档:cfg_, version_ 都由 mu_ 守护
// 访问任何一个必须先持 mu_
struct State {
mutable std::mutex mu_;
std::shared_ptr<const Config> cfg_;
int version_ = 0;
};
2
3
4
5
6
7
可见性原则:保护的数据和锁声明在一起,且尽量私有。不要让外部直接拿到对裸数据的引用。
# 9.5 第五步:写一份"锁约定"
一段 5-10 行的文档放在头文件最上面,记录:
// 锁约定(必读)
// ----------------------------------------------------
// - mu_ 守护 cache_ 和 version_
// - 任何对 cache_ 的访问(包括读)必须持 mu_
// - 持有 mu_ 时不得调用回调(cb_)
// - 与 listener_mu_ 的顺序:mu_ 在前,listener_mu_ 在后
2
3
4
5
6
工程里 90% 的锁 bug 都是"另一个工程师不知道锁约定"导致的。这一份约定 + 11.5 节的注解,能挡住绝大部分。
# 10. 典型场景速查
把前面的方法论变成"看场景填空"。
# 10.1 计数器与状态位
首选:std::atomic<T> + relaxed。
std::atomic<size_t> qps_{0};
std::atomic<bool> stop_{false};
2
不要:用 std::mutex 守护一个 int——纯粹浪费。
# 10.2 配置热更新
首选:std::atomic<std::shared_ptr<const Config>>(C++20)。
std::atomic<std::shared_ptr<const Config>> cfg_;
auto snapshot = cfg_.load(); // 读完全无锁
cfg_.store(std::make_shared<const Config>(new_data)); // 写偶发
2
3
次选(C++17):std::shared_mutex + shared_ptr<const Config>。
# 10.3 缓存读多写少
首选:分片 + 每片 std::mutex。
次选:单把 std::shared_mutex,且仅在临界区 > 1μs 时。
class Cache {
static constexpr size_t kShards = 64;
struct Shard {
std::mutex mu;
std::unordered_map<Key, Val> data;
};
std::array<Shard, kShards> shards_;
};
2
3
4
5
6
7
8
# 10.4 生产者消费者队列
首选:std::mutex + std::condition_variable + std::queue。
template<class T> class BlockingQueue {
std::mutex mu_;
std::condition_variable cv_;
std::queue<T> q_;
bool stop_ = false;
public:
void Push(T x) {
{ std::lock_guard g(mu_); q_.push(std::move(x)); }
cv_.notify_one();
}
std::optional<T> Pop() {
std::unique_lock g(mu_);
cv_.wait(g, [&]{ return stop_ || !q_.empty(); });
if (stop_ && q_.empty()) return std::nullopt;
auto v = std::move(q_.front()); q_.pop();
return v;
}
void Stop() {
{ std::lock_guard g(mu_); stop_ = true; }
cv_.notify_all();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
进阶:极高吞吐场景用 lock-free queue(Moodycamel、folly MPMC),但99% 业务用不到,且 lock-free 的正确性远比 mutex+cv 难审。
# 10.5 单例与首次初始化
首选:函数内 static(C++11 起线程安全)。
Service& GetService() { static Service s; return s; }
次选:std::call_once。
# 10.6 跨线程回调与发布
铁律:不要持锁回调用户代码。
// ❌ 持锁回调
{
std::lock_guard g(mu_);
for (auto& cb : listeners_) cb(data_); // 用户回调可能重入
}
// ✅ 拷贝出来再回调
std::vector<Cb> snap;
Snap data_copy;
{
std::lock_guard g(mu_);
snap = listeners_;
data_copy = data_;
}
for (auto& cb : snap) cb(data_copy);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 11. 工程化最佳实践
API、性能讲完,最后是怎么把"对的代码"沉淀成习惯。
# 11.1 锁与数据绑在一起
反例:
std::mutex g_map_mu;
std::unordered_map<Key, Val> g_map; // 全局裸数据 + 全局裸锁
2
正例:把数据和锁封进类,外部只能通过方法访问。
class TagIndex {
public:
TagList Get(uint64_t id) const;
void Update(uint64_t id, TagList tl);
private:
mutable std::mutex mu_;
std::unordered_map<uint64_t, TagList> data_; // 私有,外部摸不到
};
2
3
4
5
6
7
8
更进一步:泛型工具 Mutexed<T>:
template<class T>
class Mutexed {
public:
template<class F> auto WithLock(F&& f) {
std::lock_guard g(mu_);
return f(data_);
}
private:
std::mutex mu_;
T data_;
};
// 用法
Mutexed<std::unordered_map<Key, Val>> m_;
m_.WithLock([&](auto& m) { m[k] = v; });
2
3
4
5
6
7
8
9
10
11
12
13
14
15
裸数据 + 裸锁 = 90% 的死锁来源。封装能直接堵死。
# 11.2 锁顺序文档化
如果实在要持有多把锁,写一份层级表:
锁层级(数字大的在内层,先拿大的)
100 task_mu_
90 cache_mu_
80 listener_mu_
50 log_mu_
2
3
4
5
工程上可以用宏 ASSERT_LOCK_LEVEL_AT_LEAST(80),运行期检查;Abseil 的 absl::Mutex 内置 EnableInvariantDebugging。
# 11.3 临界区不持锁回调
第 2.8、10.6 节已强调过,这里再写成一条铁律:
临界区里只做内存读写。任何"可能阻塞"或"调用外部代码"的事,都拷贝到栈上、离开锁再做。
具体包含:
- 文件 / 网络 IO
std::cout(同步流锁也有竞争)- 调用用户回调
- 触发析构(含
unique_ptr.reset())
如果临界区里非要析构一个大对象(比如旧 cache):
std::shared_ptr<OldCache> dying;
{
std::lock_guard g(mu_);
dying = std::move(cache_); // 临界区里只搬指针
cache_ = std::make_shared<NewCache>(...);
}
// dying 在锁外析构,可能很慢,但不影响临界区
2
3
4
5
6
7
# 11.4 用类型封装强约束
进阶模式:用类型让"不持锁就拿不到指针"变成编译错误。
template<class T>
class Guarded {
public:
class Locked {
public:
T* operator->() { return p_; }
T& operator*() { return *p_; }
~Locked() { mu_->unlock(); }
private:
friend class Guarded;
Locked(std::mutex* m, T* p) : mu_(m), p_(p) {}
std::mutex* mu_; T* p_;
};
Locked Lock() { mu_.lock(); return {&mu_, &data_}; }
private:
std::mutex mu_;
T data_;
};
// 用法
Guarded<std::unordered_map<K, V>> table_;
auto h = table_.Lock();
(*h)[k] = v; // 用 -> 也行
// h 离开作用域自动解锁
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
这个模式让"忘了加锁的访问"在编译期就直接拿不到指针——比注释和注解都强。
# 11.5 GUARDED_BY 注解检查
Clang 的 -Wthread-safety 配合 Abseil/Chromium 风格注解:
class Bank {
public:
void Transfer(int from, int to, int n)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
private:
std::mutex mu_;
std::map<int, long> balance_ GUARDED_BY(mu_);
};
2
3
4
5
6
7
8
只要:
- 访问
balance_时没持mu_→ 编译报警告。 - 调
Transfer时没持mu_→ 编译报警告。
代价是要把 std::mutex 换成带注解友好的封装(如 absl::Mutex),但对大型项目极其值得。
# 12. 综合案例串讲
把第 1 章两条主线接回来,用 4-11 章的所有知识一次性修复。
# 12.1 重新审视 1.1 的事故
症状回顾:8 万 QPS、读写 99:1、batch 回灌触发 p99 从 12ms 涨到 980ms。
逐步分析:
- 第一步:盘读写比例(9.1)→ 读写 99:1,读极重。
- 第二步:度量临界区(9.2)→
unordered_map::find大约 200ns,"短"。 - 第三步:评估冲突(9.3)→ batch 时一次性灌 10 万条,写线程频繁抢锁;同时 8 万 QPS 读全部排队。
- 第四步:可见性(9.4)→ 数据由
mu_守护,无问题。 - 第五步:约定(9.5)→ 没有显式约定,且 batch 接口持锁循环写。
结论:
- 临界区只有 200ns → 用
shared_mutex反而更慢(4.2 表格)。 - 真正的问题是batch 写持锁太久:一次 batch 持锁循环写 10 万条。
- 修法应该是:(a) 批写改成"拷贝出新 map + 原子替换";(b) 读侧用 atomic_shared_ptr 完全无锁。
修法代码:
class TagIndex {
public:
TagList Get(uint64_t id) const {
auto snap = data_.load(); // 完全无锁
auto it = snap->find(id);
return it == snap->end() ? TagList{} : it->second;
}
// 单条更新仍走 copy-on-write,频率低,可以接受
void Update(uint64_t id, TagList tl) {
auto cur = data_.load();
auto nxt = std::make_shared<Map>(*cur);
(*nxt)[id] = std::move(tl);
data_.store(std::move(nxt));
}
// batch 直接整批替换,临界区只是指针交换
void BatchReplace(std::shared_ptr<const Map> m) {
data_.store(std::move(m));
}
private:
using Map = std::unordered_map<uint64_t, TagList>;
std::atomic<std::shared_ptr<const Map>> data_{std::make_shared<Map>()};
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
上线效果:
p99:980ms ──▶ 11ms
qps:9k ──▶ 80k
线程池:256/256 ──▶ 20/256
2
3
读者完全无锁,写者只在 batch 时占内存(旧 map 析构在锁外发生),整个事故彻底消失。
对照思考:如果当初真按同事 A 的建议"换 shared_mutex",得到什么?大约只有 1.5x 收益,且写者饥饿(2.6)。这就是第 1.3 节那四个问题的力量——盲目换锁不如重新选型。
# 12.2 锁选型速查表
┌──────────────────────────────────────────────────────────────┐
│ 场景 推荐 备注 │
├──────────────────────────────────────────────────────────────┤
│ 计数器/标志 atomic<T> relaxed 足够 │
│ 默认互斥 mutex + lock_guard 90% 场景 │
│ 多锁同时持 scoped_lock(a, b, ...) C++17 │
│ 读多写少且大 shared_mutex+shared_lock 临界区>1μs │
│ 读多写少且小 分片 + mutex cache 友好 │
│ 配置热更新 atomic<shared_ptr<const>> C++20 │
│ 生产者消费者 mutex+condition_variable 配 unique_lock │
│ 单例 函数内 static C++11 起线程安全 │
│ 首次初始化跨函数 once_flag+call_once 无需 static │
│ 同线程重入 拆 *Locked 内部函数 最差才 recursive │
└──────────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
# 12.3 易错点清单
| # | 反模式 | 正解 | 关联节 |
|---|---|---|---|
| 1 | 手动 lock/unlock | RAII 包装 | 5 |
| 2 | 类成员 lock_guard<> | 函数局部 | 5.6 |
| 3 | 多锁不一致顺序 | scoped_lock 或地址序 | 6 |
| 4 | 持锁做 IO/回调 | 拷贝出再调 | 11.3 |
| 5 | 99:1 读写盲换 shared_mutex | 先看临界区长度 | 4.6 |
| 6 | atomic flag 发布大对象 | release/acquire 配对 | 8.1 |
| 7 | 单线程使用 recursive | 拆 *Locked | 4.3 |
| 8 | 把锁公开给外部 | 私有 + 方法接口 | 11.1 |
| 9 | 临界区内 ++计数 用 mutex | atomic | 10.1 |
| 10 | 用 lock_guard 守护 shared_mutex 想读 | 用 shared_lock | 5.3 |
# 12.4 思考题
你接手一段代码:
std::mutex守护std::map<int,std::string>,QPS 20 万、读写比 5:1,p99 偏高。怎么诊断、怎么选锁?参考 9.1-9.3。同事说"
std::recursive_mutex让我们少改了 200 行代码",你怎么反驳?参考 4.3。给你两把锁
mu_a, mu_b,必须按 a→b 顺序锁;但偶发场景需要在已经锁了 b 之后还要拿 a。怎么破?提示:try_lock+ 回滚。主线一的修法用了
atomic<shared_ptr<>>,但 batch 期间内存翻倍。有没有更省内存的方案?提示:增量 patch + 版本号。用 TSan 跑一段代码,报"data race"但你看代码每个访问点都加了锁。可能是什么原因?参考 2.9、8.1。
# 13. 章节小结
# 13.1 八条铁律
- 永远用 RAII 包装锁(
lock_guard/unique_lock/shared_lock/scoped_lock)。 - 默认
std::mutex+lock_guard,不知道用啥就用这个。 - 多锁同时持有,用
scoped_lock或全局锁顺序。 shared_mutex只在"临界区大且读>>写"时才赢——不要为换而换。atomic用于单变量和发布;多个相关变量回到锁。- 临界区只做最小内存操作,不持锁 IO、不持锁回调。
- 锁和数据绑成一个类,外部只能通过方法访问。
- 写一份 5 行锁约定 + 上 TSan——挡住绝大多数低级 bug。
# 13.2 与上下章关系
- 上承第 29 章《异常安全与 RAII》:锁的 RAII 守护就是异常安全的直接应用,"配对释放"的资源类别里,锁排在
unique_ptr之后最重要。 - 后接第 31 章(如果有)将探讨异步执行与协程同步,那里会看到
std::atomic_flag::wait/std::counting_semaphore等 C++20 同步原语。
# 13.3 进阶阅读
- 《C++ Concurrency in Action》Anthony Williams——经典中的经典。
- 《The Art of Multiprocessor Programming》——理论部分(happens-before、线性化)。
- Paul McKenney《Is Parallel Programming Hard, And, If So, What Can You Do About It?》——RCU、内存屏障的工业视角。
- Abseil C++ Tips(特别是 #92、#108、#136)——Mutex 注解和层级锁的实战。
- cppreference 的
<atomic>/<mutex>/<shared_mutex>页面——API 细节最权威。
锁不是越多越好,也不是越少越好。锁的好坏取决于它和数据、临界区、读写模式的匹配度。当你下次面对一段并发代码时,回到第 9 章的五步,把这四个问题挨个回答一遍,正确答案往往会自己浮现。