通道channel
# 第 13 章 通道 channel
"Don't communicate by sharing memory; share memory by communicating." — Rob Pike 关键词:有缓冲 / 无缓冲 channel、
select、close语义、nil channel 妙用、扇入扇出、信号量模式、限流器
# 目录介绍
- 13.1 本章学习目标
- 13.2 channel 的三种形态
- 13.3 send / receive 的阻塞规则
- 13.4
close(ch)语义 - 13.5
select多路复用 - 13.6 经典并发模式
- 13.7 综合示例:实现一个限流器
- 13.8 本章底层原理(简介)
- 13.9 Go 新手陷阱 Top 5
- 13.10 思考题
- 13.11 训练题
- 13.12 推荐阅读
# 13.1 本章学习目标
学完本章你应当能够:
- ✅ 能解释有缓冲 / 无缓冲 channel 的区别,说出各自适用场景
- ✅ 能写出正确的 send / receive 代码,知道什么情况会阻塞、什么情况会 panic
- ✅ 能说出
close(ch)的 4 条语义规则,写出安全的关闭逻辑 - ✅ 能用
select + nil channel做"动态启用/禁用分支" - ✅ 能写出扇入扇出 / worker pool / 信号量三种并发模式
- ✅ 能用 channel 实现一个限流器
- ✅ 能画出
for range ch和select的控制流图
本章是 Go 并发的核心。第 12 章让你启动 goroutine,本章让你让 goroutine 之间通信。
# 13.2 channel 的三种形态
channel 是 goroutine 之间传递数据的类型安全管道——一端发送、一端接收。
# 13.2.1 无缓冲 make(chan T)
无缓冲 channel 没有容量——发送方和接收方必须同时就绪:
ch := make(chan int) // 无缓冲 channel——容量 0
// 发送方 goroutine
go func() {
ch <- 42 // 阻塞——直到有接收方就绪
fmt.Println("发送完成")
}()
// 接收方 goroutine
value := <-ch // 阻塞——直到有发送方就绪
fmt.Println("收到:", value)
// 输出:收到: 42
// 发送完成
2
3
4
5
6
7
8
9
10
11
12
13
执行时序:
时间 →
发送方: ch <- 42 ──── 阻塞等待 ──── 解除阻塞 ──── println
接收方: <-ch ────── 收到 42 ──── println
↑ 两端"握手"成功的瞬间
2
3
4
核心特性:无缓冲 channel 让发送和接收同步——它是 goroutine 之间的"握手"机制,而不是"数据队列"。
# 13.2.2 有缓冲 make(chan T, N)
有缓冲 channel 有容量——可以存入 N 个元素后再阻塞:
ch := make(chan int, 3) // 有缓冲 channel——容量 3
ch <- 1 // 不阻塞(缓冲 1/3)
ch <- 2 // 不阻塞(缓冲 2/3)
ch <- 3 // 不阻塞(缓冲 3/3——满了)
// ch <- 4 // 阻塞!——缓冲已满,等待接收方腾出空间
fmt.Println(<-ch) // 1(缓冲 2/3——有空间了)
ch <- 4 // 不阻塞(缓冲 3/3)
2
3
4
5
6
7
8
9
10
有缓冲 vs 无缓冲:
无缓冲 make(chan T) | 有缓冲 make(chan T, N) | |
|---|---|---|
| 容量 | 0 | N |
| 发送行为 | 必须等待接收方 | 缓冲未满时不阻塞 |
| 接收行为 | 必须等待发送方 | 缓冲非空时不阻塞 |
| 典型场景 | 同步通知、结果传递 | 生产者-消费者解耦、队列 |
| 类比 | 握手 | 邮箱 |
# 13.2.3 单向 channel chan<- T / <-chan T
Go 允许把双向 channel 转换为单向 channel——编译期约束:
// 双向 channel——能做任何操作
ch := make(chan int)
// ① 只发送 channel:chan<- T
func producer(out chan<- int) {
out <- 42 // ✅ 发送——合法
// <-out // ❌ 接收——编译错
}
// ② 只接收 channel:<-chan T
func consumer(in <-chan int) {
value := <-in // ✅ 接收——合法
// in <- 42 // ❌ 发送——编译错
fmt.Println(value)
}
// ③ 调用时——双向 channel 自动转换为单向
producer(ch) // chan int → chan<- int ✅
consumer(ch) // chan int → <-chan int ✅
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
为什么需要单向 channel?
- 语义清晰:函数签名中
chan<- T明确告诉调用者"这个函数只会往 channel 发数据" - 编译期安全:不会出现"不小心在消费者里发了数据"的 bug
- 和
close的配合:close(ch)需要双向 channel(防止只读方错误关闭)
# 13.2.4 综合案例与思考
综合案例:三种形态完整演示
package main
import (
"fmt"
"time"
)
// 无缓冲:同步通知
func unbufferedDemo() {
ch := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch <- "goroutine 完成" // 发送方等待接收方
}()
fmt.Println("等待 goroutine...")
result := <-ch // 主 goroutine 等待
fmt.Println("收到:", result)
}
// 有缓冲:生产者-消费者
func bufferedDemo() {
jobs := make(chan int, 5) // 缓冲 5 个任务
// 生产者——不会因为消费者慢而阻塞(缓冲空闲时)
go func() {
for i := 1; i <= 10; i++ {
jobs <- i
fmt.Printf("生产: %d\n", i)
}
close(jobs)
}()
// 消费者——按自己的速度处理
for job := range jobs {
time.Sleep(50 * time.Millisecond)
fmt.Printf(" 消费: %d\n", job)
}
}
// 单向 channel:职责分离
func generate(out chan<- int) { // 只能发送
for i := 1; i <= 3; i++ {
out <- i
}
close(out)
}
func square(in <-chan int, out chan<- int) { // 只接收 + 只发送
for v := range in {
out <- v * v
}
close(out)
}
func print(in <-chan int) { // 只能接收
for v := range in {
fmt.Println(v)
}
}
func main() {
fmt.Println("=== 无缓冲 ===")
unbufferedDemo()
fmt.Println("\n=== 有缓冲 ===")
bufferedDemo()
fmt.Println("\n=== 单向 ===")
nums := make(chan int)
squares := make(chan int)
go generate(nums)
go square(nums, squares)
print(squares)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
案例知识融合:这个案例演示了 channel 三种形态的典型用法——无缓冲用于 goroutine 间同步通知、有缓冲用于生产者-消费者解耦、单向 channel 在函数签名中约束操作方向。
思考题:
- 如果把无缓冲 channel 示例中的
goroutine去掉,直接在main中ch <- "hello"——会发生什么?为什么? - 有缓冲 channel 的容量选多大?有没有计算公式?太大或太小的后果是什么?
- 单向 channel 只是编译期的约束——底层仍是一个双向 channel。如果你用
interface{}绕过编译检查,会发生什么?
# 13.3 send / receive 的阻塞规则
# 13.3.1 完整的状态矩阵
channel 的 send 和 receive 在不同状态下有不同的行为——这张表覆盖了所有组合:
| channel 状态 | ch <- v(发送) | v := <-ch(接收) | v, ok := <-ch(接收+检测) |
|---|---|---|---|
| nil | 永远阻塞 | 永远阻塞 | 永远阻塞 |
| 无缓冲,有接收方就绪 | 成功发送 | — | — |
| 无缓冲,无接收方 | 阻塞等待 | — | — |
| 有缓冲,未满 | 成功存入 | — | — |
| 有缓冲,已满 | 阻塞等待 | — | — |
| 有缓冲,非空 | — | 成功取出 | ok=true |
| 有缓冲,为空 | — | 阻塞等待 | 阻塞等待 |
| 已关闭,有未读数据 | panic | 成功取出 | ok=true |
| 已关闭,无未读数据 | panic | 零值 | ok=false |
关键规律:
- nil channel 永远阻塞——这是
select中"禁用分支"的秘诀(见 §13.5.3) - 关闭后发送一定 panic——唯一的发送规则
- 关闭后接收不会阻塞——返回零值或缓冲中剩余的数据
# 13.3.2 综合案例与思考
综合案例:nil channel 的阻塞验证
package main
import "fmt"
func main() {
var nilCh chan int // nil channel
// 验证 1:发送到 nil channel——永久阻塞
go func() {
fmt.Println("发送方 开始")
nilCh <- 1
fmt.Println("这行永远不会执行") // 永远不会
}()
// 验证 2:从 nil channel 接收——永久阻塞
go func() {
fmt.Println("接收方 开始")
<-nilCh
fmt.Println("这行永远不会执行") // 永远不会
}()
// 验证 3:关闭 nil channel——panic
func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("关闭 nil channel:", r) // close of nil channel
}
}()
close(nilCh)
}()
}
// 输出:
// 关闭 nil channel: close of nil channel
// (两个 goroutine 永远阻塞——程序不会退出)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
案例知识融合:这个案例验证了 nil channel 的核心行为——发送和接收都会永久阻塞,关闭会 panic。nil channel 在 select 中极有用——把某个 case 的 channel 设为 nil,这个 case 就永远不会被选中(见 §13.5.3)。
思考题:
- 为什么 Go 设计成"关闭后发送 panic"而不是"关闭后发送什么也不做"?这带来了什么好处?
- nil channel 永远阻塞——这看起来是个"bug",但它在
select中非常有用。说说为什么? - 有缓冲 channel 满了之后发送方阻塞——如果这时消费者 goroutine 也阻塞了——会发生什么?(提示:死锁检测)
# 13.4 close(ch) 语义
# 13.4.1 关闭后再发送 → panic
ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)
ch <- 3 // panic: send on closed channel
2
3
4
5
6
保护方法——只有发送方应该关闭 channel。如果多个发送方,用 sync.Once 或协调机制:
var once sync.Once
once.Do(func() { close(ch) })
2
# 13.4.2 关闭后接收 → 零值 + ok=false
关闭后仍可接收缓冲中剩余的数据——消耗完毕后返回零值:
ch := make(chan int, 2)
ch <- 10
ch <- 20
close(ch)
// 缓冲中还有数据——正常接收
fmt.Println(<-ch) // 10
val, ok := <-ch
fmt.Println(val, ok) // 20, true
// 缓冲耗尽——返回零值
val, ok = <-ch
fmt.Println(val, ok) // 0, false
// for range 自动检测关闭
for v := range ch {
fmt.Println(v) // 只打印 10, 20——耗尽后停止
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 13.4.3 重复关闭 → panic
ch := make(chan int)
close(ch)
close(ch) // panic: close of closed channel
2
3
谁该关闭 channel? 黄金法则——发送方关闭:
- 只有一个发送方 → 发送方直接
close - 多个发送方 → 用
sync.Once或通知接收方"我已发完"(接收方不关) - 接收方绝不关闭——收到的 channel 通常是
<-chan T,编译期就不让你关
# 13.4.4 关闭只读 channel → 编译错
func consumer(in <-chan int) {
// close(in) // 编译错:cannot close receive-only channel
}
2
3
# 13.4.5 综合案例与思考
综合案例:安全的关闭模式
package main
import (
"fmt"
"sync"
)
// 模式 1:单发送方——发送方关
func singleSender() {
ch := make(chan int)
// 发送方——唯一的生产者
go func() {
defer close(ch) // ★ 发送方关
for i := 1; i <= 5; i++ {
ch <- i
}
}()
// 接收方——不关,range 自动检测
for v := range ch {
fmt.Println(v)
}
}
// 模式 2:多发送方——sync.Once 确保只关一次
func multiSender() {
ch := make(chan int)
var once sync.Once
// 三个发送方
for i := 0; i < 3; i++ {
go func(id int) {
for j := 0; j < 3; j++ {
ch <- id*10 + j
}
// ★ 每个发送方都尝试关——但只有第一个生效
once.Do(func() { close(ch) })
}(i)
}
for v := range ch {
fmt.Println(v)
}
}
// 模式 3:接收方通过"done channel"通知发送方停止
func doneChannel() {
dataCh := make(chan int)
doneCh := make(chan struct{})
// 发送方——发到 done 信号
go func() {
i := 0
for {
select {
case <-doneCh:
close(dataCh)
return
default:
dataCh <- i
i++
}
}
}()
// 接收方——拿到 5 个后通知停止
count := 0
for v := range dataCh {
fmt.Println(v)
count++
if count >= 5 {
close(doneCh)
}
}
}
func main() {
fmt.Println("=== 单发送方 ===")
singleSender()
fmt.Println("=== 多发送方 ===")
multiSender()
fmt.Println("=== done channel ===")
doneChannel()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
案例知识融合:这个案例展示了三种安全的 channel 关闭模式——单发送方直接关、多发送方用 sync.Once、以及通过 done channel 让接收方通知发送方停止。核心原则:不要从接收方关闭 channel。
思考题:
- 为什么 Go 设计成"重复关闭 panic"而不是"什么都不做"?这和不检查数组越界有什么异曲同工之处?
for range ch是怎么检测到 channel 关闭的?它内部调用的是v, ok := <-ch吗?
# 13.5 select 多路复用
# 13.5.1 default 分支:非阻塞
select 像"多路开关"——同时监听多个 channel。加上 default 分支实现非阻塞操作:
// 非阻塞发送
select {
case ch <- value:
fmt.Println("发送成功")
default:
fmt.Println("channel 已满——跳过发送")
}
// 非阻塞接收
select {
case v := <-ch:
fmt.Println("收到:", v)
default:
fmt.Println("channel 为空——没有数据")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 13.5.2 select 随机化
当多个 case 同时就绪时,select 随机选择一个——不是按代码顺序:
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
for i := 0; i < 10; i++ {
ch1 <- "A"
ch2 <- "B"
}
}()
for i := 0; i < 10; i++ {
select {
case v := <-ch1:
fmt.Print(v)
case v := <-ch2:
fmt.Print(v)
}
}
// 输出类似:ABBAABABBA——交错的,不是严格的交替
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
为什么要随机化?
- 防止某个 case 被"饿死"——如果按顺序,第一个 case 永远优先
- 让所有 case 有公平的执行机会
# 13.5.3 nil channel 在 select 中"消失"
当一个 case 的 channel 是 nil——这个 case 永远不会被选中:
var nilCh chan int // nil channel
select {
case <-nilCh: // 永远不会被选中
fmt.Println("这行永远不会执行")
case v := <-dataCh:
fmt.Println(v)
default:
fmt.Println("no data")
}
// 输出:no data(如果 dataCh 也没数据)
2
3
4
5
6
7
8
9
10
11
实战——动态启用/禁用分支:
// 实现"超时重置定时器"——每次收到数据,重新计时
func resetTimer() {
dataCh := make(chan int)
var timerCh <-chan time.Time // nil——初始禁用
go func() {
for i := 0; i < 5; i++ {
time.Sleep(300 * time.Millisecond)
dataCh <- i
}
close(dataCh)
}()
timerCh = time.After(500 * time.Millisecond) // 启用定时器
for {
select {
case v, ok := <-dataCh:
if !ok {
fmt.Println("dataCh 关闭,退出")
return
}
fmt.Println("收到:", v)
// ★ 重置定时器——重新赋值,启用新定时器
timerCh = time.After(500 * time.Millisecond)
case <-timerCh:
fmt.Println("超时!")
return
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
nil channel 的妙用表:
| 需求 | 做法 |
|---|---|
| 禁用某个 case | 把它的 channel 设为 nil |
| 启用禁用的 case | 重新赋值为有效 channel |
| 初次不启用定时器 | var timerCh <-chan time.Time(nil) |
# 13.5.4 综合案例与思考
综合案例:同时监听 3 个 channel——数据、超时、取消
package main
import (
"context"
"fmt"
"time"
)
func monitor(ctx context.Context, dataCh <-chan int) {
// 初始禁用定时器
var timeoutCh <-chan time.Time
for {
select {
case <-ctx.Done():
fmt.Println("取消:", ctx.Err())
return
case v, ok := <-dataCh:
if !ok {
fmt.Println("dataCh 关闭")
return
}
fmt.Println("处理:", v)
// 每次收到数据——重置超时
timeoutCh = time.After(2 * time.Second)
case <-timeoutCh:
if timeoutCh != nil {
fmt.Println("2 秒无数据——超时")
return
}
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
dataCh := make(chan int)
go func() {
for i := 0; i < 3; i++ {
time.Sleep(500 * time.Millisecond)
dataCh <- i
}
// 停止发送——2 秒后超时分支触发
}()
monitor(ctx, dataCh)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
案例知识融合:这个案例展示了 select 的实战用法——同时监听数据、超时、取消三个 channel。利用 var timeoutCh <-chan time.Time(nil)让定时器初始禁用,收到数据后才启用——完美实现了"可重置超时"。
思考题:
- 如果 select 中多个 case 同时就绪——Go 为什么要随机选择而不是按代码顺序?这和操作系统调度器的公平性有什么关系?
time.After返回的<-chan time.Time在超时前如果不接收——goroutine 会泄漏吗?为什么?- 如果 select 里同时有发送和接收 case——它们之间公平吗?读和写有优先级吗?
# 13.6 经典并发模式
# 13.6.1 扇出 / 扇入
扇出(Fan-out)——一个输入 channel,多个 goroutine 并行消费:
func fanOut(in <-chan int, workers int) []<-chan int {
outs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
ch := make(chan int)
outs[i] = ch
go func(out chan<- int) {
defer close(out)
for v := range in {
out <- v * v // 每个 worker 做平方
}
}(ch)
}
return outs
}
2
3
4
5
6
7
8
9
10
11
12
13
14
扇入(Fan-in)——多个 channel 合并成一个:
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
// 等所有输入 channel 关闭后——关闭输出
go func() {
wg.Wait()
close(out)
}()
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
扇出 + 扇入组合——数据并行处理管道:
func main() {
in := make(chan int)
go func() {
for i := 1; i <= 10; i++ {
in <- i
}
close(in)
}()
// 扇出——3 个 worker
workers := fanOut(in, 3)
// 扇入——合并结果
out := fanIn(workers...)
for v := range out {
fmt.Println(v)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 13.6.2 worker pool
固定数量的 goroutine 从 channel 中取任务——控制并发度:
func workerPool(jobs <-chan int, results chan<- int, workers int) {
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
fmt.Printf("worker %d 处理 %d\n", id, job)
time.Sleep(100 * time.Millisecond) // 模拟工作
results <- job * 2
}
}(i)
}
// 等所有 worker 完成——关闭结果 channel
go func() {
wg.Wait()
close(results)
}()
}
func main() {
jobs := make(chan int, 10)
results := make(chan int, 10)
workerPool(jobs, results, 3)
// 发送任务
go func() {
for i := 1; i <= 10; i++ {
jobs <- i
}
close(jobs)
}()
// 接收结果
for r := range results {
fmt.Println("结果:", r)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 13.6.3 done channel 取消
用 channel 广播"停止"信号——所有 goroutine 同时感知:
func worker(done <-chan struct{}, jobs <-chan int) <-chan int {
results := make(chan int)
go func() {
defer close(results)
for {
select {
case <-done: // ★ 收到取消信号——立即退出
fmt.Println("worker 收到取消")
return
case job, ok := <-jobs:
if !ok {
return // jobs 关闭——正常退出
}
results <- job * 2
}
}
}()
return results
}
// 使用
done := make(chan struct{})
results := worker(done, jobs)
// 需要取消时——关闭 done channel
close(done) // 所有 worker 的 <-done 立刻返回
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
context 替代 done channel:Go 1.7+ 推荐用 context 代替裸 done chan struct{}——功能更强(超时、deadline、值传播):
func workerV2(ctx context.Context, jobs <-chan int) <-chan int {
results := make(chan int)
go func() {
defer close(results)
for {
select {
case <-ctx.Done(): // ← 和 done channel 一样的效果
return
case job, ok := <-jobs:
if !ok { return }
results <- job * 2
}
}
}()
return results
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 13.6.4 信号量模式
用有缓冲 channel 做信号量——控制最大并发数:
// 信号量:容量 3 = 最多 3 个并发
sem := make(chan struct{}, 3)
for i := 0; i < 10; i++ {
sem <- struct{}{} // 获取信号量——如果满则阻塞
go func(id int) {
defer func() { <-sem }() // 释放信号量
fmt.Printf("任务 %d 开始\n", id)
time.Sleep(time.Second)
fmt.Printf("任务 %d 完成\n", id)
}(i)
}
// 任何时候最多 3 个 goroutine 在执行
2
3
4
5
6
7
8
9
10
11
12
13
有缓冲 channel 作为信号量的精妙:
sem <- struct{}{}→ 如果缓冲满(3/3),阻塞——等待<-sem→ 释放一个槽——下一个等待者可以进入- 零内存开销——
struct{}不占空间
# 13.6.5 综合案例与思考
综合案例:流水线——生成 → 过滤 → 平方 → 输出
package main
import "fmt"
// 阶段 1:生成数字
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// 阶段 2:过滤偶数
func even(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
// 阶段 3:平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// 管道串联:gen → even → square
for v := range square(even(gen(1, 2, 3, 4, 5, 6, 7, 8))) {
fmt.Println(v)
}
// 输出:4, 16, 36, 64(偶数的平方)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
案例知识融合:这个案例展示了 Go 的 pipeline 模式——每个阶段都是"接收一个 channel、返回一个 channel"。阶段之间用 goroutine 连接——天然并发。整个管道是"声明式"的:square(even(gen(...))) 读起来像数据流。
思考题:
- 这个 pipeline 中,每个阶段都有独立的 goroutine——如果数据量非常大,goroutine 会不会太多?如何优化?
- 扇出模式中,多个 worker 共用一个输入 channel——这是并发安全的吗?为什么不需要锁?
- 信号量用有缓冲 channel 实现——和
sync.WaitGroup+ worker pool 有什么区别?各自适合什么场景?
# 13.7 综合示例:实现一个限流器
用 channel 实现令牌桶限流器——控制每秒请求数:
package main
import (
"fmt"
"time"
)
type RateLimiter struct {
tokens chan struct{} // 令牌
ticker *time.Ticker // 定时放令牌
done chan struct{} // 停止信号
}
// 创建限流器——每秒 rate 个请求
func NewRateLimiter(rate int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, rate), // 桶容量 = 速率
ticker: time.NewTicker(time.Second / time.Duration(rate)),
done: make(chan struct{}),
}
// 后台 goroutine——定期补充令牌
go rl.refill()
return rl
}
func (rl *RateLimiter) refill() {
// 初始填满
for i := 0; i < cap(rl.tokens); i++ {
rl.tokens <- struct{}{}
}
for {
select {
case <-rl.ticker.C:
// 尝试补充一个令牌——桶满了就跳过
select {
case rl.tokens <- struct{}{}:
default:
}
case <-rl.done:
rl.ticker.Stop()
return
}
}
}
// 等待获取一个令牌——阻塞直到有令牌可用
func (rl *RateLimiter) Wait() {
<-rl.tokens
}
func (rl *RateLimiter) Stop() {
close(rl.done)
}
func main() {
limiter := NewRateLimiter(5) // 每秒 5 个请求
defer limiter.Stop()
// 模拟 10 个请求——每秒只放行 5 个
for i := 1; i <= 10; i++ {
limiter.Wait() // 获取令牌(阻塞)
fmt.Printf("[%s] 请求 %d 通过\n",
time.Now().Format("15:04:05.000"), i)
}
}
// 输出(间隔约 200ms):
// [10:00:00.000] 请求 1 通过
// [10:00:00.000] 请求 2 通过
// [10:00:00.000] 请求 3 通过
// [10:00:00.000] 请求 4 通过
// [10:00:00.000] 请求 5 通过
// [10:00:00.200] 请求 6 通过
// ...
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
限流器设计要点:
- 有缓冲 channel 做令牌桶——容量 = 速率,满了跳过补充(default 分支)
time.Ticker定期放令牌——精度取决于 ticker 的间隔Wait()方法阻塞获取——调用方不需要关心什么时候就绪- stop 机制——通过 done channel 优雅关闭
# 13.8 本章底层原理(简介)
本章涉及的底层机制在**卷三"专栏博客"**中详细展开:
| 本章概念 | 卷三对应篇 |
|---|---|
make(chan T) 创建的 hchan 结构 | 09.通道 channel 源码剖析 |
| 环形缓冲区、sendq/recvq 等待队列 | 09.通道 channel 源码剖析 |
select 的随机化实现(runtime.selectgo) | 09.通道 channel 源码剖析 |
| channel 与 GMP 调度器的交互 | 08.GMP 协程调度器机制 |
close(ch) 的底层状态变更 | 09.通道 channel 源码剖析 |
hchan 结构速览(每个 make(chan T) 创建的就是这个):
hchan {
qcount uint // 当前队列中的元素数
dataqsiz uint // 环形缓冲区大小(make(chan T, N) 中的 N)
buf unsafe.Pointer // 环形缓冲区指针
elemsize uint16 // 每个元素大小
closed uint32 // 是否已关闭
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 接收等待队列(sudog 链表)
sendq waitq // 发送等待队列(sudog 链表)
lock mutex // 互斥锁
}
2
3
4
5
6
7
8
9
10
11
12
卷三第 09 篇会拆解到每个字段、每个
send/recv/close操作的源码级流程。
# 13.9 Go 新手陷阱 Top 5
| # | 陷阱 | 说明 |
|---|---|---|
| 1 | 关闭一个有发送者还在写的 channel → panic | 发送方必须知道"什么时候没人发了"。多发送方用 sync.Once 或协调机制。 |
| 2 | 多个 sender,谁关 channel? | 答案:通常由 sender 关,但要协调——用 sync.Once 或引入"协调者" goroutine。 |
| 3 | 用 channel 当 mutex(仅一个槽位)→ 性能不如 sync.Mutex | ch <- struct{}{}; ...; <-ch 模拟锁——channel 内部有锁和调度开销,不如 sync.Mutex。 |
| 4 | for v := range ch 期望某条件后跳出 → 必须 close(ch) 或 break | range 只有 close 才退出。条件跳出用 select + ctx.Done() 或 break。 |
| 5 | 非缓冲 channel 在同一 goroutine ch <- 1; <-ch → 死锁 | 发送方等接收方,接收方等发送方——但都在同一个 goroutine → deadlock。 |
# 13.10 思考题
channel vs mutex:Go 的谚语是"用通信共享内存"。但 channel 底层也是锁+队列。在哪些场景下用 channel 更好?哪些场景下用 mutex 更好?给出 3 条判断标准。
无缓冲 channel 的同步开销:每次
ch <- v和<-ch都要做 goroutine 上下文切换——这个开销有多大?在什么情况下"用 channel 比用锁慢"?select 的随机化实现:Go 保证
select的 case 是均匀随机的——但它是真随机还是伪随机?如果某个 case 的数据量是另一个的 100 倍——随机化会不会导致那个 case 被"饿死"?关闭 channel 的设计选择:Go 设计成"关闭后发送 panic"。为什么不设计成"静默忽略"?如果让你设计,你会怎么选?
有缓冲 channel 的大小选择:
make(chan T, 0)vsmake(chan T, 1)vsmake(chan T, 100)——这三种容量在语义上有什么不同?什么时候该选 1,什么时候该选 100?
# 13.11 训练题
训练 1:实现一个"超时重试"机制——用 channel + select 组合:
- 启动一个 goroutine 执行耗时的
doWork() - 主 goroutine 等待结果,最多等 3 秒
- 3 秒内完成 → 打印结果
- 3 秒超时 → 打印"超时"并退出
- 使用
select+time.After
训练 2:实现一个"广播器"——一个生产者,多个消费者:
- 生产者往 channel 发 10 个数字
- 3 个消费者 goroutine 并行消费——每个数字只被消费一次(不是广播)
- 用扇出模式实现
训练 3:实现"优雅关闭"——收到 SIGINT(Ctrl+C)后:
- 不再接受新任务
- 等正在执行的任务完成
- 关闭所有 channel
- 退出程序
- 使用
os.Signalchannel +context.WithCancel
# 13.12 推荐阅读
- 入门卷:第 12 章 并发 goroutine——goroutine 起步、WaitGroup、context
- 入门卷:第 14 章 sync 包——Mutex、RWMutex、Once、Pool
- 卷三:09.通道 channel 源码剖析——hchan 结构、send/recv/close 源码级拆解
- 卷三:08.GMP 协程调度器机制——goroutine 如何被调度
- 卷三:16.并发设计模式详解——Pipeline、Fan-out/Fan-in 源码级讲解
- Go Concurrency Patterns - Rob Pike (opens new window)——经典演讲
- Go Channels: An Illustrated Guide (opens new window)——图文并茂