编程进阶网 编程进阶网
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接

杨充

专注编程 · 终身学习者
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接
  • README
  • C语言入门精通

  • Cpp入门到精通

  • Java入门精通

  • Go入门到精通

    • 入门教程

    • 综合案例

    • 专栏博客

      • Go 专栏博客
      • 内存模型与栈堆布局
      • 指针与逃逸分析
      • 结构体内存布局对齐
      • 字符串与切片底层
      • 接口与类型系统
      • map哈希表底层实现
      • 零值初始化设计哲学
      • GMP协程调度器机制
      • 通道channel源码剖析
      • sync同步原语剖析
      • map并发安全与哈希
      • Go内存模型一致性
      • 加权信号量与限流
      • errgroup并行控制
      • 协程泄漏排查与修复
      • 并发设计模式详解
        • 1. 案例引入
          • 1.1 一段崩在哪
          • 1.2 顺藤摸到根因
          • 1.3 我们要回答什么
        • 2. 架构概览
          • 2.1 七种模式全景图
          • 2.2 channel 是所有模式的公共语言
        • 3. Pipeline 流水线模式
          • 3.1 基础三级流水线
          • 3.2 扇出流水线
          • 3.3 取消传播
        • 4. Fan-out 与 Fan-in 模式
          • 4.1 Fan-out 扇出实现
          • 4.2 Fan-in 扇入实现
          • 4.3 扇出系数选择
        • 5. Worker Pool 工作池模式
          • 5.1 固定大小工作池
          • 5.2 动态扩缩容
          • 5.3 与 errgroup.SetLimit 的关系
        • 6. Or-Done 与 Tee 模式
          • 6.1 Or-Done 优雅退出
          • 6.2 Tee 分流模式
          • 6.3 为什么 Tee 不能共享 channel
        • 7. Bridge 桥接模式
          • 7.1 将 channel 的 channel 扁平化
          • 7.2 典型应用:动态数据源切换
        • 8. Pub/Sub 发布订阅模式
          • 8.1 基于 channel 的简易实现
          • 8.2 与消息队列的关系
        • 9. 模式组合实战
          • 9.1 Pipeline + Fan-out = 并行流水线
          • 9.2 Worker Pool + Or-Done = 可优雅退出的池
          • 9.3 选型决策树
        • 10. 综合案例串讲
          • 10.1 案例真相揭晓
          • 10.2 一次 Pipeline 数据的完整旅程
          • 10.3 设计哲学回扣
          • 10.4 速查表
      • GC三色标记与屏障
      • 内存分配器深挖
      • defer延迟执行机制
      • 定时器四叉堆实现
      • 抢占式调度器原理
      • 协程栈扩容与缩容
      • 上下文取消与传播
      • 泛型与类型约束
      • 反射机制与unsafe
      • 迭代器与rangefunc
      • 错误处理与panic
      • 网络轮询器netpoller
      • HTTP服务端源码分析
      • JSON序列化与编解码
      • 数据库SQL连接池
      • 文件IO与零拷贝
      • 结构化日志与配置
      • 单元测试与基准
      • cgo与系统调用切换
      • 编译链接与PGO优化
      • 写作模板
    • 开发技巧

  • JavaScript入门

  • CodeX
  • Go入门到精通
  • 专栏博客
杨充
2026-06-12
目录

并发设计模式详解

# 16.并发设计模式详解

卷三第十六篇——Go 的并发模型不是"一堆 goroutine 乱跑",而是可以通过 channel 组合出可预测的数据流拓扑。Pipeline 让数据像流水线一样传递,Fan-out/Fan-in 把"扇出放大 + 扇回归一"变成标准操作,Worker Pool 控制并发度的同时也控制资源,Or-Done 让 goroutine 优雅退出。这些不是设计模式书上的抽象概念——是 Go 标准库扩展库每天都在用的代码模板。读完本篇,你能回答:Pipeline 的每个 stage 应该用什么类型的 channel?Fan-out 的 goroutine 数量怎么确定?为什么 Tee 模式不能直接用同一个 channel 发给两个消费者?关键词:Pipeline、Fan-out/Fan-in、Worker Pool、Or-Done、Tee、Bridge、Pub/Sub。

# 目录介绍

  • 1. 案例引入
    • 1.1 一段崩在哪
    • 1.2 顺藤摸到根因
    • 1.3 我们要回答什么
  • 2. 架构概览
    • 2.1 七种模式全景图
    • 2.2 channel 是所有模式的公共语言
  • 3. Pipeline 流水线模式
    • 3.1 基础三级流水线
    • 3.2 扇出流水线
    • 3.3 取消传播
  • 4. Fan-out 与 Fan-in 模式
    • 4.1 Fan-out 扇出实现
    • 4.2 Fan-in 扇入实现
    • 4.3 扇出系数选择
  • 5. Worker Pool 工作池模式
    • 5.1 固定大小工作池
    • 5.2 动态扩缩容
    • 5.3 与 errgroup.SetLimit 的关系
  • 6. Or-Done 与 Tee 模式
    • 6.1 Or-Done 优雅退出
    • 6.2 Tee 分流模式
    • 6.3 为什么 Tee 不能共享 channel
  • 7. Bridge 桥接模式
    • 7.1 将 channel 的 channel 扁平化
    • 7.2 典型应用:动态数据源切换
  • 8. Pub/Sub 发布订阅模式
    • 8.1 基于 channel 的简易实现
    • 8.2 与消息队列的关系
  • 9. 模式组合实战
    • 9.1 Pipeline + Fan-out = 并行流水线
    • 9.2 Worker Pool + Or-Done = 可优雅退出的池
    • 9.3 选型决策树
  • 10. 综合案例串讲
    • 10.1 案例真相揭晓
    • 10.2 一次 Pipeline 数据的完整旅程
    • 10.3 设计哲学回扣
    • 10.4 速查表

# 1. 案例引入

# 1.1 一段崩在哪

看一个实时日志处理服务——从 Kafka 消费原始日志,经过去重、格式化、富化(补全 IP 归属地)、写入 Elasticsearch 四个阶段。每天处理 2 亿条日志,高峰期 QPS 超 5000:

// log_pipeline.go —— 日志处理流水线
package main

import (
    "context"
    "log"
    "sync"
)

// Stage 1: Kafka 消费 → 产出原始日志
func consumeKafka(ctx context.Context, out chan<- RawLog) {
    defer close(out)
    for msg := range kafkaConsumer.Messages() {
        select {
        case out <- parseRawLog(msg):
        case <-ctx.Done():
            return
        }
    }
}

// Stage 2: 去重
func deduplicate(in <-chan RawLog, out chan<- RawLog) {
    defer close(out)
    seen := make(map[string]bool)
    for log := range in {
        if !seen[log.Fingerprint] {
            seen[log.Fingerprint] = true
            out <- log
        }
    }
}

// Stage 3: 富化——补全 IP 归属地(调用外部 API)
func enrich(in <-chan RawLog, out chan<- EnrichedLog) {
    defer close(out)
    for log := range in {
        geo, err := lookupGeo(log.IP) // ← 外部 API 调用——每次 ~5ms
        if err != nil {
            continue // 跳过失败的
        }
        out <- EnrichedLog{RawLog: log, Geo: geo}
    }
}

// Stage 4: 写入 Elasticsearch
func writeToES(in <-chan EnrichedLog) {
    for log := range in {
        if err := esClient.Index(log); err != nil {
            log.Printf("ES 写入失败: %v", err)
        }
    }
}

func main() {
    ch1 := make(chan RawLog, 100)        // Kafka → 去重
    ch2 := make(chan RawLog, 100)        // 去重 → 富化
    ch3 := make(chan EnrichedLog, 100)   // 富化 → ES

    go consumeKafka(context.Background(), ch1)
    go deduplicate(ch1, ch2)
    go enrich(ch2, ch3)
    writeToES(ch3) // 主 goroutine 做最后一步
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

现象:

  • 正常时 QPS 5000,ES 写入 P99 约 50ms——流水线运行平稳
  • 某天 ES 集群因为分片迁移变慢——写入 P99 涨到 500ms
  • 问题来了:ES 慢 → writeToES 消费 ch3 变慢 → ch3 积压 → enrich 往 ch3 写阻塞 → ch2 积压 → deduplicate 往 ch2 写阻塞 → ch1 积压 → consumeKafka 往 ch1 写阻塞
  • 反压层层传递——最上游的 Kafka 消费被堵住 → 消费者心跳超时 → Kafka rebalancing → 整个消费组停摆
  • 监控显示:CPU 利用率 15%,内存 30%——资源充足但已经被"一个慢节点卡死了整条链"

更隐蔽的问题:富化阶段 lookupGeo 是串行的——每次 5ms × 5000 QPS = 25 秒/秒的累计延迟——理论上需要 25 个并发才能追上速率。但当前只有一个 goroutine 在做富化——它本身就是瓶颈:没有使用 Fan-out 模式。

# 1.2 顺藤摸到根因

追查过程:

第一步:定位瓶颈——给每个 stage 加上耗时统计,发现富化阶段的 ch2→ch3 的 channel 长度持续在 100(满),ch3→ES 也在 100(满)。反压链:ES 写入慢 → 堵 ch3 → 堵 enrich → 堵 ch2 → 堵 deduplicate → 堵 ch1 → 堵 consumeKafka。

第二步:分析容错——ES 变慢时,enrich 还在调用 lookupGeo(外部 API)——即使下游已经堵了,它还在做无用功。应该在 out <- enrichedLog 之前检查 ctx.Done()。

第三步:分析并发度——enrich 只有一个 goroutine:5000 QPS × 5ms/条 = 25000ms/s > 1000ms——串行处理根本跟不上。需要 Fan-out 成多个 goroutine。

这个事故藏着 7 个原理点:

① Pipeline 的每个 stage 应该用有缓冲还是无缓冲 channel?缓冲多大?  → 第 3 章
② Fan-out 怎么把一个 channel 的数据分发给多个 goroutine?            → 第 4 章
③ Fan-in 怎么把多个 goroutine 的输出合并到一个 channel?             → 第 4.2
④ Worker Pool 和 Fan-out 的区别是什么——什么时候选哪个?             → 第 5 章
⑤ Or-Done 怎么让 Pipeline 的 goroutine 优雅退出?                    → 第 6.1
⑥ Tee 模式为什么不能把同一个 channel 发给两个消费者?                → 第 6.2-6.3
⑦ Bridge 怎么把"动态变化的 channel 列表"变成一个稳定的数据流?       → 第 7 章
1
2
3
4
5
6
7

# 1.3 我们要回答什么

这个日志流水线案例贯穿全篇。我们从 Pipeline 的基础模型出发,深入到 Fan-out/Fan-in 的并发放大、Worker Pool 的资源控制、Or-Done 的优雅退出——最后用 Pipeline + Fan-out + Or-Done 的组合模式彻底修复——并用选型决策树指导读者在真实场景中如何选模式。

本篇路线:

架构总图 (第 2 章) ── 七种模式全景 + channel 是公共语言
   ↓
Pipeline (第 3 章) ── 基础三级流水线 + 取消传播
   ↓
Fan-out/Fan-in (第 4 章) ── 扇出 + 扇入 + 系数选择
   ↓
Worker Pool (第 5 章) ── 固定池 + 动态扩缩 + 与 errgroup 对比
   ↓
Or-Done/Tee (第 6 章) ── 优雅退出 + 分流 + Tee 陷阱
   ↓
Bridge (第 7 章) ── channel of channels 扁平化
   ↓
Pub/Sub (第 8 章) ── 基于 channel 的发布订阅
   ↓
模式组合 (第 9 章) ── 组合实战 + 选型决策树
   ↓
综合案例 (第 10 章) ── 修复日志流水线 + 设计哲学
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

📌 本篇定位:第 09 篇讲 channel 底层实现、第 13-14 篇讲信号量和 errgroup——那些是"组件"。本篇把组件组合成模式——Pipeline 是 channel 的串联、Fan-out 是 channel 的并联、Or-Done 是 select 的套用。理解了模式,你就知道"什么时候用 channel、什么时候用锁、什么时候用 errgroup"——这是 Go 并发编程从"能跑"到"工程化"的分水岭。

# 2. 架构概览

# 2.1 七种模式全景图

七种 Go 并发模式按"数据流拓扑"分为四类:

Go 并发模式 ── 按数据流拓扑分类
│
├─ 线性拓扑
│     ├── Pipeline:goroutine 串联——数据像流水线一样依次通过各 stage
│     └── Bridge:channel of channels → 单 channel——"动态数据源"扁平化
│
├─ 扇入扇出拓扑
│     ├── Fan-out:一个输入 → 多个 goroutine 并行处理
│     └── Fan-in:多个 goroutine 的输出 → 合并到一个 channel
│
├─ 分发拓扑
│     ├── Pub/Sub:一条消息 → 多个订阅者独立接收
│     └── Tee:一个输入 channel → 两个输出 channel(独立副本)
│
└─ 生命周期拓扑
      ├── Worker Pool:固定数量 goroutine——有边界、有容量
      └── Or-Done:给任何 channel 加上"可取消"的退出信号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

模式之间的关系图:

              ┌─────────────────────────────┐
              │        Worker Pool          │
              │  goroutine 池 + task chan    │
              └──────────────┬──────────────┘
                             │ 常与
              ┌──────────────┴──────────────┐
              │        Pipeline             │
              │  stage1 → stage2 → stage3   │
              └──────┬────────────┬─────────┘
                     │            │
          ┌──────────┴──┐   ┌────┴──────────┐
          │  Fan-out    │   │   Fan-in      │
          │  1 → N 分发  │   │  N → 1 合并   │
          └─────────────┘   └───────────────┘
                     │            │
          ┌──────────┴────────────┴──────────┐
          │          Or-Done                 │
          │  给任何 channel 加取消信号        │
          └──────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 2.2 channel 是所有模式的公共语言

疑惑:为什么 Go 的并发模式都围绕 channel 展开——而不是锁或 WaitGroup?

论证:

  1. channel 天生适合做"数据流"的载体——Pipeline 的"数据从一个 stage 传到下一个 stage"本质就是 channel 的 send/recv。锁保护的是"共享状态"(如计数器、缓存),但并发模式处理的是"数据传递"——channel 是数据传递的最佳载体。

  2. channel 自带反压——ch <- v 在 channel 满时阻塞——这就是天然的反压机制。第 1 章的日志流水线中,ES 慢 → ch3 满 → enrich 的 ch3 <- log 阻塞 → 反压传递到上游。不需要额外的信号量或计数器——channel 本身就是流控。

  3. channel 的 close 是"数据结束"的统一信号——close(ch) 唤醒所有 for-range ch 的 goroutine。在 Pipeline 中,上游 stage 的 defer close(out) 就是告诉下游"没有更多数据了"的信号——无需额外的"done" flag。

  4. select 是模式组合的"胶水"——Or-Done 就是 select { case v := <-ch: case <-done: }。Tee 就是 select { case out1 <- v: out2 <- v }。select 让 goroutine 可以同时等待多个 channel——这是组合模式的基础。

结论:Go 的并发模式不是凭空发明的——它们是 channel + goroutine + select 这三个原语的自然组合。理解了这三个原语,模式只是它们的"最佳实践用法"。

# 3. Pipeline 流水线模式

# 3.1 基础三级流水线

Pipeline 是最基本的并发模式——每个 stage 是一个 goroutine,通过 channel 与前后的 stage 通信:

数据流:
  [生成器] ──ch1──→ [处理器] ──ch2──→ [消费者]
  gen()             process()         consume()
1
2
3
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // Pipeline: gen → square → 主 goroutine 打印
    for n := range square(gen(1, 2, 3, 4, 5)) {
        fmt.Println(n) // 1, 4, 9, 16, 25
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

Pipeline 的核心约定:

  • 每个 stage 的 goroutine 在数据发完后 defer close(out)——通知下游"我完成了"
  • 下游用 for v := range in 消费——上游 close 时自动退出
  • 每个 stage 返回 <-chan T(只读 channel)——防止下游意外写入

# 3.2 扇出流水线

当某个 stage 是瓶颈时(如第 1 章的 enrich 阶段调用外部 API),可以扇出:启动多个 goroutine 并行处理同一个输入 channel:

              ┌─→ [处理器副本 1] ─┐
[生成器] ──ch──┼─→ [处理器副本 2] ─┼──out──→ [消费者]
              └─→ [处理器副本 3] ─┘
1
2
3
// Fan-out: 多个 goroutine 从同一个 channel 读取——Go channel 天然支持
func fanOut(in <-chan RawLog, workerCount int) <-chan EnrichedLog {
    out := make(chan EnrichedLog)
    var wg sync.WaitGroup

    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for log := range in { // ← 多个 goroutine 竞争同一个 channel
                enriched, err := enrichLog(log)
                if err == nil {
                    out <- enriched
                }
            }
        }()
    }

    // Fan-in: 等所有 worker 完成 → close out
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

为什么多个 goroutine 可以安全地 for-range 同一个 channel——Go channel 的内部实现保证了:<-ch 时 recvq 上的 goroutine 是原子唤醒的——不会出现两个 goroutine 同时收到同一个值(详见第 09 篇 channel 源码剖析)。

# 3.3 取消传播

Pipeline 的每个 stage 都应该接受 context——当上游取消时,下游能感知并安全退出:

// ✅ 可取消的 Pipeline stage
func enrichWithCtx(ctx context.Context, in <-chan RawLog) <-chan EnrichedLog {
    out := make(chan EnrichedLog)
    go func() {
        defer close(out)
        for {
            select {
            case log, ok := <-in:
                if !ok {
                    return // 上游 channel 关闭 → 退出
                }
                enriched, err := enrichLog(log)
                if err == nil {
                    select {
                    case out <- enriched:
                    case <-ctx.Done():
                        return // 取消 → 退出
                    }
                }
            case <-ctx.Done():
                return // 取消 → 退出
            }
        }
    }()
    return out
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

回到第 1 章的日志流水线——加上扇出 + 取消传播后,enrich 阶段的问题就解决了。这展示了 Pipeline 和 Fan-out 组合的威力(第 9 章会完整展示)。

# 4. Fan-out 与 Fan-in 模式

# 4.1 Fan-out 扇出实现

Fan-out 的核心是将一个输入 channel 的数据分发到多个 goroutine 并行处理。Go 的 channel 天然支持多消费者——无需额外实现:

// Fan-out 模式:1 个输入 → N 个 worker → N 个输出 channel
func fanOut(in <-chan Task, workerCount int) []<-chan Result {
    workers := make([]<-chan Result, workerCount)
    for i := 0; i < workerCount; i++ {
        workers[i] = worker(in) // 每个 worker 返回自己的 output channel
    }
    return workers
}

func worker(in <-chan Task) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for task := range in {
            out <- process(task)
        }
    }()
    return out
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

Fan-out 的关键:

  • 多个 goroutine 竞争同一个 <-chan(Go 保证公平性——不会饿死某个 worker)
  • 每个 worker 有自己的输出 channel——需要 Fan-in 来合并

# 4.2 Fan-in 扇入实现

Fan-in 将多个 channel 的数据合并到一个 channel——需要用 WaitGroup 等所有上游完成后关闭输出 channel:

// Fan-in 模式:N 个输入 channel → 1 个输出 channel
func fanIn(channels ...<-chan Result) <-chan Result {
    out := make(chan Result)
    var wg sync.WaitGroup

    // 对每个输入 channel 启动一个 goroutine——把数据转发到 out
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan Result) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }

    // 等所有转发 goroutine 结束 → close out
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

Fan-in 的 goroutine 数量:每个输入 channel 需要一个转发 goroutine——如果 Fan-out 了 N 个 worker,Fan-in 阶段就有 N 个转发 goroutine。总 goroutine 数 = N (worker) + N (转发) + 1 (Wait goroutine) = 2N+1。

Fan-in 的优化——用 select 合并少量 channel(不需要 goroutine):

// 合并 2 个 channel——不用 goroutine
func mergeTwo(a, b <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for a != nil || b != nil {
            select {
            case v, ok := <-a:
                if !ok { a = nil; continue }
                out <- v
            case v, ok := <-b:
                if !ok { b = nil; continue }
                out <- v
            }
        }
    }()
    return out
}
// 当 channel 是 nil 时——select 永远跳过那个 case(Go 的 nil channel 永不就绪特性)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 4.3 扇出系数选择

疑惑:Fan-out 应该开多少个 goroutine?

论证:

扇出系数取决于两个因素:

  1. 瓶颈 stage 的单任务耗时 vs 目标吞吐——如果 enrich 阶段单次处理 5ms,目标 QPS 是 5000:需要的并发数 = 5000 × 0.005 / 1 ≈ 25。所以至少 25 个 worker。

  2. 下游的承载能力——如果 ES 只能承受 1000 写 QPS,开了 100 个 enrich worker 也没用——输出会被 Fan-in 的 channel 缓冲/阻塞。扇出必须和下游容量匹配。

  3. 外部资源限制——如果 enrich 调用的是第三方 API(有 rate limit)——超过 rate limit 会导致 429 错误——扇出系数不能超过 rate limit。

// ✅ 经验法则:扇出系数 = max(CPU 核数, 目标吞吐 × 单任务耗时)
func optimalFanOut(targetQPS float64, taskDuration time.Duration, cpuCores int) int {
    needed := int(targetQPS * taskDuration.Seconds())
    if needed < cpuCores {
        return cpuCores // 至少用满 CPU
    }
    if needed > 100 {
        return 100 // 上限——避免 goroutine 泛滥
    }
    return needed
}
1
2
3
4
5
6
7
8
9
10
11

结论:扇出系数不是越大越好——它是"够快就行"。扇出 100 个 goroutine 处理一个 QPS=10 的输入就是浪费。扇出 = 目标吞吐 × 单任务耗时 / 1 秒。

# 5. Worker Pool 工作池模式

# 5.1 固定大小工作池

Worker Pool 和 Fan-out 的区别——Worker Pool 的 goroutine 数量是固定的,任务通过一个 task channel 分发:

            ┌─→ [Worker 1] ─┐
[taskCh] ───┼─→ [Worker 2] ─┼──→ [resultCh]
            └─→ [Worker 3] ─┘
1
2
3
type WorkerPool struct {
    taskCh   chan Task
    resultCh chan Result
    done     chan struct{}
}

func NewWorkerPool(workerCount, queueSize int) *WorkerPool {
    pool := &WorkerPool{
        taskCh:   make(chan Task, queueSize),
        resultCh: make(chan Result, queueSize),
        done:     make(chan struct{}),
    }

    for i := 0; i < workerCount; i++ {
        go pool.worker(i)
    }
    return pool
}

func (p *WorkerPool) worker(id int) {
    for {
        select {
        case task, ok := <-p.taskCh:
            if !ok {
                return // task channel 关闭 → worker 退出
            }
            result := process(task)
            p.resultCh <- result
        case <-p.done:
            return // 外部通知退出
        }
    }
}

func (p *WorkerPool) Submit(task Task) {
    p.taskCh <- task // 如果 taskCh 满了 → 阻塞调用方 → 反压
}

func (p *WorkerPool) Shutdown() {
    close(p.done)     // 通知所有 worker 退出
    close(p.taskCh)   // 不再接受新任务
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

Worker Pool vs Fan-out 的差异:

Worker Pool Fan-out
goroutine 生命周期 长生命周期——随 Pool 创建/销毁 短生命周期——随输入 channel 关闭而退出
任务分发方式 通过 task channel 显式分发 多个 worker 竞争同一个输入 channel
反压位置 Submit 时 taskCh <- task 阻塞 输入 channel 的 send 方阻塞
适用场景 持久的后台任务(如 HTTP handler pool) 一次性批处理(如数据处理流水线)

# 5.2 动态扩缩容

固定大小的 Worker Pool 在流量波动时可能不足或浪费。动态扩缩容增加弹性——但需要小心 goroutine 泄漏:

type DynamicPool struct {
    mu       sync.Mutex
    workers  int
    maxWorkers int
    minWorkers int
    taskCh   chan Task
}

func (p *DynamicPool) adjustSize() {
    p.mu.Lock()
    defer p.mu.Unlock()

    queueLen := len(p.taskCh)
    if queueLen > cap(p.taskCh)/2 && p.workers < p.maxWorkers {
        // 队列过半 → 扩容
        go p.worker()
        p.workers++
    } else if queueLen < cap(p.taskCh)/10 && p.workers > p.minWorkers {
        // 队列几乎空 → 缩容——但不能直接 kill goroutine
        // 需要 worker 自己检查"是否该退出"→ 用 done signal
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

实际生产中更安全的做法——用 errgroup + SetLimit 替代手写动态扩缩(见 5.3)。

# 5.3 与 errgroup.SetLimit 的关系

Worker Pool 的"固定并发度"本质就是第 14 篇的 errgroup.SetLimit——但 Worker Pool 更适合"长生命周期",errgroup 更适合"一次性批量":

// 一次性批处理——errgroup.SetLimit 更简洁
func batchProcess(tasks []Task) error {
    g, ctx := errgroup.WithContext(context.Background())
    g.SetLimit(10) // ← Worker Pool 的等效表达:10 个 worker
    for _, task := range tasks {
        task := task
        g.Go(func() error {
            return process(ctx, task)
        })
    }
    return g.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12

选择原则:

  • 一次性处理所有任务 → 用 errgroup + SetLimit
  • 任务持续到达(如 HTTP handler)→ 用 Worker Pool
  • 需要动态扩缩 → 考虑用 semaphore.Weighted + goroutine 管理

# 6. Or-Done 与 Tee 模式

# 6.1 Or-Done 优雅退出

Or-Done 是给任何 channel 加上一个"退出信号"——这是 goroutine 泄漏防护的核心模式:

// Or-Done: 从 ch 接收数据——但如果 done 被 close → 立即退出
func orDone(done <-chan struct{}, ch <-chan interface{}) <-chan interface{} {
    out := make(chan interface{})
    go func() {
        defer close(out)
        for {
            select {
            case <-done:
                return // ← 退出信号优先级最高
            case v, ok := <-ch:
                if !ok {
                    return // ← 上游 channel 关闭
                }
                select {
                case out <- v:
                case <-done:
                    return // ← 发送时也要检查退出信号
                }
            }
        }
    }()
    return out
}

// 使用:把任何 channel 包装成"可取消"版本
func consumeWithCancel(done <-chan struct{}, taskCh <-chan Task) {
    for task := range orDone(done, taskCh) {
        process(task.(Task))
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

Or-Done 的精妙——双重 select(接收时一次检查 done,发送时再检查一次 done)。如果只在外层 select 检查 done:

  • 接收时 done 关闭 → 正常退出 ✓
  • 但如果在 out <- v 时 done 关闭 → goroutine 阻塞在发送上 → done 检查不到 → 泄漏

这就是为什么内层还有一个 select { case out <- v: case <-done: return }——任何阻塞点都必须有退出路径。

# 6.2 Tee 分流模式

Tee 模式将一个 channel 的数据复制到两个输出 channel——两个消费者各自独立接收:

// Tee: 1 个输入 → 2 个输出(各自独立副本)
func tee(done <-chan struct{}, in <-chan interface{}) (<-chan interface{}, <-chan interface{}) {
    out1 := make(chan interface{})
    out2 := make(chan interface{})
    go func() {
        defer close(out1)
        defer close(out2)
        for v := range orDone(done, in) {
            // 必须用局部变量复制——避免下面的 select 发送到同一个 out1/out2
            var out1Ch, out2Ch = out1, out2
            for i := 0; i < 2; i++ {
                select {
                case <-done:
                    return
                case out1Ch <- v:
                    out1Ch = nil // ← nil channel 让 select 跳过这个 case
                case out2Ch <- v:
                    out2Ch = nil
                }
            }
        }
    }()
    return out1, out2
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

Tee 的关键技巧——out1Ch = nil:

  • 第一次 select: out1 和 out2 都可能就绪 → 随机选一个 → 选了 out1 → 发送成功 → out1Ch = nil
  • 第二次 select: out1Ch 是 nil channel → select 跳过 → 只能选 out2 → 发送成功
  • 用 nil channel 实现"依次发送到两个 channel"

# 6.3 为什么 Tee 不能共享 channel

疑惑:为什么 Tee 需要复制数据——不能直接把同一个 input channel 发给两个消费者吗?

论证:

如果把同一个 channel 传给两个 goroutine:

// ❌ 错误:两个 goroutine 共享同一个 channel——数据竞争
ch := make(chan int, 10)

go func() { for v := range ch { fmt.Println("A:", v) } }()
go func() { for v := range ch { fmt.Println("B:", v) } }()

for i := 0; i < 5; i++ { ch <- i }
close(ch)
// 输出可能是:
// A: 0  B: 1  A: 2  A: 3  B: 4  ——每个值只被一个 goroutine 收到!
1
2
3
4
5
6
7
8
9
10

根因:channel 的 recv 是互斥的——每个值只能被一个 goroutine 拿到。两个消费者竞争同一个 channel → 每个值各被谁拿走是随机的 → 这是 Fan-out(负载均衡),不是 Tee(多播)。

Tee 必须"复制"——用 goroutine 把每个值分别发送到两个独立的输出 channel。

结论:Tee 是"广播"——一个数据源 → 多个独立副本 → 每个消费者拿到完整数据。共享 channel 是"负载均衡"——一个数据源 → 多个消费者 → 数据被瓜分。两者语义完全不同。

# 7. Bridge 桥接模式

# 7.1 将 channel 的 channel 扁平化

Bridge 模式解决的是"动态数据源"问题——当数据源的列表本身是变化的:

// 场景:有多个数据源——每个数据源是一个 <-chan T
// 数据源列表本身在动态变化(新增/移除)
// Bridge 把所有数据源的数据合并到一个输出 channel

func bridge(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
    out := make(chan interface{})
    go func() {
        defer close(out)
        for {
            // chanStream 本身是一个 channel——它产出"数据源 channel"
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if !ok {
                    return
                }
                stream = maybeStream // 切换到新的数据源
            case <-done:
                return
            }

            // 消费当前数据源——直到它关闭
            for v := range orDone(done, stream) {
                select {
                case out <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    return out
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

Bridge 的核心——chanStream <-chan <-chan T 是"channel of channels"。外层 channel 产出内层 channel——每个内层 channel 是一个数据源。Bridge 遍历所有内层 channel——把它们的数据串行输出。

# 7.2 典型应用:动态数据源切换

Bridge 的典型场景是动态增减的数据源——如服务发现的端点列表变化:

func watchEndpoints(done <-chan struct{}, discoveryCh <-chan []string) <-chan interface{} {
    // 把"端点列表变化事件"转成"数据源的 channel"
    streamCh := make(chan (<-chan interface{}))

    go func() {
        defer close(streamCh)
        for {
            select {
            case endpoints := <-discoveryCh:
                for _, ep := range endpoints {
                    streamCh <- connectToEndpoint(ep) // 每个端点一个 channel
                }
            case <-done:
                return
            }
        }
    }()

    // Bridge: 把所有端点的数据扁平化为一个 channel
    return bridge(done, streamCh)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

Bridge 的价值——调用方只看到一个 <-chan T——完全不感知底层有多少个数据源、数据源什么时候变化。这是"抽象层"的威力:把复杂性封装在模式内部。

# 8. Pub/Sub 发布订阅模式

# 8.1 基于 channel 的简易实现

Pub/Sub 和 Tee 类似——都是"一对多"——但 Pub/Sub 支持动态订阅/取消订阅:

type PubSub struct {
    mu     sync.RWMutex
    subs   map[string][]chan interface{} // topic → 订阅者 channel 列表
}

func NewPubSub() *PubSub {
    return &PubSub{subs: make(map[string][]chan interface{})}
}

// Subscribe 返回一个只读 channel——订阅者通过它接收消息
func (ps *PubSub) Subscribe(topic string) <-chan interface{} {
    ch := make(chan interface{}, 10) // 缓冲——避免慢消费者阻塞发布者
    ps.mu.Lock()
    ps.subs[topic] = append(ps.subs[topic], ch)
    ps.mu.Unlock()
    return ch
}

// Publish 把消息发送给 topic 的所有订阅者
func (ps *PubSub) Publish(topic string, msg interface{}) {
    ps.mu.RLock()
    defer ps.mu.RUnlock()
    for _, ch := range ps.subs[topic] {
        select {
        case ch <- msg:
        default:
            // 订阅者 channel 满了——丢弃消息(或记录 metric)
        }
    }
}

// Unsubscribe 取消订阅——关闭并移除 channel
func (ps *PubSub) Unsubscribe(topic string, ch <-chan interface{}) {
    ps.mu.Lock()
    defer ps.mu.Unlock()
    subs := ps.subs[topic]
    for i, sub := range subs {
        if sub == ch {
            close(sub)
            ps.subs[topic] = append(subs[:i], subs[i+1:]...)
            return
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

使用示例:

ps := NewPubSub()

// 两个订阅者订阅同一个 topic
ch1 := ps.Subscribe("logs")
ch2 := ps.Subscribe("logs")

// 发布消息——两个订阅者都收到
ps.Publish("logs", LogEntry{Level: "ERROR", Msg: "磁盘满"})

go func() { for log := range ch1 { handle(log) } }()
go func() { for log := range ch2 { handle(log) } }()
1
2
3
4
5
6
7
8
9
10
11

# 8.2 与消息队列的关系

Go channel 实现的 Pub/Sub 适用于进程内通信——不适合跨进程。如果需要持久化、重试、死信队列——用 RabbitMQ/Kafka/Pulsar。

选择指南:

  • 进程内、低延迟、不要求持久化 → channel Pub/Sub
  • 跨进程、需要持久化和确认机制 → 消息队列
  • 两者可以组合——Pub/Sub 作为消息队列的本地缓存层(减少 RPC 调用)

# 9. 模式组合实战

# 9.1 Pipeline + Fan-out = 并行流水线

回到第 1 章的日志流水线——把 Fan-out 应用到瓶颈的 enrich 阶段:

// ✅ 修复后的日志流水线:Pipeline + Fan-out + Fan-in
func enhancedPipeline(ctx context.Context) {
    // Stage 1: Kafka 消费 → 去重
    rawCh := make(chan RawLog, 1000) // 缓冲区放大到 1000

    // Stage 2: 富化阶段——Fan-out 到 25 个 goroutine
    enrichedCh := fanOutEnrich(ctx, rawCh, 25)

    // Stage 3: ES 写入
    writeToES(ctx, enrichedCh)

    // 启动 Kafka 消费(在最上游)
    go consumeKafka(ctx, rawCh)
}

func fanOutEnrich(ctx context.Context, in <-chan RawLog, workers int) <-chan EnrichedLog {
    out := make(chan EnrichedLog, 1000)
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case log, ok := <-in:
                    if !ok { return }
                    enriched, err := enrichLog(log)
                    if err != nil { continue }
                    select {
                    case out <- enriched:
                    case <-ctx.Done():
                        return
                    }
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

改进点:

  • enrich 阶段从 1 个 goroutine → 25 个 → 瓶颈消除 ✓
  • channel 缓冲区从 100 → 1000 → 吸收 ES 短暂抖动 ✓
  • 每个阶段都检查 ctx.Done() → 取消传播 ✓
  • ES 写入阶段也加上了 select { case enriched := <-enrichedCh: ... case <-ctx.Done(): return } ✓

# 9.2 Worker Pool + Or-Done = 可优雅退出的池

Worker Pool 结合 Or-Done——保证 shutdown 时所有 worker 都退出:

func (p *WorkerPool) gracefulShutdown(timeout time.Duration) {
    // 1. 通知所有 worker 退出
    close(p.done)

    // 2. 等待 worker 处理完当前任务(给一个宽限期)
    done := make(chan struct{})
    go func() {
        p.wg.Wait() // 等待所有 worker 结束
        close(done)
    }()

    select {
    case <-done:
        log.Println("所有 worker 优雅退出")
    case <-time.After(timeout):
        log.Println("超时——强制退出(可能有未完成的任务)")
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 9.3 选型决策树

你需要?
│
├─ 数据需要依次经过多个处理步骤?
│     → Pipeline
│     如果某个步骤是瓶颈 → Pipeline + Fan-out
│
├─ 数据需要并行处理(同一个处理逻辑)?
│     → 一次性批量数据 → Fan-out + Fan-in(或用 errgroup.SetLimit)
│     → 持续到达的任务 → Worker Pool
│
├─ 多个消费者各自需要完整的数据副本?
│     → Tee(2 个消费者)或 Pub/Sub(动态订阅者)或 Bridge(动态数据源)
│
├─ 需要优雅退出(取消或超时)?
│     → 给任何模式套上 Or-Done
│
└─ 需要结合多种模式?
      → Pipeline + Fan-out + Or-Done(最常见组合)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 10. 综合案例串讲

# 10.1 案例真相揭晓

回到第 1 章日志流水线的七个疑问,逐条作答:

疑问 答案
① Pipeline 的 stage 应该用有缓冲还是无缓冲 channel? 第 3 章:有缓冲——吸收下游短暂抖动,缓冲区大小 = 目标 QPS × 下游 P99 延迟
② Fan-out 怎么分发数据? 第 4.1:多个 goroutine 竞争同一个 <-chan——Go channel 天然支持多消费者
③ Fan-in 怎么合并输出? 第 4.2:每个输出 channel 一个转发 goroutine + WaitGroup + 最后 close
④ Worker Pool 和 Fan-out 的区别? 第 5 章:Worker Pool 是持久的、有边界的池;Fan-out 是一次性的并行分发
⑤ Or-Done 怎么优雅退出? 第 6.1:双重 select——接收和发送时都检查 done signal
⑥ 为什么 Tee 不能共享 channel? 第 6.3:channel 的 recv 是互斥的——共享 channel 是 Fan-out(负载均衡),不是 Tee(广播)
⑦ Bridge 怎么把动态数据源变稳定? 第 7 章:<-chan (<-chan T) → Bridge → <-chan T——抽象掉数据源变化

案例完整根因链条:

ES 写入 P99 从 50ms 涨到 500ms → ch3 积压
  → enrich 的 `ch3 <- log` 阻塞 → ch2 积压
  → deduplicate 的 `ch2 <- log` 阻塞 → ch1 积压
  → consumeKafka 的 `ch1 <- log` 阻塞 → 消费者心跳超时 → rebalancing
  → 与此同时 enrich 只有一个 goroutine → 5ms × 5000 QPS = 25s/s > 1s → 串行瓶颈

根因 1: 全部用无缓冲/小缓冲 channel → 无缓冲吸收能力 → 反压瞬间传导到最上游
根因 2: enrich 串行 → 无 Fan-out → 瓶颈
根因 3: 无 ctx 取消传播 → ES 恢复后 goroutine 也感知不到
1
2
3
4
5
6
7
8
9

修复方案总结:

// ✅ 完整修复:Pipeline + Fan-out + Or-Done + 大缓冲
func fixedPipeline(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // channel 缓冲区:QPS × P99延迟 = 5000 × 0.5s ≈ 2500 → 用 3000
    rawCh      := make(chan RawLog, 1000)      // Kafka → 去重
    dedupedCh  := make(chan RawLog, 2000)      // 去重 → 富化
    enrichedCh := make(chan EnrichedLog, 3000) // 富化 → ES

    // Stage 1: Kafka 消费
    go consumeKafka(ctx, rawCh)

    // Stage 2: 去重
    go deduplicate(ctx, rawCh, dedupedCh)

    // Stage 3: 富化——Fan-out 25 个 goroutine
    go fanOutEnrich(ctx, dedupedCh, enrichedCh, 25)

    // Stage 4: ES 写入(主 goroutine)
    writeToES(ctx, enrichedCh)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 10.2 一次 Pipeline 数据的完整旅程

Kafka 消息到达
─────────────────────────────────────────────────────────
        │
        ├─ Stage 1: consumeKafka
        │    kafkaConsumer.Message() → parseRawLog
        │    → select { rawCh <- log; <-ctx.Done() }
        │    → rawCh 成功写入 → 去重 goroutine 接收
        │
        ├─ Stage 2: deduplicate
        │    for log := range rawCh:
        │        检查 Fingerprint → 未见过 → dedupedCh <- log
        │
        ├─ Stage 3: fanOutEnrich (25 workers)
        │    worker-7 从 dedupedCh 拿到 log:
        │      enrichLog(log) → lookupGeo → 5ms
        │      → select { enrichedCh <- result; <-ctx.Done() }
        │      → enrichedCh 写入成功
        │
        │    如果此时 ctx 被取消(外部超时):
        │      worker-7 的 select 走 ctx.Done() 分支 → return
        │      → wg.Done() → 当所有 worker 都 return → close(enrichedCh)
        │
        ├─ Stage 4: writeToES
        │    for enriched := range enrichedCh:
        │      esClient.Index(enriched) → 50ms (正常)
        │      → 如果 ES 慢到 500ms → enrichedCh 缓冲区 3000 吸收
        │      → 如果 3000 也满了 → enriched 的 `enrichedCh <- result` 阻塞
        │      → 反压传递到 enrich workers → dedupedCh → rawCh → Kafka
        │
        └─ 退出:
              ctx cancel → 所有 stage 收到 ctx.Done() → return
              → Kafka consumer commit offset(保证 at-least-once)
              → ES 写入最后一批数据 → 程序退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 10.3 设计哲学回扣

哲学 1:用 channel 缓冲区做"蓄水池"——在反压链中留出缓冲空间

Pipeline 的每个 stage 之间是一个 channel。如果这个 channel 是无缓冲的——上下游完全同步——下游一慢,上游立刻卡住。加上缓冲 = 上下游之间多了一个"蓄水池"——下游短暂变慢时,上游可以继续生产(把数据放入缓冲区)。缓冲区的大小 = QPS × 下游 P99 延迟的缓冲时间。但缓冲不能无限大——它是"时间换空间":给下游一个恢复窗口,而不是掩盖问题。

哲学 2:Fan-out 的"并行瓶颈消除"——是 Pipeline 的自然延伸

Pipeline 让数据流变成"线性的"——但当某个 stage 成为瓶颈,线性就变成了瓶颈。Fan-out 是 Pipeline 的自然延伸——在瓶颈 stage 开多个 goroutine 并行处理——把一个串行节点变成并行节点。Fan-out 不是"设计模式"——是 Go 的 channel 天然支持多消费者——只需要多启动几个 goroutine 就够了。

哲学 3:Or-Done 是 goroutine 泄漏防护的"最后一层安全网"

第 15 篇讲了 goroutine 泄漏的五种成因——Or-Done 是它们的统一解。任何 <-ch 都可能永远阻塞——Or-Done 给每个阻塞操作配上退出路径:select { case v := <-ch: case <-done: return }。这不是"功能需求"——是"安全需求":每个 goroutine 都必须有退出路径——Or-Done 保证了这个约束。

哲学 4:模式是 channel + goroutine + select 的"组合公式"——不是封装,是约定

Go 的并发模式不像 Java 的 Design Patterns——没有抽象类、没有接口继承。它们就是几个 goroutine + 几个 channel + 几个 select 的组合——模式只是"我们约定这么写,因为这样最不容易出错"。Tee 就是"用 goroutine 把数据复制到两个 channel",Fan-out 就是"多个 goroutine 读同一个 channel",Pipeline 就是"每个函数返回 <-chan T"。约定优于配置,模式是社区共识的结晶。

# 10.4 速查表

七种模式速查:

模式 核心结构 用途 关键约定
Pipeline stage(in) -> out chan 数据依次处理 每个 stage defer close(out)
Fan-out 多 goroutine 竞争 <-ch 并行处理瓶颈 goroutine 数量 = 目标QPS × 单任务耗时
Fan-in 多 <-ch + WaitGroup + 转发 合并多个输出 WaitGroup 等所有转发 goroutine 结束后 close
Worker Pool task channel + 固定 goroutine 持续任务处理 Shutdown 时 close(taskCh) + close(done)
Or-Done select { case v:=<-ch: case <-done: } 任何 channel 加退出信号 接收和发送都要检查 done
Tee goroutine 把数据发到两个 out channel 数据多播 nil channel 技巧依次发送
Bridge <-chan (<-chan T) → <-chan T 动态数据源扁平化 遍历内层 channel 直到关闭

模式组合决策树:

场景 推荐组合
数据批量处理,某阶段慢 Pipeline + Fan-out
数据持续到达,固定并发 Worker Pool
需要多播数据 Pipeline 后接 Tee
需要动态订阅者 Pipeline 后接 Pub/Sub
需要动态数据源 Bridge → Pipeline
需要优雅退出 所有模式 + Or-Done
需要错误传播 + 快速失败 所有模式 + errgroup.WithContext
需要加权并发控制 Worker Pool + semaphore.Weighted

Channel 缓冲区大小公式:

缓冲区大小 = 目标 QPS × 下游 P99 延迟 × 缓冲系数

缓冲系数:
  - 批处理场景:1~3(允许几秒的缓冲)
  - 实时处理场景:0.1~1(只缓冲几百毫秒)
  - 外部 API 调用:5~10(API 抖动通常 5-10 秒恢复)
1
2
3
4
5
6

下一篇:我们已经掌握了 Pipeline、Fan-out/Fan-in、Or-Done 等七种并发模式,下一步进入 17.GC三色标记与屏障——从 Go 1.3 STW 到 Go 1.8 混合屏障,把 GC 的演进史和三色标记的每一个细节剖开。

上次更新: 2026/06/13, 21:14:36
协程泄漏排查与修复
GC三色标记与屏障

← 协程泄漏排查与修复 GC三色标记与屏障→

最近更新
01
信号崩溃快速排查
06-15
02
CoreDump破案
06-15
03
perf火焰图实战
06-15
更多文章>
Theme by Vdoing | Copyright © 2019-2026 杨充 | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式