并发设计模式详解
# 16.并发设计模式详解
卷三第十六篇——Go 的并发模型不是"一堆 goroutine 乱跑",而是可以通过 channel 组合出可预测的数据流拓扑。Pipeline 让数据像流水线一样传递,Fan-out/Fan-in 把"扇出放大 + 扇回归一"变成标准操作,Worker Pool 控制并发度的同时也控制资源,Or-Done 让 goroutine 优雅退出。这些不是设计模式书上的抽象概念——是 Go 标准库扩展库每天都在用的代码模板。读完本篇,你能回答:Pipeline 的每个 stage 应该用什么类型的 channel?Fan-out 的 goroutine 数量怎么确定?为什么 Tee 模式不能直接用同一个 channel 发给两个消费者?关键词:Pipeline、Fan-out/Fan-in、Worker Pool、Or-Done、Tee、Bridge、Pub/Sub。
# 目录介绍
- 1. 案例引入
- 2. 架构概览
- 3. Pipeline 流水线模式
- 4. Fan-out 与 Fan-in 模式
- 5. Worker Pool 工作池模式
- 6. Or-Done 与 Tee 模式
- 7. Bridge 桥接模式
- 8. Pub/Sub 发布订阅模式
- 9. 模式组合实战
- 10. 综合案例串讲
# 1. 案例引入
# 1.1 一段崩在哪
看一个实时日志处理服务——从 Kafka 消费原始日志,经过去重、格式化、富化(补全 IP 归属地)、写入 Elasticsearch 四个阶段。每天处理 2 亿条日志,高峰期 QPS 超 5000:
// log_pipeline.go —— 日志处理流水线
package main
import (
"context"
"log"
"sync"
)
// Stage 1: Kafka 消费 → 产出原始日志
func consumeKafka(ctx context.Context, out chan<- RawLog) {
defer close(out)
for msg := range kafkaConsumer.Messages() {
select {
case out <- parseRawLog(msg):
case <-ctx.Done():
return
}
}
}
// Stage 2: 去重
func deduplicate(in <-chan RawLog, out chan<- RawLog) {
defer close(out)
seen := make(map[string]bool)
for log := range in {
if !seen[log.Fingerprint] {
seen[log.Fingerprint] = true
out <- log
}
}
}
// Stage 3: 富化——补全 IP 归属地(调用外部 API)
func enrich(in <-chan RawLog, out chan<- EnrichedLog) {
defer close(out)
for log := range in {
geo, err := lookupGeo(log.IP) // ← 外部 API 调用——每次 ~5ms
if err != nil {
continue // 跳过失败的
}
out <- EnrichedLog{RawLog: log, Geo: geo}
}
}
// Stage 4: 写入 Elasticsearch
func writeToES(in <-chan EnrichedLog) {
for log := range in {
if err := esClient.Index(log); err != nil {
log.Printf("ES 写入失败: %v", err)
}
}
}
func main() {
ch1 := make(chan RawLog, 100) // Kafka → 去重
ch2 := make(chan RawLog, 100) // 去重 → 富化
ch3 := make(chan EnrichedLog, 100) // 富化 → ES
go consumeKafka(context.Background(), ch1)
go deduplicate(ch1, ch2)
go enrich(ch2, ch3)
writeToES(ch3) // 主 goroutine 做最后一步
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
现象:
- 正常时 QPS 5000,ES 写入 P99 约 50ms——流水线运行平稳
- 某天 ES 集群因为分片迁移变慢——写入 P99 涨到 500ms
- 问题来了:ES 慢 →
writeToES消费 ch3 变慢 → ch3 积压 →enrich往 ch3 写阻塞 → ch2 积压 →deduplicate往 ch2 写阻塞 → ch1 积压 →consumeKafka往 ch1 写阻塞 - 反压层层传递——最上游的 Kafka 消费被堵住 → 消费者心跳超时 → Kafka rebalancing → 整个消费组停摆
- 监控显示:CPU 利用率 15%,内存 30%——资源充足但已经被"一个慢节点卡死了整条链"
更隐蔽的问题:富化阶段 lookupGeo 是串行的——每次 5ms × 5000 QPS = 25 秒/秒的累计延迟——理论上需要 25 个并发才能追上速率。但当前只有一个 goroutine 在做富化——它本身就是瓶颈:没有使用 Fan-out 模式。
# 1.2 顺藤摸到根因
追查过程:
第一步:定位瓶颈——给每个 stage 加上耗时统计,发现富化阶段的 ch2→ch3 的 channel 长度持续在 100(满),ch3→ES 也在 100(满)。反压链:ES 写入慢 → 堵 ch3 → 堵 enrich → 堵 ch2 → 堵 deduplicate → 堵 ch1 → 堵 consumeKafka。
第二步:分析容错——ES 变慢时,enrich 还在调用 lookupGeo(外部 API)——即使下游已经堵了,它还在做无用功。应该在 out <- enrichedLog 之前检查 ctx.Done()。
第三步:分析并发度——enrich 只有一个 goroutine:5000 QPS × 5ms/条 = 25000ms/s > 1000ms——串行处理根本跟不上。需要 Fan-out 成多个 goroutine。
这个事故藏着 7 个原理点:
① Pipeline 的每个 stage 应该用有缓冲还是无缓冲 channel?缓冲多大? → 第 3 章
② Fan-out 怎么把一个 channel 的数据分发给多个 goroutine? → 第 4 章
③ Fan-in 怎么把多个 goroutine 的输出合并到一个 channel? → 第 4.2
④ Worker Pool 和 Fan-out 的区别是什么——什么时候选哪个? → 第 5 章
⑤ Or-Done 怎么让 Pipeline 的 goroutine 优雅退出? → 第 6.1
⑥ Tee 模式为什么不能把同一个 channel 发给两个消费者? → 第 6.2-6.3
⑦ Bridge 怎么把"动态变化的 channel 列表"变成一个稳定的数据流? → 第 7 章
2
3
4
5
6
7
# 1.3 我们要回答什么
这个日志流水线案例贯穿全篇。我们从 Pipeline 的基础模型出发,深入到 Fan-out/Fan-in 的并发放大、Worker Pool 的资源控制、Or-Done 的优雅退出——最后用 Pipeline + Fan-out + Or-Done 的组合模式彻底修复——并用选型决策树指导读者在真实场景中如何选模式。
本篇路线:
架构总图 (第 2 章) ── 七种模式全景 + channel 是公共语言
↓
Pipeline (第 3 章) ── 基础三级流水线 + 取消传播
↓
Fan-out/Fan-in (第 4 章) ── 扇出 + 扇入 + 系数选择
↓
Worker Pool (第 5 章) ── 固定池 + 动态扩缩 + 与 errgroup 对比
↓
Or-Done/Tee (第 6 章) ── 优雅退出 + 分流 + Tee 陷阱
↓
Bridge (第 7 章) ── channel of channels 扁平化
↓
Pub/Sub (第 8 章) ── 基于 channel 的发布订阅
↓
模式组合 (第 9 章) ── 组合实战 + 选型决策树
↓
综合案例 (第 10 章) ── 修复日志流水线 + 设计哲学
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
📌 本篇定位:第 09 篇讲 channel 底层实现、第 13-14 篇讲信号量和 errgroup——那些是"组件"。本篇把组件组合成模式——Pipeline 是 channel 的串联、Fan-out 是 channel 的并联、Or-Done 是 select 的套用。理解了模式,你就知道"什么时候用 channel、什么时候用锁、什么时候用 errgroup"——这是 Go 并发编程从"能跑"到"工程化"的分水岭。
# 2. 架构概览
# 2.1 七种模式全景图
七种 Go 并发模式按"数据流拓扑"分为四类:
Go 并发模式 ── 按数据流拓扑分类
│
├─ 线性拓扑
│ ├── Pipeline:goroutine 串联——数据像流水线一样依次通过各 stage
│ └── Bridge:channel of channels → 单 channel——"动态数据源"扁平化
│
├─ 扇入扇出拓扑
│ ├── Fan-out:一个输入 → 多个 goroutine 并行处理
│ └── Fan-in:多个 goroutine 的输出 → 合并到一个 channel
│
├─ 分发拓扑
│ ├── Pub/Sub:一条消息 → 多个订阅者独立接收
│ └── Tee:一个输入 channel → 两个输出 channel(独立副本)
│
└─ 生命周期拓扑
├── Worker Pool:固定数量 goroutine——有边界、有容量
└── Or-Done:给任何 channel 加上"可取消"的退出信号
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
模式之间的关系图:
┌─────────────────────────────┐
│ Worker Pool │
│ goroutine 池 + task chan │
└──────────────┬──────────────┘
│ 常与
┌──────────────┴──────────────┐
│ Pipeline │
│ stage1 → stage2 → stage3 │
└──────┬────────────┬─────────┘
│ │
┌──────────┴──┐ ┌────┴──────────┐
│ Fan-out │ │ Fan-in │
│ 1 → N 分发 │ │ N → 1 合并 │
└─────────────┘ └───────────────┘
│ │
┌──────────┴────────────┴──────────┐
│ Or-Done │
│ 给任何 channel 加取消信号 │
└──────────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 2.2 channel 是所有模式的公共语言
疑惑:为什么 Go 的并发模式都围绕 channel 展开——而不是锁或 WaitGroup?
论证:
channel 天生适合做"数据流"的载体——Pipeline 的"数据从一个 stage 传到下一个 stage"本质就是 channel 的 send/recv。锁保护的是"共享状态"(如计数器、缓存),但并发模式处理的是"数据传递"——channel 是数据传递的最佳载体。
channel 自带反压——
ch <- v在 channel 满时阻塞——这就是天然的反压机制。第 1 章的日志流水线中,ES 慢 → ch3 满 → enrich 的ch3 <- log阻塞 → 反压传递到上游。不需要额外的信号量或计数器——channel 本身就是流控。channel 的 close 是"数据结束"的统一信号——
close(ch)唤醒所有for-range ch的 goroutine。在 Pipeline 中,上游 stage 的defer close(out)就是告诉下游"没有更多数据了"的信号——无需额外的"done" flag。select 是模式组合的"胶水"——Or-Done 就是
select { case v := <-ch: case <-done: }。Tee 就是select { case out1 <- v: out2 <- v }。select 让 goroutine 可以同时等待多个 channel——这是组合模式的基础。
结论:Go 的并发模式不是凭空发明的——它们是 channel + goroutine + select 这三个原语的自然组合。理解了这三个原语,模式只是它们的"最佳实践用法"。
# 3. Pipeline 流水线模式
# 3.1 基础三级流水线
Pipeline 是最基本的并发模式——每个 stage 是一个 goroutine,通过 channel 与前后的 stage 通信:
数据流:
[生成器] ──ch1──→ [处理器] ──ch2──→ [消费者]
gen() process() consume()
2
3
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// Pipeline: gen → square → 主 goroutine 打印
for n := range square(gen(1, 2, 3, 4, 5)) {
fmt.Println(n) // 1, 4, 9, 16, 25
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Pipeline 的核心约定:
- 每个 stage 的 goroutine 在数据发完后
defer close(out)——通知下游"我完成了" - 下游用
for v := range in消费——上游 close 时自动退出 - 每个 stage 返回
<-chan T(只读 channel)——防止下游意外写入
# 3.2 扇出流水线
当某个 stage 是瓶颈时(如第 1 章的 enrich 阶段调用外部 API),可以扇出:启动多个 goroutine 并行处理同一个输入 channel:
┌─→ [处理器副本 1] ─┐
[生成器] ──ch──┼─→ [处理器副本 2] ─┼──out──→ [消费者]
└─→ [处理器副本 3] ─┘
2
3
// Fan-out: 多个 goroutine 从同一个 channel 读取——Go channel 天然支持
func fanOut(in <-chan RawLog, workerCount int) <-chan EnrichedLog {
out := make(chan EnrichedLog)
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for log := range in { // ← 多个 goroutine 竞争同一个 channel
enriched, err := enrichLog(log)
if err == nil {
out <- enriched
}
}
}()
}
// Fan-in: 等所有 worker 完成 → close out
go func() {
wg.Wait()
close(out)
}()
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
为什么多个 goroutine 可以安全地 for-range 同一个 channel——Go channel 的内部实现保证了:<-ch 时 recvq 上的 goroutine 是原子唤醒的——不会出现两个 goroutine 同时收到同一个值(详见第 09 篇 channel 源码剖析)。
# 3.3 取消传播
Pipeline 的每个 stage 都应该接受 context——当上游取消时,下游能感知并安全退出:
// ✅ 可取消的 Pipeline stage
func enrichWithCtx(ctx context.Context, in <-chan RawLog) <-chan EnrichedLog {
out := make(chan EnrichedLog)
go func() {
defer close(out)
for {
select {
case log, ok := <-in:
if !ok {
return // 上游 channel 关闭 → 退出
}
enriched, err := enrichLog(log)
if err == nil {
select {
case out <- enriched:
case <-ctx.Done():
return // 取消 → 退出
}
}
case <-ctx.Done():
return // 取消 → 退出
}
}
}()
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
回到第 1 章的日志流水线——加上扇出 + 取消传播后,enrich 阶段的问题就解决了。这展示了 Pipeline 和 Fan-out 组合的威力(第 9 章会完整展示)。
# 4. Fan-out 与 Fan-in 模式
# 4.1 Fan-out 扇出实现
Fan-out 的核心是将一个输入 channel 的数据分发到多个 goroutine 并行处理。Go 的 channel 天然支持多消费者——无需额外实现:
// Fan-out 模式:1 个输入 → N 个 worker → N 个输出 channel
func fanOut(in <-chan Task, workerCount int) []<-chan Result {
workers := make([]<-chan Result, workerCount)
for i := 0; i < workerCount; i++ {
workers[i] = worker(in) // 每个 worker 返回自己的 output channel
}
return workers
}
func worker(in <-chan Task) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for task := range in {
out <- process(task)
}
}()
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Fan-out 的关键:
- 多个 goroutine 竞争同一个
<-chan(Go 保证公平性——不会饿死某个 worker) - 每个 worker 有自己的输出 channel——需要 Fan-in 来合并
# 4.2 Fan-in 扇入实现
Fan-in 将多个 channel 的数据合并到一个 channel——需要用 WaitGroup 等所有上游完成后关闭输出 channel:
// Fan-in 模式:N 个输入 channel → 1 个输出 channel
func fanIn(channels ...<-chan Result) <-chan Result {
out := make(chan Result)
var wg sync.WaitGroup
// 对每个输入 channel 启动一个 goroutine——把数据转发到 out
for _, ch := range channels {
wg.Add(1)
go func(c <-chan Result) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
// 等所有转发 goroutine 结束 → close out
go func() {
wg.Wait()
close(out)
}()
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Fan-in 的 goroutine 数量:每个输入 channel 需要一个转发 goroutine——如果 Fan-out 了 N 个 worker,Fan-in 阶段就有 N 个转发 goroutine。总 goroutine 数 = N (worker) + N (转发) + 1 (Wait goroutine) = 2N+1。
Fan-in 的优化——用 select 合并少量 channel(不需要 goroutine):
// 合并 2 个 channel——不用 goroutine
func mergeTwo(a, b <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for a != nil || b != nil {
select {
case v, ok := <-a:
if !ok { a = nil; continue }
out <- v
case v, ok := <-b:
if !ok { b = nil; continue }
out <- v
}
}
}()
return out
}
// 当 channel 是 nil 时——select 永远跳过那个 case(Go 的 nil channel 永不就绪特性)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 4.3 扇出系数选择
疑惑:Fan-out 应该开多少个 goroutine?
论证:
扇出系数取决于两个因素:
瓶颈 stage 的单任务耗时 vs 目标吞吐——如果 enrich 阶段单次处理 5ms,目标 QPS 是 5000:需要的并发数 = 5000 × 0.005 / 1 ≈ 25。所以至少 25 个 worker。
下游的承载能力——如果 ES 只能承受 1000 写 QPS,开了 100 个 enrich worker 也没用——输出会被 Fan-in 的 channel 缓冲/阻塞。扇出必须和下游容量匹配。
外部资源限制——如果 enrich 调用的是第三方 API(有 rate limit)——超过 rate limit 会导致 429 错误——扇出系数不能超过 rate limit。
// ✅ 经验法则:扇出系数 = max(CPU 核数, 目标吞吐 × 单任务耗时)
func optimalFanOut(targetQPS float64, taskDuration time.Duration, cpuCores int) int {
needed := int(targetQPS * taskDuration.Seconds())
if needed < cpuCores {
return cpuCores // 至少用满 CPU
}
if needed > 100 {
return 100 // 上限——避免 goroutine 泛滥
}
return needed
}
2
3
4
5
6
7
8
9
10
11
结论:扇出系数不是越大越好——它是"够快就行"。扇出 100 个 goroutine 处理一个 QPS=10 的输入就是浪费。扇出 = 目标吞吐 × 单任务耗时 / 1 秒。
# 5. Worker Pool 工作池模式
# 5.1 固定大小工作池
Worker Pool 和 Fan-out 的区别——Worker Pool 的 goroutine 数量是固定的,任务通过一个 task channel 分发:
┌─→ [Worker 1] ─┐
[taskCh] ───┼─→ [Worker 2] ─┼──→ [resultCh]
└─→ [Worker 3] ─┘
2
3
type WorkerPool struct {
taskCh chan Task
resultCh chan Result
done chan struct{}
}
func NewWorkerPool(workerCount, queueSize int) *WorkerPool {
pool := &WorkerPool{
taskCh: make(chan Task, queueSize),
resultCh: make(chan Result, queueSize),
done: make(chan struct{}),
}
for i := 0; i < workerCount; i++ {
go pool.worker(i)
}
return pool
}
func (p *WorkerPool) worker(id int) {
for {
select {
case task, ok := <-p.taskCh:
if !ok {
return // task channel 关闭 → worker 退出
}
result := process(task)
p.resultCh <- result
case <-p.done:
return // 外部通知退出
}
}
}
func (p *WorkerPool) Submit(task Task) {
p.taskCh <- task // 如果 taskCh 满了 → 阻塞调用方 → 反压
}
func (p *WorkerPool) Shutdown() {
close(p.done) // 通知所有 worker 退出
close(p.taskCh) // 不再接受新任务
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Worker Pool vs Fan-out 的差异:
| Worker Pool | Fan-out | |
|---|---|---|
| goroutine 生命周期 | 长生命周期——随 Pool 创建/销毁 | 短生命周期——随输入 channel 关闭而退出 |
| 任务分发方式 | 通过 task channel 显式分发 | 多个 worker 竞争同一个输入 channel |
| 反压位置 | Submit 时 taskCh <- task 阻塞 | 输入 channel 的 send 方阻塞 |
| 适用场景 | 持久的后台任务(如 HTTP handler pool) | 一次性批处理(如数据处理流水线) |
# 5.2 动态扩缩容
固定大小的 Worker Pool 在流量波动时可能不足或浪费。动态扩缩容增加弹性——但需要小心 goroutine 泄漏:
type DynamicPool struct {
mu sync.Mutex
workers int
maxWorkers int
minWorkers int
taskCh chan Task
}
func (p *DynamicPool) adjustSize() {
p.mu.Lock()
defer p.mu.Unlock()
queueLen := len(p.taskCh)
if queueLen > cap(p.taskCh)/2 && p.workers < p.maxWorkers {
// 队列过半 → 扩容
go p.worker()
p.workers++
} else if queueLen < cap(p.taskCh)/10 && p.workers > p.minWorkers {
// 队列几乎空 → 缩容——但不能直接 kill goroutine
// 需要 worker 自己检查"是否该退出"→ 用 done signal
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
实际生产中更安全的做法——用 errgroup + SetLimit 替代手写动态扩缩(见 5.3)。
# 5.3 与 errgroup.SetLimit 的关系
Worker Pool 的"固定并发度"本质就是第 14 篇的 errgroup.SetLimit——但 Worker Pool 更适合"长生命周期",errgroup 更适合"一次性批量":
// 一次性批处理——errgroup.SetLimit 更简洁
func batchProcess(tasks []Task) error {
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(10) // ← Worker Pool 的等效表达:10 个 worker
for _, task := range tasks {
task := task
g.Go(func() error {
return process(ctx, task)
})
}
return g.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
选择原则:
- 一次性处理所有任务 → 用 errgroup + SetLimit
- 任务持续到达(如 HTTP handler)→ 用 Worker Pool
- 需要动态扩缩 → 考虑用
semaphore.Weighted+ goroutine 管理
# 6. Or-Done 与 Tee 模式
# 6.1 Or-Done 优雅退出
Or-Done 是给任何 channel 加上一个"退出信号"——这是 goroutine 泄漏防护的核心模式:
// Or-Done: 从 ch 接收数据——但如果 done 被 close → 立即退出
func orDone(done <-chan struct{}, ch <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
for {
select {
case <-done:
return // ← 退出信号优先级最高
case v, ok := <-ch:
if !ok {
return // ← 上游 channel 关闭
}
select {
case out <- v:
case <-done:
return // ← 发送时也要检查退出信号
}
}
}
}()
return out
}
// 使用:把任何 channel 包装成"可取消"版本
func consumeWithCancel(done <-chan struct{}, taskCh <-chan Task) {
for task := range orDone(done, taskCh) {
process(task.(Task))
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Or-Done 的精妙——双重 select(接收时一次检查 done,发送时再检查一次 done)。如果只在外层 select 检查 done:
- 接收时 done 关闭 → 正常退出 ✓
- 但如果在
out <- v时 done 关闭 → goroutine 阻塞在发送上 → done 检查不到 → 泄漏
这就是为什么内层还有一个 select { case out <- v: case <-done: return }——任何阻塞点都必须有退出路径。
# 6.2 Tee 分流模式
Tee 模式将一个 channel 的数据复制到两个输出 channel——两个消费者各自独立接收:
// Tee: 1 个输入 → 2 个输出(各自独立副本)
func tee(done <-chan struct{}, in <-chan interface{}) (<-chan interface{}, <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out1)
defer close(out2)
for v := range orDone(done, in) {
// 必须用局部变量复制——避免下面的 select 发送到同一个 out1/out2
var out1Ch, out2Ch = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
return
case out1Ch <- v:
out1Ch = nil // ← nil channel 让 select 跳过这个 case
case out2Ch <- v:
out2Ch = nil
}
}
}
}()
return out1, out2
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Tee 的关键技巧——out1Ch = nil:
- 第一次 select: out1 和 out2 都可能就绪 → 随机选一个 → 选了 out1 → 发送成功 →
out1Ch = nil - 第二次 select: out1Ch 是 nil channel → select 跳过 → 只能选 out2 → 发送成功
- 用 nil channel 实现"依次发送到两个 channel"
# 6.3 为什么 Tee 不能共享 channel
疑惑:为什么 Tee 需要复制数据——不能直接把同一个 input channel 发给两个消费者吗?
论证:
如果把同一个 channel 传给两个 goroutine:
// ❌ 错误:两个 goroutine 共享同一个 channel——数据竞争
ch := make(chan int, 10)
go func() { for v := range ch { fmt.Println("A:", v) } }()
go func() { for v := range ch { fmt.Println("B:", v) } }()
for i := 0; i < 5; i++ { ch <- i }
close(ch)
// 输出可能是:
// A: 0 B: 1 A: 2 A: 3 B: 4 ——每个值只被一个 goroutine 收到!
2
3
4
5
6
7
8
9
10
根因:channel 的 recv 是互斥的——每个值只能被一个 goroutine 拿到。两个消费者竞争同一个 channel → 每个值各被谁拿走是随机的 → 这是 Fan-out(负载均衡),不是 Tee(多播)。
Tee 必须"复制"——用 goroutine 把每个值分别发送到两个独立的输出 channel。
结论:Tee 是"广播"——一个数据源 → 多个独立副本 → 每个消费者拿到完整数据。共享 channel 是"负载均衡"——一个数据源 → 多个消费者 → 数据被瓜分。两者语义完全不同。
# 7. Bridge 桥接模式
# 7.1 将 channel 的 channel 扁平化
Bridge 模式解决的是"动态数据源"问题——当数据源的列表本身是变化的:
// 场景:有多个数据源——每个数据源是一个 <-chan T
// 数据源列表本身在动态变化(新增/移除)
// Bridge 把所有数据源的数据合并到一个输出 channel
func bridge(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
for {
// chanStream 本身是一个 channel——它产出"数据源 channel"
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream // 切换到新的数据源
case <-done:
return
}
// 消费当前数据源——直到它关闭
for v := range orDone(done, stream) {
select {
case out <- v:
case <-done:
return
}
}
}
}()
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Bridge 的核心——chanStream <-chan <-chan T 是"channel of channels"。外层 channel 产出内层 channel——每个内层 channel 是一个数据源。Bridge 遍历所有内层 channel——把它们的数据串行输出。
# 7.2 典型应用:动态数据源切换
Bridge 的典型场景是动态增减的数据源——如服务发现的端点列表变化:
func watchEndpoints(done <-chan struct{}, discoveryCh <-chan []string) <-chan interface{} {
// 把"端点列表变化事件"转成"数据源的 channel"
streamCh := make(chan (<-chan interface{}))
go func() {
defer close(streamCh)
for {
select {
case endpoints := <-discoveryCh:
for _, ep := range endpoints {
streamCh <- connectToEndpoint(ep) // 每个端点一个 channel
}
case <-done:
return
}
}
}()
// Bridge: 把所有端点的数据扁平化为一个 channel
return bridge(done, streamCh)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Bridge 的价值——调用方只看到一个 <-chan T——完全不感知底层有多少个数据源、数据源什么时候变化。这是"抽象层"的威力:把复杂性封装在模式内部。
# 8. Pub/Sub 发布订阅模式
# 8.1 基于 channel 的简易实现
Pub/Sub 和 Tee 类似——都是"一对多"——但 Pub/Sub 支持动态订阅/取消订阅:
type PubSub struct {
mu sync.RWMutex
subs map[string][]chan interface{} // topic → 订阅者 channel 列表
}
func NewPubSub() *PubSub {
return &PubSub{subs: make(map[string][]chan interface{})}
}
// Subscribe 返回一个只读 channel——订阅者通过它接收消息
func (ps *PubSub) Subscribe(topic string) <-chan interface{} {
ch := make(chan interface{}, 10) // 缓冲——避免慢消费者阻塞发布者
ps.mu.Lock()
ps.subs[topic] = append(ps.subs[topic], ch)
ps.mu.Unlock()
return ch
}
// Publish 把消息发送给 topic 的所有订阅者
func (ps *PubSub) Publish(topic string, msg interface{}) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, ch := range ps.subs[topic] {
select {
case ch <- msg:
default:
// 订阅者 channel 满了——丢弃消息(或记录 metric)
}
}
}
// Unsubscribe 取消订阅——关闭并移除 channel
func (ps *PubSub) Unsubscribe(topic string, ch <-chan interface{}) {
ps.mu.Lock()
defer ps.mu.Unlock()
subs := ps.subs[topic]
for i, sub := range subs {
if sub == ch {
close(sub)
ps.subs[topic] = append(subs[:i], subs[i+1:]...)
return
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
使用示例:
ps := NewPubSub()
// 两个订阅者订阅同一个 topic
ch1 := ps.Subscribe("logs")
ch2 := ps.Subscribe("logs")
// 发布消息——两个订阅者都收到
ps.Publish("logs", LogEntry{Level: "ERROR", Msg: "磁盘满"})
go func() { for log := range ch1 { handle(log) } }()
go func() { for log := range ch2 { handle(log) } }()
2
3
4
5
6
7
8
9
10
11
# 8.2 与消息队列的关系
Go channel 实现的 Pub/Sub 适用于进程内通信——不适合跨进程。如果需要持久化、重试、死信队列——用 RabbitMQ/Kafka/Pulsar。
选择指南:
- 进程内、低延迟、不要求持久化 → channel Pub/Sub
- 跨进程、需要持久化和确认机制 → 消息队列
- 两者可以组合——Pub/Sub 作为消息队列的本地缓存层(减少 RPC 调用)
# 9. 模式组合实战
# 9.1 Pipeline + Fan-out = 并行流水线
回到第 1 章的日志流水线——把 Fan-out 应用到瓶颈的 enrich 阶段:
// ✅ 修复后的日志流水线:Pipeline + Fan-out + Fan-in
func enhancedPipeline(ctx context.Context) {
// Stage 1: Kafka 消费 → 去重
rawCh := make(chan RawLog, 1000) // 缓冲区放大到 1000
// Stage 2: 富化阶段——Fan-out 到 25 个 goroutine
enrichedCh := fanOutEnrich(ctx, rawCh, 25)
// Stage 3: ES 写入
writeToES(ctx, enrichedCh)
// 启动 Kafka 消费(在最上游)
go consumeKafka(ctx, rawCh)
}
func fanOutEnrich(ctx context.Context, in <-chan RawLog, workers int) <-chan EnrichedLog {
out := make(chan EnrichedLog, 1000)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case log, ok := <-in:
if !ok { return }
enriched, err := enrichLog(log)
if err != nil { continue }
select {
case out <- enriched:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
改进点:
- enrich 阶段从 1 个 goroutine → 25 个 → 瓶颈消除 ✓
- channel 缓冲区从 100 → 1000 → 吸收 ES 短暂抖动 ✓
- 每个阶段都检查 ctx.Done() → 取消传播 ✓
- ES 写入阶段也加上了
select { case enriched := <-enrichedCh: ... case <-ctx.Done(): return }✓
# 9.2 Worker Pool + Or-Done = 可优雅退出的池
Worker Pool 结合 Or-Done——保证 shutdown 时所有 worker 都退出:
func (p *WorkerPool) gracefulShutdown(timeout time.Duration) {
// 1. 通知所有 worker 退出
close(p.done)
// 2. 等待 worker 处理完当前任务(给一个宽限期)
done := make(chan struct{})
go func() {
p.wg.Wait() // 等待所有 worker 结束
close(done)
}()
select {
case <-done:
log.Println("所有 worker 优雅退出")
case <-time.After(timeout):
log.Println("超时——强制退出(可能有未完成的任务)")
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 9.3 选型决策树
你需要?
│
├─ 数据需要依次经过多个处理步骤?
│ → Pipeline
│ 如果某个步骤是瓶颈 → Pipeline + Fan-out
│
├─ 数据需要并行处理(同一个处理逻辑)?
│ → 一次性批量数据 → Fan-out + Fan-in(或用 errgroup.SetLimit)
│ → 持续到达的任务 → Worker Pool
│
├─ 多个消费者各自需要完整的数据副本?
│ → Tee(2 个消费者)或 Pub/Sub(动态订阅者)或 Bridge(动态数据源)
│
├─ 需要优雅退出(取消或超时)?
│ → 给任何模式套上 Or-Done
│
└─ 需要结合多种模式?
→ Pipeline + Fan-out + Or-Done(最常见组合)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 10. 综合案例串讲
# 10.1 案例真相揭晓
回到第 1 章日志流水线的七个疑问,逐条作答:
| 疑问 | 答案 |
|---|---|
| ① Pipeline 的 stage 应该用有缓冲还是无缓冲 channel? | 第 3 章:有缓冲——吸收下游短暂抖动,缓冲区大小 = 目标 QPS × 下游 P99 延迟 |
| ② Fan-out 怎么分发数据? | 第 4.1:多个 goroutine 竞争同一个 <-chan——Go channel 天然支持多消费者 |
| ③ Fan-in 怎么合并输出? | 第 4.2:每个输出 channel 一个转发 goroutine + WaitGroup + 最后 close |
| ④ Worker Pool 和 Fan-out 的区别? | 第 5 章:Worker Pool 是持久的、有边界的池;Fan-out 是一次性的并行分发 |
| ⑤ Or-Done 怎么优雅退出? | 第 6.1:双重 select——接收和发送时都检查 done signal |
| ⑥ 为什么 Tee 不能共享 channel? | 第 6.3:channel 的 recv 是互斥的——共享 channel 是 Fan-out(负载均衡),不是 Tee(广播) |
| ⑦ Bridge 怎么把动态数据源变稳定? | 第 7 章:<-chan (<-chan T) → Bridge → <-chan T——抽象掉数据源变化 |
案例完整根因链条:
ES 写入 P99 从 50ms 涨到 500ms → ch3 积压
→ enrich 的 `ch3 <- log` 阻塞 → ch2 积压
→ deduplicate 的 `ch2 <- log` 阻塞 → ch1 积压
→ consumeKafka 的 `ch1 <- log` 阻塞 → 消费者心跳超时 → rebalancing
→ 与此同时 enrich 只有一个 goroutine → 5ms × 5000 QPS = 25s/s > 1s → 串行瓶颈
根因 1: 全部用无缓冲/小缓冲 channel → 无缓冲吸收能力 → 反压瞬间传导到最上游
根因 2: enrich 串行 → 无 Fan-out → 瓶颈
根因 3: 无 ctx 取消传播 → ES 恢复后 goroutine 也感知不到
2
3
4
5
6
7
8
9
修复方案总结:
// ✅ 完整修复:Pipeline + Fan-out + Or-Done + 大缓冲
func fixedPipeline(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// channel 缓冲区:QPS × P99延迟 = 5000 × 0.5s ≈ 2500 → 用 3000
rawCh := make(chan RawLog, 1000) // Kafka → 去重
dedupedCh := make(chan RawLog, 2000) // 去重 → 富化
enrichedCh := make(chan EnrichedLog, 3000) // 富化 → ES
// Stage 1: Kafka 消费
go consumeKafka(ctx, rawCh)
// Stage 2: 去重
go deduplicate(ctx, rawCh, dedupedCh)
// Stage 3: 富化——Fan-out 25 个 goroutine
go fanOutEnrich(ctx, dedupedCh, enrichedCh, 25)
// Stage 4: ES 写入(主 goroutine)
writeToES(ctx, enrichedCh)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 10.2 一次 Pipeline 数据的完整旅程
Kafka 消息到达
─────────────────────────────────────────────────────────
│
├─ Stage 1: consumeKafka
│ kafkaConsumer.Message() → parseRawLog
│ → select { rawCh <- log; <-ctx.Done() }
│ → rawCh 成功写入 → 去重 goroutine 接收
│
├─ Stage 2: deduplicate
│ for log := range rawCh:
│ 检查 Fingerprint → 未见过 → dedupedCh <- log
│
├─ Stage 3: fanOutEnrich (25 workers)
│ worker-7 从 dedupedCh 拿到 log:
│ enrichLog(log) → lookupGeo → 5ms
│ → select { enrichedCh <- result; <-ctx.Done() }
│ → enrichedCh 写入成功
│
│ 如果此时 ctx 被取消(外部超时):
│ worker-7 的 select 走 ctx.Done() 分支 → return
│ → wg.Done() → 当所有 worker 都 return → close(enrichedCh)
│
├─ Stage 4: writeToES
│ for enriched := range enrichedCh:
│ esClient.Index(enriched) → 50ms (正常)
│ → 如果 ES 慢到 500ms → enrichedCh 缓冲区 3000 吸收
│ → 如果 3000 也满了 → enriched 的 `enrichedCh <- result` 阻塞
│ → 反压传递到 enrich workers → dedupedCh → rawCh → Kafka
│
└─ 退出:
ctx cancel → 所有 stage 收到 ctx.Done() → return
→ Kafka consumer commit offset(保证 at-least-once)
→ ES 写入最后一批数据 → 程序退出
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 10.3 设计哲学回扣
哲学 1:用 channel 缓冲区做"蓄水池"——在反压链中留出缓冲空间
Pipeline 的每个 stage 之间是一个 channel。如果这个 channel 是无缓冲的——上下游完全同步——下游一慢,上游立刻卡住。加上缓冲 = 上下游之间多了一个"蓄水池"——下游短暂变慢时,上游可以继续生产(把数据放入缓冲区)。缓冲区的大小 = QPS × 下游 P99 延迟的缓冲时间。但缓冲不能无限大——它是"时间换空间":给下游一个恢复窗口,而不是掩盖问题。
哲学 2:Fan-out 的"并行瓶颈消除"——是 Pipeline 的自然延伸
Pipeline 让数据流变成"线性的"——但当某个 stage 成为瓶颈,线性就变成了瓶颈。Fan-out 是 Pipeline 的自然延伸——在瓶颈 stage 开多个 goroutine 并行处理——把一个串行节点变成并行节点。Fan-out 不是"设计模式"——是 Go 的 channel 天然支持多消费者——只需要多启动几个 goroutine 就够了。
哲学 3:Or-Done 是 goroutine 泄漏防护的"最后一层安全网"
第 15 篇讲了 goroutine 泄漏的五种成因——Or-Done 是它们的统一解。任何 <-ch 都可能永远阻塞——Or-Done 给每个阻塞操作配上退出路径:select { case v := <-ch: case <-done: return }。这不是"功能需求"——是"安全需求":每个 goroutine 都必须有退出路径——Or-Done 保证了这个约束。
哲学 4:模式是 channel + goroutine + select 的"组合公式"——不是封装,是约定
Go 的并发模式不像 Java 的 Design Patterns——没有抽象类、没有接口继承。它们就是几个 goroutine + 几个 channel + 几个 select 的组合——模式只是"我们约定这么写,因为这样最不容易出错"。Tee 就是"用 goroutine 把数据复制到两个 channel",Fan-out 就是"多个 goroutine 读同一个 channel",Pipeline 就是"每个函数返回 <-chan T"。约定优于配置,模式是社区共识的结晶。
# 10.4 速查表
七种模式速查:
| 模式 | 核心结构 | 用途 | 关键约定 |
|---|---|---|---|
| Pipeline | stage(in) -> out chan | 数据依次处理 | 每个 stage defer close(out) |
| Fan-out | 多 goroutine 竞争 <-ch | 并行处理瓶颈 | goroutine 数量 = 目标QPS × 单任务耗时 |
| Fan-in | 多 <-ch + WaitGroup + 转发 | 合并多个输出 | WaitGroup 等所有转发 goroutine 结束后 close |
| Worker Pool | task channel + 固定 goroutine | 持续任务处理 | Shutdown 时 close(taskCh) + close(done) |
| Or-Done | select { case v:=<-ch: case <-done: } | 任何 channel 加退出信号 | 接收和发送都要检查 done |
| Tee | goroutine 把数据发到两个 out channel | 数据多播 | nil channel 技巧依次发送 |
| Bridge | <-chan (<-chan T) → <-chan T | 动态数据源扁平化 | 遍历内层 channel 直到关闭 |
模式组合决策树:
| 场景 | 推荐组合 |
|---|---|
| 数据批量处理,某阶段慢 | Pipeline + Fan-out |
| 数据持续到达,固定并发 | Worker Pool |
| 需要多播数据 | Pipeline 后接 Tee |
| 需要动态订阅者 | Pipeline 后接 Pub/Sub |
| 需要动态数据源 | Bridge → Pipeline |
| 需要优雅退出 | 所有模式 + Or-Done |
| 需要错误传播 + 快速失败 | 所有模式 + errgroup.WithContext |
| 需要加权并发控制 | Worker Pool + semaphore.Weighted |
Channel 缓冲区大小公式:
缓冲区大小 = 目标 QPS × 下游 P99 延迟 × 缓冲系数
缓冲系数:
- 批处理场景:1~3(允许几秒的缓冲)
- 实时处理场景:0.1~1(只缓冲几百毫秒)
- 外部 API 调用:5~10(API 抖动通常 5-10 秒恢复)
2
3
4
5
6
下一篇:我们已经掌握了 Pipeline、Fan-out/Fan-in、Or-Done 等七种并发模式,下一步进入 17.GC三色标记与屏障——从 Go 1.3 STW 到 Go 1.8 混合屏障,把 GC 的演进史和三色标记的每一个细节剖开。