异步编程future家族
# 45.异步编程future家族
# 目录介绍
- 1. 案例引入
- 2. 架构概览
- 3. promise:值的生产者
- 4. future:值的消费者
- 5. packaged_task:可调对象的包装器
- 6. std::async 的启动策略陷阱——最容易被误解的接口
- 7. shared_future:广播语义的消费者
- 8. 共享状态的内部原子机制——promise 如何把值送到 future
- 9. 常见陷阱与反模式
- 10. 综合案例串讲
# 1. 案例引入
# 1.1 async 用 deferred 策略——主线程偷偷被阻塞
某 RPC 网关用 std::async 并行查询三个下游服务,然后聚合结果。在测试环境一切正常——但上线后在低流量时段 P99 延迟异常:
// ====== 事故代码 V1:deferred 策略偷偷把异步变同步 ======
std::vector<std::future<int>> futures;
futures.push_back(std::async(query_service_A)); // ①
futures.push_back(std::async(query_service_B)); // ②
futures.push_back(std::async(query_service_C)); // ③
// 预期:三个任务并行执行——总耗时 ≈ max(Ta, Tb, Tc)
// 现实:第 ③ 个任务在 get 时才执行(deferred)——总耗时 ≈ Ta + Tb + Tc!
for (auto& f : futures) {
int result = f.get(); // ④ 如果任务是 deferred——get 时才执行!
// query_service_C 在主线程中运行——和 B 一起阻塞了主线程
}
2
3
4
5
6
7
8
9
10
11
12
13
14
根因色谱:
默认策略 = std::launch::async | std::launch::deferred
这给了编译器两个选择——可以异步、可以同步。
编译器凭什么选?线程资源的可用性、系统负载、黑魔法。
在低流量时段:
系统有空闲 CPU 核心——编译器选了 async 跑 A 和 B
但创建 C 的时候——线程池瞬时已满(两个线程占着)
→ 编译器选 deferred → C 在 get() 时才执行!
结果:
预期并行时间:max(Ta, Tb, Tc) ≈ 30ms
实际时间: Ta + Tb + Tc ≈ 90ms(3× 慢)
2
3
4
5
6
7
8
9
10
11
12
13
核心教训:std::async 的默认策略是不确定的。它在不同负载、不同编译器、不同系统上可能给出不同的行为——这是设计上的「灵活性」,也是工程上的「定时炸弹」。
# 1.2 promise 提前析构——broken_promise 的无声崩溃
同一个网关的监控模块用 promise/future 做异步健康检查:
// ====== 事故代码 V2:promise 析构 → broken_promise ======
std::future<bool> check_health_async() {
std::promise<bool> promise; // ① promise 在栈上
auto future = promise.get_future(); // ② 获取关联的 future
std::thread([p = std::move(promise)]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(1));
p.set_value(do_actual_check()); // ④ 在线程里设值
}).detach();
return future; // ③ future 返回给调用方
// ❌ 问题:promise 在 ① 创建、在 ② 被 move 到线程里
// 但如果 ② 之前发生异常——promise 在栈上析构
// → future 的 get 将抛出 std::future_error(broken_promise)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
更深层的版本——promise 在所有 future 之前析构:
void bad() {
std::promise<int> p;
auto f = p.get_future();
// 模拟复杂的异步调度——异常路径
try {
do_risky_setup(p); // ⚠️ 如果这里抛异常——p 在 catch 后析构
} catch (...) {
return; // p 析构——但 f 还在外面等待!
}
p.set_value(42);
} // f.get() 在等待一个已死的 promise → broken_promise 异常
2
3
4
5
6
7
8
9
10
11
12
13
# 1.3 八个待解疑问
① promise 怎么把值安全地跨线程传给 future?不需要 mutex 吗? → 第 3 / 第 8 章
② future::get 为什么只能调一次?调第二次会怎样? → 第 4 章
③ packaged_task 和直接 promise 有什么区别?什么时候该用哪个? → 第 5 章
④ std::async 的 deferred 策略什么时候会被触发?怎么防止被 defer? → 第 6 章
⑤ shared_future 和 future 的区别是什么?为什么需要显式 share? → 第 7 章
⑥ promise/future 之间的共享状态是什么样的?内部有几个原子变量? → 第 8 章
⑦ async 比裸 thread 快吗?什么时候该用 async、什么时候用 thread? → 第 6.4 / 第 10 章
⑧ 多个 future 怎么并行等待?有类似 select/poll 的机制吗? → 第 9.5 / 第 10 章
2
3
4
5
6
7
8
# 2. 架构概览
# 2.1 三件套的职责划分
┌──────────────────────────────────────────────────────────────────┐
│ future 三件套架构 │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ promise │ │ packaged_ │ │ std:: │ │
│ │ (纯值入口) │ │ task │ │ async │ │
│ │ │ │ (可调对象包装)│ │ (一键并行) │ │
│ │ set_value() │ │ operator()() │ │ async(f,args)│ │
│ │ set_excep() │ │ get_future() │ │ → future │ │
│ └──────┬──────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ shared state (堆上,原子同步) │ │
│ │ ┌────────────┬────────────┬────────────┬───────────┐ │ │
│ │ │ 状态标志 │ 值 / 异常 │ ready cv │ 引用计数 │ │ │
│ │ │ (atomic) │ (aligned) │ (futex) │ (atomic) │ │ │
│ │ └────────────┴────────────┴────────────┴───────────┘ │ │
│ └──────────────────────────┬───────────────────────────────┘ │
│ │ │
│ ┌────────────────────┴───────────────────────┐ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ future<T> │ │ shared_ │ │
│ │ (唯一消费者) │ │ future<T> │ │
│ │ │ │ (广播消费者) │ │
│ │ get() ← 移动 │ │ get() ← 拷贝 │ │
│ │ 只能调一次 │ │ 可以调多次 │ │
│ └──────────────┘ └──────────────┘ │
└──────────────────────────────────────────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 2.2 为何这么切
疑惑:为什么需要 promise/future/packaged_task/async 四个不同的概念?一个统一的「异步结果」不够吗?
论证——四种接口对应四种不同的「谁生产值」:
① promise = 「我愿意手动设值」——最灵活
你显式调 set_value / set_exception
适用于复杂的异步调度——值在任意时刻从任意线程产生
② packaged_task = 「我有一个现成的函数——帮我包装」——最简单
包装一个可调对象——内部自动创建 promise
调 operator() → 执行函数 → 结果自动进 shared state → 自动 set
③ std::async = 「帮我决定怎么跑」——最省事
你给函数——框架决定线程策略(async/deferred)
不保证在新线程中执行——但保证返回 future
④ future = 「我是值的消费者」——唯一入口
不管值从 promise/packaged_task/async 哪个来——future 统一接收
2
3
4
5
6
7
8
9
10
11
12
13
14
结论:四个概念不是冗余——是「生产者」的三条不同路径和「消费者」的唯一入口。这和 Unix 的设计哲学一致——做一件事并做好它。promise 只管设值、future 只管取值、packaged_task 只管包装、async 只管调度——互不干扰。
# 3. promise:值的生产者
# 3.1 set_value 与 shared state 的原子传递
std::promise<int> p;
auto f = p.get_future();
// 在另一个线程:
p.set_value(42); // ① 把值写入 shared state
// ② 原子设置"就绪"标志
// ③ 唤醒所有在 future::get() 上等待的线程(通过 condition_variable)
// 在主线程:
int result = f.get(); // ④ 等待就绪标志 → 拿值(移动语义)
2
3
4
5
6
7
8
9
10
set_value 不能重复调用——第二次调用的行为:
p.set_value(42);
p.set_value(43); // ❌ std::future_error(promise_already_satisfied)
// shared state 的状态机:empty → has_value → has_value(第二次非法)
2
3
关键理解——值存在哪里? shared state 在堆上。它是一个独立于 promise 和 future 的对象——由 shared_ptr 管理。这保证了即使 promise 先析构、future 在之后 get 时仍然可以拿到值。
# 3.2 set_exception 与异常的跨线程传播路径
void worker(std::promise<int> p) {
try {
int r = risky_computation();
p.set_value(r);
} catch (...) {
p.set_exception(std::current_exception()); // ① 捕获当前异常
}
}
// 调用方:
auto f = p.get_future();
try {
int r = f.get(); // ② 如果 worker 抛异常——这里重新抛出!
} catch (const std::exception& e) {
// ③ 原始异常——不是包装后的——完整类型信息保留
std::cerr << e.what() << '\n';
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
内部机制——std::exception_ptr 的类型擦除:
① worker 中:std::current_exception() → 捕获异常对象 → 类型擦除到 exception_ptr
② set_exception(ptr) → ptr 被存储到 shared state(引用计数管理)
③ future::get() → 检查 shared state 有值还是异常
→ 有异常 → std::rethrow_exception(ptr)
→ 原始异常类型被完好保留——不是 std::exception 的切片!
2
3
4
5
# 3.3 set_value_at_thread_exit——延迟到线程结束的通知
void worker(std::promise<int> p) {
thread_local LogBuffer buffer; // TLS 日志缓冲区
int result = compute();
// ❌ p.set_value(result); // 如果此时线程结束——buffer 先析构
// → 日志丢失
// ✅ p.set_value_at_thread_exit(result);
// 通知延迟到「所有 thread_local 对象析构之后」
// → buffer 已被 flush → 日志完整
}
2
3
4
5
6
7
8
9
10
11
12
实现原理:set_value_at_thread_exit 把值存进 shared state,但不立即通知 future。线程退出时、所有 thread_local 析构后——再做 ready 通知。这是 TLS 和 future 在两个生命周期边界上的协调。
# 3.4 promise 的生命周期约束——为什么必须比 future 活得久
疑惑:promise 析构时如果还没设值——为什么 future::get 会抛 broken_promise?
论证——shared state 的状态机:
shared state 的生命周期由引用计数管理(一个 promise + 所有 future)
promise 析构路径:
if (state 有值或异常) → 什么都不做(已满足)
else → 标记 state 为 broken
→ 如果 shared state 的引用计数只剩 future → 唤醒所有等待者
future::get 发现 broken 状态 → 抛 std::future_error(broken_promise)
2
3
4
5
6
7
8
为什么选择抛异常而不是阻塞:如果 promise 已经析构——再也不会有人设值了。继续阻塞 = 永远卡住。broken_promise 异常是「这个等待永远不会有结果」的宣告——比无限阻塞好一万倍。
# 4. future:值的消费者
# 4.1 get 的一次性语义——内部的 std::move 与 use-after-get 陷阱
std::future<int> f = std::async([] { return 42; });
int a = f.get(); // ✅ 第一次——拿值(移动语义)
int b = f.get(); // ❌ std::future_error(no_state)——没有共享状态了!
// 为什么是一次性?——内部实现
// future::get() ≈ {
// wait(); // 等就绪
// return std::move(value); // 移走值——不是拷贝!
// }
// 移动后 value 为空 → 第二次 get 时 value 已无效 → 抛异常
2
3
4
5
6
7
8
9
10
为什么用移动而不是拷贝:future 的设计假设——值只需要消费一次。如果值是 std::vector<int>(1000000),拷贝意味着巨大的无谓开销。多次消费的需求由 shared_future 满足(第 7 章)——future 只做最简单的事。
# 4.2 wait / wait_for / wait_until 三级等待模型
std::future<int> f = std::async(heavy_computation);
// ① wait —— 无限等待
f.wait(); // 阻塞直到有结果
// ② wait_for —— 超时等待
auto status = f.wait_for(std::chrono::seconds(5));
if (status == std::future_status::ready) {
int r = f.get();
} else if (status == std::future_status::timeout) {
// 超时——可以继续干别的
} else if (status == std::future_status::deferred) {
// ⚠️ 如果是 deferred——永远返回 deferred(第 6 章)
}
// ③ wait_until —— 绝对时间等待(更适合实时系统)
auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
if (f.wait_until(deadline) == std::future_status::ready) {
int r = f.get();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
内部 wait 怎么实现的——shared state 中的 condition_variable:
shared state 内部有一个 condition_variable(或等价的原生 futex)
future::wait() = {
lock(mutex)
while (!ready) cv.wait(mutex) // 等待 promise set_value 时的 notify
unlock(mutex)
}
2
3
4
5
6
7
# 4.3 valid——future 的「活着」检查——但只应该用于调试
std::future<int> f;
assert(f.valid() == false); // 默认构造——无关联
f = std::async([] { return 42; });
assert(f.valid() == true); // 关联了 shared state
int r = f.get();
assert(f.valid() == false); // get 之后——共享状态释放
2
3
4
5
6
7
8
为什么 valid 只应该用于调试——在多线程环境中,valid() 的返回值可能在返回的瞬间就失效(另一个线程调了 get)。这不是同步原语——是开发时的诊断工具。
# 5. packaged_task:可调对象的包装器
# 5.1 把任意可调对象变成 promise + future 对
// 手写版——繁琐
std::promise<int> p;
auto f = p.get_future();
std::thread([p = std::move(p)](int x) mutable {
p.set_value(x * x);
}, 10).detach();
// packaged_task 版——清晰
std::packaged_task<int(int)> task([](int x) { return x * x; });
auto f = task.get_future();
std::thread([t = std::move(task)]() mutable {
t(10); // ① 调可调对象 → 内部自动 set_value(100)
}).detach();
int result = f.get(); // ② 100
2
3
4
5
6
7
8
9
10
11
12
13
14
15
内部机制——packaged_task 是一个三明治:
packaged_task<R(Args...)> 内部:
外层:operator()(Args... args)
↓
中间:执行存放的可调对象 → 获得结果(或捕获异常)
↓
内层:promise.set_value(result) 或 promise.set_exception(e)
↓
shared state → future 端可读取
2
3
4
5
6
7
8
9
# 5.2 与 thread 的组合——比手写 promise 更安全
// ✅ 标准范式——packaged_task + thread
std::packaged_task<int()> task(worker);
auto f = task.get_future();
std::thread t(std::move(task)); // 移动 task 到线程
t.detach(); // 线程自主管理
int result = f.get();
// 这个范式比手写 promise 安全——因为:
// ① task 内部自动处理异常——worker 抛异常 → f.get 重新抛出
// ② task 保证最多设一次值——不会重复 set_value
// ③ task 移动之后源对象为空——不会误用
2
3
4
5
6
7
8
9
10
11
# 5.3 移动语义——packaged_task 只能移动不能拷贝的理由
疑惑:为什么不能拷贝?lambda 都可以拷贝啊?
论证——内部有 promise 成员(promise 只能移动):
// packaged_task 内部——简化版
template <typename R, typename... Args>
class packaged_task<R(Args...)> {
std::promise<R> promise_; // ← promise 只能移动
std::function<R(Args...)> func_; // ← 可调对象(可以拷贝)
// 因为 promise_ 不能拷贝 → packaged_task 也不能拷贝
};
2
3
4
5
6
7
为什么不用 shared_ptrpromise 意味着两个 packaged_task 对象引用同一个 shared state——谁负责 set_value?如果一个设了值另一个试图再设——抛异常。拷贝语义在「只能设一次值」的语境下不存在合理的定义——移动是唯一正确的语义。
# 6. std::async 的启动策略陷阱——最容易被误解的接口
# 6.1 launch::async vs launch::deferred 的根本差异
launch::async:
"请一定在新线程中执行"
→ 内部创建 std::thread → 执行任务 → 结果进 future
→ f.get() 时等待线程结束——但如果线程已结束则立即返回
launch::deferred:
"别急着跑——等调用方需要时再说"
→ 任务被存储为惰性求值(lazy evaluation)
→ f.get() 才真正在当前线程执行!
→ 如果在 f.get() 之前 future 析构了——任务永远不会跑
launch::async | launch::deferred (默认):
"你看着办——哪个方便用哪个"
→ 标准库的实现自由选择
→ GCC: 优先用 async(如果线程资源允许)
→ MSVC: 可能选 deferred(线程池已满时)
→ 行为不确定——依赖系统负载、线程池状态、编译器版本
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 6.2 默认策略 launch::async | launch::deferred——编译器替你选的灾难
案例 1.1 的完整再现——为什么默认策略是暗坑:
// ❌ 这段代码的行为取决于编译器和系统负载
auto f = std::async([] { return worker(); });
// ↑ 等价于 std::async(std::launch::async | std::launch::deferred, ...)
// 在低负载机器(足够空闲线程) → async → 立即在后台跑
// 在高负载机器(线程池满) → deferred → 在 get 时同步跑
// 在单核嵌入式系统 → deferred(逻辑:多线程有开销)
2
3
4
5
6
7
修复——永远显式指定策略:
// ✅ 显式 async——我保证它在新线程中
auto f = std::async(std::launch::async, [] { return worker(); });
// ✅ 显式 deferred——我明确要惰性求值
auto f = std::async(std::launch::deferred, [] { return heavy_lazy(); });
2
3
4
5
# 6.3 deferred 的 wait_for 为什么永远返回 future_status::deferred
疑惑:我在 f.wait_for(5s) 等待——为什么立即返回?明明 5 秒还没到?
论证——deferred 的本质是没有时钟:
auto f = std::async(std::launch::deferred, worker);
// deferred 的任务还没开始执行——它躺在 future 里等待 get 调用
// → 没有线程、没有时钟起点、没有「多久之后」的概念
// → wait_for / wait_until 唯一合法的状态是 deferred
auto status = f.wait_for(std::chrono::seconds(100));
// status == std::future_status::deferred ✅——不是 ready,不是 timeout!
2
3
4
5
6
7
8
检测 deferred 的正确方式:
if (f.wait_for(std::chrono::seconds(0)) == std::future_status::deferred) {
// 是 deferred——我需要主动调 get 来执行
} else {
// 是 async——等待就绪
f.wait();
}
2
3
4
5
6
# 6.4 线程池 + async 的组合——什么时候 async 比裸 thread 更好
场景 A:少量大型任务 → async 更简单
for (int i = 0; i < 3; ++i) {
futures.push_back(std::async(std::launch::async, big_task, i));
}
// 三个线程并行——比手写 thread 少 5 行代码
场景 B:大量小型任务 → 线程池 > async
for (int i = 0; i < 1000; ++i) {
futures.push_back(std::async(std::launch::async, tiny_task, i));
}
// ❌ 创建 1000 个线程——每个线程的开销 > 任务的执行时间
// ✅ 应该用线程池 + packaged_task——复用线程
场景 C:CPU 密集中等大小 → async(launch::async) 是合理选择
每个任务 10ms-100ms、任务数 < 10
async 的开销(线程创建 ~15μs)可忽略
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 7. shared_future:广播语义的消费者
# 7.1 从 future 到 shared_future——为什么需要显式 share
std::promise<int> p;
auto f = p.get_future(); // future<int>
// ❌ 这样是编译错误:
// auto f2 = f; // future 不能拷贝
// ✅ 通过 share 转换为 shared_future:
std::shared_future<int> sf = f.share(); // ① f 失去 shared state 所有权
// f.valid() == false // f 现在是空的
// ✅ shared_future 可以拷贝——多个消费者共享同一个结果
auto sf2 = sf; // ② 拷贝——共享同一个 shared state
auto sf3 = sf; // ③ 同上
2
3
4
5
6
7
8
9
10
11
12
13
为什么需要 share() 这一步——不能直接从 promise 获取 shared_future?
设计原因:future 是默认的「唯一消费者」——这是最常见的使用场景。如果直接从 promise 能拿 shared_future,程序员可能无意中复制了它——引入多个消费者和引用计数的开销。share() 把「我需要广播」的意图显式化——增加引用计数和额外的同步开销只在明确需要时才付。
# 7.2 get 可以多次调用的内部原理——拷贝语义 vs future 的移动语义
std::shared_future<int> sf = std::async([] { return 42; }).share();
int a = sf.get(); // ✅ 第一次——返回值的 const 引用
int b = sf.get(); // ✅ 第二次——还是同一个值
int c = sf.get(); // ✅ 第三次——...
2
3
4
5
内部差异:
future<int>::get():
→ wait()
→ return std::move(value); // 移走值——value 变成空
→ 第二次 get 时 value 无效——抛异常
shared_future<int>::get():
→ wait()
→ return value; // 返回值 const 引用——value 完好
→ 第二次 get 时 value 还存在——返回同一个值的 const 引用
2
3
4
5
6
7
8
9
性能代价:shared_future 比 future 多了引用计数(shared state 的引用数从 1 → N)——多了原子操作开销。在单消费者场景,用 future 更快。
# 7.3 多消费者场景——何时 shared_future 是唯一正解
// 场景:CPU 密集型计算的结果需要被多个模块读取
std::shared_future<Matrix> result_future = std::async(std::launch::async, compute_large_matrix).share();
// 三个消费者——各自独立访问同一个结果
std::thread consumer1([result_future] {
const auto& m = result_future.get();
render_3d(m);
});
std::thread consumer2([result_future] {
const auto& m = result_future.get();
export_csv(m);
});
std::thread consumer3([result_future] {
const auto& m = result_future.get();
compute_preview(m);
});
// 三个线程共享同一个结果——没有额外的拷贝
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 8. 共享状态的内部原子机制——promise 如何把值送到 future
# 8.1 shared state 的完整结构拆解
// libstdc++ 中 shared state 的简化表示
template <typename R>
struct __future_base::_State_baseV2 {
// ===== 状态标志 =====
atomic<int> _M_status; // ① 原子状态码
// 三个状态:
// __state_base::__not_ready = 0 → 等待设值
// __state_base::__ready = 1 → 值已就绪
// __state_base::__exception = 2 → 存储了异常
// ===== 结果存储 =====
using __result_type = aligned_union_t<1, R>; // 对齐存储
__result_type _M_result; // ② 值或异常(在 union/aligned storage 中)
// ===== 等待机制 =====
condition_variable _M_cond; // ③ 条件变量——用于 wait
mutex _M_mutex; // ④ 互斥锁——保护 _M_cond 的等待队列
// ===== 引用计数 =====
atomic<int> _M_refcount; // ⑤ 引用计数——管理 shared state 生命周期
// promise 持有 1 个引用、future 持有 1 个引用 = 初始 refcount = 2
// future 析构 → refcount--;promise 析构 → refcount--
// refcount 归零 → delete this(释放整个 shared state)
};
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sizeof 开销:一个 future<int> 的 shared state 在堆上约 80-120 字节(mutex + cv + 原子变量 + 对齐存储)。这就是为什么 future 不适合用于极轻量级的值传递——每个 promise/future 对都有一块堆分配的开销。
# 8.2 set_value 到 get 的全原子流程——汇编层证据
生产者(promise::set_value):
① lock(_M_mutex) // 进入互斥区——保护 cv 的等待队列
② placement new (&_M_result, value) // 构造值在 aligned storage 中
③ _M_status.store(ready, release) // 原子 release——可见性边界
④ _M_cond.notify_all() // 唤醒所有等待者
⑤ unlock(_M_mutex)
消费者(future::get):
⑥ lock(_M_mutex)
⑦ while (_M_status.load(acquire) != ready) { // 检查状态
_M_cond.wait(lock); // 等待 ④ 的 notify
}
⑧ result = std::move(*(R*)(&_M_result)) // 取出值
⑨ _M_status = 0 // 标记为空(防止二次 get)
⑩ unlock(_M_mutex)
⑪ return result
happens-before 链:③ release → ⑦ acquire
→ promise 中的 ①-③ 所有内存写入对 future 中的 ⑦-⑧ 可见 ✅
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
汇编关键——共享状态的 lock + atomic store 提供了完整的 happens-before 关系。promise 和 future 之间的同步不需要外部 mutex——共享状态内部的 mutex + cv + atomic 构成了完整的同步原语组合。
# 8.3 异常路径——set_exception 的数据分发
set_exception(eptr) 的流程:
① lock(_M_mutex)
② 在 _M_result 中存储 exception_ptr
③ _M_status.store(__exception, release)
④ _M_cond.notify_all()
⑤ unlock(_M_mutex)
future::get() 的异常检测:
⑦ 等待 _M_status != __not_ready
⑧ if (_M_status == __exception):
std::rethrow_exception(*(exception_ptr*)(&_M_result))
else:
return std::move(value)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
exception_ptr 的类型擦除是异常跨线程传播的关键——它捕获了完整类型(包括动态类型、多态类型),在 get 端能用 catch(const MyException& e) 精确匹配。这和 std::any 共享同一类型擦除哲学——第 16 篇已有详细讨论。
# 8.4 为什么未来/promise 不需要外部的 mutex——内建的原子同步
疑惑:promise 在一个线程设值、future 在另一个线程取值——不需要互斥锁保护吗?
论证——shared state 内部的完整同步边界:
所有同步都由 shared state 内部的 mutex + cv + atomic 组合提供:
互斥:
生产者线程:set_value 时 lock _M_mutex
消费者线程:get 时 lock _M_mutex + 等 cv
→ 两个线程不会同时修改 shared state——由 mutex 保护
可见性:
set_value 的最后一步:_M_status.store(ready, release)
get 的第一步检查: _M_status.load(acquire)
→ release-acquire 对保证生产者所做的所有写入对消费者可见
等待:
cv.wait → 消费者在没就绪时睡眠——不烧 CPU
cv.notify_all → 生产者在就绪时唤醒消费者
2
3
4
5
6
7
8
9
10
11
12
13
14
15
这就是为什么不需要外部的 mutex——shared state 已经把「互斥 + 可见性 + 等待」三位一体地封装好了。
# 9. 常见陷阱与反模式
# 9.1 deferred 策略下的「假异步」
// ❌ 以为在并行——实际上在串行
auto f1 = std::async(heavy_task1); // 默认策略
auto f2 = std::async(heavy_task2); // 可能 deferred
heavy_task1(); // ← 在 get f1 时执行!
heavy_task2(); // ← 在 get f2 时执行!
// ✅ 修复——显式 async
auto f1 = std::async(std::launch::async, heavy_task1);
auto f2 = std::async(std::launch::async, heavy_task2);
2
3
4
5
6
7
8
9
# 9.2 future 提前析构——get 都不调用
// ❌ future 析构 = 阻塞等待线程结束(只在 async 产生的 future 上!)
{
auto f = std::async(std::launch::async, [] {
std::this_thread::sleep_for(std::chrono::seconds(10));
return 42;
});
} // ① future 析构——这里是阻塞的!等待 10 秒!
// 为什么阻塞?因为 async 用 launch::async 创建的线程是
// 由 future 负责 join 的——析构时 join 等待线程结束
2
3
4
5
6
7
8
9
10
这是 async 独有的行为——从 promise 获取的 future 析构不阻塞。
# 9.3 在 promise 析构前不设值——broken_promise
std::future<int> dangling_promise() {
std::promise<int> p;
auto f = p.get_future();
// 忘了设值——p 在函数结束时析构
return future; // future 的 shared state 被标记为 broken
}
auto f = dangling_promise();
f.get(); // ❌ std::future_error(broken_promise)
2
3
4
5
6
7
8
9
# 9.4 把 async 的返回值当 future 但忘记取——阻塞式析构
void compute() {
// ❌ async 返回的 future 被丢弃——在析构时阻塞
std::async(std::launch::async, [] {
std::this_thread::sleep_for(std::chrono::minutes(5));
});
// async 返回的临时 future 在这一行析构
// → 析构阻塞 5 分钟!
}
2
3
4
5
6
7
8
正确:把 future 存下来,在合适的地方 get。
# 9.5 future 在多线程间传递——移动 vs 拷贝的博弈
// ✅ 正确——future 移动(所有权串行传递)
std::future<int> worker() {
auto f = std::async(std::launch::async, [] { return 42; });
return f; // 移动——一个线程一个 future
}
// ❌ 错误——不能拷贝 future——因为不知道多个副本谁来 get
std::future<int> f = std::async(worker);
// std::future<int> f2 = f; // 编译错误!
// ✅ 如果需要多消费者——显式 share
std::shared_future<int> sf = std::async(worker).share();
auto sf2 = sf; // ✅ 拷贝 shared_future——引用计数保护
2
3
4
5
6
7
8
9
10
11
12
13
# 10. 综合案例串讲
# 10.1 案例真相揭晓
| # | 疑问 | 答案 |
|---|---|---|
| ① | 跨线程传值怎么不用 mutex? | 第 8 章:shared state 内部 mutex + cv + atomic 三合一 |
| ② | get 只能调一次? | 第 4.1:内部 std::move(value)——移动后 value 为空 |
| ③ | packaged_task vs promise? | 第 5 章:task 自动设值——包装函数;promise 手动设值——任意调度 |
| ④ | deferred 什么时候触发? | 第 6 章:默认策略下,编译器自由选择——系统负载决定 |
| ⑤ | shared_future 为什么显式 share? | 第 7.1:增加引用计数有开销——只在明确需要时付 |
| ⑥ | shared state 内部结构? | 第 8.1:status + aligned_result + mutex + cv + refcount |
| ⑦ | async vs thread? | 第 6.4:少量大任务用 async、大量小任务用线程池 |
| ⑧ | 多 future 并行等待? | 第 5.2:packaged_task + thread 并行 + 逐个 get |
案例①修复——async deferred 陷阱:
// ❌ 原版
futures.push_back(std::async(query_service_A));
// ✅ 修复——显式指定策略
futures.push_back(std::async(std::launch::async, query_service_A));
futures.push_back(std::async(std::launch::async, query_service_B));
futures.push_back(std::async(std::launch::async, query_service_C));
// 现在保证并行——不受系统负载影响
2
3
4
5
6
7
8
案例②修复——broken_promise:
// ❌ 原版——promise 在异常路径上析构
void bad(std::promise<int> p) {
risky_work();
p.set_value(42);
}
// ✅ 修复——promise 在线程中管理和设值
std::future<int> safe() {
auto task = std::packaged_task<int()>([] {
risky_work();
return 42;
});
auto f = task.get_future();
std::thread(std::move(task)).detach();
return f; // packaged_task 保证异常安全设值
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 10.2 从 promise 到 future 的完整值旅程
auto p = std::promise<int>(); // ① 创建 promise(创建 shared state,refcount=1)
auto f = p.get_future(); // ② 获取 future(refcount=2)
std::thread t([p = std::move(p)]() mutable {
int r = compute(); // ③ 计算(在 worker 线程)
p.set_value(r); // ④ set_value:
// lock(mutex) → placement new r into aligned storage
// → status.store(ready, release) → cv.notify_one → unlock
}); // refcount--(原 promise 已 move,新 promise 在 lambda)
t.detach();
int result = f.get(); // ⑤ get:
// lock(mutex) → while (status.load(acquire) != ready) cv.wait
// → 被 ④ 唤醒 → std::move(value) → status = 0 → unlock → return
// ⑥ future 析构 → refcount-- → refcount=0 → delete shared state
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
性能全景(AMD 7950X, GCC 13.2 -O2):
| 操作 | 耗时 | 说明 |
|---|---|---|
| promise + future 创建 | ~200 ns | 堆分配 shared state |
| set_value (无等待) | ~80 ns | lock + store + unlock(快路径) |
| get (已就绪) | ~80 ns | lock + take value + unlock(快路径) |
| get (未就绪) | ~5-15 μs | lock → cv.wait → 等 notify → lock |
| packaged_task 创建 | ~250 ns | 同上 + 捕获函数对象 |
| shared_future 拷贝 | ~30 ns | 原子 refcount++ |
# 10.3 设计哲学回扣
哲学 1:共享状态是 promise 和 future 之间的「第三方保管箱」——独立于两者存活
promise 和 future 都不拥有值——值在 shared state 中。这让两者可以独立析构而不丢失数据。这是生产者-消费者模型的类型化实现——生产者设值后不管消费者的死活,消费者在生产者死后仍能拿值。 这个设计的精妙之处:shared state 的引用计数管理(一个来自 promise 的方向、一个来自 future 的方向)保证了「只要还有人需要值——值就活着」。
哲学 2:future 的一次性消费是「移动语义」在并发层的延伸——不需要的值不配占用内存
future::get 移动值而非拷贝它——和 unique_ptr 的独占所有权一脉相承。如果你只需要值一次——就只付一次拷贝的代价。不需要为「可能需要的第二次」预支任何开销。 shared_future 提供多次消费——但你显式地请求并接受引用计数和拷贝的开销。这和 unique_ptr → shared_ptr 的迁移路径完全对应。
哲学 3:std::async 的默认策略是「设计上的灵活」和「工程上的不可预测」之间的经典矛盾
设计者想要:一个简单的接口——一个函数、一个返回值、自动决定怎么跑。既能在理想条件下多线程并行,又能在资源不足时退化为同步。灵活性 = 最少手动干预——但在生产环境中,「最少手动干预」变成「不确定行为」。C++ 在 async 上选择了灵活性——工程实践教会我们:不确定性在并发代码中比性能损失更危险。
哲学 4:packaged_task 把「执行→设值」的范式进行类型化——手工范式成为可组合的构建块
如果没有 packaged_task——每个异步任务都要手写 try { p.set_value(f()); } catch { p.set_exception(...); }。packaged_task 把这五行模板代码抽象为一个可移动、可存储、可传给线程池的「任务对象」。这是 STL 泛型哲学在异步层的延伸——把重复的模式变成类型。
哲学 5:异常通过 exception_ptr 的跨线程传播——类型系统在运行时保留完整信息
std::current_exception() 捕获的类型是完整的——std::runtime_error("failed") 在被线程 A 抛、线程 B 通过 f.get() 重新获取时,仍然是 std::runtime_error——不是切片后的 std::exception。exception_ptr 的类型擦除是「运行时多态 + 引用计数生命周期」的组合——让异常成为一等公民,可以存储、传播、重新抛出。
# 10.4 速查表合集
三件套速查:
| 组件 | 职责 | 创建方式 | 可拷贝 | 关键操作 |
|---|---|---|---|---|
promise<T> | 手动设值 | 直接构造 | ❌ (只移动) | set_value / set_exception |
packaged_task<R(Args)> | 包装可调对象 | 包装函数 | ❌ (只移动) | operator() / get_future |
future<T> | 唯一消费者 | 从 promise/task/async | ❌ (只移动) | get / wait / valid |
shared_future<T> | 广播消费者 | future.share() | ✅ | get (多次) |
async 策略速查:
| 策略 | 执行时机 | wait_for 行为 | 线程创建 | 使用场景 |
|---|---|---|---|---|
launch::async | 立即(新线程) | ready/timeout | 是 | 真正的并行 |
launch::deferred | get 时(当前线程) | deferred | 否 | 惰性求值 |
async\|deferred (默认) | 不确定! | 不确定 | 不确定 | 不要用 |
正确范式:
// ✅ 真正的异步——必须显式 async
auto f = std::async(std::launch::async, worker, args...);
// ✅ 惰性求值——必须显式 deferred
auto f = std::async(std::launch::deferred, lazy_computation);
// ✅ packaged_task + thread——明确控制线程生命周期
std::packaged_task<int()> task(worker);
auto f = task.get_future();
std::thread(std::move(task)).detach();
// ✅ shared_future——多个消费者共享结果
auto sf = std::async(std::launch::async, big_compute).share();
auto sf2 = sf; // 拷贝——各自安全 get
2
3
4
5
6
7
8
9
10
11
12
13
14
本篇小结:future 家族是 C++ 异步编程的基础设施——promise 是值的发射端、future 是值的接收端、shared state 是原子同步的中立区。
std::async是最简洁但也是最容易用错的接口——默认策略的不确定性让它成为生产环境中最常见的异步 bug 来源。显式指定launch::async是唯一正确的防御。如果还需要更多控制——packaged_task+ 线程池提供完整的灵活性。
下一篇:异步基础设施说完了。下一篇进入 46.无锁数据结构设计——Treiber stack、Michael-Scott queue、hazard pointer 与 RCU 内存回收——把前几篇的原子操作和内存序知识用到极致。