订单票务购买系统
# 第五章:C++ 订单票务购买系统
本章是综合案例的第五关·并发栈集大成,前 4 个案例都是单线程程序,从本章开始我们正式踏入"多核时代",理解为什么"加锁"看起来简单,写错却会导致死锁、数据竞争、活锁等"幽灵 BUG"。本案例融合原 3 个案例(票务购买 / 轮询管理 / 线程池)为一个完整的"多线程订单系统",一次练遍 C++ 并发的八大核心:
std::thread+std::mutex:最基础的"启动线程 + 互斥锁"std::condition_variable:生产者-消费者模式,订单队列的灵魂std::atomic:无锁计数器、原子状态切换std::shared_mutex:读写锁,多读单写std::future+std::promise:异步任务的结果回传std::jthread🆕 C++20:自动 join + 协作中断- 完整的线程池组件:可重用的工业级线程池
- 策略模式:不同订单类型(普通/秒杀/分布式)使用不同的下单策略
学习方式:本案例按"并发原语拆解 → 写代码 → 跑压测 → 看竞态 → 加锁修复"五步法循环。总共 8 个阶段、约 14 小时,建议分 4 天完成。并发代码不能只“跱过一次”,一定要跑 benchmark 反复压测,才能揭出隐藏的竞态问题。
# 📚 渐进学习节奏
💡 先读这段,再开始敲代码!多线程代码是严禁"一口气写完才跑"的,错了你完全不知道错在哪里。
本案例采用严格的并发验证节奏,每加一个并发原语都必须跑一次压测验证,不能凭"看起来对"过关:
阶段 ① BlockingQueue(02 节) · 60 min · 4 Step
└ Step 1.1: 最小裸版(不加锁)→ 单线程 FIFO 跑通
└ Step 1.2: 双线程作死测试 → 亲眼看到 race condition ⚠️
└ Step 1.3: 加 mutex 修复数据竞争
└ Step 1.4: 加 condition_variable 阻塞 + shutdown 优雅退出
阶段 ② ThreadPool(03 节) · 120 min · 4 Step 【高峰:模板 submit】
└ Step 2.1: 最小骨架(submit_simple,无返回值)→ 工人能转
└ Step 2.2: 单独玩 packaged_task + future 三剑客 ⭐
└ Step 2.3: 模板 submit + invoke_result_t + shared_ptr 三大坑
└ Step 2.4: 1000 任务压测 → 8 倍加速 + 数值守恒
阶段 ③ OrderManager + 读写锁(04 节) · 60 min · 3 Step
└ Step 3.1: Order 数据类 + atomic ID 单线程跑通
└ Step 3.2: mutex 版多线程压测(记下基线时间)
└ Step 3.3: 改 shared_mutex → 读吞吐翻倍 ⭐
阶段 ④ Product 库存 atomic(05 节) · 90 min · 3 Step + 加餐 【双高峰:超卖 + 死锁 ⭐⭐⭐】
└ Step 4.1: 写裸 int 版 → 单线程"看起来"完美
└ Step 4.2: 200 线程压测 → 看到 stock = -3 的超卖现场 💥
└ Step 4.3: atomic + compare_exchange_weak 修复 → 零超卖
└ Step 4.4: 🆕 加餐 - 故意交叉申请两把锁 → 看到经典死锁现场 💥
└ Step 4.5: std::lock / scoped_lock 一次锁全部修复
└ Step 4.6: 锁顺序约定(按地址排序), 工业级解法
阶段 ⑤ 三种下单策略(06 节) · 60 min · 3 Step
└ Step 5.1: 策略接口 + NormalStrategy 单线程顺序
└ Step 5.2: FlashStrategy 线程池并发 → 实测 3 倍加速
└ Step 5.3: BatchStrategy 分批 + future 收集 → 内存可控
阶段 ⑥ Logger 线程安全日志(07 节) · 30 min · 2 Step
└ Step 6.1: 裸 cout 多线程 → 看到日志撕裂乱码
└ Step 6.2: mutex + C++17 折叠表达式 → 行原子日志
阶段 ⑦ CLI + benchmark(08 节) · 60 min · 2 Step 【整个项目最终验收】
└ Step 7.1: 交互式 main 串起 4 个核心组件
└ Step 7.2: 万 QPS 压测 + 一致性校验 PASS ✅
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
🎯 每个 Step 必须做的三件事:
- 看 🎯 阶段目标卡片:明确这一阶段做什么、不做什么、验收标准
- 写一小段代码就编译运行一次(并发代码还要跑 N 次看是否稳定)
- 看到预期输出再写下一个 Step(压测通过才算过关)
⚠️ 本案例独有的"造 BUG → 修复"高峰:
- 阶段 ① Step 1.2:故意不加锁的 BlockingQueue → 看到崩溃/卡死/漏数据
- 阶段 ④ Step 4.2:故意用裸 int 扣库存 → 看到 "100 件商品卖出 103 件" 的超卖现场
- 阶段 ④ Step 4.4 🆕 加餐:故意交叉申请两把锁 → 看到经典死锁现场(程序静默卡死)
- 阶段 ⑥ Step 6.1:故意用裸 cout 写日志 → 看到字符级撕裂乱码
四次"踩坑→修复"流程,让你真正记住 mutex / atomic / lock_guard / std::lock 为什么存在。
✅ 每个阶段的结构(你在正文里会反复看到):
┌─ 🎯 阶段目标 ──────────────┐ ← 阶段开头:明确做什么/不做什么 │ 完成什么、不做什么、验收标准 │ └──────────────────────────────┘ Step X.1:先写最小可编译版(5-20 行) Step X.2:编译 → 单线程跑 → 看到输出 ✅ Step X.3:加锁保护临界区(10-30 行) Step X.4:多线程压测 N 次 ✅ ┌─ 📌 阶段小结 ─────────────┐ ← 阶段结尾:✅ 已掌握 + 🔜 下一阶段 └──────────────────────────┘1
2
3
4
5
6
7
8
9
10
11
# 00.案例元信息
| 项目 | 说明 |
|---|---|
| 难度 | ★★★★★ |
| 预估时长 | 14 小时(建议分 4 天,每天 3-4 小时) |
| 前置章节 | 卷一第 11 章(内存模型)、第 12 章(RAII)、第 15 章(线程和锁)、第 16 章(STL 模板) |
| 覆盖知识点 | thread / jthread / mutex / lock_guard / unique_lock / shared_mutex / condition_variable / atomic / memory_order / future / promise / packaged_task / async / 线程池设计 |
| 设计亮点 | 多策略下单(普通秒杀分布式)+ 可重用线程池组件 + 完整的并发安全测试 |
| ⚠ 已知局限 | 不涉及无锁队列、原子内存序细节(这些在卷三《底层卷》) |
| 最终产物 | 命令行交互式系统 + 压测程序 |
| 代码规模 | 约 1500 行 / 12 个文件 |
# 项目结构
order_system/
├── include/
│ ├── ThreadPool.h # 通用线程池组件
│ ├── BlockingQueue.h # 线程安全队列
│ ├── Order.h # 订单实体
│ ├── Product.h # 商品(含库存的原子计数)
│ ├── OrderStrategy.h # 下单策略接口(抽象基类)
│ ├── NormalStrategy.h # 策略:普通下单
│ ├── FlashStrategy.h # 策略:秒杀(高并发)
│ ├── BatchStrategy.h # 策略:批量下单
│ ├── OrderManager.h # 订单管理器(含读写锁)
│ └── Logger.h # 线程安全日志
├── src/ # 各模块实现
├── main.cpp # 交互式 CLI
├── benchmark.cpp # 压测程序(10000 并发请求)
└── README.md
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 一条命令编译运行
cd order_system
g++ -std=c++17 -pthread -Iinclude src/*.cpp main.cpp -o order_system
g++ -std=c++17 -pthread -Iinclude src/*.cpp benchmark.cpp -o bench
./order_system # 交互式
./bench # 压测
2
3
4
5
6
⚠️ -pthread 不能省!Linux/macOS 必须加这个开关,否则链接会失败 undefined reference to pthread_create。Windows 用 MSVC 不需要。
# 📋 目录快速导航
- 01.项目需求和功能
- 02.阻塞队列基础组件 【阶段①】
- 03.通用线程池 ThreadPool 【阶段②·骨架高峰⭐】
- 04.OrderManager 与读写锁 【阶段③】
- 05.Product 防超卖 【阶段④·造bug高峰⭐】
- 06.策略模式下单 【阶段⑤】
- 07.线程安全 Logger 【阶段⑥】
- 08.CLI 与压测 【阶段⑦·压测高峰】
- 09.项目总结速查
- 9.1 整体架构
- 9.2 并发原语速查表 【🔑】
- 9.3 选择决策树 【🔑】
- 10.项目技术思考
- 11.衔接与延伸
# 01.项目需求和功能
# 1.1 业务场景
电商秒杀系统是工业界最经典的并发场景:
- 商品库存有限(如 100 件)
- 几千个用户同时下单
- 必须保证:不超卖、不漏单、响应快
本案例用 C++ 标准库实现一个微缩版的订单系统,同时演示线程池这个工业组件。线程池是几乎所有 C++ 服务端应用的基石,你后续学 Web 服务器、消息队列、数据库都绕不开它。
# 1.2 三种下单策略
| 策略 | 特点 | 实现要点 |
|---|---|---|
| 普通下单 NormalStrategy | 单线程逐个处理 | 加 mutex 保护订单列表即可 |
| 秒杀下单 FlashStrategy | 高并发抢库存 | atomic CAS 防超卖 + 限流 |
| 批量下单 BatchStrategy | 一次提交多个订单 | future 收集每个的处理结果 |
业务规则:
- 用户输入"商品 ID + 数量"即下单
- 系统校验库存→扣减→生成订单→记录日志
- 任何一步失败,整个下单回滚
# 1.3 并发挑战
典型 BUG 重现,这一节告诉你"为什么不加锁就会出问题":
// ❌ 错误代码(不加锁)
class Product {
int stock = 100;
public:
bool buy(int n) {
if (stock >= n) { // 检查
stock -= n; // 扣减
return true;
}
return false;
}
};
// 1000 个线程同时调 buy(1) 会怎样?
// 假设 stock = 1,线程 A 和 B 同时进 if (stock >= n),都通过
// A 执行 stock -= n → stock = 0
// B 执行 stock -= n → stock = -1 ⚠️ 超卖了!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
修正方案:用 std::atomic<int> 配合 CAS 循环,详见第 05 节。
# 02.阻塞队列基础组件
┌─ 🎯 阶段 ① 目标 ────────────────────────────────────────┐
│ 完成什么:BlockingQueue<T> 模板(线程安全的生产者消费者队列)│
│ 不做什么:还不接 ThreadPool,先把队列单独跑通 │
│ 验收标准:1 生产者 + 1 消费者两个线程互不干扰跑通 100 条数据 │
│ 预计耗时:60 min · 4 Step(每步都能编译运行) │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
💡 为什么先做这个? , 线程池 = "工人 + 任务队列"。任务队列单独抽出来叫 BlockingQueue,它会被工人线程并发 push/pop。先把队列做对,线程池就成功了一半。
⚠️ 本阶段的核心难点:你要亲眼看到"不加锁会乱",所以我们故意从"无锁裸版"起步,先看到崩溃,再一步步加 mutex → condition_variable → shutdown 把它治好。
# 灵魂三问:动手前先想清楚
❓ 既然 STL 已有 std::queue,为什么还要自己包一层?
来看反例:
// ❌ 反例:直接让多线程操作 std::queue
std::queue<int> q;
std::thread t1([&]{ for(int i=0;i<1000;++i) q.push(i); }); // 写
std::thread t2([&]{ while(!q.empty()) q.pop(); }); // 读
2
3
4
问题:
std::queue不是线程安全的,内部链表/数组指针被并发改写会直接 segfaultempty() + pop()不是原子的,刚检查非空,下一秒被 pop 掉,然后你又 pop 一次→崩- 没办法"等数据来",只能 while 死循环 100% 吃 CPU
✅ 正确做法:把"队列+锁+条件变量"封装成一个独立组件,让所有并发细节集中在一个文件里。这就是软件工程上的"封装变化"。
❓ 为什么要先做无锁版(Step 1.1)再加锁(Step 1.3)?不是浪费时间吗? 答:这是教学法刻意安排。如果一上来就给你"完美的最终版",你只会机械记住代码而不知道每一行的存在意义。先看到无锁版的崩溃(Step 1.2),你才会真正理解为什么 mutex / cv / shutdown 一个都不能少。这与 04 案例 §6.3 故障演示、03 案例 §6.4 dirty bug 同源,先有问题,后有解法。
❓ 第一步该写哪个版本? 答:先写"裸 std::queue 包一层",5 行代码、可编译、能单线程跑通。然后用双线程作死它,让它在你眼前崩。
🔑 教学要点:永远不要直接抄"最终版完美代码",一定要按 v1 → v2 → v3 的演化路径学,每一步只解决一个问题。
# Step 1.1 最小裸版输出
新建 BlockingQueue.h,故意不加任何同步:
// BlockingQueue.h(v1:单线程版,没锁)
#pragma once
#include <queue>
#include <optional>
template <typename T>
class BlockingQueue {
private:
std::queue<T> q;
public:
void push(T item) {
q.push(std::move(item));
}
std::optional<T> pop() {
if (q.empty()) return std::nullopt;
T item = std::move(q.front());
q.pop();
return item;
}
size_t size() const { return q.size(); }
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
❌ 注意:模板类不能直接拆分为 .h 和 .cpp!
因为模板是在编译期实例化的,编译器需要看到完整的模板定义才能生成代码。如果模板定义在 .cpp 中,其他 .cpp 文件 #include 头文件时看不到实现,会报链接错误。
新建 test_queue.cpp,单线程先验证基本行为:
// test_queue.cpp
#include "BlockingQueue.h"
#include <iostream>
int main() {
BlockingQueue<int> q;
for (int i = 1; i <= 3; ++i) q.push(i);
std::cout << "size = " << q.size() << "\n";
while (auto v = q.pop()) { // ⭐ optional 在 if 里直接当 bool
std::cout << *v << " ";
}
std::cout << "\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
🧪 编译运行:
$ g++ -std=c++17 test_queue.cpp -o test_queue && ./test_queue
size = 3
1 2 3
2
3
4
✅ 单线程下 FIFO 行为正确。但这只是表面平静,下一步我们用两个线程同时操作它,看它崩给你看。
# Step 1.2 双线程作死测试
先不动 BlockingQueue.h,直接用现有的"无锁版"跑双线程:
// test_queue.cpp(v2:双线程压测)
#include "BlockingQueue.h"
#include <iostream>
#include <thread>
#include <atomic>
int main() {
BlockingQueue<int> q;
std::atomic<int> consumed{0};
// 生产者:push 10000 个数
std::thread producer([&]{
for (int i = 0; i < 10000; ++i) q.push(i);
});
// 消费者:循环 pop
std::thread consumer([&]{
while (consumed < 10000) {
if (auto v = q.pop()) consumed++;
}
});
producer.join();
consumer.join();
std::cout << "consumed = " << consumed.load() << "\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
🧪 编译运行(多跑几次!):
$ g++ -std=c++17 -pthread test_queue.cpp -o test_queue
$ ./test_queue
段错误(核心已转储) ← 可能崩溃
$ ./test_queue
consumed = 9871 ← 也可能漏数据
$ ./test_queue
死循环 ← 也可能卡住
2
3
4
5
6
7
8
9
10
⚠️ 三种现象都可能出现:
- 崩溃:两个线程同时改
std::queue内部链表/数组,内存损坏 - 漏数据:
size()检查和front/pop之间被另一线程穿插 - 卡死:消费者
pop时队列暂时空,错过了生产者后续 push
📚 这就是 race condition(竞态)的可怕之处:没有报错信息,行为完全随机。今天单元测试通过 1000 次,明天上线就崩。要修它必须主动加锁。
# Step 1.3 mutex 修复竞争
升级 BlockingQueue.h:
// BlockingQueue.h(v2:加 mutex)
#pragma once
#include <queue>
#include <mutex>
#include <optional>
template <typename T>
class BlockingQueue {
private:
std::queue<T> q;
mutable std::mutex mtx; // ⭐ mutable:const 函数也能加锁
public:
void push(T item) {
std::lock_guard<std::mutex> lock(mtx); // RAII:构造加锁,析构解锁
q.push(std::move(item));
}
std::optional<T> pop() {
std::lock_guard<std::mutex> lock(mtx);
if (q.empty()) return std::nullopt;
T item = std::move(q.front());
q.pop();
return item;
}
size_t size() const {
std::lock_guard<std::mutex> lock(mtx);
return q.size();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
重新跑刚才的双线程测试:
🧪 编译运行(连跑 5 次都不应崩):
$ g++ -std=c++17 -pthread test_queue.cpp -o test_queue
$ for i in 1 2 3 4 5; do ./test_queue; done
consumed = 10000
consumed = 10000
consumed = 10000
consumed = 10000
consumed = 10000
2
3
4
5
6
7
8
✅ 5 次都得到 10000,race 治好了。 std::lock_guard vs std::unique_lock:
lock_guard:构造加锁、析构解锁,不能手动解锁,最轻量,本步够用unique_lock:可以手动lock()/unlock(),可以转移所有权,condition_variable 必须用它(下一步就要换)
⚠️ 当前版本还有一个缺陷:消费者用 if (auto v = q.pop()) 轮询,队列暂时空时,它会以 100% CPU 疯狂调用 pop。生产环境会被运维当成 bug 喷你。下一步用条件变量做"阻塞等待"。
# Step 1.4 阻塞 pop 与停机
升级 BlockingQueue.h 为最终版:
// BlockingQueue.h(v3:阻塞 + shutdown)
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>
template <typename T>
class BlockingQueue {
private:
std::queue<T> q;
mutable std::mutex mtx;
std::condition_variable cvNotEmpty; // ⭐ 队列非空时通知
bool stopped = false;
public:
void push(T item) {
{
std::lock_guard<std::mutex> lock(mtx);
if (stopped) return;
q.push(std::move(item));
}
cvNotEmpty.notify_one(); // ⭐ 锁外通知(性能更好)
}
// 阻塞 pop:队列空时挂起线程,不再轮询
std::optional<T> pop() {
std::unique_lock<std::mutex> lock(mtx); // ⭐ 必须用 unique_lock
cvNotEmpty.wait(lock, [this]{ // ⭐ 谓词防虚假唤醒
return !q.empty() || stopped;
});
if (q.empty() && stopped) return std::nullopt; // 收到关闭信号
T item = std::move(q.front());
q.pop();
return item;
}
// 关闭:通知所有 pop 中的线程退出
void shutdown() {
{
std::lock_guard<std::mutex> lock(mtx);
stopped = true;
}
cvNotEmpty.notify_all(); // ⭐ 唤醒所有等待者
}
size_t size() const {
std::lock_guard<std::mutex> lock(mtx);
return q.size();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
最终测试,同时验证阻塞行为 + 优雅关闭:
// test_queue.cpp(v3:阻塞测试)
#include "BlockingQueue.h"
#include <iostream>
#include <thread>
#include <chrono>
int main() {
BlockingQueue<int> q;
// 消费者:先启动,等待数据(不会忙等)
std::thread consumer([&]{
while (true) {
auto v = q.pop();
if (!v) break; // shutdown 信号
std::cout << "got " << *v << "\n";
}
std::cout << "consumer exit\n";
});
// 主线程:故意慢慢 push
std::this_thread::sleep_for(std::chrono::milliseconds(500));
for (int i = 1; i <= 3; ++i) {
q.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
// 通知消费者退出
std::this_thread::sleep_for(std::chrono::milliseconds(500));
q.shutdown();
consumer.join();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
🧪 预期输出(注意时间间隔):
[t=0ms] main: 启动 consumer,500ms 后开始 push
[t=500ms] got 1
[t=700ms] got 2
[t=900ms] got 3
[t=1400ms] consumer exit
2
3
4
5
✅ 关键观察:
- 0-500ms 期间消费者完全没消耗 CPU(用
top看进程占用为 0),这就是阻塞 wait 的价值 - shutdown 后
pop()返回nullopt,消费者优雅退出,不悬挂
┌─ 📌 阶段 ① 小结 ────────────────────────────────────────┐
│ ✅ 已掌握(4 个 Step 闯关) │
│ • Step 1.1 单线程裸版 FIFO │
│ • Step 1.2 双线程作死 → 亲眼看到 race condition │
│ • Step 1.3 mutex 修复 race │
│ • Step 1.4 condition_variable 阻塞 + shutdown 优雅退出 │
│ 🔜 下一阶段 ②:用它喂养 ThreadPool │
│ 💡 commit 建议:feat: blocking queue with cv & shutdown │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
💼 三个铁律记下来:
- mutable mutex , const 函数加锁必须 mutable
- notify 在锁外 , 避免被唤醒线程立刻又被锁阻塞
- wait 必须传谓词 , 防 OS 虚假唤醒(spurious wakeup)
# 03.通用线程池 ThreadPool
┌─ 🎯 阶段 ② 目标 ────────────────────────────────────────┐
│ 完成什么:ThreadPool , 工人线程 + 任务队列 + submit 异步返回 │
│ 不做什么:暂不做优先级、不做动态扩缩容 │
│ 验收标准:提交 1000 个任务能并行跑完,submit 能拿到 future │
│ 预计耗时:120 min · 4 Step(重头戏:模板 submit) │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
💡 架构总览,心里先有这张图,后面 4 个 Step 才不会迷路:
┌──────────────┐ submit(task)
│ Application │ ──────────────────────►┐
└──────────────┘ │
▼
┌────────────────────┐
│ BlockingQueue │ ← 阶段 ① 的产物
│ < void()函数 > │
└─────┬──────────────┘
│ pop()
┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│worker 0 │ │worker 1 │ ... │worker N │
└─────────┘ └─────────┘ └─────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
# 灵魂三问:线程池设计要点
❓ 为什么不每来一个任务就 new 一个线程?反正 std::thread 也很方便
来看反例:
// ❌ 每个请求开一个线程
void handleRequest(Request r) {
std::thread t([r]{ /* 处理 */ });
t.detach();
}
2
3
4
5
问题:
- 创建线程很贵,Linux pthread_create 约 50 微秒,处理一个简单请求可能就 1 微秒,99% 时间在创建线程
- 线程数无上限,10 万个请求来了你就有 10 万个线程,直接 OOM 或被内核拒绝
- 没有背压(backpressure),所有请求并发处理,下游数据库直接被打挂
✅ 正确做法:固定数量的工人线程 + 任务队列,线程数受控、复用线程、有天然背压。
❓ 任务队列存 std::function<void()> 而不是 std::packaged_task<R()>,那"返回值"怎么办?
这个问题的答案就是 Step 2.3 那行精华代码:
auto task = std::make_shared<std::packaged_task<R()>>(...); // 真任务
tasks.push([task]{ (*task)(); }); // 队列存的"包装层"
2
为什么搞这么复杂:
- packaged_task 模板带 R,队列必须存"统一类型",不能既存 task<int()> 又存 task<string()>
- std::function<void()> 是统一类型,不管原任务返回啥,包一层 lambda 调用即可
- shared_ptr 是因为 packaged_task 不可拷贝,但 std::function 内部要求 callable 可拷贝
这就是 Step 2.3 的"三个细节"为什么标注 ⭐⭐⭐,三个 C++ 模板特性必须同时用到,缺一个就编不过。
❓ 为什么 ~ThreadPool() 必须先 shutdown 队列再 join 线程?反过来行不行?
绝对不行!来看反例:
// ❌ 反例:先 join 再 shutdown
~ThreadPool() {
for (auto& w : workers) w.join(); // ⚠️ 线程在 BlockingQueue::pop() 里阻塞
tasks.shutdown(); // 永远跑不到这里
}
2
3
4
5
会发生什么:工人线程正卡在 pop() 的 cv.wait() 上等数据。主线程 w.join() 等工人结束。双方互等 → 永久死锁。
✅ 正确顺序:先 shutdown 让 pop 返回 nullopt → 工人 break 循环 → join 顺利完成。
🔑 教学要点:资源释放顺序在并发里和构造顺序一样重要。这是为什么 RAII 析构要严格按"反向构造顺序"。
# Step 2.1 线程池最小骨架
新建 ThreadPool.h,先不管模板 submit,让工人能转起来:
// ThreadPool.h(v1:最小骨架,没有 submit)
#pragma once
#include "BlockingQueue.h"
#include <thread>
#include <vector>
#include <functional>
#include <iostream>
class ThreadPool {
private:
std::vector<std::thread> workers;
BlockingQueue<std::function<void()>> tasks; // ⭐ 任务 = 无返回的函数
public:
explicit ThreadPool(size_t n) {
for (size_t i = 0; i < n; ++i) {
workers.emplace_back([this, i]{
std::cout << "worker " << i << " started\n";
while (true) {
auto task = tasks.pop();
if (!task) break; // shutdown 信号
(*task)(); // 执行任务
}
std::cout << "worker " << i << " exit\n";
});
}
}
~ThreadPool() {
tasks.shutdown();
for (auto& w : workers) {
if (w.joinable()) w.join();
}
}
// 临时简化版:只接受 void() 任务
void submit_simple(std::function<void()> task) {
tasks.push(std::move(task));
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
测试,给工人喂 5 个 lambda 任务:
// test_pool.cpp
#include "ThreadPool.h"
#include <iostream>
#include <chrono>
int main() {
{
ThreadPool pool(3);
for (int i = 1; i <= 5; ++i) {
pool.submit_simple([i]{
std::cout << "task " << i << " running on tid="
<< std::this_thread::get_id() << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
} // 离开作用域 → ~ThreadPool() 自动 shutdown + join
std::cout << "main exit\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
🧪 预期输出(顺序可能不同,因为是并发):
worker 0 started
worker 1 started
worker 2 started
task 1 running on tid=0x70...01
task 2 running on tid=0x70...02
task 3 running on tid=0x70...03
task 4 running on tid=0x70...01
task 5 running on tid=0x70...02
worker 0 exit
worker 1 exit
worker 2 exit
main exit
2
3
4
5
6
7
8
9
10
11
12
13
✅ 关键观察:
- 5 个任务跑在 3 个不同的 tid 上(即真的并行)
- task 4/5 复用了 task 1/2 的工人线程,这就是"线程池"省下的"创建/销毁线程"开销
- 离开作用域自动 shutdown , RAII 的力量
⚠️ 但 submit_simple 太弱了:没有返回值机制。下一步要让 pool.submit([]{ return 42; }) 这种带返回值的任务也能跑,并且能在主线程拿到结果。
# Step 2.2 packaged_task 入门
在写正式 submit 前,先单独练熟"三剑客",packaged_task / future / promise。这是 C++ 异步任务的基石。
// test_future.cpp(独立小练习,跟 ThreadPool 暂时无关)
#include <future>
#include <thread>
#include <iostream>
int main() {
// 1. packaged_task:把一个普通函数包装成"会写入 promise"的对象
std::packaged_task<int()> task([]{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return 42;
});
// 2. 拿到 future(异步结果的"句柄")
std::future<int> fut = task.get_future();
// 3. 把 task 扔到另一线程执行
std::thread t(std::move(task));
t.detach();
std::cout << "main waiting...\n";
int result = fut.get(); // ⭐ 阻塞 500ms 等结果
std::cout << "got " << result << "\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
🧪 预期输出:
main waiting...
[等待 500ms]
got 42
2
3
✅ 这就是异步返回值的标准玩法:packaged_task 装函数 → get_future() 拿凭证 → 另一线程执行 → fut.get() 在主线程取结果。
📚 三剑客的关系:
std::promise<T>:写入端("将来我会给你一个 T")std::future<T>:读取端("等着拿那个 T")std::packaged_task<T()>:把函数+promise 打包,函数返回值自动写进 promise
# Step 2.3 模板 submit 实现
升级 ThreadPool.h:
// ThreadPool.h(v2:模板 submit)
#pragma once
#include "BlockingQueue.h"
#include <thread>
#include <vector>
#include <functional>
#include <future>
#include <atomic>
#include <memory>
class ThreadPool {
private:
std::vector<std::thread> workers;
BlockingQueue<std::function<void()>> tasks;
std::atomic<bool> stopped{false};
public:
explicit ThreadPool(size_t n = std::thread::hardware_concurrency()) {
for (size_t i = 0; i < n; ++i) {
workers.emplace_back([this]{
while (true) {
auto task = tasks.pop();
if (!task) break;
try { (*task)(); }
catch (...) { /* 工作线程必须吞异常 */ }
}
});
}
}
~ThreadPool() { shutdown(); }
// ⭐⭐⭐ 重头戏:万能 submit
template <typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> std::future<std::invoke_result_t<F, Args...>> // 返回类型推导
{
using R = std::invoke_result_t<F, Args...>;
// 1. 把 f + args 打包成 packaged_task
// 用 shared_ptr 是因为 packaged_task 不可拷贝(只能移动),
// 但 std::function 要求 callable 可拷贝 → 套一层 shared_ptr 解决
auto task = std::make_shared<std::packaged_task<R()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
// 2. 提前拿到 future(必须在 push 之前,因为 push 后 task 可能立即被工人执行)
std::future<R> fut = task->get_future();
// 3. 把"调用 task 的 lambda"扔进队列
tasks.push([task]{ (*task)(); });
return fut;
}
void shutdown() {
if (stopped.exchange(true)) return; // 防重复 shutdown
tasks.shutdown();
for (auto& w : workers) {
if (w.joinable()) w.join();
}
}
size_t pending() const { return tasks.size(); }
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
📚 三个细节,建议在代码上画三个箭头标记:
std::invoke_result_t<F, Args...>, C++17 推导"如果用 Args 调用 F,返回什么类型"std::make_shared<packaged_task>, 套 shared_ptr 是因为 packaged_task 不可拷贝,但std::function要求 callable 可拷贝(会被 push 进队列复制)- 先
get_future()再tasks.push(...), 顺序不能反!push 后任务可能秒被执行掉,再 get_future 就拿不到了
测试 3 种返回类型:
// test_pool.cpp(v2:测试 submit)
#include "ThreadPool.h"
#include <iostream>
int main() {
ThreadPool pool(4);
// ① 无参 lambda 返回 int
auto f1 = pool.submit([]{ return 42; });
// ② 带参 lambda 返回 int
auto f2 = pool.submit([](int a, int b){ return a + b; }, 3, 5);
// ③ 返回 string
auto f3 = pool.submit([](std::string name){
return "hello, " + name;
}, std::string("world"));
std::cout << "f1 = " << f1.get() << "\n";
std::cout << "f2 = " << f2.get() << "\n";
std::cout << "f3 = " << f3.get() << "\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
🧪 预期输出:
f1 = 42
f2 = 8
f3 = hello, world
2
3
✅ 三种类型完美拿回结果。真正的异步任务编程器已经到手。
# Step 2.4 千任务压测验证
写一个并行平方求和的小压测,既验证正确性,也感受线程池的性能:
// test_pool.cpp(v3:1000 任务压测)
#include "ThreadPool.h"
#include <iostream>
#include <chrono>
#include <vector>
int main() {
ThreadPool pool(std::thread::hardware_concurrency());
std::cout << "threads = " << std::thread::hardware_concurrency() << "\n";
auto t0 = std::chrono::steady_clock::now();
// 提交 1000 个任务,每个算 i*i
std::vector<std::future<long long>> futs;
for (int i = 1; i <= 1000; ++i) {
futs.push_back(pool.submit([i]{
// 故意做一点 CPU 工作模拟真实任务
long long sum = 0;
for (int j = 0; j < 100000; ++j) sum += j;
return (long long)i * i;
}));
}
// 收集结果
long long total = 0;
for (auto& f : futs) total += f.get();
auto t1 = std::chrono::steady_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0).count();
std::cout << "sum of i*i (1..1000) = " << total << "\n";
std::cout << "expected = " << (1000LL * 1001 * 2001 / 6) << "\n";
std::cout << "time = " << ms << " ms\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
🧪 预期输出(4 核机器):
threads = 8
sum of i*i (1..1000) = 333833500
expected = 333833500
time = 35 ms
2
3
4
✅ 3 个验证全过:
- 数值正确:sum 与公式 n(n+1)(2n+1)/6 完全相等(无丢任务、无重复执行)
- 真并行:单线程跑同样任务约 280ms,8 线程 35ms ≈ 8 倍加速
- 优雅退出:main 结束时
~ThreadPool()自动 shutdown + join,无悬挂
┌─ 📌 阶段 ② 小结 ────────────────────────────────────────┐
│ ✅ 已掌握(4 个 Step 闯关) │
│ • Step 2.1 工人骨架 + submit_simple 跑通调度 │
│ • Step 2.2 packaged_task / future 三剑客单练 ⭐ │
│ • Step 2.3 模板 submit + invoke_result_t + shared_ptr 三大坑│
│ • Step 2.4 1000 任务压测 → 8 倍加速 + 数值守恒 │
│ 🔜 下一阶段 ③:用线程池跑 OrderManager(读多写多) │
│ 💡 commit 建议:feat: thread pool with future submit │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
💼 关键性质回顾:
- 任务异步执行:submit 立即返回,工作线程后台跑
- 任务有结果返回:submit 返回 future,调用方
.get()拿结果- 任务异常隔离:一个任务抛异常被
catch (...)吞下,不影响其他任务
# 04.OrderManager 与读写锁
┌─ 🎯 阶段 ③ 目标 ────────────────────────────────────────┐
│ 完成什么:OrderManager(订单存储 + 多线程读写) │
│ 不做什么:暂不接策略,先单独跑通"多线程下单 + 多线程查询" │
│ 验收标准:先用 mutex 跑通 → 改 shared_mutex 看到读吞吐翻倍 │
│ 预计耗时:60 min · 3 Step(含锁性能对比) │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
💡 本阶段灵魂三问
- 订单 ID 怎么生成?不能两个线程拿到同一个 ID →
std::atomic<int>- 订单列表是 vector,多线程并发 push_back 会崩 → 加锁
- 80% 是查询、20% 是下单,普通 mutex 让查询互相等。怎么破?→
std::shared_mutex
# Step 3.1 Order 单线程跑通
新建 Order.h,纯数据载体:
// Order.h
#pragma once
#include <string>
struct Order {
int id;
int userId;
int productId;
int quantity;
Order() = default;
Order(int i, int u, int p, int q)
: id(i), userId(u), productId(p), quantity(q) {}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
新建 OrderManager.h(v1:先用最简单的 mutex):
// OrderManager.h(v1:mutex 版)
#pragma once
#include "Order.h"
#include <vector>
#include <mutex>
#include <atomic>
#include <optional>
class OrderManager {
private:
std::vector<Order> orders;
mutable std::mutex mtx; // ⭐ v1 用普通 mutex
std::atomic<int> nextId{1}; // ⭐ ID 自增用 atomic
public:
int addOrder(int userId, int productId, int qty) {
Order o(nextId.fetch_add(1), userId, productId, qty);
std::lock_guard<std::mutex> lock(mtx);
orders.push_back(o);
return o.id;
}
std::optional<Order> findOrder(int id) const {
std::lock_guard<std::mutex> lock(mtx);
for (const auto& o : orders) {
if (o.id == id) return o;
}
return std::nullopt;
}
size_t size() const {
std::lock_guard<std::mutex> lock(mtx);
return orders.size();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
测试,单线程基本功能:
// test_mgr.cpp
#include "OrderManager.h"
#include <iostream>
int main() {
OrderManager mgr;
int id1 = mgr.addOrder(101, 1, 2);
int id2 = mgr.addOrder(102, 1, 1);
std::cout << "id1=" << id1 << " id2=" << id2 << " size=" << mgr.size() << "\n";
if (auto o = mgr.findOrder(id1)) {
std::cout << "找到 id=" << o->id << " user=" << o->userId
<< " qty=" << o->quantity << "\n";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
🧪 编译运行:
$ g++ -std=c++17 -pthread test_mgr.cpp -o test_mgr && ./test_mgr
id1=1 id2=2 size=2
找到 id=1 user=101 qty=2
2
3
✅ 单线程基本路径通了。注意 ID 是从 1 开始递增的,这是 atomic 在功劳,下面会让多线程同时下单验证 ID 不会重复。
# Step 3.2 mutex 基线压测
升级测试,8 个写线程 + 8 个读线程同时跑:
// test_mgr.cpp(v2:并发压测)
#include "OrderManager.h"
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <atomic>
int main() {
OrderManager mgr;
const int WRITES = 1000; // 每个写线程下 1000 单
const int READS = 100000; // 每个读线程查 10w 次
const int W_THREADS = 4;
const int R_THREADS = 8;
std::atomic<int> readHits{0};
auto t0 = std::chrono::steady_clock::now();
std::vector<std::thread> threads;
// 写线程
for (int w = 0; w < W_THREADS; ++w) {
threads.emplace_back([&, w]{
for (int i = 0; i < WRITES; ++i) mgr.addOrder(w, 1, 1);
});
}
// 读线程
for (int r = 0; r < R_THREADS; ++r) {
threads.emplace_back([&]{
for (int i = 0; i < READS; ++i) {
if (mgr.findOrder(1)) readHits++;
}
});
}
for (auto& t : threads) t.join();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - t0).count();
std::cout << "orders = " << mgr.size()
<< " (expect=" << W_THREADS * WRITES << ")\n";
std::cout << "read hits = " << readHits.load()
<< " / " << R_THREADS * READS << "\n";
std::cout << "time = " << ms << " ms ← 记下这个数字\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
🧪 预期输出(mutex 版基线,4 核机器):
orders = 4000 (expect=4000)
read hits = 800000 / 800000
time = 1850 ms ← 写下来!下一步对比
2
3
✅ 核心验证:
- orders=4000 完全等于期望,atomic ID 没冲突、push_back 没崩
- 所有读都 hit,没漏数据
- 总耗时 1850ms 作为基线,下一步 shared_mutex 应该明显更快
# Step 3.3 shared_mutex 升级
把 OrderManager.h 中的 mutex 替换为 shared_mutex:
// OrderManager.h(v2:shared_mutex 版)
#pragma once
#include "Order.h"
#include <vector>
#include <shared_mutex> // ⭐ C++17 共享锁
#include <atomic>
#include <optional>
class OrderManager {
private:
std::vector<Order> orders;
mutable std::shared_mutex mtx; // ⭐ 改成 shared_mutex
std::atomic<int> nextId{1};
public:
int addOrder(int userId, int productId, int qty) {
Order o(nextId.fetch_add(1), userId, productId, qty);
std::unique_lock lock(mtx); // ⭐ 写:独占锁
orders.push_back(std::move(o));
return o.id;
}
std::optional<Order> findOrder(int id) const {
std::shared_lock lock(mtx); // ⭐ 读:共享锁
for (const auto& o : orders) {
if (o.id == id) return o;
}
return std::nullopt;
}
std::vector<Order> ordersByUser(int userId) const {
std::shared_lock lock(mtx); // 读锁
std::vector<Order> result;
for (const auto& o : orders) {
if (o.userId == userId) result.push_back(o);
}
return result;
}
size_t size() const {
std::shared_lock lock(mtx);
return orders.size();
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
重新跑同一份压测:
🧪 预期输出(shared_mutex 版):
orders = 4000 (expect=4000)
read hits = 800000 / 800000
time = 720 ms ← 比 mutex 的 1850ms 快 ~2.5 倍 ✅
2
3
4
✅ 同样的代码,读多场景吞吐提升 2-3 倍。这就是 shared_mutex 的回报。
📚 shared_mutex 访问规则:
当前持有 谁能进入 无锁 任何线程 独占( unique_lock)只有当前线程 共享( shared_lock)任意多个读者
⚠️ shared_mutex 不是银弹:写多场景反而比 mutex 慢(构造析构成本高),有"写者饥饿"风险。只在读 > 80% 时才换。
┌─ 📌 阶段 ③ 小结 ────────────────────────────────────────┐
│ ✅ 已掌握 │
│ • Step 3.1 atomic ID 自增 + Order 数据类 │
│ • Step 3.2 mutex 版基线压测(记下 1850ms) │
│ • Step 3.3 shared_mutex 改造 → 读吞吐翻倍 ⭐ │
│ 🔜 下一阶段 ④:Product 超卖 BUG 重现 → atomic CAS 修复 │
│ 💡 commit 建议:feat: order manager with shared_mutex │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
# 05.Product 防超卖
┌─ 🎯 阶段 ④ 目标 ──────────── 本案例最重磅一战 ────────────┐
│ 完成什么:让你亲眼看到"100件库存卖出 103 件"的超卖 BUG │
│ 然后用 atomic CAS 修复它 │
│ 🆕 加餐:再亲眼看一次"死锁"现场 + 两种修复法 │
│ 不做什么:不引入分布式锁、不接 Redis(本卷范畴外) │
│ 验收标准:bug 版能压测出超卖;修复版 CAS 后零超卖 │
│ 死锁加餐版能复现 + std::lock 修复成功 │
│ 预计耗时:90 min · 3 Step + 加餐 3 Step │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
💡 为什么这一阶段值得 60 分钟? , 这是新手最容易"以为自己懂了但其实没懂"的并发知识点。亲手造一次超卖 BUG,胜过看 100 篇文章。
# Step 4.1 裸 int 埋 BUG
新建 Product.h,故意不加任何同步:
// Product.h(v1:裸 int 版,有 BUG)
#pragma once
#include <string>
class Product {
private:
int id;
std::string name;
double price;
int stock; // ⚠️ 裸 int,将来会出事
public:
Product(int id_, std::string n, double p, int s)
: id(id_), name(std::move(n)), price(p), stock(s) {}
int getId() const { return id; }
int getStock() const { return stock; }
// ⚠️ 经典超卖代码,看似没问题,多线程下会出大事
bool buy(int n) {
if (stock >= n) { // ① 检查
stock -= n; // ② 扣减
return true;
}
return false;
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
单线程测试,看起来"完全正确":
// test_product.cpp
#include "Product.h"
#include <iostream>
int main() {
Product p(1, "iPhone", 7999, 5);
for (int i = 0; i < 7; ++i) {
bool ok = p.buy(1);
std::cout << "买 1 件: " << (ok ? "成功" : "失败")
<< " 剩余: " << p.getStock() << "\n";
}
}
2
3
4
5
6
7
8
9
10
11
12
🧪 预期输出:
买 1 件: 成功 剩余: 4
买 1 件: 成功 剩余: 3
买 1 件: 成功 剩余: 2
买 1 件: 成功 剩余: 1
买 1 件: 成功 剩余: 0
买 1 件: 失败 剩余: 0
买 1 件: 失败 剩余: 0
2
3
4
5
6
7
8
✅ 单线程下完美,成功了 5 次,剩余 0,后两次失败。就是这种"单测全过"的代码骗了无数新手上线。下一步用 1000 个线程同时调,让你看清真相。
# Step 4.2 千线程见超卖
升级测试,100 件库存 + 200 个并发线程,每人买 1 件:
// test_product.cpp(v2:并发压测)
#include "Product.h"
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
int main() {
Product p(1, "iPhone", 7999, 100); // ⭐ 库存 100
std::atomic<int> successCount{0};
std::vector<std::thread> threads;
for (int i = 0; i < 200; ++i) { // ⭐ 200 个并发买家
threads.emplace_back([&]{
if (p.buy(1)) successCount++;
});
}
for (auto& t : threads) t.join();
std::cout << "成功购买: " << successCount.load() << " 件\n";
std::cout << "剩余库存: " << p.getStock() << " 件\n";
std::cout << "成功+剩余: " << successCount.load() + p.getStock() << "\n";
std::cout << "原始库存: 100\n";
std::cout << "一致性: " << (successCount + p.getStock() == 100 ? "✅" : "❌ 超卖!") << "\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
🧪 编译运行(多跑几次):
$ g++ -std=c++17 -pthread -O0 test_product.cpp -o test_product
$ for i in 1 2 3 4 5; do ./test_product; echo "---"; done
成功购买: 103 件
剩余库存: -3 件 ← 超卖!
成功+剩余: 100
一致性: ✅
---
成功购买: 100 件
剩余库存: 0 件
一致性: ✅ ← 这一次没超卖(运气好)
---
成功购买: 105 件
剩余库存: -5 件 ← 又超卖
一致性: ✅
---
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
⚠️ 看见了吗,"成功+剩余 = 100" 满足,但 stock 居然变负数了!
BUG 解释,两个线程交错执行:
线程 A 进入 buy(1):
① if (stock >= 1) 读 stock = 1,通过 ✅
[此时被 OS 切走]
线程 B 进入 buy(1):
① if (stock >= 1) 读 stock = 1,也通过 ✅
② stock -= 1 stock = 0
返回 true
线程 A 恢复:
② stock -= 1 stock = -1 💥 超卖!
返回 true
2
3
4
5
6
7
8
9
10
📚 核心错误:if(stock>=n) stock-=n 这两步操作不是原子的,读和写中间被其他线程插入了相同的读和写,导致条件判断失效。这就是经典的 check-then-act race。
# Step 4.3 atomic CAS 修复
升级 Product.h 为最终版:
// Product.h(v2:atomic CAS 版)
#pragma once
#include <string>
#include <atomic>
class Product {
private:
int id;
std::string name;
double price;
std::atomic<int> stock; // ⭐ 改成原子变量
public:
Product(int id_, std::string n, double p, int s)
: id(id_), name(std::move(n)), price(p), stock(s) {}
int getId() const { return id; }
int getStock() const { return stock.load(); }
// ⭐⭐⭐ CAS 循环扣库存(防超卖)
bool tryDeduct(int n) {
int cur = stock.load();
while (cur >= n) {
// 含义:如果 stock 仍然是 cur,则把它改成 cur - n
// 失败时 cur 会被自动更新为最新值,进入下一轮
if (stock.compare_exchange_weak(cur, cur - n)) {
return true; // 成功扣减
}
}
return false; // 库存不够(cur < n)
}
void restock(int n) { stock.fetch_add(n); }
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
把测试中的 p.buy(1) 改成 p.tryDeduct(1),重新压测:
🧪 编译运行(连跑 5 次都不应超卖):
$ for i in 1 2 3 4 5; do ./test_product; echo "---"; done
成功购买: 100 件
剩余库存: 0 件
成功+剩余: 100
一致性: ✅ ← 永远不超卖
---
(5 次完全一致)
2
3
4
5
6
7
8
9
✅ 零超卖、零漏单,这就是 atomic CAS 的力量。
CAS(Compare-And-Swap)执行轨迹:
初始 stock = 1 线程 A 进入 tryDeduct(1):cur = 1,尝试 CAS(1 → 0) 线程 B 进入 tryDeduct(1):cur = 1,尝试 CAS(1 → 0) → A 先执行 CAS 成功:stock = 0,返回 true → B 的 CAS 失败:cur 自动被更新为 0 → B 检查 cur >= 1 失败,跳出 while,返回 false1
2
3
4
5
6
7
整个判断+扣减是原子的,OS 切线程也插不进缝。
💼 weak vs strong:compare_exchange_weak 在某些 CPU(如 ARM)会"伪失败"(即使值相等也偶尔返回 false),但循环里会立即重试,性能比 strong 好。一次性判断必须用 strong。CAS 循环里永远首选 weak。
┌─ 📌 阶段 ④ 小结 ────────────────────────────────────────┐
│ ✅ 你刚刚亲手做完了: │
│ • Step 4.1 故意写 buy() 裸 int 版 │
│ • Step 4.2 200 线程压测 → 亲眼看到 stock = -3 的超卖现场 │
│ • Step 4.3 atomic + compare_exchange_weak 修复 │
│ 🎓 这一阶段你掌握的不是"如何写代码",而是"为什么要写" │
│ 🔜 下一阶段 ⑤:用策略模式串起所有组件 │
│ 💡 commit 建议:fix: prevent oversell with atomic CAS │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
# Step 4.4 故意造死锁
⚠️ 强烈建议跑一遍(并发四大金刚之二),race condition 你刚见过了,死锁是另一种灾难:程序不崩、不报错,只是永远卡死。你今天踩过这个坑,将来上线代码就不会重蹈覆辙。
场景:用户想把账户 A 的钱转账到账户 B。每个账户有自己的锁。
// deadlock_demo.cpp(v1:经典交叉申请,必死锁)
#include <iostream>
#include <mutex>
#include <thread>
#include <chrono>
class Account {
public:
int balance;
std::mutex mtx;
explicit Account(int b) : balance(b) {}
};
// ⚠️ 错误的转账实现:先锁 from 再锁 to
void transfer(Account& from, Account& to, int amount) {
std::lock_guard<std::mutex> l1(from.mtx);
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // ⭐ 放大死锁概率
std::lock_guard<std::mutex> l2(to.mtx);
from.balance -= amount;
to.balance += amount;
}
int main() {
Account a(1000), b(1000);
// ⚠️ 两个线程,转账方向相反
std::thread t1([&]{ transfer(a, b, 100); }); // 先锁 a 再锁 b
std::thread t2([&]{ transfer(b, a, 200); }); // 先锁 b 再锁 a
t1.join();
t2.join();
std::cout << "done a=" << a.balance << " b=" << b.balance << "\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
🧪 编译运行:
$ g++ -std=c++17 -pthread deadlock_demo.cpp -o dd && ./dd
(程序卡死,永远不输出 done)
# 用 Ctrl+C 强行结束,或用另一个终端 jstack/lldb 看:
# t1 持有 a.mtx,等 b.mtx
# t2 持有 b.mtx,等 a.mtx
# 互相等对方释放 → 永久死锁
2
3
4
5
6
7
⚠️ 死锁四要素全部满足(操作系统课的"必考"):
- 互斥 , mutex 同一时间只能被一个线程持有 ✅
- 持有并等待 , t1 持有 a.mtx 还想要 b.mtx ✅
- 不可剥夺 , 持有的锁 OS 不会强制收回 ✅
- 循环等待 , t1 等 t2,t2 等 t1,形成环 ✅
📚 为什么死锁比 race condition 更可怕?
- race 的崩溃有报错(segfault / 数据错乱),容易发现
- deadlock 没报错,程序就在那里"安静地"卡死,监控指标全绿,QPS = 0,运维以为机器卡了重启就行,重启完还会再死
# Step 4.5 修复方案 A:std::lock 一次锁全部
C++ 标准库给了一个专门用于多锁场景的工具 std::lock,一次性锁住一组锁,保证不死锁:
// 修复版 1:std::lock + std::adopt_lock
void transfer(Account& from, Account& to, int amount) {
std::lock(from.mtx, to.mtx); // ⭐ 同时申请两把锁
std::lock_guard<std::mutex> l1(from.mtx, std::adopt_lock); // ⭐ adopt_lock:接管已锁
std::lock_guard<std::mutex> l2(to.mtx, std::adopt_lock);
from.balance -= amount;
to.balance += amount;
}
2
3
4
5
6
7
8
或者用 C++17 的 std::scoped_lock(更简洁):
// 修复版 1+:scoped_lock(C++17)
void transfer(Account& from, Account& to, int amount) {
std::scoped_lock lock(from.mtx, to.mtx); // ⭐ 一行搞定,保证不死锁
from.balance -= amount;
to.balance += amount;
}
2
3
4
5
6
📚
std::lock内部实现:用 try_lock 序列尝试锁所有,只要有一个失败就把已经拿到的全部释放,再重来。它打破了"持有并等待"的死锁条件,要么全拿到,要么一个都不拿。
# Step 4.6 修复方案 B:锁顺序约定(更通用)
std::lock 只能解决这一对锁的死锁。如果系统里有 1000 个 Account,锁的对象事先不知道,怎么办?
✅ 业界标准做法:全局规定一个"锁顺序",比如按地址排序:
// 修复版 2:锁顺序约定(按地址排序)
void transfer(Account& from, Account& to, int amount) {
auto* first = std::min(&from, &to, [](auto a, auto b){ return a < b; });
auto* second = std::max(&from, &to, [](auto a, auto b){ return a < b; });
std::lock_guard<std::mutex> l1(first->mtx);
std::lock_guard<std::mutex> l2(second->mtx);
from.balance -= amount;
to.balance += amount;
}
2
3
4
5
6
7
8
9
10
这样不管 t1 是 transfer(a,b) 还是 t2 是 transfer(b,a),两个线程都会按"小地址先锁"的顺序申请,循环等待就被打破了。
💼 真实工程经验:MySQL InnoDB、PostgreSQL 的行锁、Linux 内核的 spinlock 都用"锁顺序"避免死锁。给每个锁分配一个全局唯一的 rank(等级),只能从小 rank 申请到大 rank,这是"锁层次"(lock hierarchy)的最严格形式,违反层次会直接 panic。
🧪 任选一种修复版重新跑:
$ g++ -std=c++17 -pthread deadlock_demo.cpp -o dd && ./dd
done a=1100 b=900 ← 顺利输出!
2
3
✅ 死锁治好了。这一节学到的不仅是 std::lock 这个 API,更是面对并发资源依赖图的两种通用思想:
- 原子获取所有依赖(std::lock 思想)
- 强制全局顺序(锁层次思想)
🔑 教学要点:本案例 §02-§07 的真实代码里没有"两个不同 mutex 交叉申请"的场景,所以这是加餐演示。但生产环境一旦你有"操作多个对象"的业务(订单转账、库存调拨、消息群发),死锁就会找上门。今天先在这里扎扎实实见过它一次。
# 06.策略模式下单
┌─ 🎯 阶段 ⑤ 目标 ────────────────────────────────────────┐
│ 完成什么:用策略模式串起 ThreadPool + Product + OrderManager │
│ 三种下单方式:Normal / Flash / Batch │
│ 不做什么:暂不做 CLI,先用 main 直接调对比效果 │
│ 验收标准:同样下 100 单,看到 Flash 比 Normal 快 5-8 倍 │
│ 预计耗时:60 min · 3 Step │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
7
💡 策略模式(Strategy Pattern):把"做某事的多种方式"抽成接口,调用方传哪个就用哪个。在这里,同样是下单,单线程顺序 vs 线程池并发 vs 分批,调用代码完全不变,只换策略对象。
# 灵魂三问:为什么不用 if-else?
三种下单方式直接写 if (mode == FLASH) { ... } else if (mode == BATCH) { ... } 不就完了?
来看反例:
// ❌ 反例:if-else 巨型函数
std::vector<int> placeOrders(int mode, Product& p, OrderManager& mgr,
int userId, std::vector<int> qtys) {
if (mode == NORMAL) {
// 50 行普通下单代码
} else if (mode == FLASH) {
// 80 行秒杀代码(涉及线程池)
} else if (mode == BATCH) {
// 60 行分批代码(涉及 future 收集)
}
// 想加个分布式策略?再 else if?
}
2
3
4
5
6
7
8
9
10
11
12
问题:
- 违反开闭原则,加新策略要改这个巨型函数
- 依赖发散,这一个函数同时依赖 ThreadPool / future / 分批逻辑,没法单独测试 NormalStrategy
- switch case 永远在迭代,业务越发展越乱
✅ 正确做法:抽 OrderStrategy 接口 + 三个独立子类,加新策略只需新增一个文件,不动旧代码。这就是策略模式的"开"(对扩展开放)"闭"(对修改关闭)。
❓ 为什么 FlashStrategy 接受 ThreadPool& 引用而不是自己 new 一个?
来看反例:
// ❌ 反例:策略内部自己 new 线程池
class FlashStrategy : public OrderStrategy {
ThreadPool pool{8}; // 每个策略对象 8 个线程
// ...
};
2
3
4
5
问题:
- 资源浪费,程序里如果同时有 NormalStrategy + FlashStrategy + BatchStrategy 实例,会有 3 个线程池 = 24 个线程
- 生命周期失控,策略对象析构时线程池跟着析构,正在跑的任务被强制中断
- 测试困难,单测 FlashStrategy 必须真启 8 个线程,跑不快
✅ 正确做法:依赖注入,线程池由调用方创建并注入,策略对象只是"用户"。这与 04 案例 §07 main 注入 KvDatabase 给 REPL 是同一个思想。
❓ 三种策略性能差距是否一定满足"Normal < Batch < Flash"? 答:不一定!本案例的任务太轻(atomic 加一个 int),并行收益不大。任务越重,并发优势越明显。这就是为什么 Step 5.2 要在测试代码里说"任务里加 100ms 的 IO 才会接近 8 倍加速"。
🔑 教学要点:策略模式的精髓不是"避免 if-else",而是让"算法族"成为一等公民,它们可以被传递、组合、替换、单独测试。
# Step 5.1 策略接口与基线
新建 OrderStrategy.h:
// OrderStrategy.h
#pragma once
#include "Product.h"
#include "OrderManager.h"
#include <vector>
class OrderStrategy {
public:
virtual ~OrderStrategy() = default;
virtual std::vector<int> placeOrders(
Product& product,
OrderManager& mgr,
int userId,
const std::vector<int>& quantities) = 0; // 纯虚函数
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
新建 NormalStrategy.h,最简单的实现:
// NormalStrategy.h
#pragma once
#include "OrderStrategy.h"
class NormalStrategy : public OrderStrategy {
public:
std::vector<int> placeOrders(Product& product, OrderManager& mgr,
int userId,
const std::vector<int>& quantities) override {
std::vector<int> orderIds;
for (int qty : quantities) {
if (product.tryDeduct(qty)) {
orderIds.push_back(mgr.addOrder(userId, product.getId(), qty));
} else {
orderIds.push_back(-1); // 失败标记
}
}
return orderIds;
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
测试,下 10 单看流程通:
// test_strategy.cpp
#include "NormalStrategy.h"
#include <iostream>
int main() {
Product p(1, "iPhone", 7999, 5); // ⭐ 库存只有 5
OrderManager mgr;
NormalStrategy normal;
std::vector<int> qtys(10, 1); // 想下 10 单(每单 1 件)
auto ids = normal.placeOrders(p, mgr, 100, qtys);
int success = 0;
for (int id : ids) if (id > 0) success++;
std::cout << "下单 10 次,成功 " << success << " 剩余库存 " << p.getStock() << "\n";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
🧪 预期输出:
下单 10 次,成功 5 剩余库存 0
2
✅ 库存 5 件,10 个请求只有 5 个成功,业务正确。
# Step 5.2 FlashStrategy 并发
新建 FlashStrategy.h:
// FlashStrategy.h
#pragma once
#include "OrderStrategy.h"
#include "ThreadPool.h"
#include <future>
class FlashStrategy : public OrderStrategy {
private:
ThreadPool& pool;
public:
explicit FlashStrategy(ThreadPool& p) : pool(p) {}
std::vector<int> placeOrders(Product& product, OrderManager& mgr,
int userId,
const std::vector<int>& quantities) override {
// ⭐ 每个下单作为一个 task 提交,工人并发抢库存
std::vector<std::future<int>> futs;
for (int qty : quantities) {
futs.push_back(pool.submit([&product, &mgr, userId, qty]{
if (product.tryDeduct(qty)) {
return mgr.addOrder(userId, product.getId(), qty);
}
return -1;
}));
}
// 收集所有结果(按提交顺序)
std::vector<int> result;
for (auto& f : futs) result.push_back(f.get());
return result;
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
对比测试,同一份输入,分别用 Normal 和 Flash 跑,比时间:
#include "NormalStrategy.h"
#include "FlashStrategy.h"
#include <iostream>
#include <chrono>
void bench(const char* name, OrderStrategy& s, Product& p, OrderManager& mgr) {
std::vector<int> qtys(1000, 1);
auto t0 = std::chrono::steady_clock::now();
auto ids = s.placeOrders(p, mgr, 1, qtys);
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - t0).count();
int ok = 0;
for (int id : ids) if (id > 0) ok++;
std::cout << name << ": 成功 " << ok << " / 1000 耗时 " << ms << " ms\n";
}
int main() {
ThreadPool pool(8);
{
Product p(1, "iPhone", 7999, 1000);
OrderManager mgr;
NormalStrategy normal;
bench("Normal", normal, p, mgr);
}
{
Product p(1, "iPhone", 7999, 1000);
OrderManager mgr;
FlashStrategy flash(pool);
bench("Flash ", flash, p, mgr);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
🧪 预期输出(差距明显):
Normal: 成功 1000 / 1000 耗时 12 ms
Flash : 成功 1000 / 1000 耗时 4 ms ← 8 线程并发约 3 倍加速
2
3
✅ Flash 比 Normal 快 2-3 倍,任务越重,差距越大(CPU 密集型可达 8 倍)。
📚 为什么不是直接 8 倍快? , 因为本例中"扣库存 + 加订单"本身已经被 atomic / shared_mutex 串行化。如果任务里加 100ms 的 IO(如发邮件、查数据库),加速比就会接近 8 倍。
# Step 5.3 BatchStrategy 分批
新建 BatchStrategy.h,一次提交多批,控制内存压力:
// BatchStrategy.h
#pragma once
#include "OrderStrategy.h"
#include "ThreadPool.h"
#include <future>
#include <algorithm>
class BatchStrategy : public OrderStrategy {
private:
ThreadPool& pool;
size_t batchSize;
public:
BatchStrategy(ThreadPool& p, size_t bs = 10) : pool(p), batchSize(bs) {}
std::vector<int> placeOrders(Product& product, OrderManager& mgr,
int userId,
const std::vector<int>& quantities) override {
std::vector<int> result(quantities.size(), -1);
// ⭐ 每 batchSize 个为一组,组内并发,组间串行
for (size_t i = 0; i < quantities.size(); i += batchSize) {
size_t end = std::min(i + batchSize, quantities.size());
std::vector<std::future<int>> futs;
for (size_t j = i; j < end; ++j) {
int qty = quantities[j];
futs.push_back(pool.submit([&product, &mgr, userId, qty]{
if (product.tryDeduct(qty)) {
return mgr.addOrder(userId, product.getId(), qty);
}
return -1;
}));
}
// 等本批完成后再开下一批
for (size_t j = i; j < end; ++j) {
result[j] = futs[j - i].get();
}
}
return result;
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
测试,下 50 单 batchSize=10 的情况:
ThreadPool pool(8);
Product p(1, "iPhone", 7999, 30); // 库存只够一半多
OrderManager mgr;
BatchStrategy batch(pool, 10);
std::vector<int> qtys(50, 1);
auto ids = batch.placeOrders(p, mgr, 1, qtys);
int ok = 0;
for (int id : ids) if (id > 0) ok++;
std::cout << "Batch 50 单 batchSize=10: 成功 " << ok
<< " 剩余 " << p.getStock() << "\n";
2
3
4
5
6
7
8
9
10
11
🧪 预期输出:
Batch 50 单 batchSize=10: 成功 30 剩余 0
2
✅ 库存 30 全部卖光,剩 20 单失败,业务正确,且内存占用最多只有 10 个 future(不像 Flash 一口气创建 50 个)。
┌─ 📌 阶段 ⑤ 小结 ────────────────────────────────────────┐
│ ✅ 已掌握 │
│ • Step 5.1 OrderStrategy 抽象接口 + Normal 顺序版 │
│ • Step 5.2 FlashStrategy 线程池并发 → 实测 3 倍加速 │
│ • Step 5.3 BatchStrategy 分批 + future 收集 → 内存可控 │
│ 📊 三策略对比: │
│ ┌─────────┬─────┬─────┬───────────────┐ │
│ │ 策略 │并发度│吞吐量│ 适用场景 │ │
│ │ Normal │ 1 │ 低 │ 顺序业务/小流量 │ │
│ │ Flash │ N │ 高 │ 秒杀/突发流量 │ │
│ │ Batch │ M/批│ 中 │ 后台批处理 │ │
│ └─────────┴─────┴─────┴───────────────┘ │
│ 🔜 下一阶段 ⑥:Logger 线程安全日志 │
│ 💡 commit 建议:feat: 3 order strategies │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 07.线程安全 Logger
┌─ 🎯 阶段 ⑥ 目标 ────────────────────────────────────────┐
│ 完成什么:Logger , 线程安全的格式化日志 │
│ 不做什么:不做日志分级(DEBUG/INFO/ERROR)、不写文件 │
│ 验收标准:先写裸 cout 看日志撕成乱码 → 加锁后行行清晰 │
│ 预计耗时:30 min · 2 Step(造 BUG → 修 BUG) │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
💡 std::cout 本身不保证线程安全 , 多线程同时输出会"字符级"交错。比如线程 A 输出 "Hello\n"、线程 B 输出 "World\n",可能会变成 "HelWloolrd\n\n"。这一阶段我们让你看到这种乱码,再用 mutex 修好它。
# Step 6.1 裸 cout 见撕裂
不写 Logger,直接用 cout 起步,写一个测试:
// test_log.cpp(v1:裸 cout,会撕裂)
#include <iostream>
#include <thread>
#include <vector>
int main() {
std::vector<std::thread> threads;
for (int t = 0; t < 4; ++t) {
threads.emplace_back([t]{
for (int i = 0; i < 5; ++i) {
// 故意分多次 << 输出,放大撕裂效应
std::cout << "[Thread " << t << "] ";
std::cout << "iter ";
std::cout << i;
std::cout << "\n";
}
});
}
for (auto& th : threads) th.join();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
🧪 编译运行(多跑几次都能看到乱码):
$ g++ -std=c++17 -pthread test_log.cpp -o test_log && ./test_log
[Thread [Thread 0] iter 1] iter [Thread 0
2] iter [Thread 3] iter 0
0
[Thread 1] iter 1
[Thread 0] iter 1
[Thread 2] iter 1
...
2
3
4
5
6
7
8
9
⚠️ 看见了吗,第一行 "[Thread 0] iter 0" 和 "[Thread 1] iter 0" 完全交错在一起,根本读不懂哪条来自哪个线程。
📚 为什么 cout 不安全? , std::cout << x 会调用 operator<< 的几次内部函数。多个线程的多次调用没有任何同步,所以输出字符级交错。
⚠️ 注意:C++11 之后单次 operator<< 是原子的(不会撕一个字符串),但多次 << 拼一行不是原子的,这就是上面乱码的原因。
# Step 6.2 mutex 折叠 Logger
新建 Logger.h,一次性写完所有要点:
// Logger.h(v1:线程安全 + 时间戳 + 折叠输出)
#pragma once
#include <iostream>
#include <mutex>
#include <sstream>
#include <thread>
#include <chrono>
#include <iomanip>
class Logger {
private:
static inline std::mutex mtx; // ⭐ C++17 inline static:头文件里就能初始化
// 生成 "HH:MM:SS.mmm" 格式时间戳
static std::string timestamp() {
auto now = std::chrono::system_clock::now();
auto t = std::chrono::system_clock::to_time_t(now);
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch()) % 1000;
std::ostringstream oss;
oss << std::put_time(std::localtime(&t), "%H:%M:%S")
<< "." << std::setw(3) << std::setfill('0') << ms.count();
return oss.str();
}
public:
template <typename... Args>
static void info(Args&&... args) {
std::lock_guard<std::mutex> lock(mtx); // ⭐ 整行原子
std::cout << "[" << timestamp() << "][T"
<< std::this_thread::get_id() << "] ";
(std::cout << ... << std::forward<Args>(args)); // ⭐ C++17 折叠表达式
std::cout << "\n";
}
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
升级测试,把 cout 全换成 Logger::info:
// test_log.cpp(v2:用 Logger)
#include "Logger.h"
#include <thread>
#include <vector>
int main() {
std::vector<std::thread> threads;
for (int t = 0; t < 4; ++t) {
threads.emplace_back([t]{
for (int i = 0; i < 5; ++i) {
Logger::info("Thread ", t, " iter ", i);
}
});
}
for (auto& th : threads) th.join();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
🧪 预期输出(每行完整、不撕裂):
[14:05:32.001][T0x70...01] Thread 0 iter 0
[14:05:32.001][T0x70...02] Thread 1 iter 0
[14:05:32.001][T0x70...03] Thread 2 iter 0
[14:05:32.001][T0x70...04] Thread 3 iter 0
[14:05:32.002][T0x70...01] Thread 0 iter 1
...
2
3
4
5
6
7
✅ 每行都完整、带时间戳、带线程号,日志的"行原子性"保住了。
📚 C++17 折叠表达式
(... op pack)神技:(std::cout << ... << args) // 等价于 cout << arg0 << arg1 << ... << argN1比 C++11 时代手写递归模板优雅 100 倍,以前要这么写:
// C++11 老办法:递归 void log() {} template<class T, class... Args> void log(T&& t, Args&&... rest) { cout << t; log(rest...); }1
2
3
4
💼
static inline std::mutex mtx;, C++17 起允许inline静态成员直接在类内初始化,不用再去 .cpp 写一句std::mutex Logger::mtx;了。
┌─ 📌 阶段 ⑥ 小结 ────────────────────────────────────────┐
│ ✅ 已掌握 │
│ • Step 6.1 裸 cout 多线程 → 亲眼看到日志撕裂 │
│ • Step 6.2 mutex + 折叠表达式 → 行原子日志 ⭐ │
│ 🔜 下一阶段 ⑦:把所有组件串起来做 CLI + 1万 QPS 压测 │
│ 💡 commit 建议:feat: thread-safe logger │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
7
# 08.CLI 与压测
┌─ 🎯 阶段 ⑦ 目标 ──────────── 整个项目的最终验收 ─────────┐
│ 完成什么:交互式 CLI(main.cpp)+ 1 万 QPS 压测(benchmark)│
│ 不做什么:不做菜单美化、不做命令历史 │
│ 验收标准:压测脚本输出 "一致性: ✅ PASS" + QPS > 10000 │
│ 预计耗时:60 min · 2 Step │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
💡 本阶段是整个项目的"组装阶段",你前面 6 个阶段做好的零件,现在装到一辆能跑的车上。
# Step 7.1 交互式 main
新建 main.cpp:
// main.cpp
#include "ThreadPool.h"
#include "OrderManager.h"
#include "Product.h"
#include "NormalStrategy.h"
#include "FlashStrategy.h"
#include "Logger.h"
#include <iostream>
int main() {
ThreadPool pool(4);
OrderManager mgr;
Product iphone(1, "iPhone 15", 7999.0, 100);
NormalStrategy normal;
FlashStrategy flash(pool);
while (true) {
std::cout << "\n========== 订单系统 ==========\n"
"1=普通下单 2=秒杀下单 3=查询订单\n"
"4=查看库存 0=退出\n> ";
int op;
if (!(std::cin >> op)) break;
if (op == 0) break;
try {
if (op == 1 || op == 2) {
int userId, count;
std::cout << "用户 ID: "; std::cin >> userId;
std::cout << "下单次数: "; std::cin >> count;
std::vector<int> qtys(count, 1);
OrderStrategy& s = (op == 1)
? static_cast<OrderStrategy&>(normal)
: static_cast<OrderStrategy&>(flash);
auto ids = s.placeOrders(iphone, mgr, userId, qtys);
int success = 0;
for (int id : ids) if (id > 0) success++;
Logger::info("用户 ", userId, " 提交 ", count,
" 单,成功 ", success);
}
else if (op == 3) {
int userId;
std::cout << "用户 ID: "; std::cin >> userId;
auto v = mgr.ordersByUser(userId);
std::cout << "用户 " << userId << " 共 " << v.size() << " 单:\n";
for (const auto& o : v) {
std::cout << " 订单 #" << o.id
<< " - 商品 " << o.productId
<< " - 数量 " << o.quantity << "\n";
}
}
else if (op == 4) {
std::cout << "iPhone 15 当前库存: " << iphone.getStock() << "\n";
}
} catch (const std::exception& e) {
std::cout << "ERR " << e.what() << "\n";
}
}
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
编译运行 + 互动验证:
🧪 编译运行:
$ g++ -std=c++17 -pthread -O2 main.cpp -o orderapp
$ ./orderapp
========== 订单系统 ==========
1=普通下单 2=秒杀下单 3=查询订单 4=查看库存 0=退出
> 4
iPhone 15 当前库存: 100
> 1
用户 ID: 100
下单次数: 30
[14:06:01.234][T0x70...] 用户 100 提交 30 单,成功 30
> 2
用户 ID: 200
下单次数: 80 ← 库存只剩 70
[14:06:05.678][T0x70...] 用户 200 提交 80 单,成功 70
> 4
iPhone 15 当前库存: 0
> 3
用户 ID: 100
用户 100 共 30 单:
订单 #1 - 商品 1 - 数量 1
订单 #2 - 商品 1 - 数量 1
...
> 0
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
✅ 三个关键点都验证到了:
- 库存正确递减(100 → 70 → 0)
- 超过库存的下单部分失败(200 用户下 80 单只成功 70)
- 查询能拿到该用户的订单
# Step 7.2 benchmark 万 QPS
新建 benchmark.cpp,这是最终验收:
// benchmark.cpp
#include "ThreadPool.h"
#include "OrderManager.h"
#include "Product.h"
#include "FlashStrategy.h"
#include <chrono>
#include <iostream>
#include <atomic>
#include <thread>
int main() {
constexpr int TOTAL_USERS = 1000;
constexpr int ORDERS_PER_USER = 10;
constexpr int INITIAL_STOCK = 5000; // ⭐ 故意 < 总请求 10000
ThreadPool pool(std::thread::hardware_concurrency());
OrderManager mgr;
Product iphone(1, "iPhone 15", 7999.0, INITIAL_STOCK);
FlashStrategy flash(pool);
std::vector<int> qtys(ORDERS_PER_USER, 1);
std::atomic<int> totalSuccess{0};
auto t0 = std::chrono::steady_clock::now();
// 启动 1000 个用户线程,每人下 10 单
std::vector<std::thread> users;
users.reserve(TOTAL_USERS);
for (int u = 0; u < TOTAL_USERS; ++u) {
users.emplace_back([&, u]{
auto ids = flash.placeOrders(iphone, mgr, u, qtys);
for (int id : ids) if (id > 0) totalSuccess++;
});
}
for (auto& t : users) t.join();
auto t1 = std::chrono::steady_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0).count();
int total = TOTAL_USERS * ORDERS_PER_USER;
int success = totalSuccess.load();
int leftover = iphone.getStock();
std::cout << "========================================\n";
std::cout << "总请求数: " << total << "\n";
std::cout << "成功订单: " << success << "\n";
std::cout << "剩余库存: " << leftover << "\n";
std::cout << "总耗时: " << ms << " ms\n";
std::cout << "QPS: " << (total * 1000.0 / ms) << "\n";
// ⭐⭐⭐ 关键一致性验证
bool consistent = (success + leftover == INITIAL_STOCK);
std::cout << "一致性校验: 成功(" << success << ") + 剩余(" << leftover
<< ") == 初始(" << INITIAL_STOCK << ") ? "
<< (consistent ? "✅ PASS" : "❌ FAIL:超卖或漏单!") << "\n";
std::cout << "========================================\n";
return consistent ? 0 : 1; // 一致则退出码 0
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
🧪 编译运行(多跑几次都应一致):
$ g++ -std=c++17 -pthread -O2 benchmark.cpp -o bench
$ for i in 1 2 3 4 5; do ./bench; echo; done
========================================
总请求数: 10000
成功订单: 5000
剩余库存: 0
总耗时: 487 ms
QPS: 20533.88
一致性校验: 成功(5000) + 剩余(0) == 初始(5000) ? ✅ PASS
========================================
(5 次完全一致 PASS)
2
3
4
5
6
7
8
9
10
11
12
✅ 三个最终验证全部通过:
- QPS > 20000(普通笔记本 4 核能轻松达到,证明线程池调度高效)
- 零超卖(atomic CAS 在万级并发下零失误)
- 零漏单(shared_mutex 正确保护 vector 不丢数据)
┌─ 📌 阶段 ⑦ 小结 + 整个项目里程碑 ──────────────────────┐
│ ✅ 已掌握 │
│ • Step 7.1 交互式 CLI 串起 4 个核心组件 │
│ • Step 7.2 万级并发压测 + 一致性校验 PASS │
│ 🎉 整个 7 阶段 22 Step 全部跑通,你已经掌握了: │
│ • mutex / shared_mutex / atomic / cv 四大同步原语 │
│ • thread / future / packaged_task 异步任务三件套 │
│ • 生产者-消费者 + 策略模式 两个工业级模式 │
│ • "造 BUG → 看 BUG → 修 BUG" 这种以错为师的并发学习法 │
│ 💡 commit 建议:feat: cli + benchmark with consistency │
└──────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
# 09.项目总结速查
# 9.1 整体架构
Application
│
▼
OrderStrategy(策略模式)
│ │ │
▼ ▼ ▼
Normal Flash Batch
│ │ │
└───┬───┴──────┘
▼
ThreadPool ◄──── BlockingQueue
│
▼
OrderManager (shared_mutex) + Product (atomic)
│
▼
Logger (mutex)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 9.2 并发原语速查表
| 原语 | 头文件 | 用途 | 性能 |
|---|---|---|---|
std::thread | <thread> | 启动线程 | - |
std::jthread 🆕 | <thread> C++20 | 自动 join + stop_token | 同 thread |
std::mutex | <mutex> | 基础互斥锁 | 中(约 25ns/次) |
std::shared_mutex | <shared_mutex> C++17 | 读写锁 | 慢(约 60ns) |
std::atomic<T> | <atomic> | 无锁原子变量 | 极快(约 5ns) |
std::condition_variable | <condition_variable> | 条件等待 | - |
std::future | <future> | 异步结果 | - |
std::lock_guard | <mutex> | RAII 锁守卫 | 同 mutex |
std::unique_lock | <mutex> | 灵活锁守卫(CV 必备) | 比 lock_guard 略慢 |
# 9.3 选择决策树
要保护的数据
├── 是单个变量(int/bool/指针)?
│ └── ✅ 用 std::atomic<T>
│
├── 多读少写(读 > 80%)?
│ └── ✅ 用 std::shared_mutex
│
├── 任务执行有先后依赖?
│ └── ✅ 用 condition_variable
│
└── 需要"等结果"?
└── ✅ 用 future + promise(或 packaged_task)
2
3
4
5
6
7
8
9
10
11
12
# 10.项目技术思考
# 10.1 三种锁的对比
// 三种实现"线程安全计数器"的方法
class CounterMutex {
int v = 0; std::mutex m;
public:
void inc() { std::lock_guard l(m); v++; } // 25 ns
int get() { std::lock_guard l(m); return v; }
};
class CounterShared {
int v = 0; std::shared_mutex m;
public:
void inc() { std::unique_lock l(m); v++; } // 35 ns
int get() { std::shared_lock l(m); return v; }// 30 ns(多读快)
};
class CounterAtomic {
std::atomic<int> v{0};
public:
void inc() { v.fetch_add(1); } // 5 ns
int get() { return v.load(); }
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
结论:单变量场景永远首选 atomic;多变量需要"事务一致性"才用 mutex。
# 10.2 cv 与 unique_lock
cv.wait(lock) 的实现伪码:
void wait(unique_lock& lk) {
lk.unlock(); // 1. 释放锁
block_until_notified(); // 2. 阻塞等通知
lk.lock(); // 3. 醒来后重新加锁
}
2
3
4
5
因此 wait() 必须能 unlock 和 lock,lock_guard 不支持手动 unlock,所以 cv 必须用 unique_lock。
# 10.3 jthread 解决了什么
// 旧 thread 的痛点:
std::thread t(work);
// 必须记得 t.join() 否则程序崩溃(terminate)
// 想中断 t 没有官方机制
// C++20 jthread:
std::jthread t([](std::stop_token st) {
while (!st.stop_requested()) { /* work */ }
});
// 析构自动 join,析构前自动发 stop request
2
3
4
5
6
7
8
9
10
jthread = thread + 自动 join + 协作中断,是替代 thread 的"全面升级"。
# 11.衔接与延伸
# 11.1 与上一案例的差异
| 维度 | 案例 04 JSON+KV | 案例 05 多线程订单 |
|---|---|---|
| 线程模型 | 单线程 | 多线程 + 线程池 |
| 同步机制 | 无 | mutex / shared_mutex / atomic / cv |
| 异步任务 | 无 | future / promise / packaged_task |
| 设计模式 | 工厂 | 策略模式 + 生产者消费者 |
# 11.2 下一案例的递进
下一案例 06.迷你 KV 存储引擎 会做四件升级:
- 持久化升级:从内存 KV 升级为 AOF 日志持久化(每次写命令落盘)
- 命令模式:所有 SET / GET / DEL / EXPIRE 都是
Command子类 - 后台清理:用
jthread+stop_token实现 TTL 过期键清理 - 手写单测:用宏 + 注册表实现"裸 C++ 单元测试框架"
# 11.3 三个延伸挑战
挑战 A(基础)· 用 jthread 重写线程池
把 std::thread 换成 std::jthread,析构时不再需要手动 shutdown。提示:用 stop_token 替代 BlockingQueue::shutdown(),彻底消除 §03 灵魂三问里提到的"析构顺序错就死锁"的隐患。
挑战 B(进阶)· 实现 readers-writer 优先级
当前 shared_mutex 在大量读时,写线程会"饿死"(永远拿不到锁)。请实现"写优先",有写请求时阻止新读者进入。
挑战 C(现代化)· 集成 std::async 和协程(C++20)
把 pool.submit(f) 替换为 co_await async_op(),体验协程的简洁。这是 C++20 最重要的特性之一,详见卷三。
- ⬅ 上一案例:04.JSON与内存数据库
- ➡ 下一案例:06.迷你KV存储引擎 , 卷一 17 章全章串联的毕业设计