errgroup并行控制
# 14.errgroup并行控制
卷三第十四篇——
sync.WaitGroup能等所有 goroutine 结束,但它不能传回第一个错误;semaphore.Weighted能控制并发度,但它不负责错误收集。golang.org/x/sync/errgroup把两者缝合成一个优雅的原语:一堆 goroutine 并行干活,只要有一个出错——context 立即取消,其他 goroutine 收到信号后安全退出,Wait 返回第一个错误。读完本篇,你能解释:为什么 errgroup 的 context 取消不会漏掉 goroutine?SetLimit内部是用什么实现的?TryGo和Go的语义差一个"阻塞"字但在哪里?关键词:errgroup、WithContext、errorOnce、SetLimit 限并发、TryGo 非阻塞、WaitGroup 对比。
# 目录介绍
- 1. 案例引入
- 2. 架构概览
- 3. Group 核心数据结构
- 4. Go 任务提交与错误收集
- 5. Wait 错误聚合机制
- 6. WithContext 取消传播
- 7. SetLimit 限并发机制
- 8. TryGo 非阻塞提交
- 9. WaitGroup 对比与实战
- 10. 综合案例串讲
# 1. 案例引入
# 1.1 一段崩在哪
看一个商品详情聚合服务——它需要在 300ms 内返回聚合结果,后端同时调用库存、价格、推荐三个下游 API。生产环境跑了半年,某天下游推荐服务因为数据库主从切换出现 5 秒的响应延迟,整个聚合服务就崩了:
// product_aggregator.go —— 商品详情聚合
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
type ProductDetail struct {
ProductID string
Inventory *InventoryData
Price *PriceData
Recs *RecommendData
}
var errCh = make(chan error, 3) // 缓冲 3 个——收集错误
func fetchInventory(ctx context.Context, productID string) (*InventoryData, error) {
// 模拟:正常响应 50ms
time.Sleep(50 * time.Millisecond)
return &InventoryData{Stock: 100}, nil
}
func fetchPrice(ctx context.Context, productID string) (*PriceData, error) {
time.Sleep(60 * time.Millisecond)
return &PriceData{CurrentPrice: 2999}, nil
}
func fetchRecommendations(ctx context.Context, productID string) (*RecommendData, error) {
// 模拟:下游服务卡死——5 秒无响应
select {
case <-time.After(5 * time.Second):
return &RecommendData{Items: nil}, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func aggregateProduct(ctx context.Context, productID string) (*ProductDetail, error) {
var wg sync.WaitGroup
detail := &ProductDetail{ProductID: productID}
// 并行启动 3 个 goroutine
wg.Add(3)
go func() {
defer wg.Done()
inv, err := fetchInventory(ctx, productID)
if err != nil {
errCh <- fmt.Errorf("库存接口: %w", err)
return
}
detail.Inventory = inv
}()
go func() {
defer wg.Done()
price, err := fetchPrice(ctx, productID)
if err != nil {
errCh <- fmt.Errorf("价格接口: %w", err)
return
}
detail.Price = price
}()
go func() {
defer wg.Done()
recs, err := fetchRecommendations(ctx, productID)
if err != nil {
errCh <- fmt.Errorf("推荐接口: %w", err)
return
}
detail.Recs = recs
}()
// 等待所有 goroutine 结束
wg.Wait()
close(errCh)
// 检查错误
var errs []error
for e := range errCh {
errs = append(errs, e)
}
if len(errs) > 0 {
return nil, errors.Join(errs...) // Go 1.20+
}
return detail, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
现象:
- 平时三个下游 API 都在 100ms 内返回,聚合接口 P99 约 120ms
- 某天推荐服务因主从切换卡了 5 秒——但库存和价格在 50ms 就返回了
fetchRecommendations的ctx.Done()永远不会触发——因为传入的 context 没有超时wg.Wait()死等 5 秒——主 goroutine 挂在 Wait 上,无法提前返回- 这 5 秒内,上游网关已经超时(300ms)断连——但聚合服务的 goroutine 还在跑
- 更糟糕的是:库存和价格的数据早就拿到了,但无法提前返回给调用方——因为 WaitGroup 要求所有 goroutine 都 Done
goroutine 泄漏的连锁反应:高峰期每秒钟 500 个请求——每个请求最多卡 5 秒——同时有 2500 个 goroutine 挂在 fetchRecommendations 上。每个 goroutine 持有一个 HTTP 连接(TCP socket + buffer ~16KB)→ 40MB 额外内存,且上游的 context 已经取消但这些 goroutine 对下游的连接还没断——下游推荐服务进一步被打垮。
核心矛盾:WaitGroup 的语义是"等所有任务完成"——它不在乎"某些任务已经没意义了"。当一个 goroutine 出错或超时后,其他 goroutine 继续执行是在浪费资源——而且调用方拿不到任何结果。
# 1.2 顺藤摸到根因
追查过程:
第一步:看 pprof goroutine——出问题时 goroutine 数从平时的 200 飙到 5000+:
$ curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep -c "fetchRecommendations"
# 2478 个 goroutine 全部卡在 select 上——time.After(5s) 还没到
2
第二步:看这些 goroutine 对应的请求——通过 tracing 发现 80% 的请求上游已经超时(context deadline exceeded),但 goroutine 仍然在等待下游响应——因为 WaitGroup 没有取消机制。
第三步:search 代码中的 WaitGroup 使用模式——发现所有并行 RPC 调用都是 WaitGroup + errCh 模式——没有一个能提前退出。
这个事故藏着 7 个原理点:
① errgroup 怎么让「先出错」的 goroutine 触发全局取消? → 第 3-4 章
② Wait 为什么只返回第一个错误——而不是收集全部错误? → 第 5.2
③ WithContext 的 context 取消是什么时候触发的? → 第 6 章
④ 为什么「其他 goroutine 也必须检查 ctx.Done()」——否则取消无效? → 第 6.2
⑤ SetLimit 的内部是用什么实现限并发的? → 第 7 章
⑥ TryGo 和 Go 的语义差异——一个"阻塞"字藏在哪? → 第 8 章
⑦ errgroup vs WaitGroup 在四种典型场景下怎么选? → 第 9 章
2
3
4
5
6
7
# 1.3 我们要回答什么
这个聚合服务案例贯穿全篇。我们从 errgroup 的数据结构出发,深入 Go/Wait 的协作流程、errorOnce 的错误捕获策略、WithContext 的取消级联——再对比 WaitGroup 在并行场景下的不足——最后给出多数据源聚合和批量 API 调用的最佳实践。
本篇路线:
架构总图 (第 2 章) ── errgroup 三组件协作 + WaitGroup 为什么不够
↓
数据结构 (第 3 章) ── Group 字段 + errorOnce 原子捕获
↓
Go + Wait (第 4-5 章) ── 任务提交 / 错误收集 / 只返回第一个错误
↓
WithContext (第 6 章) ── context 取消传播的内部实现
↓
SetLimit (第 7 章) ── channel 信号量限并发 + 与 Weighted 对比
↓
TryGo (第 8 章) ── 非阻塞提交 + 背压降级
↓
WaitGroup 对比 (第 9 章) ── 四种场景横向评测 + 实战模式
↓
综合案例 (第 10 章) ── 修复聚合服务 + 设计哲学
2
3
4
5
6
7
8
9
10
11
12
13
14
15
📌 本篇定位:第 13 篇的
semaphore.Weighted解决的是"单个 goroutine 如何可取消地获取许可",本篇的errgroup解决的是"一组 goroutine 如何协同退出"——前者是点对点限流,后者是组内级联控制。理解了 errgroup 的 context 传播,就能回答:"为什么 Go 的并发模型不需要 Java 的 Thread.interrupt()?"
# 2. 架构概览
# 2.1 errgroup 全景图
errgroup 只做三件事——把三个组件缝合在一起:
errgroup.Group
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
sync.WaitGroup sync.Once context.Context
(等所有任务结束) (记录第一个错误) (WithContext 版)
│ │ │
│ │ │
Go(fn) 包装为 第一个非 nil 的 Go 返回 error →
wg.Add(1) error 被捕获 ctx 自动取消
│ │ │
Wait() 调用 后续 error 被 其他 G 收到
wg.Wait() 丢弃 ctx.Done() →
安全退出
2
3
4
5
6
7
8
9
10
11
12
13
14
15
数据流图:
主 goroutine 创建 errgroup.WithContext(ctx)
│
├── 派生 context (cancelCtx)
│
├── Go(func1) ──→ goroutine 1: wg.Add(1)
│ │
├── Go(func2) ──→ goroutine 2: wg.Add(1)
│ │
└── Go(func3) ──→ goroutine 3: wg.Add(1)
│
┌───────────┴───────────┐
│ │
goroutine 2 返回 error goroutine 1 收到 ctx.Done()
│ │
errorOnce.Do(store err) return ctx.Err()
context cancel() ◄──────────── 安全退出
│
goroutine 3 收到 ctx.Done()
return ctx.Err()
│
wg.Wait() 返回 (三个 goroutine 都 wg.Done)
│
Wait() 返回第一个 error
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 2.2 为什么 WaitGroup 不够用
疑惑:WaitGroup 加一个 error channel 不就能实现错误收集吗?为什么需要 errgroup?
论证:
没有取消传播——WaitGroup 只能"等",不能"推"。goroutine A 返回 error,但 goroutine B、C 完全不知道——它们继续执行直到自然结束。errgroup 的 WithContext 在第一个 error 返回时自动 cancel context——其他 goroutine 通过
ctx.Done()直接感知。错误收集需要手动管理 channel——WaitGroup + errCh 模式下,channel 的关闭时机、容量、消费方式都需要手动维护。第 1 章的
errCh用了缓冲 3 的 channel——但如果 goroutine 数量不固定(动态启动),就不知道该怎么设定 channel 容量。errgroup 用sync.Once捕获第一个 error——不需要 channel。WaitGroup 的 Add/Done 匹配必须精确——在动态任务数量的场景(如遍历一个不确定长度的列表),Add 和 Done 的配对容易出错。errgroup 的
Go(fn)封装了 Add/Done,调用方不需要手动维护计数。没有内建的并发度控制——WaitGroup 本身不限制同时运行的 goroutine 数量——如果要"1000 个任务但只并发 10 个",需要额外组合 semaphore。errgroup 的
SetLimit(Go 1.20+)直接提供了这个能力。
结论:WaitGroup 是"计数等待器"——只管任务数量,不管任务状态。errgroup 是"并行任务协调器"——加上错误传播、取消级联、并发度控制三层语义。两者的关系不是替代,是进化:简单的等全部结束用 WaitGroup,需要"一人出错全员退出"用 errgroup。
# 3. Group 核心数据结构
# 3.1 字段逐层解读
errgroup 的 Group 结构体只有 5 个字段——但恰好覆盖了并行任务协调的所有需求:
// golang.org/x/sync/errgroup/errgroup.go (简化)
type Group struct {
cancel func(error) // context.WithCancelCause 的取消函数
wg sync.WaitGroup // 等待所有 goroutine 结束
sem chan struct{} // SetLimit 后的信号量 channel
errOnce sync.Once // 保证只记录第一个错误
err error // 第一个非 nil 的 error
}
2
3
4
5
6
7
8
每个字段的设计意图:
| 字段 | 类型 | 零值 Group 时的状态 | 设计意图 |
|---|---|---|---|
cancel | func(error) | nil | WithContext 版才有值——零值 Group 不支持取消传播 |
wg | sync.WaitGroup | 可用 | 追踪未完成的 goroutine 数量 |
sem | chan struct{} | nil | SetLimit(n) 后初始化为 make(chan struct{}, n)——当信号量用 |
errOnce | sync.Once | 可用 | 只记录第一个错误——后续错误被丢弃 |
err | error | nil | 第一个非 nil 的 error 的值 |
零值 Group 可用——不需要构造函数,var g errgroup.Group 直接能用(但没有 context 取消功能)。
// 零值 Group:有 WaitGroup + errorOnce——但没有 cancel
var g errgroup.Group
g.Go(func() error { return doWork() })
g.Go(func() error { return doOther() })
if err := g.Wait(); err != nil {
// 第一个非 nil 的 error
}
2
3
4
5
6
7
8
WithContext 版:在零值 Group 的基础上注入了 context 取消能力:
// golang.org/x/sync/errgroup/errgroup.go (简化)
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
2
3
4
5
# 3.2 errorOnce 错误捕获
sync.Once 保证了有且仅有第一个错误被记录:
// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) Go(f func() error) {
// ...
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() { // ← 只有第一次执行
g.err = err // 记录第一个错误
if g.cancel != nil {
g.cancel(err) // 触发 context 取消
}
})
}
}()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sync.Once 的原子保证——即使多个 goroutine 同时返回 error,g.errOnce.Do 只执行一次。第一个到达的 goroutine 写入 g.err 并调用 g.cancel;后续到达的 goroutine 在 sync.Once 层面被跳过——它们的 error 被丢弃。
goroutine A 返回 err: "库存超时" ──→ errOnce.Do ✓ → g.err = "库存超时" → cancel()
goroutine B 返回 err: "价格拒绝" ──→ errOnce.Do ✗ (已执行) → err 被丢弃
goroutine C 返回 nil ──→ 不进入 if 分支
2
3
# 3.3 并发安全保证
Group 的并发模型极其简洁——不暴露锁给外部:
并发安全保证层级:
1. WaitGroup (Add/Done/Wait) — sync 包保证原子性
2. sync.Once (Do) — 保证 error 只写一次
3. err 字段 — 只被 sync.Once.Do 写,只被 Wait 读 → happens-before 保证
4. sem channel — Go 的 channel 自带并发安全
没有暴露的锁 → 调用方不会死锁
2
3
4
5
6
7
8
goroutine 视角:当 g.Go(fn) 启动一个 goroutine 时——
- G 从
_Gidle→_Grunnable→_Grunning(被 P 调度执行) - 执行 fn 期间 G 正常运行
- 如果 fn 返回 error →
sync.Once.Do→ 可能调用context.CancelFunc→ 触发其他等待ctx.Done()的 G 变为_Grunnable - fn 返回后 →
g.wg.Done()→ 可能唤醒正在g.wg.Wait()的 G
# 4. Go 任务提交与错误收集
# 4.1 Go 方法实现
Go 方法实现了 WaitGroup 的 Add(1) + goroutine 启动 + error 收集三合一:
// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) Go(f func() error) {
// SetLimit 后的限流:如果 sem 不为 nil → 获取许可
if g.sem != nil {
g.sem <- struct{}{} // ← 阻塞等待并发槽位
}
g.wg.Add(1)
go func() {
defer g.done() // ← 释放 sem 许可 + wg.Done
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(err)
}
})
}
}()
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem // 归还并发槽位
}
g.wg.Done()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Go 方法的行为总结:
| 条件 | Go(fn) 的行为 |
|---|---|
| 无 SetLimit | wg.Add(1) → go fn() ——立即返回,不阻塞 |
| 有 SetLimit(n) | sem <- {}(如果 n 个槽位满了则阻塞)→ wg.Add(1) → go fn() |
| fn 返回 nil | wg.Done() → 不触发 errorOnce |
| fn 返回 error | errorOnce.Do(记录错误 + cancel) → wg.Done() |
# 4.2 闭包延迟执行的陷阱
陷阱 1:循环变量捕获(Go 1.22 之前):
// ❌ Go 1.21 及之前:所有 goroutine 共享同一个 task 变量
var g errgroup.Group
for _, task := range tasks {
g.Go(func() error {
return process(task) // ← task 是循环变量——数据竞争
})
}
// ✅ 修复:参数传递或局部变量复制
for _, task := range tasks {
task := task // Go 1.21 需要这一行
g.Go(func() error {
return process(task)
})
}
// Go 1.22+ 循环变量语义变化后不再需要 task := task
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
陷阱 2:defer 中的 error 不会被收集:
// ❌ fn 内的 defer 返回的 error 不会被 errgroup 收集
g.Go(func() error {
f, _ := os.Open(file)
defer f.Close() // ← f.Close() 返回的 error 被丢弃
return process(f)
})
// ✅ 显式处理——defer 中收集 close error
g.Go(func() error {
f, err := os.Open(file)
if err != nil {
return err
}
defer func() {
if cerr := f.Close(); cerr != nil && err == nil {
err = cerr // 保持对 err 的赋值——但 fn 已经返回了
}
}()
return process(f)
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
更现实的场景——HTTP Body 没 Close:
// ❌ 如果 process 返回 error,resp.Body 不会被 Close
g.Go(func() error {
resp, _ := http.Get(url)
return process(resp) // ← 没有 defer resp.Body.Close()
})
// ✅ 不管是否 error 都要 Close
g.Go(func() error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
return process(resp)
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 4.3 错误的传播路径
goroutine 1
fn() 返回 error
│
▼
┌─────────────────┐
│ g.errOnce.Do() │ ← sync.Once 保证第一次执行
│ g.err = err │
│ g.cancel(err) │ ← context.CancelFunc
└────────┬────────┘
│
┌───────────┴───────────┐
│ │
▼ ▼
goroutine 2 goroutine 3
(仍在执行 fn 中) (fn 中检查 ctx.Done())
fn 结束后 │
errOnce.Do 被跳过 收到 ctx.Done() →
(已执行过) return ctx.Err()
│
▼
errOnce.Do 被跳过
(ctx.Err() 不被记录)
│
┌───────────────────┘
│
▼
wg.Wait() 返回
Wait() 返回 g.err (第一个 error)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 5. Wait 错误聚合机制
# 5.1 Wait 内部流程
Wait 的实现只有一行有效代码——等待所有 goroutine 结束,然后触发一次 cancel:
// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) Wait() error {
g.wg.Wait() // ← 阻塞,直到所有 goroutine 都 Done
if g.cancel != nil {
g.cancel(g.err) // ← 确保 cancel 被调用至少一次
}
return g.err
}
2
3
4
5
6
7
8
为什么 Wait 结束后还要调一次 cancel——这覆盖了一个边界情况:所有 goroutine 都正常结束(无 error),但使用者需要 ctx 也被 cancel——因为 errgroup 的 ctx 生命周期是"任务组的生命周期"。任务组结束了,ctx 就应该 Done。
所有 goroutine 都返回 nil:
→ wg.Wait() 返回
→ g.cancel(g.err) // g.err 为 nil → cancel(nil)
→ return nil // 使用者收到 nil error
第一个 goroutine 返回 error:
→ errorOnce.Do → g.err = err → g.cancel(err) ← 第一次 cancel
→ 其他 goroutine 收到 ctx.Done() → 安全退出
→ wg.Wait() 返回
→ g.cancel(g.err) // g.err 不为 nil → cancel(err) ← 第二次 cancel(幂等)
→ return err
2
3
4
5
6
7
8
9
10
11
cancel 的两次调用都是安全的——context.CancelFunc 是幂等的,多次调用效果相同。
# 5.2 只返回第一个错误的设计考量
疑惑:为什么不收集所有 error——像 errors.Join 那样?
论证:
第一个错误最可能是"根因"——在 errgroup 的 WithContext 模式下,第一个 error 触发 context 取消 → 其他 goroutine 收到
ctx.Done()→ 它们返回ctx.Err()(即context.Canceled)。如果收集所有 error,会得到一堆context.Canceled——这些是"症状"不是"根因"。收集所有 error 需要更复杂的数据结构——需要一个线程安全的 error 列表(如
[]error+sync.Mutex)。sync.Once + 单个 err 字段是最简设计——对绝大多数使用场景"只关心第一个失败原因"来说足够了。如果真的需要收集所有 error——自己组合:
// 如果需要收集所有 error(非标准 errgroup 行为)
type MultiErrorGroup struct {
wg sync.WaitGroup
mu sync.Mutex
errs []error
}
func (g *MultiErrorGroup) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.mu.Lock()
g.errs = append(g.errs, err)
g.mu.Unlock()
}
}()
}
func (g *MultiErrorGroup) Wait() error {
g.wg.Wait()
return errors.Join(g.errs...)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
结论:errgroup 的 "只返回第一个错误" 不是设计缺陷——是与 WithContext 取消传播配合的必然结果。标准 errgroup 关注的是"快速失败 + 取消级联"——如果需要"收集全部 error + 继续执行",需要不同的协调原语。
# 5.3 取消信号的生命周期
ctx 的生命周期严格绑定在 errgroup 任务组上:
WithContext(ctx):
ctx, cancel := context.WithCancelCause(parentCtx)
├── Go(task1) → wg.Add(1) ← ctx 激活
├── Go(task2) → wg.Add(1)
└── Go(task3) → wg.Add(1)
Wait():
wg.Wait() ← 所有 goroutine 结束
cancel(g.err) ← ctx 终结
← ctx.Done() 之后不可再被使用
2
3
4
5
6
7
8
9
10
11
12
关键约束:ctx 只在 errgroup 的整个生命周期内有效。Wait 返回后使用 ctx 是错误行为:
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error { return doWork(ctx) })
if err := g.Wait(); err != nil {
return err
}
// ❌ 错误:Wait 返回后 ctx 已经被 cancel 了
// 此时 ctx.Done() 已经关闭——任何读取都是零值
doMoreWork(ctx) // ← ctx 已失效
2
3
4
5
6
7
8
9
10
# 6. WithContext 取消传播
# 6.1 派生 context 的内部实现
WithContext 只有 5 行——但它在标准 context.WithCancel 基础上加入了"错误原因":
// golang.org/x/sync/errgroup/errgroup.go (简化)
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
2
3
4
5
context.WithCancelCause(Go 1.20+)和 context.WithCancel 的区别:
| WithCancel | WithCancelCause | |
|---|---|---|
| CancelFunc | func() | func(error) |
| ctx.Err() | 返回 context.Canceled | 返回传入的 error |
| 用途 | 不知道取消原因 | 第一个出错的 goroutine 传自己的 error |
这意味着:当 goroutine 返回 "库存超时" 的 error 时——g.cancel(errStockTimeout) → 其他 goroutine 的 ctx.Err() 返回的就是 errStockTimeout(而不是泛化的 context.Canceled)。这对于日志和调试非常有价值。
# 6.2 取消级联的正确用法
最关键的规则——每个 goroutine 内部的 fn 必须检查 ctx.Done()。否则 WithContext 的取消传播完全无效:
// ❌ WithContext 无效——goroutine 不检查 ctx.Done()
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
return callSlowAPI() // ← 即使 ctx 被取消,这个调用继续执行
})
// ✅ 正确:fn 内部检查 ctx.Done()
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
return http.DefaultClient.Do(req) // ← ctx 取消 → 请求中断
})
// ✅ 或者 select 双重监听
g.Go(func() error {
resultCh := make(chan Result, 1)
go func() { resultCh <- doWork() }()
select {
case <-ctx.Done():
return ctx.Err() // ← 取消传播
case result := <-resultCh:
return process(result)
}
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
为什么 ctx 取消不能"杀死" goroutine——Go 的 goroutine 不能被外部强制终止(没有 Thread.stop())。取消信号是协作式的——goroutine 必须主动检查 ctx.Done() 并返回。如果 fn 内部是一个不接收 context 的阻塞调用(如一个没有超时的 time.Sleep),取消信号对它无效。
# 6.3 常见的 context 误用
误用 1:在 fn 内部创建新 context——切断取消链:
// ❌ fn 内部用 context.Background()——父 ctx 取消无效
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
// 创建了一个全新的 context——和 errgroup 的 ctx 无关
return callAPI(context.Background(), url) // ← 永远不会收到取消信号
})
// ✅ 传递 errgroup 的 ctx
g.Go(func() error {
return callAPI(ctx, url)
})
2
3
4
5
6
7
8
9
10
11
误用 2:Wait 返回后还引用 ctx:
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error { return doWork(ctx) })
if err := g.Wait(); err != nil {
return err
}
// ❌ 此时 ctx 已经 Done——任何依赖 ctx 的操作都会立即失败
data, err := doMore(ctx) // ← always context.Canceled
// ✅ 用原始的 parent context(如果有的话)
data, err := doMore(parentCtx)
2
3
4
5
6
7
8
9
10
11
误用 3:混合使用外部 ctx 和 errgroup ctx:
// ✅ 正确:用外部 ctx 作为 errgroup ctx 的 parent
func processWithTimeout(ctx context.Context, tasks []Task) error {
g, ctx := errgroup.WithContext(ctx) // ← errgroup ctx 继承外部 ctx
// 如果外部 ctx 被取消 → errgroup 的 ctx 也取消 → 所有 goroutine 收到信号
for _, task := range tasks {
task := task
g.Go(func() error {
return doWork(ctx, task) // ← 监听的是 errgroup 的 ctx
})
}
return g.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
# 7. SetLimit 限并发机制
# 7.1 内部实现:channel 信号量
SetLimit 是 Go 1.20 新增的——用 channel 做信号量控制同时运行的 goroutine 数量。回顾第 13 篇的 channel 信号量对比——SetLimit 用的就是最简洁的 channel 方案:
// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil // 取消限制
return
}
if len(g.sem) != 0 { // 已有运行的 goroutine → panic
panic("errgroup: modify limit while goroutines are still active")
}
g.sem = make(chan struct{}, n) // ← channel 信号量
}
2
3
4
5
6
7
8
9
10
11
Go 方法中的限流逻辑——在启动 goroutine 之前,先往 sem channel 发一个 struct{}:
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- struct{}{} // ← 阻塞:如果 n 个槽位已满,这里卡住
}
g.wg.Add(1)
go func() {
defer g.done()
// ...
}()
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem // ← 归还槽位——让给下一个等待的 Go 调用
}
g.wg.Done()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
和 semaphore.Weighted 的关键差异:
| 特性 | SetLimit channel 信号量 | semaphore.Weighted |
|---|---|---|
| 权重 | 固定为 1——每个 goroutine 占 1 个槽 | 可变——Acquire(ctx, n) 一次拿 n 个许可 |
| context 取消 | ❌ 不支持——sem <- {} 阻塞时无法取消 | ✅ Acquire(ctx, n) 支持 |
| 等待者可见性 | 不可见——无法查看"当前有多少个 Go 调用在排队" | 不可见——waiters 链表不导出 |
| 适用场景 | 简单并发度控制 | 加权资源控制 + 可取消等待 |
# 7.2 限流效果验证
package main
import (
"context"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
var g errgroup.Group
g.SetLimit(3) // 最多 3 个 goroutine 同时运行
for i := 0; i < 10; i++ {
i := i
g.Go(func() error {
fmt.Printf("[%d] 开始\n", i)
time.Sleep(200 * time.Millisecond) // 模拟工作
fmt.Printf("[%d] 结束\n", i)
return nil
})
}
g.Wait()
}
// 输出(前三行几乎同时打印,后续分批):
// [0] 开始
// [1] 开始
// [2] 开始
// -- 约 200ms --
// [0] 结束 [1] 结束 [2] 结束
// [3] 开始 [4] 开始 [5] 开始
// ...
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 7.3 与 semaphore.Weighted 的关联
疑惑:既然 errgroup 内部用 channel 做信号量,为什么不直接用 semaphore.Weighted 实现 SetLimit?
论证:
不需要加权——errgroup 的每个 goroutine 权重都是 1(SetLimit 限的是 goroutine 数量,不是资源消耗)。Weighted 的 Acquire(1) 和 channel 的
sem <- {}语义等价,但 channel 更轻(不引入额外的 sync.Mutex 和容器/list 依赖)。不需要 context 取消 sem 获取——
g.Go(f)不应该因为取不到 sem 而返回 error——Go 的签名是Go(func() error),没有返回 error。如果 sem 获取失败要报错,Go 的签名就需要改。channel 的sem <- {}会阻塞调用 Go 的 goroutine——这本身就是一个"反压"机制(调用方在提交任务时就被挡住)。保持包依赖最小——errgroup 只依赖标准库。如果引入 semaphore 包,会增加包之间的耦合。channel 是标准库内置的——零依赖。
结论:SetLimit 用 channel 信号量而不是 semaphore.Weighted 是一个"刚刚好"的设计选择——不引入不必要的抽象,不增加不必要的依赖。
# 8. TryGo 非阻塞提交
# 8.1 TryGo vs Go 语义差异
TryGo 是 Go 1.22 新增的——非阻塞版本的 Go。当 SetLimit 的并发槽位满了,TryGo 直接返回 false 而不是阻塞:
// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- struct{}{}: // 尝试获取槽位——非阻塞
// 拿到了 → 继续
default:
return false // 槽位满 → 立即返回 false
}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(err)
}
})
}
}()
return true
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Go 和 TryGo 的对比:
| Go | TryGo | |
|---|---|---|
| 无 SetLimit 时 | 直接启动 goroutine | 直接启动 goroutine |
| 有 SetLimit 且槽位有空 | 获取槽位 → 启动 | 获取槽位 → 启动 |
| 有 SetLimit 且槽位满 | 阻塞等待槽位 | 立即返回 false |
| 返回值 | 无(void) | bool(是否成功提交) |
| 调用方感知 | 无法感知"槽位满" | 可以感知 → 实现降级逻辑 |
# 8.2 背压降级模式
TryGo 的关键使用场景——背压感知的降级:
// 限流 10 个并发——满了就返回"服务繁忙"
func handleBatch(tasks []Task) error {
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(10)
for _, task := range tasks {
task := task
// TryGo:如果 10 个槽位全满——不阻塞,直接拒绝
if !g.TryGo(func() error {
return processTask(ctx, task)
}) {
// 背压——返回限流错误
return fmt.Errorf("系统繁忙,请稍后重试——并发已满")
}
}
return g.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
和 "Go + goroutine 中排队" 的差异:
Go 阻塞方案(生产者等待):
请求 → Go(任务) → 如果槽位满 → 阻塞 → 占用调用方 goroutine
→ 调用方 goroutine 无法处理其他请求
→ 发起方无感知——不知道是"排队中"还是"正在执行"
TryGo 非阻塞方案(背压反馈):
请求 → TryGo(任务) → 如果槽位满 → 返回 false
→ 调用方立刻知道"系统忙" → 可以做降级(返回 429 / 用缓存 / 拒绝)
→ 调用方 goroutine 不阻塞 → 可以继续处理其他请求
2
3
4
5
6
7
8
9
典型架构组合:
HTTP handler:
│
├── TryGo 成功 → goroutine 执行任务 → 响应 200
│
└── TryGo 失败 → 检查缓存 → 有缓存? → 返回缓存
└── 无缓存? → 返回 429 Too Many Requests
2
3
4
5
6
# 9. WaitGroup 对比与实战
# 9.1 四种对比场景
用一个综合表格说明 errgroup 和 WaitGroup 在四种典型并行场景中的差异:
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 等所有任务完成(不关心错误) | sync.WaitGroup | 最轻量的计数等待器——零依赖 |
| 等所有任务完成(需要收集全部错误) | WaitGroup + errCh 或 MultiErrorGroup | errgroup 只保留第一个错误——如果需要全部错误,需要自己收集 |
| 有一个任务失败就提前退出 | errgroup.WithContext | 一人出错 → ctx 取消 → 其他人收到信号退出 → Wait 返回第一个错误 |
| 限并发 + 快速失败 | errgroup.WithContext + SetLimit(n) | WaitGroup 本身不限并发——errgroup 的 SetLimit 直接解决 |
# 9.2 多数据源并行聚合
回到第 1 章的聚合服务——用 errgroup 重写:
func aggregateProductV2(ctx context.Context, productID string) (*ProductDetail, error) {
g, ctx := errgroup.WithContext(ctx)
detail := &ProductDetail{ProductID: productID}
g.Go(func() error {
inv, err := fetchInventory(ctx, productID)
if err != nil {
return fmt.Errorf("库存: %w", err)
}
detail.Inventory = inv
return nil
})
g.Go(func() error {
price, err := fetchPrice(ctx, productID)
if err != nil {
return fmt.Errorf("价格: %w", err)
}
detail.Price = price
return nil
})
g.Go(func() error {
recs, err := fetchRecommendations(ctx, productID)
if err != nil {
return fmt.Errorf("推荐: %w", err)
}
detail.Recs = recs
return nil
})
// Wait 阻塞——直到所有 goroutine 结束(或第一个 error 触发 ctx 取消)
if err := g.Wait(); err != nil {
return nil, err
}
return detail, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
改进点对照:
| 原 WaitGroup 方案的问题 | errgroup 方案的改进 |
|---|---|
| 推荐服务卡 5 秒→主 goroutine 死等 | ctx 取消→其他 goroutine 的 ctx.Done 触发→安全返回 |
| errCh 容量硬编码为 3 | 不需要 errCh——errorOnce 自动处理 |
| wg.Add/wg.Done 手动维护 | Go 封装了 Add/Done |
| 拿到数据后无法提前返回 | 所有 goroutine 必须检查 ctx.Done |
# 9.3 批量 API 调用模式
批量调用 N 个 API——限并发 5 个、一人失败全员退出:
func batchCallAPIs(ctx context.Context, urls []string) ([]*APIResponse, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(5) // 最多 5 个并发 HTTP 请求
results := make([]*APIResponse, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err // ← ctx 取消时这里会返回 context.Canceled
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results[i] = &APIResponse{URL: url, Body: body, Status: resp.StatusCode}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
SetLimit(5) 的作用——即使传入 100 个 URL,同时只有 5 个 HTTP 请求在进行。其他 g.Go(f) 调用阻塞在 g.sem <- {} 上——这正是第 13 篇讨论的 channel 信号量模式。
# 10. 综合案例串讲
# 10.1 案例真相揭晓
回到第 1 章商品聚合服务的七个疑问,逐条作答:
| 疑问 | 答案 |
|---|---|
| ① errgroup 怎么让「先出错」触发全局取消? | 第 3-4 章:errorOnce.Do → 记录 err → cancel(err) → ctx.Done() → 其他 goroutine 收到信号 |
| ② Wait 为什么只返回第一个错误? | 第 5.2:取消传播后其他 goroutine 返回 ctx.Err()——收集它们是"症状"不是"根因"——简化为只保留第一个 |
| ③ WithContext 的 context 取消何时触发? | 第 6 章:第一个 goroutine 返回 error 时,或 Wait 返回时(保证至少 cancel 一次) |
| ④ 为什么 goroutine 必须检查 ctx.Done()? | 第 6.2:Go 的取消是协作式——不检查 ctx.Done() 的 goroutine 不会自动停止 |
| ⑤ SetLimit 内部用什么实现? | 第 7 章:make(chan struct{}, n)——最简洁的 channel 信号量 |
| ⑥ TryGo 和 Go 的语义差异? | 第 8 章:TryGo 在槽位满时立即返回 false(非阻塞),Go 会阻塞等待 |
| ⑦ errgroup vs WaitGroup 怎么选? | 第 9 章:等全部结束用 WaitGroup,需要"一人出错全员退出"用 errgroup |
案例完整根因链条:
推荐服务卡 5 秒
→ fetchRecommendations 的 ctx 没有超时 → ctx.Done() 不触发
→ wg.Wait() 死等 5 秒 → 主 goroutine 挂起
→ 库存和价格数据早就拿到了 → 但无法提前返回
→ 高峰期每秒 500 请求 × 每请求最多 5 秒 = 2500 个阻塞 goroutine
→ 每个持有一个 TCP 连接 → 40MB 额外内存 + 下游连接池耗尽
→ 上游网关已经超时断连 → 但这些 goroutine 还在跑 → 纯浪费
2
3
4
5
6
7
修复方案:
// ✅ errgroup.WithContext + 上游 context 带 timeout
func handleProduct(w http.ResponseWriter, r *http.Request) {
// 外层 context 带 300ms 总超时
ctx, cancel := context.WithTimeout(r.Context(), 300*time.Millisecond)
defer cancel()
detail, err := aggregateProductV2(ctx, r.URL.Query().Get("product_id"))
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
http.Error(w, "聚合超时", http.StatusGatewayTimeout)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(detail)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
三层防护:
- 上游 ctx 超时(300ms)→ errgroup 的 ctx 也超时 → 所有 goroutine 退出
- 任何一个下游 goroutine 返回 error → WithContext 取消 → 其他 goroutine 退出
fetchRecommendations内部检查ctx.Done()——不再傻等 5 秒
# 10.2 一次 errgroup 任务组的完整旅程
g, ctx := errgroup.WithContext(ctx) // ← 外部 ctx 带 300ms 超时
g.Go(fetchInventory) // ← 启动 goroutine 1
g.Go(fetchPrice) // ← 启动 goroutine 2
g.Go(fetchRecommendations) // ← 启动 goroutine 3
err := g.Wait() // ← 主 goroutine 阻塞等待
─────────────────────────────────────────────────────────
│
├─ goroutine 1 (fetchInventory):
│ http.NewRequestWithContext(ctx, ...)
│ → ctx.Done() 未触发 → 正常执行
│ → 50ms 后返回 → detail.Inventory = {...}
│ → return nil → wg.Done()
│
├─ goroutine 2 (fetchPrice):
│ http.NewRequestWithContext(ctx, ...)
│ → 60ms 后返回 → detail.Price = {...}
│ → return nil → wg.Done()
│
└─ goroutine 3 (fetchRecommendations):
http.NewRequestWithContext(ctx, ...)
→ 下游延迟 5 秒——但 ctx 在 300ms 时超时了
→ http 客户端检测到 ctx.Done() → 中断 TCP 连接
→ 返回 context.DeadlineExceeded
→ errorOnce.Do: g.err = DeadlineExceeded
→ g.cancel(DeadlineExceeded) → ctx 取消
→ 但 goroutine 1、2 已经结束了——wip
→ wg.Done()
g.wg.Wait() 返回(三个 goroutine 都 Done)
g.cancel(g.err) — 幂等,无影响
return DeadlineExceeded
结果: 库存和价格数据拿到了——但主 goroutine 返回 error 给调用方
→ 调用方可以选择:降级返回部分数据 / 返回超时错误 / 重试
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
没有获取的数据怎么处理:detail.Recs 为 nil——但如果业务允许"部分成功返回"(只有库存和价格没有推荐也可以展示页面),可以在调用方做判断:
if err != nil && errors.Is(err, context.DeadlineExceeded) {
// 超时了——但 detail 里的 Inventory 和 Price 可能已经有值
if detail.Inventory != nil && detail.Price != nil {
detail.Recs = fallbackRecs(productID) // 用兜底推荐
return detail, nil // ← 部分成功
}
return nil, err
}
2
3
4
5
6
7
8
# 10.3 设计哲学回扣
哲学 1:取消信号的"推"模式——替代线程间 kill
Java 有 Thread.interrupt() 和 Thread.stop()——前者是协作式的(需要线程检查 isInterrupted),后者是强制的(已废弃)。Go 没有"强制杀死 goroutine"的 API——errgroup 的 WithContext 是这种哲学的完美体现:用 context 通道"推"送取消信号,每个 goroutine 自己"拉"取信号并安全退出。推拉结合 = 没有竞态、没有资源泄漏。
哲学 2:sync.Once 捕获第一个错误——把"多写一读"简化到极致
errgroup 没有用 sync.Mutex 保护 error 字段——只用了一个 sync.Once。任何 goroutine 都可能写入 error——但 Once 保证只有第一次写入生效。后续写入被静默丢弃——这是一种 "数据不关键,放弃也无妨" 的设计取舍。它放弃了"收集全部错误"的完整性,换来了零锁竞争的简洁实现。
哲学 3:channel 信号量做 SetLimit——复用现有原语,不引入新概念
SetLimit 的实现就是 make(chan struct{}, n)——和程序员手动写 sem <- struct{}{} / <-sem 一模一样。errgroup 没有发明新的限流机制——它只是把这个 pattern 标准化、内置化。这是 Go 标准库/扩展库的设计原则:用组合代替继承,用现有 primitives 搭建新 abstractions。
哲学 4:Wait 返回后 cancel——让 ctx 的生命周期和任务组精确绑定
即使没有任何 goroutine 返回 error,Wait 返回后也会调用 cancel()。这保证了 ctx 不会在任务组结束后还被使用——避免"僵尸 ctx"导致后续操作拿到过期信号。这是一个防御性设计——明确 ctx 的边界。"出了这个 errgroup,ctx 就失效"。
# 10.4 速查表
errgroup 决策矩阵:
| 需求 | 推荐的 errgroup 配置 |
|---|---|
| 等一组任务全部结束,不关心错误 | var g errgroup.Group(零值) |
| 任何任务出错立即停止 | errgroup.WithContext(ctx) |
| 限并发执行 | g.SetLimit(n)(Go 1.20+) |
| 非阻塞提交(槽位满时拒绝) | g.TryGo(fn)(Go 1.22+) |
| 收集所有错误(非标准) | 自己实现 MultiErrorGroup |
errgroup vs 替代方案:
| 场景 | errgroup | WaitGroup | semaphore.Weighted |
|---|---|---|---|
| 等待多个 goroutine | ✅ | ✅ | ❌ |
| 返回第一个错误 | ✅ | ❌(需手动) | ❌ |
| 自动取消其他 goroutine | ✅(WithContext) | ❌ | ❌(需手动) |
| 限并发 | ✅(SetLimit) | ❌ | ✅ |
| 加权资源控制 | ❌ | ❌ | ✅ |
| 可取消的等待 | ❌(Go 阻塞) | ❌ | ✅(Acquire(ctx, n)) |
典型错误模式:
// ❌ fn 不检查 ctx.Done() ——取消信号无效
g.Go(func() error {
time.Sleep(10 * time.Second) // ← ctx 取消后继续睡
return nil
})
// ❌ Wait 返回后使用 ctx —— ctx 已 Done
data, _ := doMore(ctx)
// ❌ 循环变量捕获(Go 1.21 及之前)
for _, task := range tasks {
g.Go(func() error { return process(task) }) // ← data race
}
// ❌ fn 内部创建新 context ——切断取消链
g.Go(func() error {
return callAPI(context.Background(), url)
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
诊断命令:
# 查看阻塞在 errgroup.Go 或 errgroup.Wait 的 goroutine
curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep "errgroup"
# goroutine profile——看是否 goroutine 泄漏
go tool pprof http://localhost:6060/debug/pprof/goroutine
# data race 检测(闭包捕获循环变量、共享数据写入)
go test -race ./...
# 逃逸分析——确认闭包变量不会意外逃逸到堆
go build -gcflags="-m" .
2
3
4
5
6
7
8
9
10
11
Go 版本特性一览:
| 特性 | 引入版本 |
|---|---|
errgroup.Group(基础版) | Go 1.x(golang.org/x/sync) |
WithContext | Go 1.x |
SetLimit | Go 1.20 |
TryGo | Go 1.22 |
context.WithCancelCause | Go 1.20 |
下一篇:我们已经掌握了 errgroup 的并行任务协调和 context 取消传播,下一步进入 15.协程泄漏排查与修复——把 goroutine 泄漏的 5 种典型场景、pprof 定位方法、leaktest 自动检测剖开。