编程进阶网 编程进阶网
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接

杨充

专注编程 · 终身学习者
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接
  • README
  • C语言入门精通

  • Cpp入门到精通

    • README
    • 入门教程

      • README
      • Cpp简史
      • 基础语法
      • 数据类型
      • 运算符
      • 复合类型
      • 流程语句
      • 函数
      • 指针引用
      • 类和对象
      • 继承多态
      • 内存模型
      • 动态内存
      • IO和文件
      • 异常处理
      • 线程和锁
        • 15.1 线程
          • 15.1.1 线程介绍
          • 15.1.2 创建线程
          • 15.1.2.1 使用函数创建线程
          • 15.1.2.2 Lambda表达式创建线程
          • 15.1.2.3 传递参数给线程函数
          • 15.1.3 等待线程结束
          • 15.1.4 分离线程
          • 15.1.5 检查线程
          • 15.1.6 线程休眠
          • 15.1.9 线程管理总结
          • 15.1.10 综合案例与思考
        • 15.2 多线程
          • 15.2.1 多线程头文件
          • 15.2.2 创建多线程
          • 15.2.3 多线程数据竞争
          • 15.2.4 死锁
          • 15.2.5 虚假唤醒
          • 15.2.6 线程局部存储
          • 15.2.7 C++锁机制发展
          • 15.2.8 综合案例与思考
        • 15.3 mutex
          • 15.3.1 mutex作用
          • 15.3.2 lock和unlock
          • 15.3.3 lock_guard
          • 15.3.4 unique_lock
          • 15.3.5 adopt_lock
          • 15.1.6 recursive_mutex
          • 15.3.7 mutex总结
          • 15.3.8 综合案例与思考
        • 15.4 condition_variable
          • 15.4.1 条件变量
          • 15.4.2 核心方法
          • 15.4.3 案例展示
          • 15.4.4 条件变量总结
          • 15.4.5 综合案例与思考
        • 15.5 atomic
          • 15.5.1 基本概念
          • 15.5.2 常用操作
          • 15.5.3 使用示例
          • 15.5.4 核心原理
          • 15.5.5 注意事项
          • 15.5.6 综合案例与思考
        • 15.6 async和future
          • 15.6.1 异步概念
          • 15.6.2 核心API
          • 15.6.3 简单案例
          • 15.6.4 多个异步任务
        • 15.7 shared_mutex
          • 15.7.1 读写锁机制
          • 15.7.2 简单案例
          • 15.7.3 应用场景
          • 15.7.4 实现原理
          • 15.7.5 综合案例与思考
        • 15.9 综合案例
          • 15.9.1 死锁与避免
          • 15.9.2 延迟任务
          • 15.9.3 轮训定时器
          • 15.9.4 提前唤醒休眠
        • 15.10 线程和锁底层原理
          • 15.10.1 线程的内核实现
          • 15.10.2 mutex的底层机制
          • 15.10.3 原子操作与内存序
        • 15.11 线程和锁训练题
        • 15.12 综合思考题
      • STL模版
      • 预处理器
      • 特性图谱
    • 综合案例

    • 专栏博客

    • 开发技巧

  • Java入门精通

  • Go入门到精通

  • JavaScript入门

  • CodeX
  • Cpp入门到精通
  • 入门教程
杨充
2026-05-07
目录

线程和锁

# 第 15 章 C++ 线程和锁

# 目录介绍

  • 15.1 线程管理
    • 15.1.1 线程介绍
    • 15.1.2 创建线程
    • 15.1.3 等待线程结束
    • 15.1.4 分离线程
    • 15.1.5 检查线程
    • 15.1.6 线程休眠
    • 15.1.9 线程管理总结
    • 15.1.10 综合案例与思考
  • 15.2 多线程
    • 15.2.1 多线程头文件
    • 15.2.2 创建多线程
    • 15.2.3 多线程数据竞争
    • 15.2.4 死锁
    • 15.2.5 虚假唤醒
    • 15.2.6 线程局部存储
    • 15.2.7 C++锁机制发展
    • 15.2.8 综合案例与思考
  • 15.3 mutex
    • 15.3.1 mutex作用
    • 15.3.2 lock和unlock
    • 15.3.3 lock_guard
    • 15.3.4 unique_lock
    • 15.3.5 adopt_lock
    • 15.3.6 recursive_mutex
    • 15.3.7 mutex总结
    • 15.3.8 综合案例与思考
  • 15.4 condition_variable
    • 15.4.1 条件变量
    • 15.4.2 核心方法
    • 15.4.3 案例展示
    • 15.4.4 条件变量总结
    • 15.4.5 综合案例与思考
  • 15.5 atomic
    • 15.5.1 基本概念
    • 15.5.2 常用操作
    • 15.5.3 使用示例
    • 15.5.4 核心原理
    • 15.5.5 注意事项
    • 15.5.6 综合案例与思考
  • 15.6 async和future
    • 15.6.1 异步概念
    • 15.6.2 核心API
    • 15.6.3 简单案例
    • 15.6.4 多个异步任务
  • 15.7 shared_mutex
    • 15.7.1 读写锁机制
    • 15.7.2 简单案例
    • 15.7.3 应用场景
    • 15.7.4 实现原理
    • 15.7.5 综合案例与思考
  • 15.9 综合案例
    • 15.9.1 死锁与避免
    • 15.9.2 延迟任务
    • 15.9.3 轮训定时器
    • 15.9.4 提前唤醒休眠
  • 15.10 线程和锁底层原理
    • 15.10.1 线程的内核实现
    • 15.10.2 mutex的底层机制
    • 15.10.3 原子操作与内存序
  • 15.11 线程和锁训练题
  • 15.12 综合思考题
    • 15.9.2 延迟任务
    • 15.9.3 轮训定时器
    • 15.9.4 提前唤醒休眠

# 15.1 线程

# 15.1.1 线程介绍

在 C++ 中,线程 是并发执行的基本单位。C++11 引入了标准库支持多线程编程,提供了 std::thread 类来创建和管理线程。

std::thread 在 <thread> 头文件中声明,因此使用 std::thread 需包含 <thread> 头文件。

# 15.1.2 创建线程

使用 std::thread 创建线程,需要传入一个可调用对象(如函数、Lambda 表达式、函数对象等)。

# 15.1.2.1 使用函数创建线程

#include <iostream>
#include <thread>

void threadFunction() {
    std::cout << "Hello from thread!" << std::endl;
}

int main() {
    std::thread t(threadFunction); // 创建线程
    t.join(); // 等待线程结束
    std::cout << "Main thread finished." << std::endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 15.1.2.2 Lambda表达式创建线程

#include <iostream>
#include <thread>

int main() {
    std::thread t([]() {
        std::cout << "Hello from Lambda thread!" << std::endl;
    });
    t.join();
    std::cout << "Main thread finished." << std::endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11

# 15.1.2.3 传递参数给线程函数

值传递:直接传递

void func(int a, const std::string& s);
std::thread t(func, 42, "Hello");
1
2

引用传递:用 std::ref

void modify(int& x);
int value = 0;
std::thread t(modify, std::ref(value)); // 传递引用
1
2
3

然后看一个具体的案例,如下所示:

#include <iostream>
#include <thread>

void printMessage(const std::string& message) {
    std::cout << message << std::endl;
}

int main() {
    std::thread t(printMessage, "Hello from thread with arguments!");
    t.join();
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12

# 15.1.3 等待线程结束

join() 的作用

  1. 等待线程完成:调用 join() 会阻塞当前线程,直到目标线程执行完毕。
  2. 释放线程资源:线程执行完毕后,join() 会释放线程占用的资源。
  3. 确保线程安全:如果不调用 join() 或 detach(),std::thread 对象析构时会调用 std::terminate,导致程序崩溃。

使用 join() 等待线程执行完毕。

std::thread t(threadFunction);
t.join(); // 等待线程结束
1
2

# 15.1.4 分离线程

detach() 的作用

  1. 分离线程:将线程与 std::thread 对象分离,线程在后台独立运行。
  2. 释放 std::thread 对象:分离后,std::thread 对象不再管理该线程,可以安全销毁。
  3. 避免资源泄漏:分离线程后,线程的资源会在其执行完毕后由操作系统自动回收。

使用 detach() 将线程与主线程分离,线程在后台独立运行。

std::thread t(threadFunction);
t.detach(); // 分离线程
1
2

join() 与 detach() 的区别

特性 join() detach()
线程管理 std::thread 对象管理线程 std::thread 对象不再管理线程
阻塞主线程 阻塞主线程,直到目标线程完成 不阻塞主线程,主线程继续执行
线程资源回收 线程完成后,资源由 join() 释放 线程完成后,资源由操作系统自动回收
线程控制 可以通过 std::thread 对象控制线程 无法通过 std::thread 对象控制线程

# 15.1.5 检查线程

使用 joinable() 检查线程是否可以被 join() 或 detach()。

std::thread t(threadFunction);
if (t.joinable()) {
    t.join();
}
1
2
3
4

# 15.1.6 线程休眠

  • sleep_until: 线程休眠至某个指定的时刻(time point),该线程才被重新唤醒。
  • sleep_for: 线程休眠某个指定的时间片(time span),该线程才被重新唤醒,不过由于线程调度等原因,实际休眠时间可能比 sleep_duration 所表示的时间片更长。

# 15.1.9 线程管理总结

方法 作用
join() 阻塞当前线程,直到目标线程完成
detach() 分离线程(资源由系统自动回收),分离后不可再 join
joinable() 检查线程是否可 join(未执行 join 或 detach 时返回 true)
get_id() 获取线程唯一标识符
std::thread::hardware_concurrency() 返回系统支持的并发线程数(逻辑CPU核心数)

# 15.1.10 综合案例与思考

下面通过一个"多线程下载模拟器"案例,综合演示线程创建、join/detach、joinable 检查、线程休眠和线程 ID 获取:

#include <iostream>
#include <thread>
#include <vector>
#include <string>
#include <chrono>
using namespace std;

// 模拟下载任务
void downloadFile(const string& filename, int sizeMB) {
    auto tid = this_thread::get_id();
    cout << "[线程" << tid << "] 开始下载: " << filename
         << " (" << sizeMB << "MB)" << endl;

    // 模拟下载耗时:每 100MB 休眠 100ms
    int steps = sizeMB / 10;
    for (int i = 1; i <= steps; ++i) {
        this_thread::sleep_for(chrono::milliseconds(50));
        if (i == steps) {
            cout << "[线程" << tid << "] " << filename << " 下载完成 100%" << endl;
        }
    }
}

// 后台日志线程(分离线程)
void backgroundLogger() {
    for (int i = 0; i < 3; ++i) {
        this_thread::sleep_for(chrono::milliseconds(80));
        cout << "[后台日志] 系统运行正常 #" << i + 1 << endl;
    }
}

int main() {
    cout << "主线程ID: " << this_thread::get_id() << endl;
    cout << "硬件并发数: " << thread::hardware_concurrency() << endl;

    // 1. 创建并分离后台日志线程(detach)
    thread logThread(backgroundLogger);
    logThread.detach();
    cout << "日志线程已分离, joinable=" << logThread.joinable() << endl;

    // 2. 创建多个下载线程并等待完成(join)
    vector<thread> workers;
    vector<pair<string, int>> tasks = {
        {"video.mp4", 50}, {"image.zip", 30}, {"doc.pdf", 20}
    };

    for (auto& [name, size] : tasks) {
        workers.emplace_back(downloadFile, name, size);
    }

    // 3. 使用 Lambda 创建一个额外线程
    workers.emplace_back([]{
        cout << "[Lambda线程" << this_thread::get_id() << "] 校验文件完整性..." << endl;
        this_thread::sleep_for(chrono::milliseconds(200));
        cout << "[Lambda线程] 校验完成" << endl;
    });

    // 4. 等待所有工作线程完成
    for (auto& t : workers) {
        if (t.joinable()) {
            t.join();
        }
    }

    // 给 detach 的日志线程一点时间输出
    this_thread::sleep_for(chrono::milliseconds(300));
    cout << "所有下载任务完成" << endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

案例知识融合:本案例整合了线程管理的所有核心知识点:通过函数和 Lambda 两种方式创建线程,使用 join() 等待下载线程完成,使用 detach() 分离后台日志线程,通过 joinable() 安全地检查线程状态避免重复 join,this_thread::get_id() 获取线程标识,this_thread::sleep_for() 模拟耗时操作,hardware_concurrency() 查询系统并发能力。分离线程和等待线程的对比使用体现了两种线程生命周期管理策略的差异。

思考题:

  1. 如果主线程在 detach 之后立即退出(不 sleep),后台日志线程的输出会怎样?为什么?
  2. 如果忘记对某个 worker 线程调用 join() 或 detach(),程序会发生什么?
  3. 使用 std::ref 传引用和直接传值给线程函数有什么区别?如果 downloadFile 的 filename 参数不是 const 引用会有什么问题?

# 15.2 多线程

# 15.2.1 多线程头文件

C++11 新标准中引入了五个头文件来支持多线程编程,它们分别是 <atomic>, <thread>, <mutex>, <condition_variable> 和 <future>。

  • <atomic>:该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数。
  • <thread>:该头文件主要声明了 std::thread 类,另外 std::this_thread 命名空间也在该头文件中。
  • <mutex>:该头文件主要声明了与互斥量(Mutex)相关的类,包括 std::mutex_* 一系列类,std::lock_guard, std::unique_lock, 以及其他的类型和函数。
  • <condition_variable>:该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any。
  • <future>:该头文件主要声明了 std::promise, std::package_task 两个 Provider 类,以及 std::future 和 std::shared_future 两个 Future 类,另外还有一些与之相关的类型和函数,std::async() 函数就声明在此头文件中。

# 15.2.2 创建多线程

使用 std::thread 创建多个线程,每个线程执行不同的任务。

1.基本示例

#include <iostream>
#include <thread>

void task(int id) {
    std::cout << "Thread " << id << " is running." << std::endl;
}

int main() {
    std::thread t1(task, 1);
    std::thread t2(task, 2);

    t1.join(); // 等待线程 t1 结束
    t2.join(); // 等待线程 t2 结束

    std::cout << "Main thread finished." << std::endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

2.使用 Lambda 表达式

#include <iostream>
#include <thread>

int main() {
    std::thread t1([]() {
        std::cout << "Thread 1 is running." << std::endl;
    });

    std::thread t2([]() {
        std::cout << "Thread 2 is running." << std::endl;
    });

    t1.join();
    t2.join();

    std::cout << "Main thread finished." << std::endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 15.2.3 多线程数据竞争

多个线程同时访问共享资源,导致未定义行为。解决方法:使用互斥锁(std::mutex)保护共享资源。

# 15.2.4 死锁

多个线程互相等待对方释放锁,导致程序无法继续执行。解决方法:

  • 按固定顺序加锁。
  • 使用 std::lock() 同时锁定多个互斥锁。

# 15.2.5 虚假唤醒

线程在条件变量上被唤醒,但条件并未满足。解决方法:在 wait() 中使用谓词检查条件。

# 15.2.6 线程局部存储

使用 thread_local 关键字声明线程局部变量,每个线程拥有独立的变量副本。

#include <iostream>
#include <thread>

thread_local int threadLocalData = 0;

void threadFunction(int id) {
    threadLocalData = id;
    std::cout << "Thread " << id << " has data: " << threadLocalData << std::endl;
}

int main() {
    std::thread t1(threadFunction, 1);
    std::thread t2(threadFunction, 2);

    t1.join();
    t2.join();

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 15.2.7 C++锁机制发展

  • C++11:引入基本并发原语 (mutex, lock_guard, condition_variable)
  • C++14:增强时间相关操作 (try_lock_for)
  • C++17:添加 scoped_lock 和 shared_mutex
  • C++20:引入信号量 (std::counting_semaphore) 和原子智能指针

# 15.2.8 综合案例与思考

下面通过一个"多线程计数器"案例,综合演示多线程创建、数据竞争问题、死锁成因、thread_local 和锁机制演进:

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <atomic>
using namespace std;

// 演示 1:数据竞争——不加锁的计数器
int unsafeCounter = 0;

void unsafeIncrement(int n) {
    for (int i = 0; i < n; ++i) {
        ++unsafeCounter;  // 数据竞争!
    }
}

// 演示 2:线程安全——使用 mutex
int safeCounter = 0;
mutex mtx;

void safeIncrement(int n) {
    for (int i = 0; i < n; ++i) {
        lock_guard<mutex> lock(mtx);
        ++safeCounter;
    }
}

// 演示 3:线程安全——使用 atomic(无锁)
atomic<int> atomicCounter(0);

void atomicIncrement(int n) {
    for (int i = 0; i < n; ++i) {
        atomicCounter.fetch_add(1, memory_order_relaxed);
    }
}

// 演示 4:thread_local 变量
thread_local int localSum = 0;

void threadLocalDemo(int id, int n) {
    localSum = 0;  // 每个线程有自己独立的 localSum
    for (int i = 0; i < n; ++i) {
        ++localSum;
    }
    cout << "线程" << id << " localSum=" << localSum << endl;
}

// 辅助:运行多线程测试
template<typename Func>
void runTest(const string& name, Func func, int threadCount, int perThread) {
    vector<thread> threads;
    auto start = chrono::high_resolution_clock::now();
    for (int i = 0; i < threadCount; ++i) {
        threads.emplace_back(func, perThread);
    }
    for (auto& t : threads) t.join();
    auto end = chrono::high_resolution_clock::now();
    auto ms = chrono::duration_cast<chrono::microseconds>(end - start).count();
    cout << name << " 耗时: " << ms << "us" << endl;
}

int main() {
    const int THREADS = 4;
    const int COUNT = 100000;

    // 测试 1:数据竞争
    runTest("不加锁", unsafeIncrement, THREADS, COUNT);
    cout << "不加锁结果: " << unsafeCounter
         << " (期望: " << THREADS * COUNT << ")" << endl;

    // 测试 2:mutex 保护
    runTest("mutex锁", safeIncrement, THREADS, COUNT);
    cout << "mutex结果: " << safeCounter << endl;

    // 测试 3:atomic 无锁
    runTest("atomic", atomicIncrement, THREADS, COUNT);
    cout << "atomic结果: " << atomicCounter.load() << endl;

    // 测试 4:thread_local
    cout << "\n--- thread_local 演示 ---" << endl;
    vector<thread> tlThreads;
    for (int i = 0; i < 3; ++i) {
        tlThreads.emplace_back(threadLocalDemo, i, 1000);
    }
    for (auto& t : tlThreads) t.join();

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

运行结果示例:

不加锁 耗时: 1200us
不加锁结果: 387652 (期望: 400000)
mutex锁 耗时: 15000us
mutex结果: 400000
atomic 耗时: 3500us
atomic结果: 400000

--- thread_local 演示 ---
线程0 localSum=1000
线程1 localSum=1000
线程2 localSum=1000
1
2
3
4
5
6
7
8
9
10
11

案例知识融合:本案例通过对比实验直观展示了多线程编程的核心问题与解决方案。不加锁的计数器因数据竞争导致结果不正确;mutex 保护保证了正确性但有性能开销;atomic 提供了无锁的高性能方案。thread_local 让每个线程拥有独立变量副本,避免了共享。性能对比体现了 C++ 锁机制从粗粒度(mutex)到细粒度(atomic)的演进方向。

思考题:

  1. 为什么不加锁的结果总是小于期望值?++unsafeCounter 在 CPU 层面分为哪几步操作?
  2. mutex 方案比 atomic 慢很多倍,原因是什么?什么场景下必须用 mutex 而不能用 atomic?
  3. 如果两个线程同时持有不同的 mutex 并试图获取对方的 mutex,会发生什么?如何避免?

# 15.3 mutex

互斥锁是最常用的锁类型,用于确保同一时间只有一个线程可以访问共享资源。

# 15.3.1 mutex作用

在 C++ 中,std::mutex 是用于实现线程同步的互斥锁。它可以确保多个线程对共享资源的访问是互斥的,从而避免数据竞争和不一致的问题。

# 15.3.2 lock和unlock

std::mutex 提供了 lock() 和 unlock() 方法,用于手动加锁和解锁。

  1. lock(): 尝试获取互斥量的所有权。如果互斥量已被其他线程锁定,则当前线程会被阻塞,直到互斥量被解锁。
  2. unlock(): 释放互斥量的所有权,允许其他线程获取该互斥量。
// 互斥锁
std::mutex mtx;

// 线程函数:递增计数器
void incrementCounter(int id) {
    for (int i = 0; i < 1000; ++i) {
        mtx.lock();  // 手动加锁
        ++counter;  // 修改共享资源
        std::cout << "Thread " << id << " incremented counter to " << counter << std::endl;
        mtx.unlock();  // 手动解锁
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

线程函数: incrementCounter 是每个线程执行的函数。 使用 mtx.lock() 手动加锁,确保每次只有一个线程能够修改 counter。使用 mtx.unlock() 手动解锁,允许其他线程访问共享资源。

原理: 互斥量(mutex)是操作系统提供的一种同步机制,用于保护共享资源,防止多个线程同时访问造成数据竞争。其底层实现通常依赖于操作系统的原子操作和线程调度机制。

  1. 获取锁(lock):当一个线程调用lock()时:如果锁当前是空闲状态(未被任何线程持有),则当前线程获得锁,并继续执行。如果锁已被其他线程持有,当前线程会被操作系统挂起(进入阻塞状态),并放入该锁的等待队列中。
  2. 释放锁(unlock):当一个线程调用unlock()时:锁被释放,变为空闲状态。操作系统会唤醒等待该锁的一个线程(或所有线程,具体取决于实现),被唤醒的线程将尝试获取锁(其中一个会成功,其余可能再次阻塞)。

为什么需要手动管理锁?在某些复杂场景下,自动锁管理(如std::lock_guard)可能不够灵活。手动管理锁提供了更精细的控制,例如:

  1. 需要跨越多个作用域的锁。
  2. 需要根据条件决定何时释放锁。
  3. 需要配合条件变量使用(std::condition_variable要求使用std::unique_lock,其内部就是手动管理)。

注意事项

  1. 避免死锁:如果加锁和解锁之间的代码抛出异常,可能会导致锁未被释放。可以使用 RAII 风格的 std::lock_guard 或 std::unique_lock 来避免这个问题。
  2. 性能问题:频繁调用 lock() 和 unlock() 可能会影响性能。在性能敏感的场景中,可以考虑减少锁的粒度或使用更高效的同步机制。

# 15.3.3 lock_guard

自动管理锁的生命周期,离开作用域时自动释放锁。适用于简单的加锁场景。

1.lock_guard的作用

  1. 自动加锁:在构造时自动锁定互斥锁。
  2. 自动解锁:在析构时自动释放互斥锁。
  3. 防止死锁:确保在作用域结束时互斥锁一定会被释放,即使发生异常。

2.lock_guard设计思想

  1. RAII(Resource Acquisition Is Initialization):将资源的生命周期与对象的生命周期绑定。资源在对象构造时获取,在对象析构时释放。
  2. 简化资源管理:通过自动管理资源的获取和释放,减少手动管理资源的错误(如忘记解锁或异常导致未解锁)。

3.lock_guard的原理

  1. 构造函数: 在构造时,std::lock_guard 会调用 std::mutex 的 lock() 方法,锁定互斥锁。如果互斥锁已被其他线程锁定,当前线程会阻塞,直到锁可用。
  2. 析构函数: 在析构时,std::lock_guard 会调用 std::mutex 的 unlock() 方法,释放互斥锁。即使发生异常,析构函数也会被调用,确保锁一定会被释放。
  3. 不可复制: std::lock_guard 是不可复制的,因为复制会导致多个对象管理同一个锁,从而引发未定义行为。

假设有一个共享的计数器 counter,多个线程会同时对其进行递增操作。如果不使用互斥锁,可能会导致数据竞争,最终结果不正确。通过使用 std::mutex,可以确保每次只有一个线程能够修改 counter。

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

// 共享资源
int counter = 0;

// 互斥锁
std::mutex mtx;

// 线程函数:递增计数器
void incrementCounter(int id) {
    for (int i = 0; i < 1000; ++i) {
        std::lock_guard<std::mutex> lock(mtx);  // 加锁
        ++counter;  // 修改共享资源
        std::cout << "Thread " << id << " incremented counter to " << counter << std::endl;
    }
}

int main() {
    const int numThreads = 10;  // 线程数量
    std::vector<std::thread> threads;
    // 创建多个线程
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back(incrementCounter, i);
    }
    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }
    // 输出最终结果
    std::cout << "Final counter value: " << counter << std::endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  1. 共享资源:counter 是一个全局变量,多个线程会同时对其进行修改。
  2. 互斥锁:std::mutex mtx 是一个互斥锁,用于保护对 counter 的访问。
  3. 线程函数:incrementCounter 是每个线程执行的函数。使用 std::lock_guard<std::mutex> lock(mtx) 对互斥锁加锁,确保每次只有一个线程能够修改 counter。std::lock_guard 是一个 RAII 风格的锁管理类,在构造时加锁,在析构时自动解锁。
  4. 主函数: 创建多个线程,并将它们存储在 std::vector<std::thread> 中。使用 join() 等待所有线程完成。输出最终的 counter 值。

由于多个线程并发执行,输出的顺序可能不同,但最终的 counter 值一定是正确的(1000 * numThreads)。

Thread 0 incremented counter to 1
Thread 1 incremented counter to 2
Thread 2 incremented counter to 3
...
Thread 9 incremented counter to 10000
Final counter value: 10000
1
2
3
4
5
6

5.lock_guard局限性

  1. 作用域限制:std::lock_guard 的生命周期受限于作用域,无法手动控制锁的释放。
  2. 不可移动:std::lock_guard 是不可移动的,无法转移锁的所有权。

# 15.3.4 unique_lock

std::unique_lock 比 std::lock_guard 更灵活,支持手动加锁、解锁、延迟加锁等操作。

std::unique_lock 的主要作用是

  1. 自动加锁和解锁:在构造时可以选择自动加锁,在析构时自动解锁。
  2. 手动控制锁:支持手动加锁(lock())和解锁(unlock())。
  3. 延迟加锁:支持延迟加锁(defer_lock),在构造时不立即加锁。
  4. 锁的所有权转移:支持移动语义,可以将锁的所有权从一个 std::unique_lock 对象转移到另一个。
  5. 条件变量支持:与 std::condition_variable 配合使用,支持条件变量的等待操作。

std::unique_lock 的设计思想

  1. RAII(Resource Acquisition Is Initialization): 将资源的生命周期与对象的生命周期绑定,确保资源在对象构造时获取,在对象析构时释放。
  2. 灵活性: 提供多种加锁策略(如立即加锁、延迟加锁、尝试加锁等),适应不同的使用场景。
  3. 所有权管理: 通过移动语义支持锁的所有权转移,允许锁的管理权在对象之间传递。

std::unique_lock 的实现原理如下:

  1. 构造函数:支持多种构造函数,可以选择立即加锁、延迟加锁或尝试加锁。例如:
    std::unique_lock<std::mutex> lock(mtx); // 立即加锁
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟加锁
    
    1
    2
  2. 析构函数:在析构时,如果锁被持有,则自动调用 unlock() 释放锁。确保锁一定会被释放,即使发生异常。
  3. 手动控制:提供 lock() 和 unlock() 方法,允许手动控制锁的状态。例如:
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
    lock.lock(); // 手动加锁
    lock.unlock(); // 手动解锁
    
    1
    2
    3
  4. 移动语义:支持移动构造函数和移动赋值运算符,允许锁的所有权转移。例如:
    std::unique_lock<std::mutex> lock1(mtx);
    std::unique_lock<std::mutex> lock2 = std::move(lock1); // 所有权转移
    
    1
    2
  5. 条件变量支持:可以与 std::condition_variable 配合使用,支持等待操作。例如:
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return ready; }); // 等待条件变量
    
    1
    2
// 线程函数:递增计数器。跟上面demo一样,区别在这里
void incrementCounter(int id) {
    for (int i = 0; i < 1000; ++i) {
        std::unique_lock<std::mutex> lock(mtx);  // 加锁
        ++counter;  // 修改共享资源
        std::cout << "Thread " << id << " incremented counter to " << counter << std::endl;
        // lock 会在析构时自动解锁
        // lock.unlock();  // 也可以手动解锁
    }
}
1
2
3
4
5
6
7
8
9
10

线程函数:incrementCounter 是每个线程执行的函数。 使用 std::unique_lock<std::mutex> lock(mtx) 对互斥锁加锁,确保每次只有一个线程能够修改 counter。std::unique_lock 是一个 RAII 风格的锁管理类,在构造时加锁,在析构时自动解锁。

由于多个线程并发执行,输出的顺序可能不同,但最终的 counter 值一定是正确的(1000 * numThreads)。

Thread 0 incremented counter to 1
Thread 1 incremented counter to 2
Thread 2 incremented counter to 3
...
Thread 9 incremented counter to 10000
Final counter value: 10000
1
2
3
4
5
6

1. 手动加锁和解锁。std::unique_lock 支持手动加锁和解锁。

void incrementCounter(int id) {
    for (int i = 0; i < 1000; ++i) {
        std::unique_lock<std::mutex> lock(mtx, std::defer_lock);  // 延迟加锁
        lock.lock();  // 手动加锁
        ++counter;
        std::cout << "Thread " << id << " incremented counter to " << counter << std::endl;
        lock.unlock();  // 手动解锁
    }
}
1
2
3
4
5
6
7
8
9

2. 尝试加锁。使用 try_lock 尝试加锁,如果锁不可用则立即返回。

void incrementCounter(int id) {
    for (int i = 0; i < 1000; ++i) {
        std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
        if (lock.try_lock()) {  // 尝试加锁
            ++counter;
            std::cout << "Thread " << id << " incremented counter to " << counter << std::endl;
        } else {
            std::cout << "Thread " << id << " failed to acquire lock" << std::endl;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11

3. 转移锁的所有权。std::unique_lock 支持移动语义,可以将锁的所有权从一个对象转移到另一个对象。

void transferLock(std::unique_lock<std::mutex> lock) {
    // 锁的所有权已转移到当前函数
    std::cout << "Lock is owned by this function now" << std::endl;
}

void incrementCounter(int id) {
    std::unique_lock<std::mutex> lock(mtx);
    transferLock(std::move(lock));  // 转移锁的所有权
    // 此时 lock 不再拥有锁
}
1
2
3
4
5
6
7
8
9
10

std::unique_lock 与 std::lock_guard 的区别

特性 std::lock_guard std::unique_lock
加锁策略 立即加锁 支持立即加锁、延迟加锁、尝试加锁
手动控制 不支持 支持手动加锁和解锁
所有权转移 不支持 支持移动语义,允许所有权转移
条件变量支持 不支持 支持与 std::condition_variable 配合
性能 更轻量,性能更高 更灵活,但性能稍低

# 15.3.5 adopt_lock

std::lock 和 std::adopt_lock。用于同时锁定多个互斥锁,避免死锁。

std::mutex mtx1, mtx2;

void threadFunction() {
    std::lock(mtx1, mtx2);  // 同时锁定两个互斥锁
    std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);  // 接管 mtx1
    std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);  // 接管 mtx2
    // 访问共享资源
}
1
2
3
4
5
6
7
8

# 15.1.6 recursive_mutex

std::recursive_mutex 是 C++ 标准库中的一种互斥锁类型,允许同一线程多次加锁而不会导致死锁。它适用于需要递归加锁的场景,例如在递归函数或嵌套调用中。

1.std::recursive_mutex 的主要作用是:

  1. 递归加锁:允许同一线程多次加锁,而不会导致死锁。
  2. 嵌套锁管理:在递归函数或嵌套调用中,确保锁的正确获取和释放。
  3. 线程安全:确保多线程环境下共享资源的互斥访问。

2.std::recursive_mutex 的设计基于以下思想:

  1. 递归锁机制: 允许同一线程多次加锁,每次加锁都会增加锁的计数。只有当锁的计数减到 0 时,锁才会被释放。
  2. 避免死锁: 在递归调用或嵌套加锁的场景中,避免因同一线程重复加锁而导致的死锁。
  3. 线程安全: 确保多线程环境下,同一时间只有一个线程可以持有锁。

3.std::recursive_mutex 的实现原理如下:

  1. 锁计数:
  • 每个 std::recursive_mutex 内部维护一个锁计数和一个线程 ID。
  • 当线程第一次加锁时,锁计数加 1,并记录当前线程 ID。
  • 当同一线程再次加锁时,锁计数加 1,而不会阻塞。
  • 当线程解锁时,锁计数减 1;只有当锁计数减到 0 时,锁才会被释放。
  1. 线程检查: 如果其他线程尝试加锁,而锁已被当前线程持有,则其他线程会阻塞,直到锁被释放。
  2. 与 std::mutex 的区别:
  • std::mutex 不允许同一线程多次加锁,否则会导致未定义行为或死锁。
  • std::recursive_mutex 允许同一线程多次加锁,适用于递归或嵌套加锁的场景。

以下是一个简单的 std::recursive_mutex 使用示例:

#include <iostream>
#include <thread>
#include <mutex>

std::recursive_mutex mtx; // 递归互斥锁

void recursive_function(int n) {
    std::lock_guard<std::recursive_mutex> lock(mtx); // 加锁
    if (n > 0) {
        std::cout << "Thread " << std::this_thread::get_id() << ": n = " << n << std::endl;
        recursive_function(n - 1); // 递归调用
    }
    // 离开作用域时自动解锁
}

int main() {
    std::thread t1(recursive_function, 3);
    std::thread t2(recursive_function, 2);

    t1.join();
    t2.join();

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

输出示例:

Thread 140735680944896: n = 3
Thread 140735680944896: n = 2
Thread 140735680944896: n = 1
Thread 140735672552192: n = 2
Thread 140735672552192: n = 1
1
2
3
4
5

# 15.3.7 mutex总结

  • std::mutex 是 C++ 中用于线程同步的基本工具,可以避免数据竞争。
  • lock() 和 unlock() 是 std::mutex 的基本方法,用于手动控制锁。
  • std::lock_guard 和 std::unique_lock 是 RAII 风格的锁管理类,推荐使用。

# 15.3.8 综合案例与思考

下面通过一个"线程安全的日志系统"案例,综合演示 mutex、lock_guard、unique_lock、recursive_mutex 和 adopt_lock 的使用:

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <string>
#include <chrono>
#include <sstream>
using namespace std;

// 线程安全的日志系统
class Logger {
    mutex mtx_;                    // 普通互斥锁
    recursive_mutex rmtx_;         // 递归互斥锁
    vector<string> logs_;

    // 获取时间戳字符串
    string timestamp() {
        auto now = chrono::system_clock::now();
        auto ms = chrono::duration_cast<chrono::milliseconds>(
            now.time_since_epoch()).count() % 10000;
        return "[" + to_string(ms) + "ms]";
    }

public:
    // 方式1:lock_guard(最简单,适合简单加锁场景)
    void log(const string& level, const string& msg) {
        lock_guard<mutex> lock(mtx_);
        string entry = timestamp() + " " + level + ": " + msg;
        logs_.push_back(entry);
        cout << entry << endl;
    }

    // 方式2:unique_lock + 延迟加锁(适合需要灵活控制锁的场景)
    void logWithCheck(const string& msg) {
        unique_lock<mutex> lock(mtx_, defer_lock);
        // 做一些不需要锁的预处理
        string processed = ">> " + msg;
        // 需要写入时才加锁
        lock.lock();
        logs_.push_back(processed);
        cout << processed << endl;
        lock.unlock();
        // 解锁后可以做不需要锁的后处理
    }

    // 方式3:recursive_mutex(支持递归调用)
    void logRecursive(const string& msg, int depth = 0) {
        lock_guard<recursive_mutex> lock(rmtx_);
        string indent(depth * 2, ' ');
        cout << indent << "递归日志[" << depth << "]: " << msg << endl;
        if (depth < 2) {
            logRecursive(msg + " (子)", depth + 1);  // 递归调用不会死锁
        }
    }

    // 方式4:std::lock + adopt_lock(同时锁定多个互斥锁)
    void mergeFrom(Logger& other) {
        // 同时锁定两个 mutex,避免死锁
        lock(mtx_, other.mtx_);
        lock_guard<mutex> lock1(mtx_, adopt_lock);
        lock_guard<mutex> lock2(other.mtx_, adopt_lock);

        cout << "合并 " << other.logs_.size() << " 条日志" << endl;
        logs_.insert(logs_.end(), other.logs_.begin(), other.logs_.end());
        other.logs_.clear();
    }

    size_t size() {
        lock_guard<mutex> lock(mtx_);
        return logs_.size();
    }
};

int main() {
    Logger mainLogger;

    // 测试 1:多线程使用 lock_guard 写日志
    cout << "=== lock_guard 多线程写日志 ===" << endl;
    vector<thread> threads;
    for (int i = 0; i < 3; ++i) {
        threads.emplace_back([&mainLogger, i]{
            for (int j = 0; j < 3; ++j) {
                mainLogger.log("INFO", "线程" + to_string(i) + " 消息" + to_string(j));
            }
        });
    }
    for (auto& t : threads) t.join();

    // 测试 2:unique_lock 延迟加锁
    cout << "\n=== unique_lock 延迟加锁 ===" << endl;
    mainLogger.logWithCheck("延迟加锁测试消息");

    // 测试 3:recursive_mutex 递归加锁
    cout << "\n=== recursive_mutex 递归加锁 ===" << endl;
    mainLogger.logRecursive("顶层消息");

    // 测试 4:adopt_lock 合并日志
    cout << "\n=== adopt_lock 合并日志 ===" << endl;
    Logger tempLogger;
    tempLogger.log("WARN", "临时日志1");
    tempLogger.log("ERROR", "临时日志2");
    mainLogger.mergeFrom(tempLogger);

    cout << "\n主日志总数: " << mainLogger.size() << endl;
    cout << "临时日志总数: " << tempLogger.size() << endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

案例知识融合:本案例将 mutex 章节的所有核心工具整合到一个实际的日志系统中。lock_guard 用于最简单的 RAII 自动加锁/解锁场景;unique_lock 配合 defer_lock 实现延迟加锁,只在真正需要时才持有锁以减少锁竞争;recursive_mutex 允许同一线程递归加锁而不死锁;std::lock + adopt_lock 组合在 mergeFrom 中同时锁定两个互斥锁,避免了因加锁顺序不一致导致的死锁问题。

思考题:

  1. logWithCheck 中使用 unique_lock + defer_lock 比直接使用 lock_guard 有什么优势?什么场景下这种延迟加锁策略能显著提升性能?
  2. 如果把 logRecursive 中的 recursive_mutex 换成普通 mutex,会发生什么?为什么?
  3. mergeFrom 中如果不用 std::lock 同时锁定两个 mutex,而是先 lock(mtx_) 再 lock(other.mtx_),在什么情况下会导致死锁?

# 15.4 condition_variable

# 15.4.1 条件变量

std::condition_variable 是 C++ 中用于线程间同步的工具,通常与 std::mutex 结合使用。用于实现线程间的等待和通知机制。

它允许线程在某个条件不满足时进入等待状态,直到其他线程通知它条件已满足。

# 15.4.2 核心方法

与 std::mutex 配合使用。使用 wait() 等待条件,notify_one() 或 notify_all() 通知等待的线程。

核心方法

wait(lock);                       // 等待条件
wait(lock, predicate);            // 带条件谓词的等待
notify_one();                     // 通知一个等待线程
notify_all();                     // 通知所有等待线程
1
2
3
4

思考一下,为什么条件变量需要锁?

  1. 检查条件(如 condition)和进入等待(wait)必须是原子操作,否则在检查条件之后和进入等待之前,另一个线程可能修改条件并发出通知,导致通知丢失。
  2. 在修改条件时,也需要加锁以保证条件状态的正确性。

# 15.4.3 案例展示

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void waitForReady() {
    std::unique_lock<std::mutex> lock(mtx);
    // 第一种用wait,等待条件成立
    // cv.wait(lock, [] { return ready; }); 
    // 第二种用wait_for,等待 2 秒或被提前唤醒
    if (cv.wait_for(lock, std::chrono::seconds(2), []() -> bool { return ready; })) {
        std::cout << "条件达成 - 提前执行任务" << std::endl;
    } else {
        std::cout << "等待超时 - 强制唤醒执行" << std::endl;
    }
    std::cout << "Ready!" << std::endl;
}

void setReady() {
    // 模拟任务准备(可以是复杂计算或IO操作)
    std::this_thread::sleep_for(std::chrono::seconds(1));
    {
        std::lock_guard<std::mutex> lock(mtx);
        //如果 ready提前变为 true(即任务准备完成),则立即唤醒
        ready = true;
    }
    cv.notify_one(); // 通知等待的线程
}

int main() {
    std::thread t1(waitForReady);
    std::thread t2(setReady);
    t1.join();
    t2.join();
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
  1. 线程启动:线程 t1 开始执行 waitForReady,获取锁并进入 cv.wait(),等待条件 ready 为 true。线程 t2 开始执行 setReady,模拟一个耗时操作。
  2. 条件设置:线程 t2 在 1 秒后获取锁,将 ready 设置为 true,并调用 cv.notify_one() 通知等待的线程。
  3. 条件满足:线程 t1 收到通知,检查条件 ready 为 true,退出等待状态,输出 "Ready!"。
  4. 线程结束:两个线程完成执行,主线程继续运行并结束程序。

关键点分析

  1. cv.wait(lock, predicate): cv.wait() 会释放锁,允许其他线程修改共享变量。当线程被唤醒时,会重新获取锁,并检查 predicate(即 ready 是否为 true)。如果 predicate 为 true,线程继续执行;否则,线程继续等待。
  2. cv.notify_one():通知一个等待的线程(如果有)。在这个例子中,只有一个等待的线程 t1。
  3. 锁的使用:std::unique_lock<std::mutex> 用于 cv.wait(),因为它需要在等待期间释放锁。std::lock_guard<std::mutex> 用于 setReady,因为它只需要在作用域内加锁。
  4. 避免虚假唤醒:使用 cv.wait(lock, predicate) 而不是 cv.wait(lock),可以避免虚假唤醒问题。

# 15.4.4 条件变量总结

# 15.4.5 综合案例与思考

下面通过一个"生产者-消费者队列"案例,综合演示条件变量的等待、通知、谓词以及超时等待:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>
#include <chrono>
using namespace std;

// 线程安全的消息队列
template<typename T>
class MessageQueue {
    queue<T> queue_;
    mutex mtx_;
    condition_variable cv_notEmpty_;    // 队列非空时通知消费者
    condition_variable cv_notFull_;     // 队列不满时通知生产者
    size_t maxSize_;
    bool shutdown_ = false;

public:
    MessageQueue(size_t maxSize = 5) : maxSize_(maxSize) {}

    // 生产者:放入消息(队列满时等待)
    bool push(const T& item) {
        unique_lock<mutex> lock(mtx_);
        // 带谓词的 wait:防止虚假唤醒
        cv_notFull_.wait(lock, [this]{
            return queue_.size() < maxSize_ || shutdown_;
        });
        if (shutdown_) return false;
        queue_.push(item);
        cout << "[生产] " << item << " (队列大小: " << queue_.size() << ")" << endl;
        cv_notEmpty_.notify_one();  // 通知一个消费者
        return true;
    }

    // 消费者:取出消息(队列空时等待,支持超时)
    bool pop(T& item, int timeoutMs = 2000) {
        unique_lock<mutex> lock(mtx_);
        // 带超时的等待
        bool hasData = cv_notEmpty_.wait_for(lock,
            chrono::milliseconds(timeoutMs),
            [this]{ return !queue_.empty() || shutdown_; });

        if (!hasData || (queue_.empty() && shutdown_)) return false;
        item = queue_.front();
        queue_.pop();
        cout << "[消费] " << item << " (剩余: " << queue_.size() << ")" << endl;
        cv_notFull_.notify_one();  // 通知一个生产者
        return true;
    }

    void close() {
        lock_guard<mutex> lock(mtx_);
        shutdown_ = true;
        cv_notEmpty_.notify_all();  // 唤醒所有消费者
        cv_notFull_.notify_all();   // 唤醒所有生产者
    }
};

int main() {
    MessageQueue<string> mq(3);  // 最大容量 3

    // 生产者线程
    thread producer([&mq]{
        for (int i = 1; i <= 6; ++i) {
            mq.push("消息" + to_string(i));
            this_thread::sleep_for(chrono::milliseconds(100));
        }
        mq.close();
    });

    // 消费者线程(处理较慢)
    thread consumer1([&mq]{
        string msg;
        while (mq.pop(msg)) {
            this_thread::sleep_for(chrono::milliseconds(300));  // 模拟处理耗时
        }
        cout << "消费者1 退出" << endl;
    });

    thread consumer2([&mq]{
        string msg;
        while (mq.pop(msg)) {
            this_thread::sleep_for(chrono::milliseconds(250));
        }
        cout << "消费者2 退出" << endl;
    });

    producer.join();
    consumer1.join();
    consumer2.join();
    cout << "所有线程结束" << endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

案例知识融合:本案例完整实现了一个有界阻塞队列,综合运用了条件变量的所有核心知识。cv.wait(lock, predicate) 带谓词等待防止虚假唤醒;cv.wait_for 实现超时等待避免消费者永久阻塞;notify_one 唤醒单个等待线程,notify_all 在关闭时唤醒所有线程。双条件变量(cv_notEmpty_ 和 cv_notFull_)分别控制生产者和消费者的同步,队列满时生产者等待、队列空时消费者等待,体现了条件变量需要配合 mutex 的原因。

思考题:

  1. 为什么 wait 必须使用 unique_lock 而不能使用 lock_guard?(提示:wait 内部需要临时释放锁)
  2. 如果 push 中不使用谓词(cv.wait(lock) 而非 cv.wait(lock, pred)),在什么情况下会出现问题?
  3. close() 中为什么要用 notify_all 而不是 notify_one?如果改成 notify_one 会有什么后果?

# 15.5 atomic

# 15.5.1 基本概念

std::atomic 是 C++11 引入的模板类,用于实现原子操作。原子操作是指在多线程环境中,某个操作要么完全执行,要么完全不执行,不会被其他线程中断。

原子操作:原子操作是不可分割的操作,不会被线程调度机制打断。

std::atomic 的作用:提供对基本数据类型(如 int、bool、指针 等)的原子操作。支持原子加载(load)、存储(store)、交换(exchange)、比较交换(compare_exchange)等操作。

# 15.5.2 常用操作

std::atomic 支持以下常见操作:

  • 加载(Load):load() 原子地加载变量的值。
  • 存储(Store):store() 原子地存储值到变量。
  • 交换(Exchange):exchange() 原子地将变量的值替换为新值,并返回旧值。
  • 比较并交换(Compare and Swap, CAS):compare_exchange_weak() 和 compare_exchange_strong() 比较变量的值是否与期望值相等。如果相等,则将变量的值替换为新值;否则,更新期望值为变量的当前值。
  • 原子加减:fetch_add()、fetch_sub()
  • 原子逻辑运算:fetch_and()、fetch_or()、fetch_xor()

std::atomic 通过 std::memory_order 指定内存顺序,以控制原子操作的内存可见性和顺序一致性。常见的内存顺序包括:

  • memory_order_relaxed:最宽松的顺序,只保证原子性。
  • memory_order_acquire:确保当前操作之前的读操作不会被重排序到当前操作之后。
  • memory_order_release:确保当前操作之后的写操作不会被重排序到当前操作之前。
  • memory_order_acq_rel:结合了 acquire 和 release。
  • memory_order_seq_cst:最严格的顺序,保证全局顺序一致性。

# 15.5.3 使用示例

以下是一个使用 std::atomic 的简单示例,展示如何实现线程安全的计数器:

#include <iostream>
#include <atomic>
#include <thread>
#include <vector>

std::atomic<int> counter(0); // 原子计数器

void increment(int n) {
    for (int i = 0; i < n; ++i) {
        counter.fetch_add(1, std::memory_order_relaxed); // 原子加 1
    }
}

int main() {
    const int num_threads = 10;
    const int num_increments = 10000;
    std::vector<std::thread> threads;
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(increment, num_increments);
    }
    for (auto& t : threads) {
        t.join();
    }
    std::cout << "Final counter value: " << counter.load() << std::endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 15.5.4 核心原理

# 15.5.5 注意事项

(1)性能:std::atomic 的性能通常比互斥锁高,但在高竞争环境下仍可能成为性能瓶颈。

(2)内存顺序:默认情况下,std::atomic 使用 std::memory_order_seq_cst(顺序一致性),性能较低。

(3)适用范围:std::atomic 仅适用于基本数据类型和指针。对于复杂对象,需要使用互斥锁或其他同步机制。

# 15.5.6 综合案例与思考

下面通过一个"无锁任务分配器"案例,综合演示 atomic 的 load/store/exchange/CAS 操作以及内存顺序:

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <chrono>
using namespace std;

// 无锁任务分配器:使用 CAS 实现无锁的任务 ID 分配
class TaskAllocator {
    atomic<int> nextId_{1};          // 下一个可用 ID
    atomic<int> completedCount_{0};  // 已完成任务数
    atomic<bool> shutdown_{false};   // 停止标志

public:
    // 分配任务 ID(CAS 操作保证唯一性)
    int allocate() {
        int current = nextId_.load(memory_order_relaxed);
        while (!nextId_.compare_exchange_weak(current, current + 1,
                memory_order_release, memory_order_relaxed)) {
            // CAS 失败则重试,current 自动更新为最新值
        }
        return current;
    }

    // 简化版:使用 fetch_add(底层也是原子操作)
    int allocateSimple() {
        return nextId_.fetch_add(1, memory_order_relaxed);
    }

    // 标记完成
    void markComplete() {
        completedCount_.fetch_add(1, memory_order_release);
    }

    // 读取完成数
    int getCompletedCount() const {
        return completedCount_.load(memory_order_acquire);
    }

    // 停止信号
    void stop() { shutdown_.store(true, memory_order_release); }
    bool isStopped() const { return shutdown_.load(memory_order_acquire); }
};

// 自旋锁示例(基于 atomic_flag)
class SpinLock {
    atomic_flag flag_ = ATOMIC_FLAG_INIT;
public:
    void lock() {
        while (flag_.test_and_set(memory_order_acquire)) {
            // 自旋等待
        }
    }
    void unlock() {
        flag_.clear(memory_order_release);
    }
};

int main() {
    TaskAllocator allocator;

    // 测试 1:多线程无锁分配任务 ID
    cout << "=== 无锁任务分配 ===" << endl;
    vector<thread> workers;
    vector<vector<int>> threadIds(4);

    for (int i = 0; i < 4; ++i) {
        workers.emplace_back([&allocator, &threadIds, i]{
            for (int j = 0; j < 5; ++j) {
                int id = allocator.allocateSimple();
                threadIds[i].push_back(id);
                allocator.markComplete();
            }
        });
    }
    for (auto& t : workers) t.join();

    // 验证:所有 ID 唯一
    cout << "分配的 ID:" << endl;
    for (int i = 0; i < 4; ++i) {
        cout << "  线程" << i << ": ";
        for (int id : threadIds[i]) cout << id << " ";
        cout << endl;
    }
    cout << "已完成任务数: " << allocator.getCompletedCount() << endl;

    // 测试 2:自旋锁保护共享资源
    cout << "\n=== 自旋锁 ===" << endl;
    SpinLock spinlock;
    int sharedValue = 0;
    vector<thread> spinWorkers;

    for (int i = 0; i < 4; ++i) {
        spinWorkers.emplace_back([&]{
            for (int j = 0; j < 10000; ++j) {
                spinlock.lock();
                ++sharedValue;
                spinlock.unlock();
            }
        });
    }
    for (auto& t : spinWorkers) t.join();
    cout << "自旋锁保护结果: " << sharedValue << " (期望: 40000)" << endl;

    // 测试 3:exchange 用于状态切换
    cout << "\n=== atomic exchange ===" << endl;
    atomic<int> state(0);
    int old = state.exchange(1);
    cout << "旧状态: " << old << ", 新状态: " << state.load() << endl;

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112

案例知识融合:本案例从多个维度展示了 atomic 的实际应用。fetch_add 和 CAS(compare_exchange_weak)实现了无锁的任务 ID 分配,保证多线程下的唯一性而无需互斥锁。atomic_flag 配合 test_and_set 和 clear 实现了自旋锁——最基础的锁原语。exchange 用于原子地状态切换。内存顺序方面,memory_order_relaxed 用于只需原子性不需顺序保证的场景,acquire/release 用于需要跨线程可见性保证的场景。

思考题:

  1. compare_exchange_weak 和 compare_exchange_strong 有什么区别?为什么循环中一般用 weak 版本?
  2. 自旋锁在什么场景下比 mutex 更高效?什么场景下反而更差?
  3. 为什么 completedCount_ 的 fetch_add 使用 memory_order_release,而 getCompletedCount 使用 memory_order_acquire?如果都用 relaxed 会有什么问题?

# 15.6 async和future

# 15.6.1 异步概念

在 C++ 中,std::async 和 std::future 是用于异步编程的重要工具。它们允许你以简单的方式启动异步任务,并在需要时获取任务的结果。

# 15.6.2 核心API

std::async 和 std::future 简介

  1. std::async: 用于启动一个异步任务(即在一个单独的线程中执行函数)。返回一个 std::future 对象,用于获取任务的结果。
  2. std::future: 表示一个异步操作的结果。可以通过 get() 方法获取结果。如果结果尚未准备好,get() 会阻塞当前线程,直到结果可用。
  3. 启动策略:std::launch::async:强制在新线程中执行任务。std::launch::deferred:延迟执行任务,直到调用 get() 或 wait()。默认情况下,std::async 的实现可以选择启动策略。

# 15.6.3 简单案例

以下是一个使用 std::async 和 std::future 的简单案例,计算两个数的和。

#include <iostream>
#include <future>  // 包含 std::async 和 std::future
#include <chrono>  // 用于模拟耗时操作

// 一个简单的函数,计算两个数的和
int add(int a, int b) {
    std::this_thread::sleep_for(std::chrono::seconds(2));  // 模拟耗时操作
    return a + b;
}

int main() {
    // 使用 std::async 启动异步任务
    std::future<int> result = std::async(std::launch::async, add, 10, 20);
    std::cout << "Calculating the sum..." << std::endl;
    // 获取异步任务的结果(如果结果未准备好,会阻塞当前线程)
    int sum = result.get();
    std::cout << "The sum is: " << sum << std::endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

代码分析

  1. 异步任务:std::async(std::launch::async, add, 10, 20) 启动一个异步任务,调用 add(10, 20)。 任务会在一个新线程中执行。
  2. std::future:std::future<int> result 用于存储异步任务的结果。result.get() 获取任务的结果。如果任务尚未完成,当前线程会阻塞,直到结果可用。
  3. 模拟耗时操作:std::this_thread::sleep_for(std::chrono::seconds(2)) 模拟一个耗时操作。
  4. 输出结果:程序首先输出 "Calculating the sum...",然后等待 2 秒,最后输出 "The sum is: 30"。

输出结果

Calculating the sum...
The sum is: 30
1
2

关键点

  1. 启动策略: 使用 std::launch::async 确保任务在新线程中执行。 使用 std::launch::deferred 可以延迟执行任务,直到调用 get() 或 wait()。
  2. get() 方法:get() 只能调用一次,调用后 std::future 对象变为无效。如果需要多次获取结果,可以使用 std::shared_future。
  3. 异常处理:如果异步任务抛出异常,get() 会重新抛出该异常。

# 15.6.4 多个异步任务

以下是一个扩展案例,展示如何使用 std::async 启动多个异步任务并获取它们的结果。

#include <iostream>
#include <future>
#include <vector>

// 计算一个数的平方
int square(int x) {
    return x * x;
}

int main() {
    std::vector<std::future<int>> futures;

    // 启动多个异步任务
    for (int i = 1; i <= 5; ++i) {
        futures.push_back(std::async(std::launch::async, square, i));
    }

    // 获取所有任务的结果
    for (auto& future : futures) {
        std::cout << "Result: " << future.get() << std::endl;
    }

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

输出结果

Result: 1
Result: 4
Result: 9
Result: 16
Result: 25
1
2
3
4
5

# 15.7 shared_mutex

# 15.7.1 读写锁机制

std::shared_mutex 是 C++17 引入的读写锁机制,允许多个线程同时读取共享资源,但只允许一个线程写入资源。

# 15.7.2 简单案例

以下是一个使用 std::shared_mutex 的案例,展示如何实现读写锁。

案例:读写共享数据:多个线程可以同时读取共享数据。只有一个线程可以写入共享数据,且写入时不允许其他线程读取或写入。

#include <iostream>
#include <thread>
#include <shared_mutex> // 包含 std::shared_mutex
#include <vector>

std::shared_mutex rw_mutex; // 读写锁
int shared_data = 0;        // 共享数据

// 读线程函数
void read_data(int id) {
    std::shared_lock<std::shared_mutex> lock(rw_mutex); // 共享锁(允许多个读线程)
    std::cout << "Reader " << id << " reads data: " << shared_data << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟读取操作
}

// 写线程函数
void write_data(int id) {
    std::unique_lock<std::shared_mutex> lock(rw_mutex); // 独占锁(只允许一个写线程)
    ++shared_data;
    std::cout << "Writer " << id << " writes data: " << shared_data << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟写入操作
}

int main() {
    std::vector<std::thread> readers;
    std::vector<std::thread> writers;

    // 创建 5 个读线程
    for (int i = 0; i < 5; ++i) {
        readers.emplace_back(read_data, i);
    }

    // 创建 2 个写线程
    for (int i = 0; i < 2; ++i) {
        writers.emplace_back(write_data, i);
    }

    // 等待所有读线程完成
    for (auto& reader : readers) {
        reader.join();
    }

    // 等待所有写线程完成
    for (auto& writer : writers) {
        writer.join();
    }

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
  1. std::shared_mutex:用于实现读写锁,支持共享锁(std::shared_lock)和独占锁(std::unique_lock)。
  2. 读线程:使用 std::shared_lock<std::shared_mutex> 获取共享锁,允许多个读线程同时访问共享数据。输出当前共享数据的值。
  3. 写线程:使用 std::unique_lock<std::shared_mutex> 获取独占锁,确保只有一个写线程可以修改共享数据。修改共享数据并输出新值。
  4. 线程创建与同步:创建 5 个读线程和 2 个写线程。使用 join() 等待所有线程完成。

输出结果可能因线程调度顺序而有所不同,但大致如下:

Reader 0 reads data: 0
Reader 1 reads data: 0
Reader 2 reads data: 0
Reader 3 reads data: 0
Reader 4 reads data: 0
Writer 0 writes data: 1
Writer 1 writes data: 2
1
2
3
4
5
6
7

# 15.7.3 应用场景

扩展:读写锁的应用场景

  1. 缓存系统:多个线程可以同时读取缓存数据,但只有一个线程可以更新缓存。
  2. 配置文件管理:多个线程可以同时读取配置文件,但只有一个线程可以修改配置。
  3. 数据库连接池:多个线程可以同时获取数据库连接,但只有一个线程可以修改连接池状态。

# 15.7.4 实现原理

# 15.7.5 综合案例与思考

下面通过一个"线程安全的配置中心"案例,综合演示 shared_mutex 实现读写分离的高性能并发访问:

#include <iostream>
#include <thread>
#include <shared_mutex>
#include <map>
#include <string>
#include <vector>
#include <chrono>
using namespace std;

// 线程安全的配置中心:读多写少场景
class ConfigCenter {
    map<string, string> configs_;
    mutable shared_mutex rwMutex_;   // 读写锁
    int readCount_ = 0;             // 统计读次数
    int writeCount_ = 0;            // 统计写次数

public:
    // 读操作:共享锁,多个线程可以同时读
    string get(const string& key) const {
        shared_lock<shared_mutex> lock(rwMutex_);
        auto it = configs_.find(key);
        return (it != configs_.end()) ? it->second : "";
    }

    // 写操作:独占锁,同一时间只能有一个写
    void set(const string& key, const string& value) {
        unique_lock<shared_mutex> lock(rwMutex_);
        configs_[key] = value;
    }

    // 批量读取(共享锁)
    map<string, string> getAll() const {
        shared_lock<shared_mutex> lock(rwMutex_);
        return configs_;  // 返回副本
    }

    // 批量更新(独占锁)
    void update(const map<string, string>& newConfigs) {
        unique_lock<shared_mutex> lock(rwMutex_);
        for (const auto& [key, value] : newConfigs) {
            configs_[key] = value;
        }
    }

    size_t size() const {
        shared_lock<shared_mutex> lock(rwMutex_);
        return configs_.size();
    }
};

int main() {
    ConfigCenter config;
    // 初始化配置
    config.set("host", "localhost");
    config.set("port", "8080");
    config.set("timeout", "30");

    atomic<int> totalReads(0), totalWrites(0);

    auto start = chrono::high_resolution_clock::now();

    // 创建多个读线程(高频读取)
    vector<thread> readers;
    for (int i = 0; i < 5; ++i) {
        readers.emplace_back([&config, &totalReads, i]{
            for (int j = 0; j < 1000; ++j) {
                string val = config.get("port");
                totalReads.fetch_add(1);
            }
            cout << "读线程" << i << " 完成" << endl;
        });
    }

    // 创建少量写线程(低频更新)
    vector<thread> writers;
    for (int i = 0; i < 2; ++i) {
        writers.emplace_back([&config, &totalWrites, i]{
            for (int j = 0; j < 100; ++j) {
                config.set("counter_" + to_string(i),
                           to_string(j));
                totalWrites.fetch_add(1);
                this_thread::sleep_for(chrono::microseconds(100));
            }
            cout << "写线程" << i << " 完成" << endl;
        });
    }

    for (auto& t : readers) t.join();
    for (auto& t : writers) t.join();

    auto end = chrono::high_resolution_clock::now();
    auto ms = chrono::duration_cast<chrono::milliseconds>(end - start).count();

    cout << "\n总读次数: " << totalReads.load()
         << ", 总写次数: " << totalWrites.load()
         << ", 配置项数: " << config.size()
         << ", 耗时: " << ms << "ms" << endl;

    // 打印最终配置
    auto all = config.getAll();
    cout << "最终配置:" << endl;
    for (const auto& [k, v] : all) {
        cout << "  " << k << " = " << v << endl;
    }
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

案例知识融合:本案例实现了一个典型的"读多写少"配置中心,完美展示了 shared_mutex 的核心价值。shared_lock 允许 5 个读线程同时持有锁并发读取,而 unique_lock 确保写操作的排他性。相比普通 mutex(每次只允许一个线程访问),shared_mutex 在读多写少场景下能显著提升吞吐量。mutable 修饰 rwMutex_ 使得 const 方法中也能加锁,保证了接口设计的语义正确性。

思考题:

  1. 如果把所有 shared_lock 都换成 unique_lock(即所有操作都用独占锁),性能会有什么变化?在读写比为 50:1 的场景下差距有多大?
  2. 写操作执行时,正在读的线程会怎样?新的读请求会怎样?这体现了 shared_mutex 的什么特性?
  3. shared_mutex 是否存在写饥饿问题(即大量读操作导致写操作长时间无法获取锁)?如何解决?

# 15.9 综合案例

# 15.9.1 死锁与避免

死锁是指多个线程互相等待对方释放锁,导致程序无法继续执行。

死锁示例

std::mutex mtx1, mtx2;

void thread1() {
    mtx1.lock();
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    mtx2.lock(); // 等待 mtx2
    mtx2.unlock();
    mtx1.unlock();
}

void thread2() {
    mtx2.lock();
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    mtx1.lock(); // 等待 mtx1
    mtx1.unlock();
    mtx2.unlock();
}

int main() {
    std::thread t1(thread1);
    std::thread t2(thread2);
    t1.join();
    t2.join();
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

避免死锁

按固定顺序加锁。使用 std::lock() 同时锁定多个互斥锁。示例:

void thread1() {
    std::lock(mtx1, mtx2); // 同时锁定
    std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
    // 操作共享资源
}

void thread2() {
    std::lock(mtx1, mtx2); // 同时锁定
    std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);
    // 操作共享资源
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 15.9.2 延迟任务

方法 特点
std::this_thread::sleep_for 简单易用,但会阻塞当前线程。
std::async 和 std::future 异步执行任务,不阻塞当前线程。
Boost.Asio 定时器 适合复杂定时任务,需要外部库支持。
std::thread 手动控制线程, C++ 中,延迟执行任务可以通过以下方式实现

第一种:sleep_for

std::this_thread::sleep_for 是 C++11 引入的标准库函数,用于让当前线程休眠指定的时间。

#include <chrono>  // 包含时间库
#include <thread>  // 包含线程库

// 延迟 300 毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(300));
1
2
3
4
5

第二种:std::async

如果需要延迟执行任务而不阻塞当前线程,可以使用 std::async 和 std::future。

#include <iostream>
#include <chrono>
#include <thread>
#include <future>  // 包含异步任务库

void delayedTask() {
    std::this_thread::sleep_for(std::chrono::milliseconds(300));
    std::cout << "Task executed after 300ms." << std::endl;
}

int main() {
    std::cout << "Task scheduled." << std::endl;
    // 使用 std::async 异步执行任务
    auto future = std::async(std::launch::async, delayedTask);
    // 主线程继续执行其他任务
    std::cout << "Main thread is doing other work..." << std::endl;
    // 等待异步任务完成
    future.wait();
    std::cout << "Main thread finished." << std::endl;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

执行结果如下所示:

Task scheduled.
Main thread is doing other work...
Task executed after 3000ms.
Main thread finished.
1
2
3
4

# 15.9.3 轮训定时器

# 15.9.4 提前唤醒休眠

std::this_thread::sleep_for(10*1000),时间还未到,如何停止sleep操作。

比如有一个需求:调起系统设置后,继续监听网络,连上后关闭;设置一个超时时间10秒,如超时内还未联网也关闭。

在 C++ 多线程编程中,std::this_thread::sleep_for是一个阻塞调用,它会暂停当前线程的执行直到指定的时间结束。但有时我们需要提前唤醒休眠的线程

为什么需要中断休眠?std::this_thread::sleep_for的阻塞特性可能导致以下问题:

  1. 程序无法响应外部事件(如用户取消操作)
  2. 资源无法及时释放
  3. 系统资源浪费(线程被无意义地挂起)
  4. 紧急任务无法及时执行

一、条件变量法(推荐):这是最标准、最安全的跨平台解决方案。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

class InterruptibleSleep {
public:
    void sleep_for(std::chrono::milliseconds duration) {
        std::unique_lock<std::mutex> lock(m_mutex);
        // 等待条件变量或超时
        m_cv.wait_for(lock, duration, [this] { 
            return m_interrupted; 
        });
    }

    void interrupt() {
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_interrupted = true;
        }
        m_cv.notify_all();
    }

    void reset() {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_interrupted = false;
    }

private:
    std::mutex m_mutex;
    std::condition_variable m_cv;
    bool m_interrupted = false;
};

// 使用示例
int main() {
    InterruptibleSleep sleeper;
    
    std::thread worker([&] {
        std::cout << "Worker: 开始休眠10秒\n";
        sleeper.sleep_for(std::chrono::seconds(10));
        std::cout << "Worker: 休眠结束\n";
    });
    
    // 主线程等待2秒后中断休眠
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Main: 中断worker的休眠\n";
    sleeper.interrupt();
    
    worker.join();
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 15.10 线程和锁底层原理

# 15.10.1 线程的内核实现

std::thread在Linux上底层调用pthread_create,而pthread最终调用clone()系统调用:

C++层:    std::thread t(func)
POSIX层:  pthread_create(&tid, attr, func, arg)
内核层:   clone(CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | ...)
1
2
3

clone()的关键参数CLONE_VM表示父子线程共享同一个虚拟地址空间。这就是为什么线程之间可以直接访问对方的变量——它们看到的是同一块内存。

线程 vs 进程:

  • 进程通过fork()创建,拥有独立的地址空间(写时复制)
  • 线程通过clone(CLONE_VM)创建,共享地址空间
  • 在Linux内核中,线程和进程都是"任务(task_struct)",调度器不区分二者

每个线程有独立的栈:

进程地址空间:
┌──────────────┐
│ 主线程栈(8MB)  │ ← 默认栈大小
├──────────────┤
│  保护页(4KB)   │ ← 栈溢出检测
├──────────────┤
│ 线程1栈(8MB)  │
├──────────────┤
│  保护页(4KB)   │
├──────────────┤
│ 线程2栈(8MB)  │
├──────────────┤
│   ...        │
│   堆(共享)    │ ← 所有线程共享
└──────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 15.10.2 mutex的底层机制

std::mutex在Linux上底层使用futex(Fast Userspace muTEX)系统调用:

mutex m;
m.lock();    // 底层过程:
1
2
  1. 快速路径(无竞争):用原子操作CAS(Compare-And-Swap)尝试将锁状态从0改为1。成功则立即返回,不涉及内核——零系统调用开销
  2. 慢速路径(有竞争):CAS失败说明锁被占用,调用futex(FUTEX_WAIT)进入内核,线程被挂起到等待队列
  3. 解锁时:将状态改为0,如果有等待的线程则调用futex(FUTEX_WAKE)唤醒一个
无竞争:lock() → CAS成功 → 返回(几纳秒)
有竞争:lock() → CAS失败 → futex(WAIT) → 内核挂起线程(微秒级)
1
2

自旋锁 vs 互斥锁:

  • 自旋锁:CAS失败后循环重试,不进入内核。适合临界区极短的场景(几十条指令)
  • 互斥锁:CAS失败后挂起线程。适合临界区较长的场景
  • 实际的std::mutex通常先自旋几次,再退化为内核等待(适应性自旋)

# 15.10.3 原子操作与内存序

原子操作的硬件实现:

atomic<int> counter = 0;
counter++;  // 底层:lock xadd [addr], 1
1
2

x86上原子递增使用LOCK前缀指令。LOCK会锁定总线(或使用缓存锁),保证对内存的读-修改-写在一个不可中断的操作中完成。

内存序(Memory Ordering):

现代CPU为了性能会乱序执行和延迟写入。内存序控制原子操作的可见性:

内存序 说明 开销
relaxed 只保证原子性,不保证顺序 最低
acquire 加载后的操作不会被重排到此操作之前 低
release 此操作之前的操作不会被重排到之后 低
seq_cst 全序一致,所有线程看到相同的操作顺序 最高(默认)
// 经典的acquire-release模式
atomic<bool> ready{false};
int data = 0;

// 线程1(生产者)
data = 42;                                    // 普通写
ready.store(true, memory_order_release);      // release写

// 线程2(消费者)
while (!ready.load(memory_order_acquire));    // acquire读
cout << data;  // 保证看到42
1
2
3
4
5
6
7
8
9
10
11

release保证它之前的写操作对acquire之后的读操作可见。这就是happens-before关系。

# 15.11 线程和锁训练题

训练题1:线程安全的消息队列

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <string>
using namespace std;

template<typename T>
class ThreadSafeQueue {
    queue<T> queue_;
    mutex mutex_;
    condition_variable cv_;

public:
    void push(const T& item) {
        {
            lock_guard<mutex> lock(mutex_);
            queue_.push(item);
        }
        cv_.notify_one();
    }

    T pop() {
        unique_lock<mutex> lock(mutex_);
        cv_.wait(lock, [this] { return !queue_.empty(); });
        T item = queue_.front();
        queue_.pop();
        return item;
    }

    bool empty() const {
        lock_guard<mutex> lock(mutex_);
        return queue_.empty();
    }
};

int main() {
    ThreadSafeQueue<string> mq;

    // 生产者
    thread producer([&mq] {
        for (int i = 0; i < 5; ++i) {
            string msg = "消息" + to_string(i);
            mq.push(msg);
            cout << "[生产] " << msg << endl;
            this_thread::sleep_for(chrono::milliseconds(100));
        }
        mq.push("EXIT");
    });

    // 消费者
    thread consumer([&mq] {
        while (true) {
            string msg = mq.pop();
            if (msg == "EXIT") break;
            cout << "[消费] " << msg << endl;
        }
    });

    producer.join();
    consumer.join();
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

练习重点:mutex+condition_variable实现生产者-消费者模式、lock_guard自动解锁、wait的谓词防止虚假唤醒。


训练题2:用atomic实现无锁计数器

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
using namespace std;

class AtomicCounter {
    atomic<long long> count_{0};
public:
    void increment() { count_.fetch_add(1, memory_order_relaxed); }
    long long get() const { return count_.load(memory_order_relaxed); }
};

int main() {
    AtomicCounter counter;
    const int numThreads = 8;
    const int incrementsPerThread = 1000000;

    vector<thread> threads;
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back([&counter, incrementsPerThread] {
            for (int j = 0; j < incrementsPerThread; ++j) {
                counter.increment();
            }
        });
    }
    for (auto& t : threads) t.join();

    cout << "期望值: " << (long long)numThreads * incrementsPerThread << endl;
    cout << "实际值: " << counter.get() << endl;
    cout << "匹配? " << (counter.get() == (long long)numThreads * incrementsPerThread ? "是" : "否") << endl;

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

练习重点:atomic无锁编程、memory_order_relaxed的使用场景、对比mutex方案的性能。

# 15.12 综合思考题

  1. 锁的粒度:粗粒度锁(一把大锁保护所有数据)简单但并发度低;细粒度锁(每个数据一把锁)并发度高但容易死锁。Java的ConcurrentHashMap使用分段锁,请思考如何在C++中实现类似的设计。

  2. 无锁数据结构:无锁编程使用CAS操作避免mutex的开销,但设计复杂且容易出现ABA问题。请解释什么是ABA问题?std::atomic的compare_exchange_weak和compare_exchange_strong有什么区别?为什么无锁队列比无锁栈更难实现?

  3. C++20协程:co_await/co_yield/co_return引入了用户态调度的轻量级线程。协程与系统线程的本质区别是什么?为什么协程的上下文切换开销远小于线程?在高并发IO场景中,协程相比线程池有什么优势?

  4. false sharing(伪共享):两个线程修改的变量如果在同一个缓存行(通常64字节)中,会导致缓存行频繁在CPU核心间传递。请思考如何通过alignas(64)对齐来避免伪共享?std::hardware_destructive_interference_size是什么?

上次更新: 2026/06/10, 11:13:41
异常处理
STL模版

← 异常处理 STL模版→

最近更新
01
信号崩溃快速排查
06-15
02
CoreDump破案
06-15
03
perf火焰图实战
06-15
更多文章>
Theme by Vdoing | Copyright © 2019-2026 杨充 | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式