编程进阶网 编程进阶网
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接

杨充

专注编程 · 终身学习者
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接
  • README
  • C语言入门精通

  • Cpp入门到精通

    • README
    • 入门教程

    • 综合案例

      • README
      • 学生管理通讯录系统
      • 银行账户管理系统
      • 校园身份预约系统
      • Json与内存数据库
      • 订单票务购买系统
        • 📚 渐进学习节奏
        • 00.案例元信息
          • 项目结构
          • 一条命令编译运行
        • 📋 目录快速导航
        • 01.项目需求和功能
          • 1.1 业务场景
          • 1.2 三种下单策略
          • 1.3 并发挑战
        • 02.阻塞队列基础组件
          • 灵魂三问:动手前先想清楚
          • Step 1.1 最小裸版输出
          • Step 1.2 双线程作死测试
          • Step 1.3 mutex 修复竞争
          • Step 1.4 阻塞 pop 与停机
        • 03.通用线程池 ThreadPool
          • 灵魂三问:线程池设计要点
          • Step 2.1 线程池最小骨架
          • Step 2.2 packaged_task 入门
          • Step 2.3 模板 submit 实现
          • Step 2.4 千任务压测验证
        • 04.OrderManager 与读写锁
          • Step 3.1 Order 单线程跑通
          • Step 3.2 mutex 基线压测
          • Step 3.3 shared_mutex 升级
        • 05.Product 防超卖
          • Step 4.1 裸 int 埋 BUG
          • Step 4.2 千线程见超卖
          • Step 4.3 atomic CAS 修复
          • Step 4.4 故意造死锁
          • Step 4.5 修复方案 A:std::lock 一次锁全部
          • Step 4.6 修复方案 B:锁顺序约定(更通用)
        • 06.策略模式下单
          • 灵魂三问:为什么不用 if-else?
          • Step 5.1 策略接口与基线
          • Step 5.2 FlashStrategy 并发
          • Step 5.3 BatchStrategy 分批
        • 07.线程安全 Logger
          • Step 6.1 裸 cout 见撕裂
          • Step 6.2 mutex 折叠 Logger
        • 08.CLI 与压测
          • Step 7.1 交互式 main
          • Step 7.2 benchmark 万 QPS
        • 09.项目总结速查
          • 9.1 整体架构
          • 9.2 并发原语速查表
          • 9.3 选择决策树
        • 10.项目技术思考
          • 10.1 三种锁的对比
          • 10.2 cv 与 unique_lock
          • 10.3 jthread 解决了什么
        • 11.衔接与延伸
          • 11.1 与上一案例的差异
          • 11.2 下一案例的递进
          • 11.3 三个延伸挑战
      • 迷你KV存储引擎器
      • 迷你编译器解释器
    • 专栏博客

    • 开发技巧

  • Java入门精通

  • Go入门到精通

  • JavaScript入门

  • CodeX
  • Cpp入门到精通
  • 综合案例
杨充
2026-05-25
目录

订单票务购买系统

# 第五章:C++ 订单票务购买系统

本章是综合案例的第五关·并发栈集大成,前 4 个案例都是单线程程序,从本章开始我们正式踏入"多核时代",理解为什么"加锁"看起来简单,写错却会导致死锁、数据竞争、活锁等"幽灵 BUG"。本案例融合原 3 个案例(票务购买 / 轮询管理 / 线程池)为一个完整的"多线程订单系统",一次练遍 C++ 并发的八大核心:

  1. std::thread + std::mutex:最基础的"启动线程 + 互斥锁"
  2. std::condition_variable:生产者-消费者模式,订单队列的灵魂
  3. std::atomic:无锁计数器、原子状态切换
  4. std::shared_mutex:读写锁,多读单写
  5. std::future + std::promise:异步任务的结果回传
  6. std::jthread 🆕 C++20:自动 join + 协作中断
  7. 完整的线程池组件:可重用的工业级线程池
  8. 策略模式:不同订单类型(普通/秒杀/分布式)使用不同的下单策略

学习方式:本案例按"并发原语拆解 → 写代码 → 跑压测 → 看竞态 → 加锁修复"五步法循环。总共 8 个阶段、约 14 小时,建议分 4 天完成。并发代码不能只“跱过一次”,一定要跑 benchmark 反复压测,才能揭出隐藏的竞态问题。


# 📚 渐进学习节奏

💡 先读这段,再开始敲代码!多线程代码是严禁"一口气写完才跑"的,错了你完全不知道错在哪里。

本案例采用严格的并发验证节奏,每加一个并发原语都必须跑一次压测验证,不能凭"看起来对"过关:

阶段 ① BlockingQueue(02 节) · 60 min · 4 Step
   └ Step 1.1: 最小裸版(不加锁)→ 单线程 FIFO 跑通
   └ Step 1.2: 双线程作死测试 → 亲眼看到 race condition ⚠️
   └ Step 1.3: 加 mutex 修复数据竞争
   └ Step 1.4: 加 condition_variable 阻塞 + shutdown 优雅退出

阶段 ② ThreadPool(03 节) · 120 min · 4 Step  【高峰:模板 submit】
   └ Step 2.1: 最小骨架(submit_simple,无返回值)→ 工人能转
   └ Step 2.2: 单独玩 packaged_task + future 三剑客 ⭐
   └ Step 2.3: 模板 submit + invoke_result_t + shared_ptr 三大坑
   └ Step 2.4: 1000 任务压测 → 8 倍加速 + 数值守恒

阶段 ③ OrderManager + 读写锁(04 节) · 60 min · 3 Step
   └ Step 3.1: Order 数据类 + atomic ID 单线程跑通
   └ Step 3.2: mutex 版多线程压测(记下基线时间)
   └ Step 3.3: 改 shared_mutex → 读吞吐翻倍 ⭐

阶段 ④ Product 库存 atomic(05 节) · 90 min · 3 Step + 加餐  【双高峰:超卖 + 死锁 ⭐⭐⭐】
   └ Step 4.1: 写裸 int 版 → 单线程"看起来"完美
   └ Step 4.2: 200 线程压测 → 看到 stock = -3 的超卖现场 💥
   └ Step 4.3: atomic + compare_exchange_weak 修复 → 零超卖
   └ Step 4.4: 🆕 加餐 - 故意交叉申请两把锁 → 看到经典死锁现场 💥
   └ Step 4.5: std::lock / scoped_lock 一次锁全部修复
   └ Step 4.6: 锁顺序约定(按地址排序), 工业级解法

阶段 ⑤ 三种下单策略(06 节) · 60 min · 3 Step
   └ Step 5.1: 策略接口 + NormalStrategy 单线程顺序
   └ Step 5.2: FlashStrategy 线程池并发 → 实测 3 倍加速
   └ Step 5.3: BatchStrategy 分批 + future 收集 → 内存可控

阶段 ⑥ Logger 线程安全日志(07 节) · 30 min · 2 Step
   └ Step 6.1: 裸 cout 多线程 → 看到日志撕裂乱码
   └ Step 6.2: mutex + C++17 折叠表达式 → 行原子日志

阶段 ⑦ CLI + benchmark(08 节) · 60 min · 2 Step  【整个项目最终验收】
   └ Step 7.1: 交互式 main 串起 4 个核心组件
   └ Step 7.2: 万 QPS 压测 + 一致性校验 PASS ✅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

🎯 每个 Step 必须做的三件事:

  1. 看 🎯 阶段目标卡片:明确这一阶段做什么、不做什么、验收标准
  2. 写一小段代码就编译运行一次(并发代码还要跑 N 次看是否稳定)
  3. 看到预期输出再写下一个 Step(压测通过才算过关)

⚠️ 本案例独有的"造 BUG → 修复"高峰:

  • 阶段 ① Step 1.2:故意不加锁的 BlockingQueue → 看到崩溃/卡死/漏数据
  • 阶段 ④ Step 4.2:故意用裸 int 扣库存 → 看到 "100 件商品卖出 103 件" 的超卖现场
  • 阶段 ④ Step 4.4 🆕 加餐:故意交叉申请两把锁 → 看到经典死锁现场(程序静默卡死)
  • 阶段 ⑥ Step 6.1:故意用裸 cout 写日志 → 看到字符级撕裂乱码

四次"踩坑→修复"流程,让你真正记住 mutex / atomic / lock_guard / std::lock 为什么存在。

✅ 每个阶段的结构(你在正文里会反复看到):

┌─ 🎯 阶段目标 ──────────────┐  ← 阶段开头:明确做什么/不做什么
│  完成什么、不做什么、验收标准    │
└──────────────────────────────┘

  Step X.1:先写最小可编译版(5-20 行)
  Step X.2:编译 → 单线程跑 → 看到输出 ✅
  Step X.3:加锁保护临界区(10-30 行)
  Step X.4:多线程压测 N 次 ✅

┌─ 📌 阶段小结 ─────────────┐  ← 阶段结尾:✅ 已掌握 + 🔜 下一阶段
└──────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11

# 00.案例元信息

项目 说明
难度 ★★★★★
预估时长 14 小时(建议分 4 天,每天 3-4 小时)
前置章节 卷一第 11 章(内存模型)、第 12 章(RAII)、第 15 章(线程和锁)、第 16 章(STL 模板)
覆盖知识点 thread / jthread / mutex / lock_guard / unique_lock / shared_mutex / condition_variable / atomic / memory_order / future / promise / packaged_task / async / 线程池设计
设计亮点 多策略下单(普通秒杀分布式)+ 可重用线程池组件 + 完整的并发安全测试
⚠ 已知局限 不涉及无锁队列、原子内存序细节(这些在卷三《底层卷》)
最终产物 命令行交互式系统 + 压测程序
代码规模 约 1500 行 / 12 个文件

# 项目结构

order_system/
├── include/
│   ├── ThreadPool.h          # 通用线程池组件
│   ├── BlockingQueue.h       # 线程安全队列
│   ├── Order.h               # 订单实体
│   ├── Product.h             # 商品(含库存的原子计数)
│   ├── OrderStrategy.h       # 下单策略接口(抽象基类)
│   ├── NormalStrategy.h      # 策略:普通下单
│   ├── FlashStrategy.h       # 策略:秒杀(高并发)
│   ├── BatchStrategy.h       # 策略:批量下单
│   ├── OrderManager.h        # 订单管理器(含读写锁)
│   └── Logger.h              # 线程安全日志
├── src/                      # 各模块实现
├── main.cpp                  # 交互式 CLI
├── benchmark.cpp             # 压测程序(10000 并发请求)
└── README.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 一条命令编译运行

cd order_system
g++ -std=c++17 -pthread -Iinclude src/*.cpp main.cpp -o order_system
g++ -std=c++17 -pthread -Iinclude src/*.cpp benchmark.cpp -o bench

./order_system            # 交互式
./bench                   # 压测
1
2
3
4
5
6

⚠️ -pthread 不能省!Linux/macOS 必须加这个开关,否则链接会失败 undefined reference to pthread_create。Windows 用 MSVC 不需要。


# 📋 目录快速导航

  • 01.项目需求和功能
    • 1.1 业务场景
    • 1.2 三种下单策略 【🔑】
    • 1.3 并发挑战
  • 02.阻塞队列基础组件 【阶段①】
    • 灵魂三问:动手前先想清楚
    • Step 1.1 最小裸版输出
    • Step 1.2 双线程作死测试
    • Step 1.3 mutex 修复竞争
    • Step 1.4 阻塞 pop 与停机
  • 03.通用线程池 ThreadPool 【阶段②·骨架高峰⭐】
    • 灵魂三问:线程池设计要点
    • Step 2.1 线程池最小骨架
    • Step 2.2 packaged_task 入门
    • Step 2.3 模板 submit 实现 【🔑】
    • Step 2.4 千任务压测验证
  • 04.OrderManager 与读写锁 【阶段③】
    • Step 3.1 Order 单线程跑通
    • Step 3.2 mutex 基线压测
    • Step 3.3 shared_mutex 升级
  • 05.Product 防超卖 【阶段④·造bug高峰⭐】
    • Step 4.1 裸 int 埋 BUG
    • Step 4.2 千线程见超卖 【🔑】
    • Step 4.3 atomic CAS 修复
    • 🚨 加餐 · 死锁现场实录
  • 06.策略模式下单 【阶段⑤】
    • 灵魂三问:为什么不用 if-else?
    • Step 5.1 策略接口与基线
    • Step 5.2 FlashStrategy 并发
    • Step 5.3 BatchStrategy 分批
  • 07.线程安全 Logger 【阶段⑥】
    • Step 6.1 裸 cout 见撕裂
    • Step 6.2 mutex 折叠 Logger
  • 08.CLI 与压测 【阶段⑦·压测高峰】
    • Step 7.1 交互式 main
    • Step 7.2 benchmark 万 QPS
  • 09.项目总结速查
    • 9.1 整体架构
    • 9.2 并发原语速查表 【🔑】
    • 9.3 选择决策树 【🔑】
  • 10.项目技术思考
    • 10.1 三种锁的对比
    • 10.2 cv 与 unique_lock
    • 10.3 jthread 解决了什么
  • 11.衔接与延伸
    • 11.1 与上一案例的差异
    • 11.2 下一案例的递进
    • 11.3 三个延伸挑战

# 01.项目需求和功能

# 1.1 业务场景

电商秒杀系统是工业界最经典的并发场景:

  • 商品库存有限(如 100 件)
  • 几千个用户同时下单
  • 必须保证:不超卖、不漏单、响应快

本案例用 C++ 标准库实现一个微缩版的订单系统,同时演示线程池这个工业组件。线程池是几乎所有 C++ 服务端应用的基石,你后续学 Web 服务器、消息队列、数据库都绕不开它。

# 1.2 三种下单策略

策略 特点 实现要点
普通下单 NormalStrategy 单线程逐个处理 加 mutex 保护订单列表即可
秒杀下单 FlashStrategy 高并发抢库存 atomic CAS 防超卖 + 限流
批量下单 BatchStrategy 一次提交多个订单 future 收集每个的处理结果

业务规则:

  • 用户输入"商品 ID + 数量"即下单
  • 系统校验库存→扣减→生成订单→记录日志
  • 任何一步失败,整个下单回滚

# 1.3 并发挑战

典型 BUG 重现,这一节告诉你"为什么不加锁就会出问题":

// ❌ 错误代码(不加锁)
class Product {
    int stock = 100;
public:
    bool buy(int n) {
        if (stock >= n) {        // 检查
            stock -= n;          // 扣减
            return true;
        }
        return false;
    }
};

// 1000 个线程同时调 buy(1) 会怎样?
// 假设 stock = 1,线程 A 和 B 同时进 if (stock >= n),都通过
// A 执行 stock -= n → stock = 0
// B 执行 stock -= n → stock = -1   ⚠️ 超卖了!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

修正方案:用 std::atomic<int> 配合 CAS 循环,详见第 05 节。


# 02.阻塞队列基础组件

┌─ 🎯 阶段 ① 目标 ────────────────────────────────────────┐
│ 完成什么:BlockingQueue<T> 模板(线程安全的生产者消费者队列)│
│ 不做什么:还不接 ThreadPool,先把队列单独跑通              │
│ 验收标准:1 生产者 + 1 消费者两个线程互不干扰跑通 100 条数据 │
│ 预计耗时:60 min · 4 Step(每步都能编译运行)              │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6

💡 为什么先做这个? , 线程池 = "工人 + 任务队列"。任务队列单独抽出来叫 BlockingQueue,它会被工人线程并发 push/pop。先把队列做对,线程池就成功了一半。

⚠️ 本阶段的核心难点:你要亲眼看到"不加锁会乱",所以我们故意从"无锁裸版"起步,先看到崩溃,再一步步加 mutex → condition_variable → shutdown 把它治好。


# 灵魂三问:动手前先想清楚

❓ 既然 STL 已有 std::queue,为什么还要自己包一层?

来看反例:

// ❌ 反例:直接让多线程操作 std::queue
std::queue<int> q;
std::thread t1([&]{ for(int i=0;i<1000;++i) q.push(i); });   // 写
std::thread t2([&]{ while(!q.empty()) q.pop(); });           // 读
1
2
3
4

问题:

  1. std::queue 不是线程安全的,内部链表/数组指针被并发改写会直接 segfault
  2. empty() + pop() 不是原子的,刚检查非空,下一秒被 pop 掉,然后你又 pop 一次→崩
  3. 没办法"等数据来",只能 while 死循环 100% 吃 CPU

✅ 正确做法:把"队列+锁+条件变量"封装成一个独立组件,让所有并发细节集中在一个文件里。这就是软件工程上的"封装变化"。

❓ 为什么要先做无锁版(Step 1.1)再加锁(Step 1.3)?不是浪费时间吗? 答:这是教学法刻意安排。如果一上来就给你"完美的最终版",你只会机械记住代码而不知道每一行的存在意义。先看到无锁版的崩溃(Step 1.2),你才会真正理解为什么 mutex / cv / shutdown 一个都不能少。这与 04 案例 §6.3 故障演示、03 案例 §6.4 dirty bug 同源,先有问题,后有解法。

❓ 第一步该写哪个版本? 答:先写"裸 std::queue 包一层",5 行代码、可编译、能单线程跑通。然后用双线程作死它,让它在你眼前崩。

🔑 教学要点:永远不要直接抄"最终版完美代码",一定要按 v1 → v2 → v3 的演化路径学,每一步只解决一个问题。


# Step 1.1 最小裸版输出

新建 BlockingQueue.h,故意不加任何同步:

// BlockingQueue.h(v1:单线程版,没锁)
#pragma once
#include <queue>
#include <optional>

template <typename T>
class BlockingQueue {
private:
    std::queue<T> q;

public:
    void push(T item) {
        q.push(std::move(item));
    }

    std::optional<T> pop() {
        if (q.empty()) return std::nullopt;
        T item = std::move(q.front());
        q.pop();
        return item;
    }

    size_t size() const { return q.size(); }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

❌ 注意:模板类不能直接拆分为 .h 和 .cpp!

因为模板是在编译期实例化的,编译器需要看到完整的模板定义才能生成代码。如果模板定义在 .cpp 中,其他 .cpp 文件 #include 头文件时看不到实现,会报链接错误。

新建 test_queue.cpp,单线程先验证基本行为:

// test_queue.cpp
#include "BlockingQueue.h"
#include <iostream>

int main() {
    BlockingQueue<int> q;
    for (int i = 1; i <= 3; ++i) q.push(i);
    std::cout << "size = " << q.size() << "\n";

    while (auto v = q.pop()) {           // ⭐ optional 在 if 里直接当 bool
        std::cout << *v << " ";
    }
    std::cout << "\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
🧪 编译运行:
$ g++ -std=c++17 test_queue.cpp -o test_queue &amp;&amp; ./test_queue
size = 3
1 2 3
1
2
3
4

✅ 单线程下 FIFO 行为正确。但这只是表面平静,下一步我们用两个线程同时操作它,看它崩给你看。


# Step 1.2 双线程作死测试

先不动 BlockingQueue.h,直接用现有的"无锁版"跑双线程:

// test_queue.cpp(v2:双线程压测)
#include "BlockingQueue.h"
#include <iostream>
#include <thread>
#include <atomic>

int main() {
    BlockingQueue<int> q;
    std::atomic<int> consumed{0};

    // 生产者:push 10000 个数
    std::thread producer([&]{
        for (int i = 0; i < 10000; ++i) q.push(i);
    });

    // 消费者:循环 pop
    std::thread consumer([&]{
        while (consumed < 10000) {
            if (auto v = q.pop()) consumed++;
        }
    });

    producer.join();
    consumer.join();
    std::cout << "consumed = " << consumed.load() << "\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
🧪 编译运行(多跑几次!):
$ g++ -std=c++17 -pthread test_queue.cpp -o test_queue
$ ./test_queue
段错误(核心已转储)         ← 可能崩溃

$ ./test_queue
consumed = 9871              ← 也可能漏数据

$ ./test_queue
死循环                       ← 也可能卡住
1
2
3
4
5
6
7
8
9
10

⚠️ 三种现象都可能出现:

  • 崩溃:两个线程同时改 std::queue 内部链表/数组,内存损坏
  • 漏数据:size() 检查和 front/pop 之间被另一线程穿插
  • 卡死:消费者 pop 时队列暂时空,错过了生产者后续 push

📚 这就是 race condition(竞态)的可怕之处:没有报错信息,行为完全随机。今天单元测试通过 1000 次,明天上线就崩。要修它必须主动加锁。


# Step 1.3 mutex 修复竞争

升级 BlockingQueue.h:

// BlockingQueue.h(v2:加 mutex)
#pragma once
#include <queue>
#include <mutex>
#include <optional>

template <typename T>
class BlockingQueue {
private:
    std::queue<T>      q;
    mutable std::mutex mtx;        // ⭐ mutable:const 函数也能加锁

public:
    void push(T item) {
        std::lock_guard<std::mutex> lock(mtx);     // RAII:构造加锁,析构解锁
        q.push(std::move(item));
    }

    std::optional<T> pop() {
        std::lock_guard<std::mutex> lock(mtx);
        if (q.empty()) return std::nullopt;
        T item = std::move(q.front());
        q.pop();
        return item;
    }

    size_t size() const {
        std::lock_guard<std::mutex> lock(mtx);
        return q.size();
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

重新跑刚才的双线程测试:

🧪 编译运行(连跑 5 次都不应崩):
$ g++ -std=c++17 -pthread test_queue.cpp -o test_queue
$ for i in 1 2 3 4 5; do ./test_queue; done
consumed = 10000
consumed = 10000
consumed = 10000
consumed = 10000
consumed = 10000
1
2
3
4
5
6
7
8

✅ 5 次都得到 10000,race 治好了。 std::lock_guard vs std::unique_lock:

  • lock_guard:构造加锁、析构解锁,不能手动解锁,最轻量,本步够用
  • unique_lock:可以手动 lock()/unlock(),可以转移所有权,condition_variable 必须用它(下一步就要换)

⚠️ 当前版本还有一个缺陷:消费者用 if (auto v = q.pop()) 轮询,队列暂时空时,它会以 100% CPU 疯狂调用 pop。生产环境会被运维当成 bug 喷你。下一步用条件变量做"阻塞等待"。


# Step 1.4 阻塞 pop 与停机

升级 BlockingQueue.h 为最终版:

// BlockingQueue.h(v3:阻塞 + shutdown)
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>

template <typename T>
class BlockingQueue {
private:
    std::queue<T>             q;
    mutable std::mutex        mtx;
    std::condition_variable   cvNotEmpty;     // ⭐ 队列非空时通知
    bool                      stopped = false;

public:
    void push(T item) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            if (stopped) return;
            q.push(std::move(item));
        }
        cvNotEmpty.notify_one();              // ⭐ 锁外通知(性能更好)
    }

    // 阻塞 pop:队列空时挂起线程,不再轮询
    std::optional<T> pop() {
        std::unique_lock<std::mutex> lock(mtx);            // ⭐ 必须用 unique_lock
        cvNotEmpty.wait(lock, [this]{                      // ⭐ 谓词防虚假唤醒
            return !q.empty() || stopped;
        });
        if (q.empty() && stopped) return std::nullopt;     // 收到关闭信号

        T item = std::move(q.front());
        q.pop();
        return item;
    }

    // 关闭:通知所有 pop 中的线程退出
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(mtx);
            stopped = true;
        }
        cvNotEmpty.notify_all();              // ⭐ 唤醒所有等待者
    }

    size_t size() const {
        std::lock_guard<std::mutex> lock(mtx);
        return q.size();
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

最终测试,同时验证阻塞行为 + 优雅关闭:

// test_queue.cpp(v3:阻塞测试)
#include "BlockingQueue.h"
#include <iostream>
#include <thread>
#include <chrono>

int main() {
    BlockingQueue<int> q;

    // 消费者:先启动,等待数据(不会忙等)
    std::thread consumer([&]{
        while (true) {
            auto v = q.pop();
            if (!v) break;                           // shutdown 信号
            std::cout << "got " << *v << "\n";
        }
        std::cout << "consumer exit\n";
    });

    // 主线程:故意慢慢 push
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    for (int i = 1; i <= 3; ++i) {
        q.push(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }

    // 通知消费者退出
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    q.shutdown();
    consumer.join();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

🧪 预期输出(注意时间间隔):

[t=0ms]    main: 启动 consumer,500ms 后开始 push
[t=500ms]  got 1
[t=700ms]  got 2
[t=900ms]  got 3
[t=1400ms] consumer exit
1
2
3
4
5

✅ 关键观察:

  1. 0-500ms 期间消费者完全没消耗 CPU(用 top 看进程占用为 0),这就是阻塞 wait 的价值
  2. shutdown 后 pop() 返回 nullopt,消费者优雅退出,不悬挂
┌─ 📌 阶段 ① 小结 ────────────────────────────────────────┐
│ ✅ 已掌握(4 个 Step 闯关)                                 │
│   • Step 1.1 单线程裸版 FIFO                              │
│   • Step 1.2 双线程作死 → 亲眼看到 race condition          │
│   • Step 1.3 mutex 修复 race                              │
│   • Step 1.4 condition_variable 阻塞 + shutdown 优雅退出   │
│ 🔜 下一阶段 ②:用它喂养 ThreadPool                          │
│ 💡 commit 建议:feat: blocking queue with cv & shutdown   │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9

💼 三个铁律记下来:

  1. mutable mutex , const 函数加锁必须 mutable
  2. notify 在锁外 , 避免被唤醒线程立刻又被锁阻塞
  3. wait 必须传谓词 , 防 OS 虚假唤醒(spurious wakeup)

# 03.通用线程池 ThreadPool

┌─ 🎯 阶段 ② 目标 ────────────────────────────────────────┐
│ 完成什么:ThreadPool , 工人线程 + 任务队列 + submit 异步返回 │
│ 不做什么:暂不做优先级、不做动态扩缩容                       │
│ 验收标准:提交 1000 个任务能并行跑完,submit 能拿到 future   │
│ 预计耗时:120 min · 4 Step(重头戏:模板 submit)          │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6

💡 架构总览,心里先有这张图,后面 4 个 Step 才不会迷路:

┌──────────────┐     submit(task)
│ Application  │ ──────────────────────►┐
└──────────────┘                        │
                                        ▼
                              ┌────────────────────┐
                              │   BlockingQueue    │  ← 阶段 ① 的产物
                              │   &lt; void()函数 >    │
                              └─────┬──────────────┘
                                    │ pop()
                    ┌───────────────┼───────────────┐
                    ▼               ▼               ▼
              ┌─────────┐     ┌─────────┐     ┌─────────┐
              │worker 0 │     │worker 1 │ ... │worker N │
              └─────────┘     └─────────┘     └─────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 灵魂三问:线程池设计要点

❓ 为什么不每来一个任务就 new 一个线程?反正 std::thread 也很方便

来看反例:

// ❌ 每个请求开一个线程
void handleRequest(Request r) {
    std::thread t([r]{ /* 处理 */ });
    t.detach();
}
1
2
3
4
5

问题:

  1. 创建线程很贵,Linux pthread_create 约 50 微秒,处理一个简单请求可能就 1 微秒,99% 时间在创建线程
  2. 线程数无上限,10 万个请求来了你就有 10 万个线程,直接 OOM 或被内核拒绝
  3. 没有背压(backpressure),所有请求并发处理,下游数据库直接被打挂

✅ 正确做法:固定数量的工人线程 + 任务队列,线程数受控、复用线程、有天然背压。

❓ 任务队列存 std::function<void()> 而不是 std::packaged_task<R()>,那"返回值"怎么办?

这个问题的答案就是 Step 2.3 那行精华代码:

auto task = std::make_shared<std::packaged_task<R()>>(...);   // 真任务
tasks.push([task]{ (*task)(); });                              // 队列存的"包装层"
1
2

为什么搞这么复杂:

  • packaged_task 模板带 R,队列必须存"统一类型",不能既存 task<int()> 又存 task<string()>
  • std::function<void()> 是统一类型,不管原任务返回啥,包一层 lambda 调用即可
  • shared_ptr 是因为 packaged_task 不可拷贝,但 std::function 内部要求 callable 可拷贝

这就是 Step 2.3 的"三个细节"为什么标注 ⭐⭐⭐,三个 C++ 模板特性必须同时用到,缺一个就编不过。

❓ 为什么 ~ThreadPool() 必须先 shutdown 队列再 join 线程?反过来行不行?

绝对不行!来看反例:

// ❌ 反例:先 join 再 shutdown
~ThreadPool() {
    for (auto& w : workers) w.join();    // ⚠️ 线程在 BlockingQueue::pop() 里阻塞
    tasks.shutdown();                     // 永远跑不到这里
}
1
2
3
4
5

会发生什么:工人线程正卡在 pop() 的 cv.wait() 上等数据。主线程 w.join() 等工人结束。双方互等 → 永久死锁。

✅ 正确顺序:先 shutdown 让 pop 返回 nullopt → 工人 break 循环 → join 顺利完成。

🔑 教学要点:资源释放顺序在并发里和构造顺序一样重要。这是为什么 RAII 析构要严格按"反向构造顺序"。


# Step 2.1 线程池最小骨架

新建 ThreadPool.h,先不管模板 submit,让工人能转起来:

// ThreadPool.h(v1:最小骨架,没有 submit)
#pragma once
#include "BlockingQueue.h"
#include <thread>
#include <vector>
#include <functional>
#include <iostream>

class ThreadPool {
private:
    std::vector<std::thread>             workers;
    BlockingQueue<std::function<void()>> tasks;     // ⭐ 任务 = 无返回的函数

public:
    explicit ThreadPool(size_t n) {
        for (size_t i = 0; i < n; ++i) {
            workers.emplace_back([this, i]{
                std::cout << "worker " << i << " started\n";
                while (true) {
                    auto task = tasks.pop();
                    if (!task) break;               // shutdown 信号
                    (*task)();                       // 执行任务
                }
                std::cout << "worker " << i << " exit\n";
            });
        }
    }

    ~ThreadPool() {
        tasks.shutdown();
        for (auto& w : workers) {
            if (w.joinable()) w.join();
        }
    }

    // 临时简化版:只接受 void() 任务
    void submit_simple(std::function<void()> task) {
        tasks.push(std::move(task));
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

测试,给工人喂 5 个 lambda 任务:

// test_pool.cpp
#include "ThreadPool.h"
#include <iostream>
#include <chrono>

int main() {
    {
        ThreadPool pool(3);
        for (int i = 1; i <= 5; ++i) {
            pool.submit_simple([i]{
                std::cout << "task " << i << " running on tid="
                          << std::this_thread::get_id() << "\n";
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            });
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }   // 离开作用域 → ~ThreadPool() 自动 shutdown + join
    std::cout << "main exit\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
🧪 预期输出(顺序可能不同,因为是并发):
worker 0 started
worker 1 started
worker 2 started
task 1 running on tid=0x70...01
task 2 running on tid=0x70...02
task 3 running on tid=0x70...03
task 4 running on tid=0x70...01
task 5 running on tid=0x70...02
worker 0 exit
worker 1 exit
worker 2 exit
main exit
1
2
3
4
5
6
7
8
9
10
11
12
13

✅ 关键观察:

  1. 5 个任务跑在 3 个不同的 tid 上(即真的并行)
  2. task 4/5 复用了 task 1/2 的工人线程,这就是"线程池"省下的"创建/销毁线程"开销
  3. 离开作用域自动 shutdown , RAII 的力量

⚠️ 但 submit_simple 太弱了:没有返回值机制。下一步要让 pool.submit([]{ return 42; }) 这种带返回值的任务也能跑,并且能在主线程拿到结果。


# Step 2.2 packaged_task 入门

在写正式 submit 前,先单独练熟"三剑客",packaged_task / future / promise。这是 C++ 异步任务的基石。

// test_future.cpp(独立小练习,跟 ThreadPool 暂时无关)
#include <future>
#include <thread>
#include <iostream>

int main() {
    // 1. packaged_task:把一个普通函数包装成"会写入 promise"的对象
    std::packaged_task<int()> task([]{
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        return 42;
    });

    // 2. 拿到 future(异步结果的"句柄")
    std::future<int> fut = task.get_future();

    // 3. 把 task 扔到另一线程执行
    std::thread t(std::move(task));
    t.detach();

    std::cout << "main waiting...\n";
    int result = fut.get();                        // ⭐ 阻塞 500ms 等结果
    std::cout << "got " << result << "\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

🧪 预期输出:

main waiting...
[等待 500ms]
got 42
1
2
3

✅ 这就是异步返回值的标准玩法:packaged_task 装函数 → get_future() 拿凭证 → 另一线程执行 → fut.get() 在主线程取结果。

📚 三剑客的关系:

  • std::promise<T>:写入端("将来我会给你一个 T")
  • std::future<T>:读取端("等着拿那个 T")
  • std::packaged_task<T()>:把函数+promise 打包,函数返回值自动写进 promise

# Step 2.3 模板 submit 实现

升级 ThreadPool.h:

// ThreadPool.h(v2:模板 submit)
#pragma once
#include "BlockingQueue.h"
#include <thread>
#include <vector>
#include <functional>
#include <future>
#include <atomic>
#include <memory>

class ThreadPool {
private:
    std::vector<std::thread>             workers;
    BlockingQueue<std::function<void()>> tasks;
    std::atomic<bool>                    stopped{false};

public:
    explicit ThreadPool(size_t n = std::thread::hardware_concurrency()) {
        for (size_t i = 0; i < n; ++i) {
            workers.emplace_back([this]{
                while (true) {
                    auto task = tasks.pop();
                    if (!task) break;
                    try { (*task)(); }
                    catch (...) { /* 工作线程必须吞异常 */ }
                }
            });
        }
    }

    ~ThreadPool() { shutdown(); }

    // ⭐⭐⭐ 重头戏:万能 submit
    template <typename F, typename... Args>
    auto submit(F&& f, Args&&... args)
        -> std::future<std::invoke_result_t<F, Args...>>     // 返回类型推导
    {
        using R = std::invoke_result_t<F, Args...>;

        // 1. 把 f + args 打包成 packaged_task
        //    用 shared_ptr 是因为 packaged_task 不可拷贝(只能移动),
        //    但 std::function 要求 callable 可拷贝 → 套一层 shared_ptr 解决
        auto task = std::make_shared<std::packaged_task<R()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        // 2. 提前拿到 future(必须在 push 之前,因为 push 后 task 可能立即被工人执行)
        std::future<R> fut = task->get_future();

        // 3. 把"调用 task 的 lambda"扔进队列
        tasks.push([task]{ (*task)(); });
        return fut;
    }

    void shutdown() {
        if (stopped.exchange(true)) return;     // 防重复 shutdown
        tasks.shutdown();
        for (auto& w : workers) {
            if (w.joinable()) w.join();
        }
    }

    size_t pending() const { return tasks.size(); }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

📚 三个细节,建议在代码上画三个箭头标记:

  1. std::invoke_result_t<F, Args...> , C++17 推导"如果用 Args 调用 F,返回什么类型"
  2. std::make_shared<packaged_task> , 套 shared_ptr 是因为 packaged_task 不可拷贝,但 std::function 要求 callable 可拷贝(会被 push 进队列复制)
  3. 先 get_future() 再 tasks.push(...) , 顺序不能反!push 后任务可能秒被执行掉,再 get_future 就拿不到了

测试 3 种返回类型:

// test_pool.cpp(v2:测试 submit)
#include "ThreadPool.h"
#include <iostream>

int main() {
    ThreadPool pool(4);

    // ① 无参 lambda 返回 int
    auto f1 = pool.submit([]{ return 42; });

    // ② 带参 lambda 返回 int
    auto f2 = pool.submit([](int a, int b){ return a + b; }, 3, 5);

    // ③ 返回 string
    auto f3 = pool.submit([](std::string name){
        return "hello, " + name;
    }, std::string("world"));

    std::cout << "f1 = " << f1.get() << "\n";
    std::cout << "f2 = " << f2.get() << "\n";
    std::cout << "f3 = " << f3.get() << "\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

🧪 预期输出:

f1 = 42
f2 = 8
f3 = hello, world
1
2
3

✅ 三种类型完美拿回结果。真正的异步任务编程器已经到手。


# Step 2.4 千任务压测验证

写一个并行平方求和的小压测,既验证正确性,也感受线程池的性能:

// test_pool.cpp(v3:1000 任务压测)
#include "ThreadPool.h"
#include <iostream>
#include <chrono>
#include <vector>

int main() {
    ThreadPool pool(std::thread::hardware_concurrency());
    std::cout << "threads = " << std::thread::hardware_concurrency() << "\n";

    auto t0 = std::chrono::steady_clock::now();

    // 提交 1000 个任务,每个算 i*i
    std::vector<std::future<long long>> futs;
    for (int i = 1; i <= 1000; ++i) {
        futs.push_back(pool.submit([i]{
            // 故意做一点 CPU 工作模拟真实任务
            long long sum = 0;
            for (int j = 0; j < 100000; ++j) sum += j;
            return (long long)i * i;
        }));
    }

    // 收集结果
    long long total = 0;
    for (auto& f : futs) total += f.get();

    auto t1 = std::chrono::steady_clock::now();
    auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0).count();

    std::cout << "sum of i*i (1..1000) = " << total << "\n";
    std::cout << "expected             = " << (1000LL * 1001 * 2001 / 6) << "\n";
    std::cout << "time = " << ms << " ms\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

🧪 预期输出(4 核机器):

threads = 8
sum of i*i (1..1000) = 333833500
expected             = 333833500
time = 35 ms
1
2
3
4

✅ 3 个验证全过:

  1. 数值正确:sum 与公式 n(n+1)(2n+1)/6 完全相等(无丢任务、无重复执行)
  2. 真并行:单线程跑同样任务约 280ms,8 线程 35ms ≈ 8 倍加速
  3. 优雅退出:main 结束时 ~ThreadPool() 自动 shutdown + join,无悬挂
┌─ 📌 阶段 ② 小结 ────────────────────────────────────────┐
│ ✅ 已掌握(4 个 Step 闯关)                                 │
│   • Step 2.1 工人骨架 + submit_simple 跑通调度              │
│   • Step 2.2 packaged_task / future 三剑客单练 ⭐           │
│   • Step 2.3 模板 submit + invoke_result_t + shared_ptr 三大坑│
│   • Step 2.4 1000 任务压测 → 8 倍加速 + 数值守恒            │
│ 🔜 下一阶段 ③:用线程池跑 OrderManager(读多写多)          │
│ 💡 commit 建议:feat: thread pool with future submit      │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9

💼 关键性质回顾:

  • 任务异步执行:submit 立即返回,工作线程后台跑
  • 任务有结果返回:submit 返回 future,调用方 .get() 拿结果
  • 任务异常隔离:一个任务抛异常被 catch (...) 吞下,不影响其他任务

# 04.OrderManager 与读写锁

┌─ 🎯 阶段 ③ 目标 ────────────────────────────────────────┐
│ 完成什么:OrderManager(订单存储 + 多线程读写)              │
│ 不做什么:暂不接策略,先单独跑通"多线程下单 + 多线程查询"     │
│ 验收标准:先用 mutex 跑通 → 改 shared_mutex 看到读吞吐翻倍    │
│ 预计耗时:60 min · 3 Step(含锁性能对比)                  │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6

💡 本阶段灵魂三问

  1. 订单 ID 怎么生成?不能两个线程拿到同一个 ID → std::atomic<int>
  2. 订单列表是 vector,多线程并发 push_back 会崩 → 加锁
  3. 80% 是查询、20% 是下单,普通 mutex 让查询互相等。怎么破?→ std::shared_mutex

# Step 3.1 Order 单线程跑通

新建 Order.h,纯数据载体:

// Order.h
#pragma once
#include <string>

struct Order {
    int id;
    int userId;
    int productId;
    int quantity;

    Order() = default;
    Order(int i, int u, int p, int q)
        : id(i), userId(u), productId(p), quantity(q) {}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14

新建 OrderManager.h(v1:先用最简单的 mutex):

// OrderManager.h(v1:mutex 版)
#pragma once
#include "Order.h"
#include <vector>
#include <mutex>
#include <atomic>
#include <optional>

class OrderManager {
private:
    std::vector<Order>     orders;
    mutable std::mutex     mtx;            // ⭐ v1 用普通 mutex
    std::atomic<int>       nextId{1};      // ⭐ ID 自增用 atomic

public:
    int addOrder(int userId, int productId, int qty) {
        Order o(nextId.fetch_add(1), userId, productId, qty);
        std::lock_guard<std::mutex> lock(mtx);
        orders.push_back(o);
        return o.id;
    }

    std::optional<Order> findOrder(int id) const {
        std::lock_guard<std::mutex> lock(mtx);
        for (const auto& o : orders) {
            if (o.id == id) return o;
        }
        return std::nullopt;
    }

    size_t size() const {
        std::lock_guard<std::mutex> lock(mtx);
        return orders.size();
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

测试,单线程基本功能:

// test_mgr.cpp
#include "OrderManager.h"
#include <iostream>

int main() {
    OrderManager mgr;
    int id1 = mgr.addOrder(101, 1, 2);
    int id2 = mgr.addOrder(102, 1, 1);
    std::cout << "id1=" << id1 << " id2=" << id2 << " size=" << mgr.size() << "\n";

    if (auto o = mgr.findOrder(id1)) {
        std::cout << "找到 id=" << o->id << " user=" << o->userId
                  << " qty=" << o->quantity << "\n";
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

🧪 编译运行:

$ g++ -std=c++17 -pthread test_mgr.cpp -o test_mgr &amp;&amp; ./test_mgr
id1=1 id2=2 size=2
找到 id=1 user=101 qty=2
1
2
3

✅ 单线程基本路径通了。注意 ID 是从 1 开始递增的,这是 atomic 在功劳,下面会让多线程同时下单验证 ID 不会重复。


# Step 3.2 mutex 基线压测

升级测试,8 个写线程 + 8 个读线程同时跑:

// test_mgr.cpp(v2:并发压测)
#include "OrderManager.h"
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <atomic>

int main() {
    OrderManager mgr;
    const int WRITES = 1000;        // 每个写线程下 1000 单
    const int READS  = 100000;      // 每个读线程查 10w 次
    const int W_THREADS = 4;
    const int R_THREADS = 8;

    std::atomic<int> readHits{0};
    auto t0 = std::chrono::steady_clock::now();

    std::vector<std::thread> threads;
    // 写线程
    for (int w = 0; w < W_THREADS; ++w) {
        threads.emplace_back([&, w]{
            for (int i = 0; i < WRITES; ++i) mgr.addOrder(w, 1, 1);
        });
    }
    // 读线程
    for (int r = 0; r < R_THREADS; ++r) {
        threads.emplace_back([&]{
            for (int i = 0; i < READS; ++i) {
                if (mgr.findOrder(1)) readHits++;
            }
        });
    }
    for (auto& t : threads) t.join();

    auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                std::chrono::steady_clock::now() - t0).count();

    std::cout << "orders = " << mgr.size()
              << " (expect=" << W_THREADS * WRITES << ")\n";
    std::cout << "read hits = " << readHits.load()
              << " / " << R_THREADS * READS << "\n";
    std::cout << "time = " << ms << " ms  ← 记下这个数字\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

🧪 预期输出(mutex 版基线,4 核机器):

orders = 4000 (expect=4000)
read hits = 800000 / 800000
time = 1850 ms       ← 写下来!下一步对比
1
2
3

✅ 核心验证:

  1. orders=4000 完全等于期望,atomic ID 没冲突、push_back 没崩
  2. 所有读都 hit,没漏数据
  3. 总耗时 1850ms 作为基线,下一步 shared_mutex 应该明显更快

# Step 3.3 shared_mutex 升级

把 OrderManager.h 中的 mutex 替换为 shared_mutex:

// OrderManager.h(v2:shared_mutex 版)
#pragma once
#include "Order.h"
#include <vector>
#include <shared_mutex>          // ⭐ C++17 共享锁
#include <atomic>
#include <optional>

class OrderManager {
private:
    std::vector<Order>           orders;
    mutable std::shared_mutex    mtx;            // ⭐ 改成 shared_mutex
    std::atomic<int>             nextId{1};

public:
    int addOrder(int userId, int productId, int qty) {
        Order o(nextId.fetch_add(1), userId, productId, qty);
        std::unique_lock lock(mtx);              // ⭐ 写:独占锁
        orders.push_back(std::move(o));
        return o.id;
    }

    std::optional<Order> findOrder(int id) const {
        std::shared_lock lock(mtx);              // ⭐ 读:共享锁
        for (const auto& o : orders) {
            if (o.id == id) return o;
        }
        return std::nullopt;
    }

    std::vector<Order> ordersByUser(int userId) const {
        std::shared_lock lock(mtx);              // 读锁
        std::vector<Order> result;
        for (const auto& o : orders) {
            if (o.userId == userId) result.push_back(o);
        }
        return result;
    }

    size_t size() const {
        std::shared_lock lock(mtx);
        return orders.size();
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

重新跑同一份压测:

🧪 预期输出(shared_mutex 版):
orders = 4000 (expect=4000)
read hits = 800000 / 800000
time = 720 ms         ← 比 mutex 的 1850ms 快 ~2.5 倍 ✅
1
2
3
4

✅ 同样的代码,读多场景吞吐提升 2-3 倍。这就是 shared_mutex 的回报。

📚 shared_mutex 访问规则:

当前持有 谁能进入
无锁 任何线程
独占(unique_lock) 只有当前线程
共享(shared_lock) 任意多个读者

⚠️ shared_mutex 不是银弹:写多场景反而比 mutex 慢(构造析构成本高),有"写者饥饿"风险。只在读 > 80% 时才换。

┌─ 📌 阶段 ③ 小结 ────────────────────────────────────────┐
│ ✅ 已掌握                                                 │
│   • Step 3.1 atomic ID 自增 + Order 数据类                 │
│   • Step 3.2 mutex 版基线压测(记下 1850ms)               │
│   • Step 3.3 shared_mutex 改造 → 读吞吐翻倍 ⭐             │
│ 🔜 下一阶段 ④:Product 超卖 BUG 重现 → atomic CAS 修复       │
│ 💡 commit 建议:feat: order manager with shared_mutex     │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6
7
8

# 05.Product 防超卖

┌─ 🎯 阶段 ④ 目标 ──────────── 本案例最重磅一战 ────────────┐
│ 完成什么:让你亲眼看到"100件库存卖出 103 件"的超卖 BUG     │
│           然后用 atomic CAS 修复它                          │
│           🆕 加餐:再亲眼看一次"死锁"现场 + 两种修复法       │
│ 不做什么:不引入分布式锁、不接 Redis(本卷范畴外)            │
│ 验收标准:bug 版能压测出超卖;修复版 CAS 后零超卖             │
│           死锁加餐版能复现 + std::lock 修复成功              │
│ 预计耗时:90 min · 3 Step + 加餐 3 Step                    │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9

💡 为什么这一阶段值得 60 分钟? , 这是新手最容易"以为自己懂了但其实没懂"的并发知识点。亲手造一次超卖 BUG,胜过看 100 篇文章。


# Step 4.1 裸 int 埋 BUG

新建 Product.h,故意不加任何同步:

// Product.h(v1:裸 int 版,有 BUG)
#pragma once
#include <string>

class Product {
private:
    int          id;
    std::string  name;
    double       price;
    int          stock;            // ⚠️ 裸 int,将来会出事

public:
    Product(int id_, std::string n, double p, int s)
        : id(id_), name(std::move(n)), price(p), stock(s) {}

    int getId() const { return id; }
    int getStock() const { return stock; }

    // ⚠️ 经典超卖代码,看似没问题,多线程下会出大事
    bool buy(int n) {
        if (stock >= n) {            // ① 检查
            stock -= n;              // ② 扣减
            return true;
        }
        return false;
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

单线程测试,看起来"完全正确":

// test_product.cpp
#include "Product.h"
#include <iostream>

int main() {
    Product p(1, "iPhone", 7999, 5);
    for (int i = 0; i < 7; ++i) {
        bool ok = p.buy(1);
        std::cout << "买 1 件: " << (ok ? "成功" : "失败")
                  << "  剩余: " << p.getStock() << "\n";
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
🧪 预期输出:
买 1 件: 成功  剩余: 4
买 1 件: 成功  剩余: 3
买 1 件: 成功  剩余: 2
买 1 件: 成功  剩余: 1
买 1 件: 成功  剩余: 0
买 1 件: 失败  剩余: 0
买 1 件: 失败  剩余: 0
1
2
3
4
5
6
7
8

✅ 单线程下完美,成功了 5 次,剩余 0,后两次失败。就是这种"单测全过"的代码骗了无数新手上线。下一步用 1000 个线程同时调,让你看清真相。


# Step 4.2 千线程见超卖

升级测试,100 件库存 + 200 个并发线程,每人买 1 件:

// test_product.cpp(v2:并发压测)
#include "Product.h"
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>

int main() {
    Product p(1, "iPhone", 7999, 100);     // ⭐ 库存 100
    std::atomic<int> successCount{0};

    std::vector<std::thread> threads;
    for (int i = 0; i < 200; ++i) {        // ⭐ 200 个并发买家
        threads.emplace_back([&]{
            if (p.buy(1)) successCount++;
        });
    }
    for (auto& t : threads) t.join();

    std::cout << "成功购买:   " << successCount.load() << " 件\n";
    std::cout << "剩余库存:   " << p.getStock() << " 件\n";
    std::cout << "成功+剩余:  " << successCount.load() + p.getStock() << "\n";
    std::cout << "原始库存:   100\n";
    std::cout << "一致性:     " << (successCount + p.getStock() == 100 ? "✅" : "❌ 超卖!") << "\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

🧪 编译运行(多跑几次):

$ g++ -std=c++17 -pthread -O0 test_product.cpp -o test_product
$ for i in 1 2 3 4 5; do ./test_product; echo "---"; done

成功购买:   103 件
剩余库存:   -3 件          ← 超卖!
成功+剩余:  100
一致性:     ✅
---
成功购买:   100 件
剩余库存:   0 件
一致性:     ✅              ← 这一次没超卖(运气好)
---
成功购买:   105 件
剩余库存:   -5 件          ← 又超卖
一致性:     ✅
---
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

⚠️ 看见了吗,"成功+剩余 = 100" 满足,但 stock 居然变负数了!

BUG 解释,两个线程交错执行:

线程 A 进入 buy(1):
  ① if (stock >= 1)         读 stock = 1,通过 ✅
  [此时被 OS 切走]
线程 B 进入 buy(1):
  ① if (stock >= 1)         读 stock = 1,也通过 ✅
  ② stock -= 1              stock = 0
  返回 true
线程 A 恢复:
  ② stock -= 1              stock = -1   💥 超卖!
  返回 true
1
2
3
4
5
6
7
8
9
10

📚 核心错误:if(stock>=n) stock-=n 这两步操作不是原子的,读和写中间被其他线程插入了相同的读和写,导致条件判断失效。这就是经典的 check-then-act race。


# Step 4.3 atomic CAS 修复

升级 Product.h 为最终版:

// Product.h(v2:atomic CAS 版)
#pragma once
#include <string>
#include <atomic>

class Product {
private:
    int               id;
    std::string       name;
    double            price;
    std::atomic<int>  stock;        // ⭐ 改成原子变量

public:
    Product(int id_, std::string n, double p, int s)
        : id(id_), name(std::move(n)), price(p), stock(s) {}

    int getId() const { return id; }
    int getStock() const { return stock.load(); }

    // ⭐⭐⭐ CAS 循环扣库存(防超卖)
    bool tryDeduct(int n) {
        int cur = stock.load();
        while (cur >= n) {
            // 含义:如果 stock 仍然是 cur,则把它改成 cur - n
            // 失败时 cur 会被自动更新为最新值,进入下一轮
            if (stock.compare_exchange_weak(cur, cur - n)) {
                return true;       // 成功扣减
            }
        }
        return false;             // 库存不够(cur < n)
    }

    void restock(int n) { stock.fetch_add(n); }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

把测试中的 p.buy(1) 改成 p.tryDeduct(1),重新压测:

🧪 编译运行(连跑 5 次都不应超卖):
$ for i in 1 2 3 4 5; do ./test_product; echo "---"; done

成功购买:   100 件
剩余库存:   0 件
成功+剩余:  100
一致性:     ✅              ← 永远不超卖
---
(5 次完全一致)
1
2
3
4
5
6
7
8
9

✅ 零超卖、零漏单,这就是 atomic CAS 的力量。

CAS(Compare-And-Swap)执行轨迹:

初始 stock = 1
线程 A 进入 tryDeduct(1):cur = 1,尝试 CAS(1 → 0)
线程 B 进入 tryDeduct(1):cur = 1,尝试 CAS(1 → 0)

→ A 先执行 CAS 成功:stock = 0,返回 true
→ B 的 CAS 失败:cur 自动被更新为 0
→ B 检查 cur >= 1 失败,跳出 while,返回 false
1
2
3
4
5
6
7

整个判断+扣减是原子的,OS 切线程也插不进缝。

💼 weak vs strong:compare_exchange_weak 在某些 CPU(如 ARM)会"伪失败"(即使值相等也偶尔返回 false),但循环里会立即重试,性能比 strong 好。一次性判断必须用 strong。CAS 循环里永远首选 weak。

┌─ 📌 阶段 ④ 小结 ────────────────────────────────────────┐
│ ✅ 你刚刚亲手做完了:                                       │
│   • Step 4.1 故意写 buy() 裸 int 版                        │
│   • Step 4.2 200 线程压测 → 亲眼看到 stock = -3 的超卖现场   │
│   • Step 4.3 atomic + compare_exchange_weak 修复           │
│ 🎓 这一阶段你掌握的不是"如何写代码",而是"为什么要写"          │
│ 🔜 下一阶段 ⑤:用策略模式串起所有组件                        │
│ 💡 commit 建议:fix: prevent oversell with atomic CAS     │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9

# Step 4.4 故意造死锁

⚠️ 强烈建议跑一遍(并发四大金刚之二),race condition 你刚见过了,死锁是另一种灾难:程序不崩、不报错,只是永远卡死。你今天踩过这个坑,将来上线代码就不会重蹈覆辙。

场景:用户想把账户 A 的钱转账到账户 B。每个账户有自己的锁。

// deadlock_demo.cpp(v1:经典交叉申请,必死锁)
#include <iostream>
#include <mutex>
#include <thread>
#include <chrono>

class Account {
public:
    int             balance;
    std::mutex      mtx;
    explicit Account(int b) : balance(b) {}
};

// ⚠️ 错误的转账实现:先锁 from 再锁 to
void transfer(Account& from, Account& to, int amount) {
    std::lock_guard<std::mutex> l1(from.mtx);
    std::this_thread::sleep_for(std::chrono::milliseconds(10));   // ⭐ 放大死锁概率
    std::lock_guard<std::mutex> l2(to.mtx);
    from.balance -= amount;
    to.balance   += amount;
}

int main() {
    Account a(1000), b(1000);

    // ⚠️ 两个线程,转账方向相反
    std::thread t1([&]{ transfer(a, b, 100); });   // 先锁 a 再锁 b
    std::thread t2([&]{ transfer(b, a, 200); });   // 先锁 b 再锁 a

    t1.join();
    t2.join();
    std::cout << "done a=" << a.balance << " b=" << b.balance << "\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

🧪 编译运行:

$ g++ -std=c++17 -pthread deadlock_demo.cpp -o dd &amp;&amp; ./dd
(程序卡死,永远不输出 done)

# 用 Ctrl+C 强行结束,或用另一个终端 jstack/lldb 看:
# t1 持有 a.mtx,等 b.mtx
# t2 持有 b.mtx,等 a.mtx
# 互相等对方释放 → 永久死锁
1
2
3
4
5
6
7

⚠️ 死锁四要素全部满足(操作系统课的"必考"):

  1. 互斥 , mutex 同一时间只能被一个线程持有 ✅
  2. 持有并等待 , t1 持有 a.mtx 还想要 b.mtx ✅
  3. 不可剥夺 , 持有的锁 OS 不会强制收回 ✅
  4. 循环等待 , t1 等 t2,t2 等 t1,形成环 ✅

📚 为什么死锁比 race condition 更可怕?

  • race 的崩溃有报错(segfault / 数据错乱),容易发现
  • deadlock 没报错,程序就在那里"安静地"卡死,监控指标全绿,QPS = 0,运维以为机器卡了重启就行,重启完还会再死

# Step 4.5 修复方案 A:std::lock 一次锁全部

C++ 标准库给了一个专门用于多锁场景的工具 std::lock,一次性锁住一组锁,保证不死锁:

// 修复版 1:std::lock + std::adopt_lock
void transfer(Account& from, Account& to, int amount) {
    std::lock(from.mtx, to.mtx);                              // ⭐ 同时申请两把锁
    std::lock_guard<std::mutex> l1(from.mtx, std::adopt_lock); // ⭐ adopt_lock:接管已锁
    std::lock_guard<std::mutex> l2(to.mtx,   std::adopt_lock);
    from.balance -= amount;
    to.balance   += amount;
}
1
2
3
4
5
6
7
8

或者用 C++17 的 std::scoped_lock(更简洁):

// 修复版 1+:scoped_lock(C++17)
void transfer(Account& from, Account& to, int amount) {
    std::scoped_lock lock(from.mtx, to.mtx);    // ⭐ 一行搞定,保证不死锁
    from.balance -= amount;
    to.balance   += amount;
}
1
2
3
4
5
6

📚 std::lock 内部实现:用 try_lock 序列尝试锁所有,只要有一个失败就把已经拿到的全部释放,再重来。它打破了"持有并等待"的死锁条件,要么全拿到,要么一个都不拿。

# Step 4.6 修复方案 B:锁顺序约定(更通用)

std::lock 只能解决这一对锁的死锁。如果系统里有 1000 个 Account,锁的对象事先不知道,怎么办?

✅ 业界标准做法:全局规定一个"锁顺序",比如按地址排序:

// 修复版 2:锁顺序约定(按地址排序)
void transfer(Account& from, Account& to, int amount) {
    auto* first  = std::min(&from, &to, [](auto a, auto b){ return a < b; });
    auto* second = std::max(&from, &to, [](auto a, auto b){ return a < b; });

    std::lock_guard<std::mutex> l1(first->mtx);
    std::lock_guard<std::mutex> l2(second->mtx);
    from.balance -= amount;
    to.balance   += amount;
}
1
2
3
4
5
6
7
8
9
10

这样不管 t1 是 transfer(a,b) 还是 t2 是 transfer(b,a),两个线程都会按"小地址先锁"的顺序申请,循环等待就被打破了。

💼 真实工程经验:MySQL InnoDB、PostgreSQL 的行锁、Linux 内核的 spinlock 都用"锁顺序"避免死锁。给每个锁分配一个全局唯一的 rank(等级),只能从小 rank 申请到大 rank,这是"锁层次"(lock hierarchy)的最严格形式,违反层次会直接 panic。

🧪 任选一种修复版重新跑:
$ g++ -std=c++17 -pthread deadlock_demo.cpp -o dd &amp;&amp; ./dd
done a=1100 b=900               ← 顺利输出!
1
2
3

✅ 死锁治好了。这一节学到的不仅是 std::lock 这个 API,更是面对并发资源依赖图的两种通用思想:

  1. 原子获取所有依赖(std::lock 思想)
  2. 强制全局顺序(锁层次思想)

🔑 教学要点:本案例 §02-§07 的真实代码里没有"两个不同 mutex 交叉申请"的场景,所以这是加餐演示。但生产环境一旦你有"操作多个对象"的业务(订单转账、库存调拨、消息群发),死锁就会找上门。今天先在这里扎扎实实见过它一次。


# 06.策略模式下单

┌─ 🎯 阶段 ⑤ 目标 ────────────────────────────────────────┐
│ 完成什么:用策略模式串起 ThreadPool + Product + OrderManager │
│           三种下单方式:Normal / Flash / Batch              │
│ 不做什么:暂不做 CLI,先用 main 直接调对比效果                │
│ 验收标准:同样下 100 单,看到 Flash 比 Normal 快 5-8 倍       │
│ 预计耗时:60 min · 3 Step                                  │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6
7

💡 策略模式(Strategy Pattern):把"做某事的多种方式"抽成接口,调用方传哪个就用哪个。在这里,同样是下单,单线程顺序 vs 线程池并发 vs 分批,调用代码完全不变,只换策略对象。


# 灵魂三问:为什么不用 if-else?

三种下单方式直接写 if (mode == FLASH) { ... } else if (mode == BATCH) { ... } 不就完了?

来看反例:

// ❌ 反例:if-else 巨型函数
std::vector<int> placeOrders(int mode, Product& p, OrderManager& mgr,
                              int userId, std::vector<int> qtys) {
    if (mode == NORMAL) {
        // 50 行普通下单代码
    } else if (mode == FLASH) {
        // 80 行秒杀代码(涉及线程池)
    } else if (mode == BATCH) {
        // 60 行分批代码(涉及 future 收集)
    }
    // 想加个分布式策略?再 else if?
}
1
2
3
4
5
6
7
8
9
10
11
12

问题:

  1. 违反开闭原则,加新策略要改这个巨型函数
  2. 依赖发散,这一个函数同时依赖 ThreadPool / future / 分批逻辑,没法单独测试 NormalStrategy
  3. switch case 永远在迭代,业务越发展越乱

✅ 正确做法:抽 OrderStrategy 接口 + 三个独立子类,加新策略只需新增一个文件,不动旧代码。这就是策略模式的"开"(对扩展开放)"闭"(对修改关闭)。

❓ 为什么 FlashStrategy 接受 ThreadPool& 引用而不是自己 new 一个?

来看反例:

// ❌ 反例:策略内部自己 new 线程池
class FlashStrategy : public OrderStrategy {
    ThreadPool pool{8};      // 每个策略对象 8 个线程
    // ...
};
1
2
3
4
5

问题:

  1. 资源浪费,程序里如果同时有 NormalStrategy + FlashStrategy + BatchStrategy 实例,会有 3 个线程池 = 24 个线程
  2. 生命周期失控,策略对象析构时线程池跟着析构,正在跑的任务被强制中断
  3. 测试困难,单测 FlashStrategy 必须真启 8 个线程,跑不快

✅ 正确做法:依赖注入,线程池由调用方创建并注入,策略对象只是"用户"。这与 04 案例 §07 main 注入 KvDatabase 给 REPL 是同一个思想。

❓ 三种策略性能差距是否一定满足"Normal < Batch < Flash"? 答:不一定!本案例的任务太轻(atomic 加一个 int),并行收益不大。任务越重,并发优势越明显。这就是为什么 Step 5.2 要在测试代码里说"任务里加 100ms 的 IO 才会接近 8 倍加速"。

🔑 教学要点:策略模式的精髓不是"避免 if-else",而是让"算法族"成为一等公民,它们可以被传递、组合、替换、单独测试。


# Step 5.1 策略接口与基线

新建 OrderStrategy.h:

// OrderStrategy.h
#pragma once
#include "Product.h"
#include "OrderManager.h"
#include <vector>

class OrderStrategy {
public:
    virtual ~OrderStrategy() = default;
    virtual std::vector<int> placeOrders(
        Product& product,
        OrderManager& mgr,
        int userId,
        const std::vector<int>& quantities) = 0;       // 纯虚函数
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

新建 NormalStrategy.h,最简单的实现:

// NormalStrategy.h
#pragma once
#include "OrderStrategy.h"

class NormalStrategy : public OrderStrategy {
public:
    std::vector<int> placeOrders(Product& product, OrderManager& mgr,
                                  int userId,
                                  const std::vector<int>& quantities) override {
        std::vector<int> orderIds;
        for (int qty : quantities) {
            if (product.tryDeduct(qty)) {
                orderIds.push_back(mgr.addOrder(userId, product.getId(), qty));
            } else {
                orderIds.push_back(-1);             // 失败标记
            }
        }
        return orderIds;
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

测试,下 10 单看流程通:

// test_strategy.cpp
#include "NormalStrategy.h"
#include <iostream>

int main() {
    Product p(1, "iPhone", 7999, 5);              // ⭐ 库存只有 5
    OrderManager mgr;
    NormalStrategy normal;

    std::vector<int> qtys(10, 1);                  // 想下 10 单(每单 1 件)
    auto ids = normal.placeOrders(p, mgr, 100, qtys);

    int success = 0;
    for (int id : ids) if (id > 0) success++;
    std::cout << "下单 10 次,成功 " << success << "  剩余库存 " << p.getStock() << "\n";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
🧪 预期输出:
下单 10 次,成功 5  剩余库存 0
1
2

✅ 库存 5 件,10 个请求只有 5 个成功,业务正确。


# Step 5.2 FlashStrategy 并发

新建 FlashStrategy.h:

// FlashStrategy.h
#pragma once
#include "OrderStrategy.h"
#include "ThreadPool.h"
#include <future>

class FlashStrategy : public OrderStrategy {
private:
    ThreadPool& pool;

public:
    explicit FlashStrategy(ThreadPool& p) : pool(p) {}

    std::vector<int> placeOrders(Product& product, OrderManager& mgr,
                                  int userId,
                                  const std::vector<int>& quantities) override {
        // ⭐ 每个下单作为一个 task 提交,工人并发抢库存
        std::vector<std::future<int>> futs;
        for (int qty : quantities) {
            futs.push_back(pool.submit([&product, &mgr, userId, qty]{
                if (product.tryDeduct(qty)) {
                    return mgr.addOrder(userId, product.getId(), qty);
                }
                return -1;
            }));
        }

        // 收集所有结果(按提交顺序)
        std::vector<int> result;
        for (auto& f : futs) result.push_back(f.get());
        return result;
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

对比测试,同一份输入,分别用 Normal 和 Flash 跑,比时间:

#include "NormalStrategy.h"
#include "FlashStrategy.h"
#include <iostream>
#include <chrono>

void bench(const char* name, OrderStrategy& s, Product& p, OrderManager& mgr) {
    std::vector<int> qtys(1000, 1);
    auto t0 = std::chrono::steady_clock::now();
    auto ids = s.placeOrders(p, mgr, 1, qtys);
    auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                std::chrono::steady_clock::now() - t0).count();
    int ok = 0;
    for (int id : ids) if (id > 0) ok++;
    std::cout << name << ": 成功 " << ok << " / 1000  耗时 " << ms << " ms\n";
}

int main() {
    ThreadPool pool(8);

    {
        Product p(1, "iPhone", 7999, 1000);
        OrderManager mgr;
        NormalStrategy normal;
        bench("Normal", normal, p, mgr);
    }
    {
        Product p(1, "iPhone", 7999, 1000);
        OrderManager mgr;
        FlashStrategy flash(pool);
        bench("Flash ", flash, p, mgr);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
🧪 预期输出(差距明显):
Normal: 成功 1000 / 1000  耗时 12 ms
Flash : 成功 1000 / 1000  耗时 4 ms       ← 8 线程并发约 3 倍加速
1
2
3

✅ Flash 比 Normal 快 2-3 倍,任务越重,差距越大(CPU 密集型可达 8 倍)。

📚 为什么不是直接 8 倍快? , 因为本例中"扣库存 + 加订单"本身已经被 atomic / shared_mutex 串行化。如果任务里加 100ms 的 IO(如发邮件、查数据库),加速比就会接近 8 倍。


# Step 5.3 BatchStrategy 分批

新建 BatchStrategy.h,一次提交多批,控制内存压力:

// BatchStrategy.h
#pragma once
#include "OrderStrategy.h"
#include "ThreadPool.h"
#include <future>
#include <algorithm>

class BatchStrategy : public OrderStrategy {
private:
    ThreadPool& pool;
    size_t      batchSize;

public:
    BatchStrategy(ThreadPool& p, size_t bs = 10) : pool(p), batchSize(bs) {}

    std::vector<int> placeOrders(Product& product, OrderManager& mgr,
                                  int userId,
                                  const std::vector<int>& quantities) override {
        std::vector<int> result(quantities.size(), -1);

        // ⭐ 每 batchSize 个为一组,组内并发,组间串行
        for (size_t i = 0; i < quantities.size(); i += batchSize) {
            size_t end = std::min(i + batchSize, quantities.size());

            std::vector<std::future<int>> futs;
            for (size_t j = i; j < end; ++j) {
                int qty = quantities[j];
                futs.push_back(pool.submit([&product, &mgr, userId, qty]{
                    if (product.tryDeduct(qty)) {
                        return mgr.addOrder(userId, product.getId(), qty);
                    }
                    return -1;
                }));
            }
            // 等本批完成后再开下一批
            for (size_t j = i; j < end; ++j) {
                result[j] = futs[j - i].get();
            }
        }
        return result;
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

测试,下 50 单 batchSize=10 的情况:

ThreadPool pool(8);
Product p(1, "iPhone", 7999, 30);          // 库存只够一半多
OrderManager mgr;
BatchStrategy batch(pool, 10);

std::vector<int> qtys(50, 1);
auto ids = batch.placeOrders(p, mgr, 1, qtys);
int ok = 0;
for (int id : ids) if (id > 0) ok++;
std::cout << "Batch 50 单 batchSize=10: 成功 " << ok
          << "  剩余 " << p.getStock() << "\n";
1
2
3
4
5
6
7
8
9
10
11
🧪 预期输出:
Batch 50 单 batchSize=10: 成功 30  剩余 0
1
2

✅ 库存 30 全部卖光,剩 20 单失败,业务正确,且内存占用最多只有 10 个 future(不像 Flash 一口气创建 50 个)。

┌─ 📌 阶段 ⑤ 小结 ────────────────────────────────────────┐
│ ✅ 已掌握                                                 │
│   • Step 5.1 OrderStrategy 抽象接口 + Normal 顺序版         │
│   • Step 5.2 FlashStrategy 线程池并发 → 实测 3 倍加速       │
│   • Step 5.3 BatchStrategy 分批 + future 收集 → 内存可控   │
│ 📊 三策略对比:                                              │
│   ┌─────────┬─────┬─────┬───────────────┐                  │
│   │ 策略     │并发度│吞吐量│ 适用场景       │                  │
│   │ Normal  │  1  │ 低  │ 顺序业务/小流量 │                  │
│   │ Flash   │  N  │ 高  │ 秒杀/突发流量   │                  │
│   │ Batch   │ M/批│ 中  │ 后台批处理     │                  │
│   └─────────┴─────┴─────┴───────────────┘                  │
│ 🔜 下一阶段 ⑥:Logger 线程安全日志                          │
│ 💡 commit 建议:feat: 3 order strategies                  │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 07.线程安全 Logger

┌─ 🎯 阶段 ⑥ 目标 ────────────────────────────────────────┐
│ 完成什么:Logger , 线程安全的格式化日志                   │
│ 不做什么:不做日志分级(DEBUG/INFO/ERROR)、不写文件        │
│ 验收标准:先写裸 cout 看日志撕成乱码 → 加锁后行行清晰         │
│ 预计耗时:30 min · 2 Step(造 BUG → 修 BUG)              │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6

💡 std::cout 本身不保证线程安全 , 多线程同时输出会"字符级"交错。比如线程 A 输出 "Hello\n"、线程 B 输出 "World\n",可能会变成 "HelWloolrd\n\n"。这一阶段我们让你看到这种乱码,再用 mutex 修好它。


# Step 6.1 裸 cout 见撕裂

不写 Logger,直接用 cout 起步,写一个测试:

// test_log.cpp(v1:裸 cout,会撕裂)
#include <iostream>
#include <thread>
#include <vector>

int main() {
    std::vector<std::thread> threads;
    for (int t = 0; t < 4; ++t) {
        threads.emplace_back([t]{
            for (int i = 0; i < 5; ++i) {
                // 故意分多次 << 输出,放大撕裂效应
                std::cout << "[Thread " << t << "] ";
                std::cout << "iter ";
                std::cout << i;
                std::cout << "\n";
            }
        });
    }
    for (auto& th : threads) th.join();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
🧪 编译运行(多跑几次都能看到乱码):
$ g++ -std=c++17 -pthread test_log.cpp -o test_log &amp;&amp; ./test_log
[Thread [Thread 0] iter 1] iter [Thread 0
2] iter [Thread 3] iter 0
0
[Thread 1] iter 1
[Thread 0] iter 1
[Thread 2] iter 1
...
1
2
3
4
5
6
7
8
9

⚠️ 看见了吗,第一行 "[Thread 0] iter 0" 和 "[Thread 1] iter 0" 完全交错在一起,根本读不懂哪条来自哪个线程。

📚 为什么 cout 不安全? , std::cout << x 会调用 operator<< 的几次内部函数。多个线程的多次调用没有任何同步,所以输出字符级交错。

⚠️ 注意:C++11 之后单次 operator<< 是原子的(不会撕一个字符串),但多次 << 拼一行不是原子的,这就是上面乱码的原因。


# Step 6.2 mutex 折叠 Logger

新建 Logger.h,一次性写完所有要点:

// Logger.h(v1:线程安全 + 时间戳 + 折叠输出)
#pragma once
#include <iostream>
#include <mutex>
#include <sstream>
#include <thread>
#include <chrono>
#include <iomanip>

class Logger {
private:
    static inline std::mutex mtx;        // ⭐ C++17 inline static:头文件里就能初始化

    // 生成 "HH:MM:SS.mmm" 格式时间戳
    static std::string timestamp() {
        auto now = std::chrono::system_clock::now();
        auto t   = std::chrono::system_clock::to_time_t(now);
        auto ms  = std::chrono::duration_cast<std::chrono::milliseconds>(
                      now.time_since_epoch()) % 1000;
        std::ostringstream oss;
        oss << std::put_time(std::localtime(&t), "%H:%M:%S")
            << "." << std::setw(3) << std::setfill('0') << ms.count();
        return oss.str();
    }

public:
    template <typename... Args>
    static void info(Args&&... args) {
        std::lock_guard<std::mutex> lock(mtx);                       // ⭐ 整行原子
        std::cout << "[" << timestamp() << "][T"
                  << std::this_thread::get_id() << "] ";
        (std::cout << ... << std::forward<Args>(args));              // ⭐ C++17 折叠表达式
        std::cout << "\n";
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

升级测试,把 cout 全换成 Logger::info:

// test_log.cpp(v2:用 Logger)
#include "Logger.h"
#include <thread>
#include <vector>

int main() {
    std::vector<std::thread> threads;
    for (int t = 0; t < 4; ++t) {
        threads.emplace_back([t]{
            for (int i = 0; i < 5; ++i) {
                Logger::info("Thread ", t, " iter ", i);
            }
        });
    }
    for (auto& th : threads) th.join();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
🧪 预期输出(每行完整、不撕裂):
[14:05:32.001][T0x70...01] Thread 0 iter 0
[14:05:32.001][T0x70...02] Thread 1 iter 0
[14:05:32.001][T0x70...03] Thread 2 iter 0
[14:05:32.001][T0x70...04] Thread 3 iter 0
[14:05:32.002][T0x70...01] Thread 0 iter 1
...
1
2
3
4
5
6
7

✅ 每行都完整、带时间戳、带线程号,日志的"行原子性"保住了。

📚 C++17 折叠表达式 (... op pack) 神技:

(std::cout << ... << args)          // 等价于 cout << arg0 << arg1 << ... << argN
1

比 C++11 时代手写递归模板优雅 100 倍,以前要这么写:

// C++11 老办法:递归
void log() {}
template<class T, class... Args>
void log(T&& t, Args&&... rest) { cout << t; log(rest...); }
1
2
3
4

💼 static inline std::mutex mtx; , C++17 起允许 inline 静态成员直接在类内初始化,不用再去 .cpp 写一句 std::mutex Logger::mtx; 了。

┌─ 📌 阶段 ⑥ 小结 ────────────────────────────────────────┐
│ ✅ 已掌握                                                 │
│   • Step 6.1 裸 cout 多线程 → 亲眼看到日志撕裂              │
│   • Step 6.2 mutex + 折叠表达式 → 行原子日志 ⭐             │
│ 🔜 下一阶段 ⑦:把所有组件串起来做 CLI + 1万 QPS 压测         │
│ 💡 commit 建议:feat: thread-safe logger                  │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6
7

# 08.CLI 与压测

┌─ 🎯 阶段 ⑦ 目标 ──────────── 整个项目的最终验收 ─────────┐
│ 完成什么:交互式 CLI(main.cpp)+ 1 万 QPS 压测(benchmark)│
│ 不做什么:不做菜单美化、不做命令历史                          │
│ 验收标准:压测脚本输出 "一致性: ✅ PASS" + QPS > 10000      │
│ 预计耗时:60 min · 2 Step                                  │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6

💡 本阶段是整个项目的"组装阶段",你前面 6 个阶段做好的零件,现在装到一辆能跑的车上。


# Step 7.1 交互式 main

新建 main.cpp:

// main.cpp
#include "ThreadPool.h"
#include "OrderManager.h"
#include "Product.h"
#include "NormalStrategy.h"
#include "FlashStrategy.h"
#include "Logger.h"
#include <iostream>

int main() {
    ThreadPool   pool(4);
    OrderManager mgr;
    Product      iphone(1, "iPhone 15", 7999.0, 100);

    NormalStrategy normal;
    FlashStrategy  flash(pool);

    while (true) {
        std::cout << "\n========== 订单系统 ==========\n"
                     "1=普通下单  2=秒杀下单  3=查询订单\n"
                     "4=查看库存  0=退出\n> ";
        int op;
        if (!(std::cin >> op)) break;
        if (op == 0) break;

        try {
            if (op == 1 || op == 2) {
                int userId, count;
                std::cout << "用户 ID: ";    std::cin >> userId;
                std::cout << "下单次数: ";    std::cin >> count;

                std::vector<int> qtys(count, 1);
                OrderStrategy& s = (op == 1)
                    ? static_cast<OrderStrategy&>(normal)
                    : static_cast<OrderStrategy&>(flash);

                auto ids = s.placeOrders(iphone, mgr, userId, qtys);
                int success = 0;
                for (int id : ids) if (id > 0) success++;
                Logger::info("用户 ", userId, " 提交 ", count,
                             " 单,成功 ", success);
            }
            else if (op == 3) {
                int userId;
                std::cout << "用户 ID: "; std::cin >> userId;
                auto v = mgr.ordersByUser(userId);
                std::cout << "用户 " << userId << " 共 " << v.size() << " 单:\n";
                for (const auto& o : v) {
                    std::cout << "  订单 #" << o.id
                              << " - 商品 " << o.productId
                              << " - 数量 " << o.quantity << "\n";
                }
            }
            else if (op == 4) {
                std::cout << "iPhone 15 当前库存: " << iphone.getStock() << "\n";
            }
        } catch (const std::exception& e) {
            std::cout << "ERR " << e.what() << "\n";
        }
    }
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

编译运行 + 互动验证:

🧪 编译运行:
$ g++ -std=c++17 -pthread -O2 main.cpp -o orderapp
$ ./orderapp

========== 订单系统 ==========
1=普通下单  2=秒杀下单  3=查询订单  4=查看库存  0=退出
> 4
iPhone 15 当前库存: 100

> 1
用户 ID: 100
下单次数: 30
[14:06:01.234][T0x70...] 用户 100 提交 30 单,成功 30

> 2
用户 ID: 200
下单次数: 80          ← 库存只剩 70
[14:06:05.678][T0x70...] 用户 200 提交 80 单,成功 70

> 4
iPhone 15 当前库存: 0

> 3
用户 ID: 100
用户 100 共 30 单:
  订单 #1 - 商品 1 - 数量 1
  订单 #2 - 商品 1 - 数量 1
  ...
> 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

✅ 三个关键点都验证到了:

  1. 库存正确递减(100 → 70 → 0)
  2. 超过库存的下单部分失败(200 用户下 80 单只成功 70)
  3. 查询能拿到该用户的订单

# Step 7.2 benchmark 万 QPS

新建 benchmark.cpp,这是最终验收:

// benchmark.cpp
#include "ThreadPool.h"
#include "OrderManager.h"
#include "Product.h"
#include "FlashStrategy.h"
#include <chrono>
#include <iostream>
#include <atomic>
#include <thread>

int main() {
    constexpr int TOTAL_USERS     = 1000;
    constexpr int ORDERS_PER_USER = 10;
    constexpr int INITIAL_STOCK   = 5000;        // ⭐ 故意 < 总请求 10000

    ThreadPool   pool(std::thread::hardware_concurrency());
    OrderManager mgr;
    Product      iphone(1, "iPhone 15", 7999.0, INITIAL_STOCK);

    FlashStrategy flash(pool);
    std::vector<int> qtys(ORDERS_PER_USER, 1);
    std::atomic<int> totalSuccess{0};

    auto t0 = std::chrono::steady_clock::now();

    // 启动 1000 个用户线程,每人下 10 单
    std::vector<std::thread> users;
    users.reserve(TOTAL_USERS);
    for (int u = 0; u < TOTAL_USERS; ++u) {
        users.emplace_back([&, u]{
            auto ids = flash.placeOrders(iphone, mgr, u, qtys);
            for (int id : ids) if (id > 0) totalSuccess++;
        });
    }
    for (auto& t : users) t.join();

    auto t1 = std::chrono::steady_clock::now();
    auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0).count();

    int total    = TOTAL_USERS * ORDERS_PER_USER;
    int success  = totalSuccess.load();
    int leftover = iphone.getStock();

    std::cout << "========================================\n";
    std::cout << "总请求数:    " << total << "\n";
    std::cout << "成功订单:    " << success << "\n";
    std::cout << "剩余库存:    " << leftover << "\n";
    std::cout << "总耗时:     " << ms << " ms\n";
    std::cout << "QPS:        " << (total * 1000.0 / ms) << "\n";

    // ⭐⭐⭐ 关键一致性验证
    bool consistent = (success + leftover == INITIAL_STOCK);
    std::cout << "一致性校验: 成功(" << success << ") + 剩余(" << leftover
              << ") == 初始(" << INITIAL_STOCK << ") ? "
              << (consistent ? "✅ PASS" : "❌ FAIL:超卖或漏单!") << "\n";
    std::cout << "========================================\n";
    return consistent ? 0 : 1;          // 一致则退出码 0
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

🧪 编译运行(多跑几次都应一致):

$ g++ -std=c++17 -pthread -O2 benchmark.cpp -o bench
$ for i in 1 2 3 4 5; do ./bench; echo; done

========================================
总请求数:    10000
成功订单:    5000
剩余库存:    0
总耗时:     487 ms
QPS:        20533.88
一致性校验: 成功(5000) + 剩余(0) == 初始(5000) ? ✅ PASS
========================================
(5 次完全一致 PASS)
1
2
3
4
5
6
7
8
9
10
11
12

✅ 三个最终验证全部通过:

  1. QPS > 20000(普通笔记本 4 核能轻松达到,证明线程池调度高效)
  2. 零超卖(atomic CAS 在万级并发下零失误)
  3. 零漏单(shared_mutex 正确保护 vector 不丢数据)
┌─ 📌 阶段 ⑦ 小结 + 整个项目里程碑 ──────────────────────┐
│ ✅ 已掌握                                                 │
│   • Step 7.1 交互式 CLI 串起 4 个核心组件                  │
│   • Step 7.2 万级并发压测 + 一致性校验 PASS                │
│ 🎉 整个 7 阶段 22 Step 全部跑通,你已经掌握了:           │
│   • mutex / shared_mutex / atomic / cv 四大同步原语        │
│   • thread / future / packaged_task 异步任务三件套         │
│   • 生产者-消费者 + 策略模式 两个工业级模式                 │
│   • "造 BUG → 看 BUG → 修 BUG" 这种以错为师的并发学习法     │
│ 💡 commit 建议:feat: cli + benchmark with consistency   │
└──────────────────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11

# 09.项目总结速查

# 9.1 整体架构

       Application
            │
            ▼
   OrderStrategy(策略模式)
       │       │      │
       ▼       ▼      ▼
   Normal   Flash   Batch
       │       │      │
       └───┬───┴──────┘
           ▼
     ThreadPool ◄──── BlockingQueue
           │
           ▼
     OrderManager (shared_mutex) + Product (atomic)
           │
           ▼
        Logger (mutex)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 9.2 并发原语速查表

原语 头文件 用途 性能
std::thread <thread> 启动线程 -
std::jthread 🆕 <thread> C++20 自动 join + stop_token 同 thread
std::mutex <mutex> 基础互斥锁 中(约 25ns/次)
std::shared_mutex <shared_mutex> C++17 读写锁 慢(约 60ns)
std::atomic<T> <atomic> 无锁原子变量 极快(约 5ns)
std::condition_variable <condition_variable> 条件等待 -
std::future <future> 异步结果 -
std::lock_guard <mutex> RAII 锁守卫 同 mutex
std::unique_lock <mutex> 灵活锁守卫(CV 必备) 比 lock_guard 略慢

# 9.3 选择决策树

要保护的数据
    ├── 是单个变量(int/bool/指针)?
    │      └── ✅ 用 std::atomic&lt;T>
    │
    ├── 多读少写(读 > 80%)?
    │      └── ✅ 用 std::shared_mutex
    │
    ├── 任务执行有先后依赖?
    │      └── ✅ 用 condition_variable
    │
    └── 需要"等结果"?
           └── ✅ 用 future + promise(或 packaged_task)
1
2
3
4
5
6
7
8
9
10
11
12

# 10.项目技术思考

# 10.1 三种锁的对比

// 三种实现"线程安全计数器"的方法
class CounterMutex {
    int v = 0; std::mutex m;
public:
    void inc() { std::lock_guard l(m); v++; }      // 25 ns
    int  get() { std::lock_guard l(m); return v; }
};
class CounterShared {
    int v = 0; std::shared_mutex m;
public:
    void inc() { std::unique_lock l(m); v++; }     // 35 ns
    int  get() { std::shared_lock l(m); return v; }// 30 ns(多读快)
};
class CounterAtomic {
    std::atomic<int> v{0};
public:
    void inc() { v.fetch_add(1); }                 // 5 ns
    int  get() { return v.load(); }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

结论:单变量场景永远首选 atomic;多变量需要"事务一致性"才用 mutex。

# 10.2 cv 与 unique_lock

cv.wait(lock) 的实现伪码:

void wait(unique_lock& lk) {
    lk.unlock();              // 1. 释放锁
    block_until_notified();   // 2. 阻塞等通知
    lk.lock();                // 3. 醒来后重新加锁
}
1
2
3
4
5

因此 wait() 必须能 unlock 和 lock,lock_guard 不支持手动 unlock,所以 cv 必须用 unique_lock。

# 10.3 jthread 解决了什么

// 旧 thread 的痛点:
std::thread t(work);
// 必须记得 t.join() 否则程序崩溃(terminate)
// 想中断 t 没有官方机制

// C++20 jthread:
std::jthread t([](std::stop_token st) {
    while (!st.stop_requested()) { /* work */ }
});
// 析构自动 join,析构前自动发 stop request
1
2
3
4
5
6
7
8
9
10

jthread = thread + 自动 join + 协作中断,是替代 thread 的"全面升级"。


# 11.衔接与延伸

# 11.1 与上一案例的差异

维度 案例 04 JSON+KV 案例 05 多线程订单
线程模型 单线程 多线程 + 线程池
同步机制 无 mutex / shared_mutex / atomic / cv
异步任务 无 future / promise / packaged_task
设计模式 工厂 策略模式 + 生产者消费者

# 11.2 下一案例的递进

下一案例 06.迷你 KV 存储引擎 会做四件升级:

  1. 持久化升级:从内存 KV 升级为 AOF 日志持久化(每次写命令落盘)
  2. 命令模式:所有 SET / GET / DEL / EXPIRE 都是 Command 子类
  3. 后台清理:用 jthread + stop_token 实现 TTL 过期键清理
  4. 手写单测:用宏 + 注册表实现"裸 C++ 单元测试框架"

# 11.3 三个延伸挑战

挑战 A(基础)· 用 jthread 重写线程池

把 std::thread 换成 std::jthread,析构时不再需要手动 shutdown。提示:用 stop_token 替代 BlockingQueue::shutdown(),彻底消除 §03 灵魂三问里提到的"析构顺序错就死锁"的隐患。

挑战 B(进阶)· 实现 readers-writer 优先级

当前 shared_mutex 在大量读时,写线程会"饿死"(永远拿不到锁)。请实现"写优先",有写请求时阻止新读者进入。

挑战 C(现代化)· 集成 std::async 和协程(C++20)

把 pool.submit(f) 替换为 co_await async_op(),体验协程的简洁。这是 C++20 最重要的特性之一,详见卷三。


  • ⬅ 上一案例:04.JSON与内存数据库
  • ➡ 下一案例:06.迷你KV存储引擎 , 卷一 17 章全章串联的毕业设计
上次更新: 2026/06/11, 18:49:10
Json与内存数据库
迷你KV存储引擎器

← Json与内存数据库 迷你KV存储引擎器→

最近更新
01
信号崩溃快速排查
06-15
02
CoreDump破案
06-15
03
perf火焰图实战
06-15
更多文章>
Theme by Vdoing | Copyright © 2019-2026 杨充 | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式