编程进阶网 编程进阶网
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接

杨充

专注编程 · 终身学习者
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接
  • README
  • C语言入门精通

  • Cpp入门到精通

  • Java入门精通

  • Go入门到精通

    • 入门教程

    • 综合案例

    • 专栏博客

      • Go 专栏博客
      • 内存模型与栈堆布局
      • 指针与逃逸分析
      • 结构体内存布局对齐
      • 字符串与切片底层
      • 接口与类型系统
      • map哈希表底层实现
      • 零值初始化设计哲学
      • GMP协程调度器机制
      • 通道channel源码剖析
      • sync同步原语剖析
      • map并发安全与哈希
      • Go内存模型一致性
      • 加权信号量与限流
      • errgroup并行控制
        • 1. 案例引入
          • 1.1 一段崩在哪
          • 1.2 顺藤摸到根因
          • 1.3 我们要回答什么
        • 2. 架构概览
          • 2.1 errgroup 全景图
          • 2.2 为什么 WaitGroup 不够用
        • 3. Group 核心数据结构
          • 3.1 字段逐层解读
          • 3.2 errorOnce 错误捕获
          • 3.3 并发安全保证
        • 4. Go 任务提交与错误收集
          • 4.1 Go 方法实现
          • 4.2 闭包延迟执行的陷阱
          • 4.3 错误的传播路径
        • 5. Wait 错误聚合机制
          • 5.1 Wait 内部流程
          • 5.2 只返回第一个错误的设计考量
          • 5.3 取消信号的生命周期
        • 6. WithContext 取消传播
          • 6.1 派生 context 的内部实现
          • 6.2 取消级联的正确用法
          • 6.3 常见的 context 误用
        • 7. SetLimit 限并发机制
          • 7.1 内部实现:channel 信号量
          • 7.2 限流效果验证
          • 7.3 与 semaphore.Weighted 的关联
        • 8. TryGo 非阻塞提交
          • 8.1 TryGo vs Go 语义差异
          • 8.2 背压降级模式
        • 9. WaitGroup 对比与实战
          • 9.1 四种对比场景
          • 9.2 多数据源并行聚合
          • 9.3 批量 API 调用模式
        • 10. 综合案例串讲
          • 10.1 案例真相揭晓
          • 10.2 一次 errgroup 任务组的完整旅程
          • 10.3 设计哲学回扣
          • 10.4 速查表
      • 协程泄漏排查与修复
      • 并发设计模式详解
      • GC三色标记与屏障
      • 内存分配器深挖
      • defer延迟执行机制
      • 定时器四叉堆实现
      • 抢占式调度器原理
      • 协程栈扩容与缩容
      • 上下文取消与传播
      • 泛型与类型约束
      • 反射机制与unsafe
      • 迭代器与rangefunc
      • 错误处理与panic
      • 网络轮询器netpoller
      • HTTP服务端源码分析
      • JSON序列化与编解码
      • 数据库SQL连接池
      • 文件IO与零拷贝
      • 结构化日志与配置
      • 单元测试与基准
      • cgo与系统调用切换
      • 编译链接与PGO优化
      • 写作模板
    • 开发技巧

  • JavaScript入门

  • CodeX
  • Go入门到精通
  • 专栏博客
杨充
2026-06-12
目录

errgroup并行控制

# 14.errgroup并行控制

卷三第十四篇——sync.WaitGroup 能等所有 goroutine 结束,但它不能传回第一个错误;semaphore.Weighted 能控制并发度,但它不负责错误收集。golang.org/x/sync/errgroup 把两者缝合成一个优雅的原语:一堆 goroutine 并行干活,只要有一个出错——context 立即取消,其他 goroutine 收到信号后安全退出,Wait 返回第一个错误。读完本篇,你能解释:为什么 errgroup 的 context 取消不会漏掉 goroutine?SetLimit 内部是用什么实现的?TryGo 和 Go 的语义差一个"阻塞"字但在哪里?关键词:errgroup、WithContext、errorOnce、SetLimit 限并发、TryGo 非阻塞、WaitGroup 对比。

# 目录介绍

  • 1. 案例引入
    • 1.1 一段崩在哪
    • 1.2 顺藤摸到根因
    • 1.3 我们要回答什么
  • 2. 架构概览
    • 2.1 errgroup 全景图
    • 2.2 为什么 WaitGroup 不够用
  • 3. Group 核心数据结构
    • 3.1 字段逐层解读
    • 3.2 errorOnce 错误捕获
    • 3.3 并发安全保证
  • 4. Go 任务提交与错误收集
    • 4.1 Go 方法实现
    • 4.2 闭包延迟执行的陷阱
    • 4.3 错误的传播路径
  • 5. Wait 错误聚合机制
    • 5.1 Wait 内部流程
    • 5.2 只返回第一个错误的设计考量
    • 5.3 取消信号的生命周期
  • 6. WithContext 取消传播
    • 6.1 派生 context 的内部实现
    • 6.2 取消级联的正确用法
    • 6.3 常见的 context 误用
  • 7. SetLimit 限并发机制
    • 7.1 内部实现:channel 信号量
    • 7.2 限流效果验证
    • 7.3 与 semaphore.Weighted 的关联
  • 8. TryGo 非阻塞提交
    • 8.1 TryGo vs Go 语义差异
    • 8.2 背压降级模式
  • 9. WaitGroup 对比与实战
    • 9.1 四种对比场景
    • 9.2 多数据源并行聚合
    • 9.3 批量 API 调用模式
  • 10. 综合案例串讲
    • 10.1 案例真相揭晓
    • 10.2 一次 errgroup 任务组的完整旅程
    • 10.3 设计哲学回扣
    • 10.4 速查表

# 1. 案例引入

# 1.1 一段崩在哪

看一个商品详情聚合服务——它需要在 300ms 内返回聚合结果,后端同时调用库存、价格、推荐三个下游 API。生产环境跑了半年,某天下游推荐服务因为数据库主从切换出现 5 秒的响应延迟,整个聚合服务就崩了:

// product_aggregator.go —— 商品详情聚合
package main

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"
)

type ProductDetail struct {
    ProductID string
    Inventory *InventoryData
    Price     *PriceData
    Recs      *RecommendData
}

var errCh = make(chan error, 3) // 缓冲 3 个——收集错误

func fetchInventory(ctx context.Context, productID string) (*InventoryData, error) {
    // 模拟:正常响应 50ms
    time.Sleep(50 * time.Millisecond)
    return &InventoryData{Stock: 100}, nil
}

func fetchPrice(ctx context.Context, productID string) (*PriceData, error) {
    time.Sleep(60 * time.Millisecond)
    return &PriceData{CurrentPrice: 2999}, nil
}

func fetchRecommendations(ctx context.Context, productID string) (*RecommendData, error) {
    // 模拟:下游服务卡死——5 秒无响应
    select {
    case <-time.After(5 * time.Second):
        return &RecommendData{Items: nil}, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func aggregateProduct(ctx context.Context, productID string) (*ProductDetail, error) {
    var wg sync.WaitGroup
    detail := &ProductDetail{ProductID: productID}

    // 并行启动 3 个 goroutine
    wg.Add(3)
    go func() {
        defer wg.Done()
        inv, err := fetchInventory(ctx, productID)
        if err != nil {
            errCh <- fmt.Errorf("库存接口: %w", err)
            return
        }
        detail.Inventory = inv
    }()

    go func() {
        defer wg.Done()
        price, err := fetchPrice(ctx, productID)
        if err != nil {
            errCh <- fmt.Errorf("价格接口: %w", err)
            return
        }
        detail.Price = price
    }()

    go func() {
        defer wg.Done()
        recs, err := fetchRecommendations(ctx, productID)
        if err != nil {
            errCh <- fmt.Errorf("推荐接口: %w", err)
            return
        }
        detail.Recs = recs
    }()

    // 等待所有 goroutine 结束
    wg.Wait()
    close(errCh)

    // 检查错误
    var errs []error
    for e := range errCh {
        errs = append(errs, e)
    }
    if len(errs) > 0 {
        return nil, errors.Join(errs...) // Go 1.20+
    }
    return detail, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

现象:

  • 平时三个下游 API 都在 100ms 内返回,聚合接口 P99 约 120ms
  • 某天推荐服务因主从切换卡了 5 秒——但库存和价格在 50ms 就返回了
  • fetchRecommendations 的 ctx.Done() 永远不会触发——因为传入的 context 没有超时
  • wg.Wait() 死等 5 秒——主 goroutine 挂在 Wait 上,无法提前返回
  • 这 5 秒内,上游网关已经超时(300ms)断连——但聚合服务的 goroutine 还在跑
  • 更糟糕的是:库存和价格的数据早就拿到了,但无法提前返回给调用方——因为 WaitGroup 要求所有 goroutine 都 Done

goroutine 泄漏的连锁反应:高峰期每秒钟 500 个请求——每个请求最多卡 5 秒——同时有 2500 个 goroutine 挂在 fetchRecommendations 上。每个 goroutine 持有一个 HTTP 连接(TCP socket + buffer ~16KB)→ 40MB 额外内存,且上游的 context 已经取消但这些 goroutine 对下游的连接还没断——下游推荐服务进一步被打垮。

核心矛盾:WaitGroup 的语义是"等所有任务完成"——它不在乎"某些任务已经没意义了"。当一个 goroutine 出错或超时后,其他 goroutine 继续执行是在浪费资源——而且调用方拿不到任何结果。

# 1.2 顺藤摸到根因

追查过程:

第一步:看 pprof goroutine——出问题时 goroutine 数从平时的 200 飙到 5000+:

$ curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep -c "fetchRecommendations"
# 2478 个 goroutine 全部卡在 select 上——time.After(5s) 还没到
1
2

第二步:看这些 goroutine 对应的请求——通过 tracing 发现 80% 的请求上游已经超时(context deadline exceeded),但 goroutine 仍然在等待下游响应——因为 WaitGroup 没有取消机制。

第三步:search 代码中的 WaitGroup 使用模式——发现所有并行 RPC 调用都是 WaitGroup + errCh 模式——没有一个能提前退出。

这个事故藏着 7 个原理点:

① errgroup 怎么让「先出错」的 goroutine 触发全局取消?              → 第 3-4 章
② Wait 为什么只返回第一个错误——而不是收集全部错误?                → 第 5.2
③ WithContext 的 context 取消是什么时候触发的?                     → 第 6 章
④ 为什么「其他 goroutine 也必须检查 ctx.Done()」——否则取消无效?   → 第 6.2
⑤ SetLimit 的内部是用什么实现限并发的?                             → 第 7 章
⑥ TryGo 和 Go 的语义差异——一个"阻塞"字藏在哪?                      → 第 8 章
⑦ errgroup vs WaitGroup 在四种典型场景下怎么选?                    → 第 9 章
1
2
3
4
5
6
7

# 1.3 我们要回答什么

这个聚合服务案例贯穿全篇。我们从 errgroup 的数据结构出发,深入 Go/Wait 的协作流程、errorOnce 的错误捕获策略、WithContext 的取消级联——再对比 WaitGroup 在并行场景下的不足——最后给出多数据源聚合和批量 API 调用的最佳实践。

本篇路线:

架构总图 (第 2 章) ── errgroup 三组件协作 + WaitGroup 为什么不够
   ↓
数据结构 (第 3 章) ── Group 字段 + errorOnce 原子捕获
   ↓
Go + Wait (第 4-5 章) ── 任务提交 / 错误收集 / 只返回第一个错误
   ↓
WithContext (第 6 章) ── context 取消传播的内部实现
   ↓
SetLimit (第 7 章) ── channel 信号量限并发 + 与 Weighted 对比
   ↓
TryGo (第 8 章) ── 非阻塞提交 + 背压降级
   ↓
WaitGroup 对比 (第 9 章) ── 四种场景横向评测 + 实战模式
   ↓
综合案例 (第 10 章) ── 修复聚合服务 + 设计哲学
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

📌 本篇定位:第 13 篇的 semaphore.Weighted 解决的是"单个 goroutine 如何可取消地获取许可",本篇的 errgroup 解决的是"一组 goroutine 如何协同退出"——前者是点对点限流,后者是组内级联控制。理解了 errgroup 的 context 传播,就能回答:"为什么 Go 的并发模型不需要 Java 的 Thread.interrupt()?"

# 2. 架构概览

# 2.1 errgroup 全景图

errgroup 只做三件事——把三个组件缝合在一起:

                      errgroup.Group
                           │
        ┌──────────────────┼──────────────────┐
        │                  │                  │
        ▼                  ▼                  ▼
  sync.WaitGroup      sync.Once        context.Context
  (等所有任务结束)    (记录第一个错误)   (WithContext 版)
        │                  │                  │
        │                  │                  │
   Go(fn) 包装为         第一个非 nil 的    Go 返回 error →
   wg.Add(1)             error 被捕获       ctx 自动取消
        │                  │                  │
   Wait() 调用          后续 error 被      其他 G 收到
   wg.Wait()            丢弃               ctx.Done() →
                                          安全退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

数据流图:

主 goroutine 创建 errgroup.WithContext(ctx)
        │
        ├── 派生 context (cancelCtx)
        │
        ├── Go(func1) ──→ goroutine 1: wg.Add(1)
        │                       │
        ├── Go(func2) ──→ goroutine 2: wg.Add(1)
        │                       │
        └── Go(func3) ──→ goroutine 3: wg.Add(1)
                                │
                    ┌───────────┴───────────┐
                    │                       │
              goroutine 2 返回 error        goroutine 1 收到 ctx.Done()
                    │                       │
              errorOnce.Do(store err)       return ctx.Err()
              context cancel() ◄──────────── 安全退出
                    │
        goroutine 3 收到 ctx.Done()
              return ctx.Err()
                    │
        wg.Wait() 返回 (三个 goroutine 都 wg.Done)
                    │
              Wait() 返回第一个 error
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 2.2 为什么 WaitGroup 不够用

疑惑:WaitGroup 加一个 error channel 不就能实现错误收集吗?为什么需要 errgroup?

论证:

  1. 没有取消传播——WaitGroup 只能"等",不能"推"。goroutine A 返回 error,但 goroutine B、C 完全不知道——它们继续执行直到自然结束。errgroup 的 WithContext 在第一个 error 返回时自动 cancel context——其他 goroutine 通过 ctx.Done() 直接感知。

  2. 错误收集需要手动管理 channel——WaitGroup + errCh 模式下,channel 的关闭时机、容量、消费方式都需要手动维护。第 1 章的 errCh 用了缓冲 3 的 channel——但如果 goroutine 数量不固定(动态启动),就不知道该怎么设定 channel 容量。errgroup 用 sync.Once 捕获第一个 error——不需要 channel。

  3. WaitGroup 的 Add/Done 匹配必须精确——在动态任务数量的场景(如遍历一个不确定长度的列表),Add 和 Done 的配对容易出错。errgroup 的 Go(fn) 封装了 Add/Done,调用方不需要手动维护计数。

  4. 没有内建的并发度控制——WaitGroup 本身不限制同时运行的 goroutine 数量——如果要"1000 个任务但只并发 10 个",需要额外组合 semaphore。errgroup 的 SetLimit(Go 1.20+)直接提供了这个能力。

结论:WaitGroup 是"计数等待器"——只管任务数量,不管任务状态。errgroup 是"并行任务协调器"——加上错误传播、取消级联、并发度控制三层语义。两者的关系不是替代,是进化:简单的等全部结束用 WaitGroup,需要"一人出错全员退出"用 errgroup。

# 3. Group 核心数据结构

# 3.1 字段逐层解读

errgroup 的 Group 结构体只有 5 个字段——但恰好覆盖了并行任务协调的所有需求:

// golang.org/x/sync/errgroup/errgroup.go (简化)
type Group struct {
    cancel  func(error)     // context.WithCancelCause 的取消函数
    wg      sync.WaitGroup  // 等待所有 goroutine 结束
    sem     chan struct{}   // SetLimit 后的信号量 channel
    errOnce sync.Once       // 保证只记录第一个错误
    err     error           // 第一个非 nil 的 error
}
1
2
3
4
5
6
7
8

每个字段的设计意图:

字段 类型 零值 Group 时的状态 设计意图
cancel func(error) nil WithContext 版才有值——零值 Group 不支持取消传播
wg sync.WaitGroup 可用 追踪未完成的 goroutine 数量
sem chan struct{} nil SetLimit(n) 后初始化为 make(chan struct{}, n)——当信号量用
errOnce sync.Once 可用 只记录第一个错误——后续错误被丢弃
err error nil 第一个非 nil 的 error 的值

零值 Group 可用——不需要构造函数,var g errgroup.Group 直接能用(但没有 context 取消功能)。

// 零值 Group:有 WaitGroup + errorOnce——但没有 cancel
var g errgroup.Group

g.Go(func() error { return doWork() })
g.Go(func() error { return doOther() })
if err := g.Wait(); err != nil {
    // 第一个非 nil 的 error
}
1
2
3
4
5
6
7
8

WithContext 版:在零值 Group 的基础上注入了 context 取消能力:

// golang.org/x/sync/errgroup/errgroup.go (简化)
func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancelCause(ctx)
    return &Group{cancel: cancel}, ctx
}
1
2
3
4
5

# 3.2 errorOnce 错误捕获

sync.Once 保证了有且仅有第一个错误被记录:

// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) Go(f func() error) {
    // ...
    go func() {
        defer g.done()
        if err := f(); err != nil {
            g.errOnce.Do(func() {  // ← 只有第一次执行
                g.err = err         // 记录第一个错误
                if g.cancel != nil {
                    g.cancel(err)   // 触发 context 取消
                }
            })
        }
    }()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

sync.Once 的原子保证——即使多个 goroutine 同时返回 error,g.errOnce.Do 只执行一次。第一个到达的 goroutine 写入 g.err 并调用 g.cancel;后续到达的 goroutine 在 sync.Once 层面被跳过——它们的 error 被丢弃。

goroutine A 返回 err: "库存超时"     ──→ errOnce.Do ✓ → g.err = "库存超时" → cancel()
goroutine B 返回 err: "价格拒绝"     ──→ errOnce.Do ✗ (已执行) → err 被丢弃
goroutine C 返回 nil                 ──→ 不进入 if 分支
1
2
3

# 3.3 并发安全保证

Group 的并发模型极其简洁——不暴露锁给外部:

并发安全保证层级:

1. WaitGroup (Add/Done/Wait) — sync 包保证原子性
2. sync.Once (Do)             — 保证 error 只写一次
3. err 字段                   — 只被 sync.Once.Do 写,只被 Wait 读 → happens-before 保证
4. sem channel                — Go 的 channel 自带并发安全

没有暴露的锁 → 调用方不会死锁
1
2
3
4
5
6
7
8

goroutine 视角:当 g.Go(fn) 启动一个 goroutine 时——

  • G 从 _Gidle → _Grunnable → _Grunning(被 P 调度执行)
  • 执行 fn 期间 G 正常运行
  • 如果 fn 返回 error → sync.Once.Do → 可能调用 context.CancelFunc → 触发其他等待 ctx.Done() 的 G 变为 _Grunnable
  • fn 返回后 → g.wg.Done() → 可能唤醒正在 g.wg.Wait() 的 G

# 4. Go 任务提交与错误收集

# 4.1 Go 方法实现

Go 方法实现了 WaitGroup 的 Add(1) + goroutine 启动 + error 收集三合一:

// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) Go(f func() error) {
    // SetLimit 后的限流:如果 sem 不为 nil → 获取许可
    if g.sem != nil {
        g.sem <- struct{}{}    // ← 阻塞等待并发槽位
    }

    g.wg.Add(1)
    go func() {
        defer g.done()          // ← 释放 sem 许可 + wg.Done
        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel(err)
                }
            })
        }
    }()
}

func (g *Group) done() {
    if g.sem != nil {
        <-g.sem   // 归还并发槽位
    }
    g.wg.Done()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

Go 方法的行为总结:

条件 Go(fn) 的行为
无 SetLimit wg.Add(1) → go fn() ——立即返回,不阻塞
有 SetLimit(n) sem <- {}(如果 n 个槽位满了则阻塞)→ wg.Add(1) → go fn()
fn 返回 nil wg.Done() → 不触发 errorOnce
fn 返回 error errorOnce.Do(记录错误 + cancel) → wg.Done()

# 4.2 闭包延迟执行的陷阱

陷阱 1:循环变量捕获(Go 1.22 之前):

// ❌ Go 1.21 及之前:所有 goroutine 共享同一个 task 变量
var g errgroup.Group
for _, task := range tasks {
    g.Go(func() error {
        return process(task) // ← task 是循环变量——数据竞争
    })
}

// ✅ 修复:参数传递或局部变量复制
for _, task := range tasks {
    task := task   // Go 1.21 需要这一行
    g.Go(func() error {
        return process(task)
    })
}
// Go 1.22+ 循环变量语义变化后不再需要 task := task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

陷阱 2:defer 中的 error 不会被收集:

// ❌ fn 内的 defer 返回的 error 不会被 errgroup 收集
g.Go(func() error {
    f, _ := os.Open(file)
    defer f.Close() // ← f.Close() 返回的 error 被丢弃
    return process(f)
})

// ✅ 显式处理——defer 中收集 close error
g.Go(func() error {
    f, err := os.Open(file)
    if err != nil {
        return err
    }
    defer func() {
        if cerr := f.Close(); cerr != nil && err == nil {
            err = cerr // 保持对 err 的赋值——但 fn 已经返回了
        }
    }()
    return process(f)
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

更现实的场景——HTTP Body 没 Close:

// ❌ 如果 process 返回 error,resp.Body 不会被 Close
g.Go(func() error {
    resp, _ := http.Get(url)
    return process(resp) // ← 没有 defer resp.Body.Close()
})

// ✅ 不管是否 error 都要 Close
g.Go(func() error {
    resp, err := http.Get(url)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    return process(resp)
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 4.3 错误的传播路径

                           goroutine 1
                             fn() 返回 error
                                  │
                                  ▼
                       ┌─────────────────┐
                       │ g.errOnce.Do()  │ ← sync.Once 保证第一次执行
                       │   g.err = err   │
                       │   g.cancel(err) │ ← context.CancelFunc
                       └────────┬────────┘
                                │
                    ┌───────────┴───────────┐
                    │                       │
                    ▼                       ▼
          goroutine 2                 goroutine 3
      (仍在执行 fn 中)             (fn 中检查 ctx.Done())
          fn 结束后                   │
          errOnce.Do 被跳过         收到 ctx.Done() →
          (已执行过)                 return ctx.Err()
                                    │
                                    ▼
                               errOnce.Do 被跳过
                               (ctx.Err() 不被记录)
                                        │
                    ┌───────────────────┘
                    │
                    ▼
             wg.Wait() 返回
             Wait() 返回 g.err (第一个 error)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 5. Wait 错误聚合机制

# 5.1 Wait 内部流程

Wait 的实现只有一行有效代码——等待所有 goroutine 结束,然后触发一次 cancel:

// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) Wait() error {
    g.wg.Wait()                // ← 阻塞,直到所有 goroutine 都 Done
    if g.cancel != nil {
        g.cancel(g.err)        // ← 确保 cancel 被调用至少一次
    }
    return g.err
}
1
2
3
4
5
6
7
8

为什么 Wait 结束后还要调一次 cancel——这覆盖了一个边界情况:所有 goroutine 都正常结束(无 error),但使用者需要 ctx 也被 cancel——因为 errgroup 的 ctx 生命周期是"任务组的生命周期"。任务组结束了,ctx 就应该 Done。

所有 goroutine 都返回 nil:
  → wg.Wait() 返回
  → g.cancel(g.err)  // g.err 为 nil → cancel(nil)
  → return nil        // 使用者收到 nil error

第一个 goroutine 返回 error:
  → errorOnce.Do → g.err = err → g.cancel(err)  ← 第一次 cancel
  → 其他 goroutine 收到 ctx.Done() → 安全退出
  → wg.Wait() 返回
  → g.cancel(g.err)  // g.err 不为 nil → cancel(err) ← 第二次 cancel(幂等)
  → return err
1
2
3
4
5
6
7
8
9
10
11

cancel 的两次调用都是安全的——context.CancelFunc 是幂等的,多次调用效果相同。

# 5.2 只返回第一个错误的设计考量

疑惑:为什么不收集所有 error——像 errors.Join 那样?

论证:

  1. 第一个错误最可能是"根因"——在 errgroup 的 WithContext 模式下,第一个 error 触发 context 取消 → 其他 goroutine 收到 ctx.Done() → 它们返回 ctx.Err()(即 context.Canceled)。如果收集所有 error,会得到一堆 context.Canceled——这些是"症状"不是"根因"。

  2. 收集所有 error 需要更复杂的数据结构——需要一个线程安全的 error 列表(如 []error + sync.Mutex)。sync.Once + 单个 err 字段是最简设计——对绝大多数使用场景"只关心第一个失败原因"来说足够了。

  3. 如果真的需要收集所有 error——自己组合:

// 如果需要收集所有 error(非标准 errgroup 行为)
type MultiErrorGroup struct {
    wg   sync.WaitGroup
    mu   sync.Mutex
    errs []error
}

func (g *MultiErrorGroup) Go(f func() error) {
    g.wg.Add(1)
    go func() {
        defer g.wg.Done()
        if err := f(); err != nil {
            g.mu.Lock()
            g.errs = append(g.errs, err)
            g.mu.Unlock()
        }
    }()
}

func (g *MultiErrorGroup) Wait() error {
    g.wg.Wait()
    return errors.Join(g.errs...)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

结论:errgroup 的 "只返回第一个错误" 不是设计缺陷——是与 WithContext 取消传播配合的必然结果。标准 errgroup 关注的是"快速失败 + 取消级联"——如果需要"收集全部 error + 继续执行",需要不同的协调原语。

# 5.3 取消信号的生命周期

ctx 的生命周期严格绑定在 errgroup 任务组上:

WithContext(ctx):
  ctx, cancel := context.WithCancelCause(parentCtx)
  
  ├── Go(task1) → wg.Add(1)  ← ctx 激活
  ├── Go(task2) → wg.Add(1)
  └── Go(task3) → wg.Add(1)
  
  Wait():
    wg.Wait()         ← 所有 goroutine 结束
    cancel(g.err)     ← ctx 终结
  
  ← ctx.Done() 之后不可再被使用
1
2
3
4
5
6
7
8
9
10
11
12

关键约束:ctx 只在 errgroup 的整个生命周期内有效。Wait 返回后使用 ctx 是错误行为:

g, ctx := errgroup.WithContext(context.Background())

g.Go(func() error { return doWork(ctx) })
if err := g.Wait(); err != nil {
    return err
}

// ❌ 错误:Wait 返回后 ctx 已经被 cancel 了
// 此时 ctx.Done() 已经关闭——任何读取都是零值
doMoreWork(ctx) // ← ctx 已失效
1
2
3
4
5
6
7
8
9
10

# 6. WithContext 取消传播

# 6.1 派生 context 的内部实现

WithContext 只有 5 行——但它在标准 context.WithCancel 基础上加入了"错误原因":

// golang.org/x/sync/errgroup/errgroup.go (简化)
func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancelCause(ctx)
    return &Group{cancel: cancel}, ctx
}
1
2
3
4
5

context.WithCancelCause(Go 1.20+)和 context.WithCancel 的区别:

WithCancel WithCancelCause
CancelFunc func() func(error)
ctx.Err() 返回 context.Canceled 返回传入的 error
用途 不知道取消原因 第一个出错的 goroutine 传自己的 error

这意味着:当 goroutine 返回 "库存超时" 的 error 时——g.cancel(errStockTimeout) → 其他 goroutine 的 ctx.Err() 返回的就是 errStockTimeout(而不是泛化的 context.Canceled)。这对于日志和调试非常有价值。

# 6.2 取消级联的正确用法

最关键的规则——每个 goroutine 内部的 fn 必须检查 ctx.Done()。否则 WithContext 的取消传播完全无效:

// ❌ WithContext 无效——goroutine 不检查 ctx.Done()
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
    return callSlowAPI() // ← 即使 ctx 被取消,这个调用继续执行
})

// ✅ 正确:fn 内部检查 ctx.Done()
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    return http.DefaultClient.Do(req) // ← ctx 取消 → 请求中断
})

// ✅ 或者 select 双重监听
g.Go(func() error {
    resultCh := make(chan Result, 1)
    go func() { resultCh <- doWork() }()
    select {
    case <-ctx.Done():
        return ctx.Err()  // ← 取消传播
    case result := <-resultCh:
        return process(result)
    }
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

为什么 ctx 取消不能"杀死" goroutine——Go 的 goroutine 不能被外部强制终止(没有 Thread.stop())。取消信号是协作式的——goroutine 必须主动检查 ctx.Done() 并返回。如果 fn 内部是一个不接收 context 的阻塞调用(如一个没有超时的 time.Sleep),取消信号对它无效。

# 6.3 常见的 context 误用

误用 1:在 fn 内部创建新 context——切断取消链:

// ❌ fn 内部用 context.Background()——父 ctx 取消无效
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
    // 创建了一个全新的 context——和 errgroup 的 ctx 无关
    return callAPI(context.Background(), url) // ← 永远不会收到取消信号
})

// ✅ 传递 errgroup 的 ctx
g.Go(func() error {
    return callAPI(ctx, url)
})
1
2
3
4
5
6
7
8
9
10
11

误用 2:Wait 返回后还引用 ctx:

g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error { return doWork(ctx) })
if err := g.Wait(); err != nil {
    return err
}

// ❌ 此时 ctx 已经 Done——任何依赖 ctx 的操作都会立即失败
data, err := doMore(ctx) // ← always context.Canceled

// ✅ 用原始的 parent context(如果有的话)
data, err := doMore(parentCtx)
1
2
3
4
5
6
7
8
9
10
11

误用 3:混合使用外部 ctx 和 errgroup ctx:

// ✅ 正确:用外部 ctx 作为 errgroup ctx 的 parent
func processWithTimeout(ctx context.Context, tasks []Task) error {
    g, ctx := errgroup.WithContext(ctx) // ← errgroup ctx 继承外部 ctx

    // 如果外部 ctx 被取消 → errgroup 的 ctx 也取消 → 所有 goroutine 收到信号
    for _, task := range tasks {
        task := task
        g.Go(func() error {
            return doWork(ctx, task) // ← 监听的是 errgroup 的 ctx
        })
    }
    return g.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 7. SetLimit 限并发机制

# 7.1 内部实现:channel 信号量

SetLimit 是 Go 1.20 新增的——用 channel 做信号量控制同时运行的 goroutine 数量。回顾第 13 篇的 channel 信号量对比——SetLimit 用的就是最简洁的 channel 方案:

// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) SetLimit(n int) {
    if n < 0 {
        g.sem = nil       // 取消限制
        return
    }
    if len(g.sem) != 0 {  // 已有运行的 goroutine → panic
        panic("errgroup: modify limit while goroutines are still active")
    }
    g.sem = make(chan struct{}, n) // ← channel 信号量
}
1
2
3
4
5
6
7
8
9
10
11

Go 方法中的限流逻辑——在启动 goroutine 之前,先往 sem channel 发一个 struct{}:

func (g *Group) Go(f func() error) {
    if g.sem != nil {
        g.sem <- struct{}{} // ← 阻塞:如果 n 个槽位已满,这里卡住
    }
    g.wg.Add(1)
    go func() {
        defer g.done()
        // ...
    }()
}

func (g *Group) done() {
    if g.sem != nil {
        <-g.sem // ← 归还槽位——让给下一个等待的 Go 调用
    }
    g.wg.Done()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

和 semaphore.Weighted 的关键差异:

特性 SetLimit channel 信号量 semaphore.Weighted
权重 固定为 1——每个 goroutine 占 1 个槽 可变——Acquire(ctx, n) 一次拿 n 个许可
context 取消 ❌ 不支持——sem <- {} 阻塞时无法取消 ✅ Acquire(ctx, n) 支持
等待者可见性 不可见——无法查看"当前有多少个 Go 调用在排队" 不可见——waiters 链表不导出
适用场景 简单并发度控制 加权资源控制 + 可取消等待

# 7.2 限流效果验证

package main

import (
    "context"
    "fmt"
    "time"
    "golang.org/x/sync/errgroup"
)

func main() {
    var g errgroup.Group
    g.SetLimit(3) // 最多 3 个 goroutine 同时运行

    for i := 0; i < 10; i++ {
        i := i
        g.Go(func() error {
            fmt.Printf("[%d] 开始\n", i)
            time.Sleep(200 * time.Millisecond) // 模拟工作
            fmt.Printf("[%d] 结束\n", i)
            return nil
        })
    }

    g.Wait()
}

// 输出(前三行几乎同时打印,后续分批):
// [0] 开始
// [1] 开始
// [2] 开始
// -- 约 200ms --
// [0] 结束 [1] 结束 [2] 结束
// [3] 开始 [4] 开始 [5] 开始
// ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 7.3 与 semaphore.Weighted 的关联

疑惑:既然 errgroup 内部用 channel 做信号量,为什么不直接用 semaphore.Weighted 实现 SetLimit?

论证:

  1. 不需要加权——errgroup 的每个 goroutine 权重都是 1(SetLimit 限的是 goroutine 数量,不是资源消耗)。Weighted 的 Acquire(1) 和 channel 的 sem <- {} 语义等价,但 channel 更轻(不引入额外的 sync.Mutex 和容器/list 依赖)。

  2. 不需要 context 取消 sem 获取——g.Go(f) 不应该因为取不到 sem 而返回 error——Go 的签名是 Go(func() error),没有返回 error。如果 sem 获取失败要报错,Go 的签名就需要改。channel 的 sem <- {} 会阻塞调用 Go 的 goroutine——这本身就是一个"反压"机制(调用方在提交任务时就被挡住)。

  3. 保持包依赖最小——errgroup 只依赖标准库。如果引入 semaphore 包,会增加包之间的耦合。channel 是标准库内置的——零依赖。

结论:SetLimit 用 channel 信号量而不是 semaphore.Weighted 是一个"刚刚好"的设计选择——不引入不必要的抽象,不增加不必要的依赖。

# 8. TryGo 非阻塞提交

# 8.1 TryGo vs Go 语义差异

TryGo 是 Go 1.22 新增的——非阻塞版本的 Go。当 SetLimit 的并发槽位满了,TryGo 直接返回 false 而不是阻塞:

// golang.org/x/sync/errgroup/errgroup.go (简化)
func (g *Group) TryGo(f func() error) bool {
    if g.sem != nil {
        select {
        case g.sem <- struct{}{}: // 尝试获取槽位——非阻塞
            // 拿到了 → 继续
        default:
            return false          // 槽位满 → 立即返回 false
        }
    }

    g.wg.Add(1)
    go func() {
        defer g.done()
        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel(err)
                }
            })
        }
    }()
    return true
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

Go 和 TryGo 的对比:

Go TryGo
无 SetLimit 时 直接启动 goroutine 直接启动 goroutine
有 SetLimit 且槽位有空 获取槽位 → 启动 获取槽位 → 启动
有 SetLimit 且槽位满 阻塞等待槽位 立即返回 false
返回值 无(void) bool(是否成功提交)
调用方感知 无法感知"槽位满" 可以感知 → 实现降级逻辑

# 8.2 背压降级模式

TryGo 的关键使用场景——背压感知的降级:

// 限流 10 个并发——满了就返回"服务繁忙"
func handleBatch(tasks []Task) error {
    g, ctx := errgroup.WithContext(context.Background())
    g.SetLimit(10)

    for _, task := range tasks {
        task := task
        // TryGo:如果 10 个槽位全满——不阻塞,直接拒绝
        if !g.TryGo(func() error {
            return processTask(ctx, task)
        }) {
            // 背压——返回限流错误
            return fmt.Errorf("系统繁忙,请稍后重试——并发已满")
        }
    }
    return g.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

和 "Go + goroutine 中排队" 的差异:

Go 阻塞方案(生产者等待):
  请求 → Go(任务) → 如果槽位满 → 阻塞 → 占用调用方 goroutine
  → 调用方 goroutine 无法处理其他请求
  → 发起方无感知——不知道是"排队中"还是"正在执行"

TryGo 非阻塞方案(背压反馈):
  请求 → TryGo(任务) → 如果槽位满 → 返回 false
  → 调用方立刻知道"系统忙" → 可以做降级(返回 429 / 用缓存 / 拒绝)
  → 调用方 goroutine 不阻塞 → 可以继续处理其他请求
1
2
3
4
5
6
7
8
9

典型架构组合:

HTTP handler:
  │
  ├── TryGo 成功 → goroutine 执行任务 → 响应 200
  │
  └── TryGo 失败 → 检查缓存 → 有缓存? → 返回缓存
                    └── 无缓存? → 返回 429 Too Many Requests
1
2
3
4
5
6

# 9. WaitGroup 对比与实战

# 9.1 四种对比场景

用一个综合表格说明 errgroup 和 WaitGroup 在四种典型并行场景中的差异:

场景 推荐方案 理由
等所有任务完成(不关心错误) sync.WaitGroup 最轻量的计数等待器——零依赖
等所有任务完成(需要收集全部错误) WaitGroup + errCh 或 MultiErrorGroup errgroup 只保留第一个错误——如果需要全部错误,需要自己收集
有一个任务失败就提前退出 errgroup.WithContext 一人出错 → ctx 取消 → 其他人收到信号退出 → Wait 返回第一个错误
限并发 + 快速失败 errgroup.WithContext + SetLimit(n) WaitGroup 本身不限并发——errgroup 的 SetLimit 直接解决

# 9.2 多数据源并行聚合

回到第 1 章的聚合服务——用 errgroup 重写:

func aggregateProductV2(ctx context.Context, productID string) (*ProductDetail, error) {
    g, ctx := errgroup.WithContext(ctx)
    detail := &ProductDetail{ProductID: productID}

    g.Go(func() error {
        inv, err := fetchInventory(ctx, productID)
        if err != nil {
            return fmt.Errorf("库存: %w", err)
        }
        detail.Inventory = inv
        return nil
    })

    g.Go(func() error {
        price, err := fetchPrice(ctx, productID)
        if err != nil {
            return fmt.Errorf("价格: %w", err)
        }
        detail.Price = price
        return nil
    })

    g.Go(func() error {
        recs, err := fetchRecommendations(ctx, productID)
        if err != nil {
            return fmt.Errorf("推荐: %w", err)
        }
        detail.Recs = recs
        return nil
    })

    // Wait 阻塞——直到所有 goroutine 结束(或第一个 error 触发 ctx 取消)
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return detail, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

改进点对照:

原 WaitGroup 方案的问题 errgroup 方案的改进
推荐服务卡 5 秒→主 goroutine 死等 ctx 取消→其他 goroutine 的 ctx.Done 触发→安全返回
errCh 容量硬编码为 3 不需要 errCh——errorOnce 自动处理
wg.Add/wg.Done 手动维护 Go 封装了 Add/Done
拿到数据后无法提前返回 所有 goroutine 必须检查 ctx.Done

# 9.3 批量 API 调用模式

批量调用 N 个 API——限并发 5 个、一人失败全员退出:

func batchCallAPIs(ctx context.Context, urls []string) ([]*APIResponse, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(5) // 最多 5 个并发 HTTP 请求

    results := make([]*APIResponse, len(urls))

    for i, url := range urls {
        i, url := i, url
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err // ← ctx 取消时这里会返回 context.Canceled
            }
            defer resp.Body.Close()

            body, _ := io.ReadAll(resp.Body)
            results[i] = &APIResponse{URL: url, Body: body, Status: resp.StatusCode}
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

SetLimit(5) 的作用——即使传入 100 个 URL,同时只有 5 个 HTTP 请求在进行。其他 g.Go(f) 调用阻塞在 g.sem <- {} 上——这正是第 13 篇讨论的 channel 信号量模式。

# 10. 综合案例串讲

# 10.1 案例真相揭晓

回到第 1 章商品聚合服务的七个疑问,逐条作答:

疑问 答案
① errgroup 怎么让「先出错」触发全局取消? 第 3-4 章:errorOnce.Do → 记录 err → cancel(err) → ctx.Done() → 其他 goroutine 收到信号
② Wait 为什么只返回第一个错误? 第 5.2:取消传播后其他 goroutine 返回 ctx.Err()——收集它们是"症状"不是"根因"——简化为只保留第一个
③ WithContext 的 context 取消何时触发? 第 6 章:第一个 goroutine 返回 error 时,或 Wait 返回时(保证至少 cancel 一次)
④ 为什么 goroutine 必须检查 ctx.Done()? 第 6.2:Go 的取消是协作式——不检查 ctx.Done() 的 goroutine 不会自动停止
⑤ SetLimit 内部用什么实现? 第 7 章:make(chan struct{}, n)——最简洁的 channel 信号量
⑥ TryGo 和 Go 的语义差异? 第 8 章:TryGo 在槽位满时立即返回 false(非阻塞),Go 会阻塞等待
⑦ errgroup vs WaitGroup 怎么选? 第 9 章:等全部结束用 WaitGroup,需要"一人出错全员退出"用 errgroup

案例完整根因链条:

推荐服务卡 5 秒
  → fetchRecommendations 的 ctx 没有超时 → ctx.Done() 不触发
  → wg.Wait() 死等 5 秒 → 主 goroutine 挂起
  → 库存和价格数据早就拿到了 → 但无法提前返回
  → 高峰期每秒 500 请求 × 每请求最多 5 秒 = 2500 个阻塞 goroutine
  → 每个持有一个 TCP 连接 → 40MB 额外内存 + 下游连接池耗尽
  → 上游网关已经超时断连 → 但这些 goroutine 还在跑 → 纯浪费
1
2
3
4
5
6
7

修复方案:

// ✅ errgroup.WithContext + 上游 context 带 timeout
func handleProduct(w http.ResponseWriter, r *http.Request) {
    // 外层 context 带 300ms 总超时
    ctx, cancel := context.WithTimeout(r.Context(), 300*time.Millisecond)
    defer cancel()

    detail, err := aggregateProductV2(ctx, r.URL.Query().Get("product_id"))
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            http.Error(w, "聚合超时", http.StatusGatewayTimeout)
            return
        }
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    json.NewEncoder(w).Encode(detail)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

三层防护:

  1. 上游 ctx 超时(300ms)→ errgroup 的 ctx 也超时 → 所有 goroutine 退出
  2. 任何一个下游 goroutine 返回 error → WithContext 取消 → 其他 goroutine 退出
  3. fetchRecommendations 内部检查 ctx.Done() ——不再傻等 5 秒

# 10.2 一次 errgroup 任务组的完整旅程

g, ctx := errgroup.WithContext(ctx)   // ← 外部 ctx 带 300ms 超时
g.Go(fetchInventory)                 // ← 启动 goroutine 1
g.Go(fetchPrice)                     // ← 启动 goroutine 2
g.Go(fetchRecommendations)           // ← 启动 goroutine 3
err := g.Wait()                      // ← 主 goroutine 阻塞等待
─────────────────────────────────────────────────────────
        │
        ├─ goroutine 1 (fetchInventory):
        │    http.NewRequestWithContext(ctx, ...)
        │    → ctx.Done() 未触发 → 正常执行
        │    → 50ms 后返回 → detail.Inventory = {...}
        │    → return nil → wg.Done()
        │
        ├─ goroutine 2 (fetchPrice):
        │    http.NewRequestWithContext(ctx, ...)
        │    → 60ms 后返回 → detail.Price = {...}
        │    → return nil → wg.Done()
        │
        └─ goroutine 3 (fetchRecommendations):
             http.NewRequestWithContext(ctx, ...)
             → 下游延迟 5 秒——但 ctx 在 300ms 时超时了
             → http 客户端检测到 ctx.Done() → 中断 TCP 连接
             → 返回 context.DeadlineExceeded
             → errorOnce.Do: g.err = DeadlineExceeded
             → g.cancel(DeadlineExceeded) → ctx 取消
             → 但 goroutine 1、2 已经结束了——wip
             → wg.Done()

        g.wg.Wait() 返回(三个 goroutine 都 Done)
        g.cancel(g.err) — 幂等,无影响
        return DeadlineExceeded

结果: 库存和价格数据拿到了——但主 goroutine 返回 error 给调用方
→ 调用方可以选择:降级返回部分数据 / 返回超时错误 / 重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

没有获取的数据怎么处理:detail.Recs 为 nil——但如果业务允许"部分成功返回"(只有库存和价格没有推荐也可以展示页面),可以在调用方做判断:

if err != nil && errors.Is(err, context.DeadlineExceeded) {
    // 超时了——但 detail 里的 Inventory 和 Price 可能已经有值
    if detail.Inventory != nil && detail.Price != nil {
        detail.Recs = fallbackRecs(productID) // 用兜底推荐
        return detail, nil // ← 部分成功
    }
    return nil, err
}
1
2
3
4
5
6
7
8

# 10.3 设计哲学回扣

哲学 1:取消信号的"推"模式——替代线程间 kill

Java 有 Thread.interrupt() 和 Thread.stop()——前者是协作式的(需要线程检查 isInterrupted),后者是强制的(已废弃)。Go 没有"强制杀死 goroutine"的 API——errgroup 的 WithContext 是这种哲学的完美体现:用 context 通道"推"送取消信号,每个 goroutine 自己"拉"取信号并安全退出。推拉结合 = 没有竞态、没有资源泄漏。

哲学 2:sync.Once 捕获第一个错误——把"多写一读"简化到极致

errgroup 没有用 sync.Mutex 保护 error 字段——只用了一个 sync.Once。任何 goroutine 都可能写入 error——但 Once 保证只有第一次写入生效。后续写入被静默丢弃——这是一种 "数据不关键,放弃也无妨" 的设计取舍。它放弃了"收集全部错误"的完整性,换来了零锁竞争的简洁实现。

哲学 3:channel 信号量做 SetLimit——复用现有原语,不引入新概念

SetLimit 的实现就是 make(chan struct{}, n)——和程序员手动写 sem <- struct{}{} / <-sem 一模一样。errgroup 没有发明新的限流机制——它只是把这个 pattern 标准化、内置化。这是 Go 标准库/扩展库的设计原则:用组合代替继承,用现有 primitives 搭建新 abstractions。

哲学 4:Wait 返回后 cancel——让 ctx 的生命周期和任务组精确绑定

即使没有任何 goroutine 返回 error,Wait 返回后也会调用 cancel()。这保证了 ctx 不会在任务组结束后还被使用——避免"僵尸 ctx"导致后续操作拿到过期信号。这是一个防御性设计——明确 ctx 的边界。"出了这个 errgroup,ctx 就失效"。

# 10.4 速查表

errgroup 决策矩阵:

需求 推荐的 errgroup 配置
等一组任务全部结束,不关心错误 var g errgroup.Group(零值)
任何任务出错立即停止 errgroup.WithContext(ctx)
限并发执行 g.SetLimit(n)(Go 1.20+)
非阻塞提交(槽位满时拒绝) g.TryGo(fn)(Go 1.22+)
收集所有错误(非标准) 自己实现 MultiErrorGroup

errgroup vs 替代方案:

场景 errgroup WaitGroup semaphore.Weighted
等待多个 goroutine ✅ ✅ ❌
返回第一个错误 ✅ ❌(需手动) ❌
自动取消其他 goroutine ✅(WithContext) ❌ ❌(需手动)
限并发 ✅(SetLimit) ❌ ✅
加权资源控制 ❌ ❌ ✅
可取消的等待 ❌(Go 阻塞) ❌ ✅(Acquire(ctx, n))

典型错误模式:

// ❌ fn 不检查 ctx.Done() ——取消信号无效
g.Go(func() error {
    time.Sleep(10 * time.Second) // ← ctx 取消后继续睡
    return nil
})

// ❌ Wait 返回后使用 ctx —— ctx 已 Done
data, _ := doMore(ctx)

// ❌ 循环变量捕获(Go 1.21 及之前)
for _, task := range tasks {
    g.Go(func() error { return process(task) }) // ← data race
}

// ❌ fn 内部创建新 context ——切断取消链
g.Go(func() error {
    return callAPI(context.Background(), url)
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

诊断命令:

# 查看阻塞在 errgroup.Go 或 errgroup.Wait 的 goroutine
curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep "errgroup"

# goroutine profile——看是否 goroutine 泄漏
go tool pprof http://localhost:6060/debug/pprof/goroutine

# data race 检测(闭包捕获循环变量、共享数据写入)
go test -race ./...

# 逃逸分析——确认闭包变量不会意外逃逸到堆
go build -gcflags="-m" .
1
2
3
4
5
6
7
8
9
10
11

Go 版本特性一览:

特性 引入版本
errgroup.Group(基础版) Go 1.x(golang.org/x/sync)
WithContext Go 1.x
SetLimit Go 1.20
TryGo Go 1.22
context.WithCancelCause Go 1.20

下一篇:我们已经掌握了 errgroup 的并行任务协调和 context 取消传播,下一步进入 15.协程泄漏排查与修复——把 goroutine 泄漏的 5 种典型场景、pprof 定位方法、leaktest 自动检测剖开。

上次更新: 2026/06/13, 21:14:36
加权信号量与限流
协程泄漏排查与修复

← 加权信号量与限流 协程泄漏排查与修复→

最近更新
01
信号崩溃快速排查
06-15
02
CoreDump破案
06-15
03
perf火焰图实战
06-15
更多文章>
Theme by Vdoing | Copyright © 2019-2026 杨充 | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式