通道channel源码剖析
# 09.通道channel源码剖析
卷三第九篇——channel 是 Go 并发的「灵魂」。它的底层是一个环形缓冲区 + 两个等待队列的结构体
hchan,由一把mutex保护所有操作。ch <- x不是魔法——是 lock → 找到匹配的接收方或空槽 → 直接内存拷贝 → unlock。读完本篇,你能解释:为什么无缓冲 channel 是「同步」的?为什么close(ch)后还能读到数据?为什么select的 case 顺序是随机的?关键词:hchan、sudog、环形缓冲区、sendq/recvq、select 随机化、goroutine 泄漏。
# 目录介绍
- 1. 案例引入
- 2. 架构概览
- 3. hchan 结构体精解
- 4. sudog:等待队列节点
- 5. send 的实现
- 6. recv 的实现
- 7. close 的实现与语义保证
- 8. select 的实现
- 9. channel 泄漏与最佳实践
- 10. 综合案例串讲
# 1. 案例引入
# 1.1 一段崩在哪
看一个订单处理服务——它用 channel 构建了「接收→校验→入库→通知」的四级流水线,每个阶段由独立的 goroutine 池并行处理:
package main
import (
"fmt"
"time"
)
type Order struct {
ID int
Status string
}
// 阶段 1:接收订单
func stageReceive(out chan<- Order) {
for i := 1; i <= 100; i++ {
out <- Order{ID: i, Status: "new"}
}
close(out) // 上游完毕,关闭通道
}
// 阶段 2:校验订单(可能失败)
func stageValidate(in <-chan Order, out chan<- Order, errCh chan<- error) {
for order := range in {
if order.ID%10 == 0 {
errCh <- fmt.Errorf("order %d validation failed", order.ID)
continue // ← BUG:跳过了写入 out
}
order.Status = "validated"
out <- order
}
close(out)
}
// 阶段 3:入库
func stagePersist(in <-chan Order) {
for order := range in {
time.Sleep(10 * time.Millisecond) // 模拟入库延迟
fmt.Printf("persisted: %d\n", order.ID)
}
}
// 错误收集 goroutine
func errorCollector(errCh <-chan error) {
for err := range errCh {
fmt.Printf("ERROR: %v\n", err)
}
}
func main() {
ch1 := make(chan Order, 10)
ch2 := make(chan Order, 10)
errCh := make(chan error, 5)
go stageReceive(ch1)
go stageValidate(ch1, ch2, errCh)
go stagePersist(ch2)
go errorCollector(errCh)
time.Sleep(2 * time.Second)
fmt.Println("main exiting")
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
现象:部署到 K8s 后,服务正常运行 2 秒——main 退出,但进程不退出!pprof goroutine 显示:
goroutine 6 [chan receive]:
main.stagePersist(0xc0000240c0)
/app/main.go:30 +0x8f
goroutine 7 [chan receive]:
main.errorCollector(0xc000024100)
/app/main.go:37 +0x7a
2
3
4
5
6
7
stagePersist 和 errorCollector 永远卡在 for range 上——因为 ch2 和 errCh 没有被关闭。但这只是表象——更深层的问题是:stageValidate 中 errCh <- err 之后 continue 跳过了写入 ch2——如果 errCh 的缓冲区满(5 个),stageValidate 会阻塞在向 errCh 发送,而它又持有对 out 的引用——但 out <- order 没有执行。
stageValidate 卡住了 → ch2 不会再有关闭 → stagePersist 永远等待 → goroutine 泄漏。
# 1.2 顺藤摸到根因
逐步排查:
假设 1:是不是 channel 没关闭导致 for range 不退出?——是的,但关闭 channel 的责任方没执行完。stageValidate 需要先执行完自己的循环,才能执行 close(out)。但它卡在了 errCh <- err 上。
假设 2:errCh 缓冲区 5 个——为什么满了?因为 errorCollector 在打印,而校验阶段每 10 个订单就有一个失败——前 5 个错误填满了 errCh,第 6 个错误到来时 stageValidate 阻塞。
假设 3:为什么不用 select 做非阻塞发送?——如果 errCh <- err 改为:
select {
case errCh <- err:
default:
// 丢弃或记录到日志
}
2
3
4
5
就不会阻塞 stageValidate,流水线不会卡死。
根因链条:
errCh满 →stageValidate阻塞在errCh <- errstageValidate没有继续执行out <- order和close(out)ch2不会被关闭 →stagePersist永远range ch2→ goroutine 泄漏errCh不会被关闭 →errorCollector永远range errCh→ goroutine 泄漏
# 1.3 我们要回答什么
这个事故藏着至少 8 个原理点:
① channel 在内存中的结构体 hchan 长什么样?环形缓冲区怎么实现? → 第 2-3 章
② sudog 是什么?goroutine 怎么挂到 channel 的等待队列上的? → 第 4 章
③ ch <- x 的发送流程——快速路径和慢路径分别做了什么? → 第 5 章
④ <-ch 的接收流程——有等待的 sender 时为什么直接拿走? → 第 6 章
⑤ close(ch) 做了什么?关闭后哪些操作是安全的? → 第 7 章
⑥ select 的随机化 case 是怎么实现的?为什么不是轮流? → 第 8 章
⑦ 为什么 channel 会泄漏 goroutine?三种泄漏模式是什么? → 第 9 章
⑧ 无缓冲和有缓冲 channel 的性能差异有多大?什么时候用哪种? → 第 9.3
2
3
4
5
6
7
8
本篇路线:
架构总图 (第 2 章) ── hchan 全景 + "为什么 channel 是一等公民"
↓
hchan 精解 (第 3 章) ── 逐字段 + 环形缓冲区实现
↓
sudog (第 4 章) ── 等待队列节点的 G 挂起与唤醒
↓
send/recv (第 5-6 章) ── 快速路径 vs 慢路径的全源码
↓
close (第 7 章) ── 关闭语义的精确保证
↓
select (第 8 章) ── 随机化 + 锁顺序 + 完整流程
↓
泄漏与最佳实践 (第 9 章) ── 三种泄漏模式 + 谁创建谁关闭
↓
综合案例 (第 10 章) ── 完整复原 + 修复 + 设计哲学
2
3
4
5
6
7
8
9
10
11
12
13
14
15
📌 本篇定位:这是 Go「并发编程」主题的第一篇。channel 是 Go CSP 并发模型的核心——goroutine 之间不通过共享内存通信,而是通过 channel 通信来共享内存。本篇把
hchan的每个字段、send/recv 的每个分支、select 的每个 case 都拆到字节级别。
# 2. 架构概览
# 2.1 hchan 结构全景
hchan(runtime/chan.go)
┌──────────────────────────────────────────────────────────────────┐
│ qcount uint ← 当前缓冲区中的元素个数 │
│ dataqsiz uint ← 缓冲区总容量(make(chan T, N) 的 N) │
│ buf unsafe.Pointer ← 指向环形缓冲区数组 │
│ elemsize uint16 ← 每个元素的大小 │
│ closed uint32 ← 是否已关闭(0=开放, 1=关闭) │
│ elemtype *_type ← 元素类型(用于 GC 扫描) │
│ sendx uint ← 环形缓冲区中下一个写入位置索引 │
│ recvx uint ← 环形缓冲区中下一个读取位置索引 │
│ recvq waitq ← 接收等待队列(<-ch 阻塞的 G) │
│ sendq waitq ← 发送等待队列(ch<- 阻塞的 G) │
│ lock mutex ← 保护所有字段的互斥锁 │
└──────────────────────────────────────────────────────────────────┘
│
┌────────────┴────────────┐
▼ ▼
recvq (waitq) sendq (waitq)
┌──────┬──────┐ ┌──────┬──────┐
│sudog │sudog │ │sudog │sudog │
│ G1 │ G2 │ │ G3 │ G4 │
└──────┴──────┘ └──────┴──────┘
等待接收的 goroutine 等待发送的 goroutine
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
核心关系:
- 环形缓冲区
buf:长度为dataqsiz,以sendx为写指针、recvx为读指针循环 - 接收等待队列
recvq:当缓冲区空时,<-ch的 goroutine 挂在这 - 发送等待队列
sendq:当缓冲区满时,ch <-的 goroutine 挂在这 - 无缓冲 channel(
dataqsiz=0):没有buf,send 直接对接 recv
# 2.2 为什么 Go channel 是一等公民
疑惑:Java 的 BlockingQueue 是库实现的,C++ 的 std::queue 需要外部同步。Go 为什么把 channel 做成语言内置的?
论证:
CSP 模型的原生支持——Go 的并发哲学来自 Hoare 的 CSP(Communicating Sequential Processes)。核心思想是"不要通过共享内存通信,而要通过通信共享内存"。这需要通信原语是语言的一部分——不是库。
select的语言级支持——C 的select是系统调用(多路复用 I/O),Java 没有语言级的 select for channels。Go 的select能同时等待多个 channel,且 case 顺序随机(公平调度)——这需要编译器生成专用的 runtime 调用。for range的 channel 语义——for v := range ch持续读取直到 channel 关闭——这是语言特性,非库能做到。库实现的队列可以用迭代器模拟,但无法和close语义无缝集成。垃圾回收集成——channel 内部有环形缓冲区
buf,存储的元素指针需要被 GC 扫描。如果 channel 是库实现的,需要程序员手动管理生命周期。Go 的 channel 由 runtime 管理,GC 自动扫描buf中的指针。反向验证——Java 的
BlockingQueue作为库实现,无法做到select级别的多 channel 等待(需要用LinkedBlockingQueue+ 复杂的多线程协调)。Go 的 channel 因为编译器和 runtime 的深度集成,select可以同时等待任意数量的 channel。
结论:channel 不是"语法糖",是 Go 并发模型的基础设施。把它放在语言层,不是 C 风格"最小内核"的选择,而是 "CSP 需要语言级原语" 的自觉。
# 3. hchan 结构体精解
# 3.1 逐字段解读
// runtime/chan.go
type hchan struct {
qcount uint // ① 缓冲区中当前元素个数
dataqsiz uint // ② 缓冲区总容量
buf unsafe.Pointer // ③ 指向环形缓冲区数组
elemsize uint16 // ④ 每个元素的大小
closed uint32 // ⑤ 是否关闭(0=开放,1=关闭)
elemtype *_type // ⑥ 元素类型(用于 GC 和类型安全)
sendx uint // ⑦ 环形缓冲区写指针索引
recvx uint // ⑧ 环形缓冲区读指针索引
recvq waitq // ⑨ 接收等待队列链表
sendq waitq // ⑩ 发送等待队列链表
lock mutex // ⑪ 互斥锁
}
2
3
4
5
6
7
8
9
10
11
12
13
14
① qcount uint——当前在缓冲区中等待被读取的元素个数。len(ch) 返回的就是这个字段。无缓冲 channel 中始终为 0。
② dataqsiz uint——环形缓冲区的槽位数。make(chan T, 5) 时 dataqsiz=5。无缓冲 channel 中 dataqsiz=0,此时 buf=nil。
③ buf unsafe.Pointer——指向堆上分配的环形数组。大小为 dataqsiz × elemsize 字节。默认分配在堆上。无缓冲 channel 中为 nil。
④ elemsize uint16——单个元素的字节大小。编译器在 make(chan T, N) 时根据 T 的类型自动填充。
⑤ closed uint32——0 表示 channel 开放,1 表示已关闭。close(ch) 将这个字段原子设置为 1,之后的所有发送操作都会 panic。
⑥ elemtype *_type——指向元素类型的 runtime._type 结构体。用于 GC 扫描(通道缓冲区中的指针需要被 GC 追踪)和方法调用。
⑦ sendx uint——环形缓冲区的写入索引。每次 qcount < dataqsiz 时写入 buf[sendx],然后 sendx = (sendx + 1) % dataqsiz。
⑧ recvx uint——环形缓冲区的读取索引。每次 qcount > 0 时读取 buf[recvx],然后 recvx = (recvx + 1) % dataqsiz。
⑨ recvq waitq——接收等待队列。链表——每个节点是一个 sudog,表示一个被 <-ch 阻塞的 goroutine。
⑩ sendq waitq——发送等待队列。链表——每个节点是一个 sudog,表示一个被 ch <- x 阻塞的 goroutine。
⑪ lock mutex——保护整个 hchan 的互斥锁。所有操作(send/recv/close)都先 lock 再 unlock。这是简单的实现选择——Go 没有使用无锁数据结构——channel 的性能瓶颈不在锁竞争(大多数操作是快速路径,锁持有时间极短)。
# 3.2 环形缓冲区实现
环形缓冲区(dataqsiz = 4,qcount = 2):
sendx=1
│
┌──────┬──────┬──────┬──────┐
│ a │ b │ │ │
└──────┴──────┴──────┴──────┘
│
recvx=0
写入 c(qcount=2 < 4 → 直接写入):
buf[sendx] = c → buf[1] = c
sendx = (1+1) % 4 = 2
qcount = 3
读取(qcount=3 > 0 → 直接读取):
c = buf[recvx] → c = buf[0] = a
recvx = (0+1) % 4 = 1
qcount = 2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sendx 和 recvx 的关系:
sendx == recvx且qcount == 0→ 缓冲区为空sendx == recvx且qcount == dataqsiz→ 缓冲区为满qcount = (sendx - recvx + dataqsiz) % dataqsiz
为什么不用普通的切片做缓冲——环形缓冲区避免了 copy(buf, buf[1:]) 这样的 O(n) 操作。出队只需移动 recvx 索引(O(1)),进队只需移动 sendx 索引(O(1))。
# 3.3 有缓冲 vs 无缓冲的 hchan 差异
| 字段 | 无缓冲 (make(chan T)) | 有缓冲 (make(chan T, N)) |
|---|---|---|
dataqsiz | 0 | N |
buf | nil | 指向 N×elemsize 的数组 |
qcount | 始终为 0 | 0~N |
| send 行为 | 阻塞直到有 receiver | 缓冲区有空位则写入,否则阻塞 |
| recv 行为 | 阻塞直到有 sender | 缓冲区有元素则读取,否则阻塞 |
| 内存占用 | sizeof(hchan) ≈ 96 字节 | 96 + N×elemsize 字节 |
# 4. sudog:等待队列节点
# 4.1 sudog 结构体
// runtime/runtime2.go
type sudog struct {
g *g // ① 等待的 goroutine
next *sudog // ② 链表下一个节点
prev *sudog // ③ 链表上一个节点
elem unsafe.Pointer // ④ 要发送/接收的数据地址
acquiretime int64 // ⑤ 入队时间(用于调度诊断)
releasetime int64 // ⑥ 出队时间
// ... 其他字段
c *hchan // ⑦ 所属的 channel
}
2
3
4
5
6
7
8
9
10
11
逐字段:
① g *g——这个 sudog 代表的 goroutine(处于 _Gwaiting 状态)。被 gopark 挂起时 G 的上下文保存在这里。
②③ next/prev *sudog——recvq 和 sendq 是双向链表。next 指向前驱,prev 指向后继(实际上 recvq.first 指向队首,recvq.last 指向队尾)。
④ elem unsafe.Pointer——指向要传输的数据。对于阻塞的 sender,elem 指向它想发送的值;对于阻塞的 receiver,elem 指向它想写入的地址。
⑦ c *hchan——所属的 channel 指针。close 时需要遍历 recvq/sendq 唤醒所有等待者。
# 4.2 G→sudog→等待队列的链
recvq (waitq)
first → sudog{g: G1, elem: &x1}
next → sudog{g: G2, elem: &x2}
next → nil
last → sudog{g: G2, elem: &x2}
2
3
4
5
入队(<-ch 阻塞时):
- 从 P 的
sudogcache分配一个sudog - 设置
sudog.g = gp(当前 G) - 设置
sudog.elem = &receiverVariable(接收变量的地址) - 插入
recvq链表尾部 - 调用
gopark——G 进入_Gwaiting状态
出队(ch <- x 唤醒 receiver 时):
- 从
recvq链表头部取一个sudog - 将 sender 的数据直接拷贝到
sudog.elem指向的地址(绕过缓冲区) - 调用
goready(sudog.g)——G 进入_Grunnable状态,放回 P 的本地队列
# 4.3 gopark 与 goready 的唤醒机制
// runtime/proc.go
func gopark(unlockf func(*g, unsafe.Pointer) bool, ...) {
mp := acquirem() // 绑定到当前 M
gp := mp.curg // 当前 G
// ...
mcall(park_m) // 切换到 g0 栈执行 park_m
// park_m 内部:
// 1. 保存 G 的上下文(PC、SP、BP 等)
// 2. 将 G 状态设为 _Gwaiting
// 3. 调用 schedule() 切换到其他 G
// → 从这行之后,当前 G 不再执行,直到被 goready 唤醒
}
func goready(gp *g, traceskip int) {
// 1. 将 G 状态设为 _Grunnable
// 2. 将 G 放入当前 P 的 runnext(优先调度)或本地队列
// 3. 如果 P 有空闲 → G 被立刻调度执行
// 4. G 恢复执行 → 从 gopark 之后的代码继续
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
关键:gopark 不是"函数返回"——它调用 mcall 切换到 g0 栈,保存 G 的执行上下文,然后调度其他 G。被 goready 唤醒后,G 从 gopark 的调用点之后继续执行——好像 gopark 返回了一样。
# 5. send 的实现
# 5.1 快速路径:等待中的 receiver 直接接手
// runtime/chan.go (简化)
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 0. nil channel 检查
if c == nil {
if !block { return false }
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 1. 快速路径:有等待的 receiver?
if c.closed != 0 {
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// 有 receiver 在等待!直接交付——绕过缓冲区
send(c, sg, ep, func() { unlock(&c.lock) })
return true
}
// 2. 缓冲区有空位?
if c.qcount < c.dataqsiz {
// 直接拷贝到环形缓冲区
typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep)
c.sendx++
if c.sendx == c.dataqsiz { c.sendx = 0 }
c.qcount++
return true
}
// 3. 缓冲区满 → 阻塞
// ... 见 5.3
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
快速路径图解:
ch <- 42
情况 A:recvq 非空(有 goroutine 在 <-ch 等待)
→ 从 recvq 队首取出 sudog
→ 将 42 直接拷贝到 sudog.elem 指向的地址(接收者的局部变量)
→ goready(sudog.g) ——唤醒接收者
→ 返回 true(没用到缓冲区!)
情况 B:buf 有空位(qcount < dataqsiz)
→ 将 42 拷贝到 buf[sendx]
→ sendx++ ; qcount++
→ 返回 true
2
3
4
5
6
7
8
9
10
11
12
绕过缓冲区的意义——当 receiver 已经在等待时,不写缓冲区而是直接内存拷贝,省去了一次写入 + 一次读取 + 两次锁操作。这是无缓冲 channel 高性能的核心。
# 5.2 慢路径:写入环形缓冲
当 qcount < dataqsiz 时(缓冲区有空位):
// runtime/chan.go
func chansend(...) {
// 在持有 c.lock 的情况下:
if c.qcount < c.dataqsiz {
// 计算目标地址 = buf + sendx * elemsize
qp := chanbuf(c, c.sendx) // chanbuf = add(buf, sendx*elemsize)
typedmemmove(c.elemtype, qp, ep) // memcpy ep → qp
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0 // 环形回绕
}
c.qcount++
return true
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedmemmove 会根据元素类型调用对应的 memmove 实现——如果元素包含指针,同时触发写屏障(GC 需要知道)。
# 5.3 阻塞 send:把 G 挂到 sendq
当缓冲区满且 recvq 空时:
// runtime/chan.go
func chansend(...) {
// ...
// 缓冲区满 → 阻塞发送
if !block { return false } // select 的非阻塞发送 → 直接返回 false
// 1. 获取 sudog
gp := getg()
mysg := acquireSudog()
mysg.elem = ep // 指向要发送的值
mysg.g = gp
// 2. 入队 sendq
c.sendq.enqueue(mysg)
// 3. 挂起当前 G
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 4. 被唤醒后——数据已经被 receiver 取走了
// (receiver 在 recv 时将 sudog.elem 的内容拷贝到了自己的缓冲区)
releaseSudog(mysg)
return true
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
阻塞后被唤醒的两种可能:
- 有 receiver 到达:receiver 从
sendq取出 sender 的sudog,将elem指向的数据拷贝走,然后goready(sender.G) - channel 被关闭:
close(ch)会 panic——closed channel 不能 send。所以 sender 被唤醒后还要检查c.closed
# 6. recv 的实现
# 6.1 快速路径:等待中的 sender 直接交付
// runtime/chan.go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 0. nil channel 检查
if c == nil {
if !block { return }
gopark(nil, nil, waitReasonChanRecvNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
// 1. channel 已关闭 + 缓冲区为空
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep) // 返回零值
}
return true, false // received=false —— channel 已关闭
}
// 2. 快速路径:有等待的 sender?
if sg := c.sendq.dequeue(); sg != nil {
// 有 sender 在等待!直接取走它的数据
recv(c, sg, ep, func() { unlock(&c.lock) })
return true, true
}
// 3. 缓冲区有元素?
if c.qcount > 0 {
// 从环形缓冲区读取
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp) // 清除旧数据
c.recvx++
if c.recvx == c.dataqsiz { c.recvx = 0 }
c.qcount--
return true, true
}
// 4. 缓冲区空 → 阻塞
// ...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
关键优化:接收时优先检查 sendq 非空——如果有 sender 在等待,直接从 sender 的 sudog.elem 拷贝数据(绕过缓冲区)。这保证了即使有缓冲 channel,当 sender 阻塞时,receiver 到来会直接取走 sender 的数据——不需要经过缓冲区中转。
# 6.2 慢路径:从环形缓冲读取
当 qcount > 0 且 sendq 为空时——从环形缓冲区取数据:
qp := chanbuf(c, c.recvx) // buf + recvx * elemsize
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // 拷贝到接收者的地址
}
typedmemclr(c.elemtype, qp) // 清除缓冲区中的旧值(有助于 GC)
c.recvx++
if c.recvx == c.dataqsiz { c.recvx = 0 }
c.qcount--
2
3
4
5
6
7
8
为什么清除 buf[recvx]——如果 channel 的元素类型包含指针,不清除会导致 GC 认为该指针仍存活(它仍在 buf 数组中),直到该槽被下一次写入覆盖。及时清除帮助 GC 提前回收。
# 6.3 阻塞 recv:把 G 挂到 recvq
当 qcount == 0 且 sendq 空时——接收者阻塞:
gp := getg()
mysg := acquireSudog()
mysg.elem = ep // ep 指向接收者的局部变量地址
mysg.g = gp
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanRecv, traceEvGoBlockRecv, 2)
// 被唤醒后——数据已经写入 ep 指向的地址(sender 做了 typedmemmove)
releaseSudog(mysg)
return true, c.closed == 0 // received = 是否确实收到数据
2
3
4
5
6
7
8
9
10
11
被唤醒的两种可能:
- 有 sender 到达:sender 将数据直接拷贝到
sudog.elem指向的地址 →goready(receiver.G) - channel 被关闭:
close(ch)会设置c.closed=1,然后唤醒所有 recvq 中的 receiver。receiver 醒来后c.qcount==0→ 返回false(表示 channel 已关闭)
# 6.4 从已关闭 channel 接收
ch := make(chan int, 3)
ch <- 1
ch <- 2
close(ch)
x, ok := <-ch // x=1, ok=true (缓冲区还有数据)
x, ok = <-ch // x=2, ok=true
x, ok = <-ch // x=0, ok=false (缓冲区空 + 已关闭 → 零值)
x, ok = <-ch // x=0, ok=false (后续始终返回零值)
2
3
4
5
6
7
8
9
设计意图:关闭 channel 后不立即停止返回——先让消费者读完缓冲区中剩余的数据。这是一种"优雅关闭"——消费者可以先处理完遗留数据再退出。
# 7. close 的实现与语义保证
# 7.1 关闭流程
// runtime/chan.go
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1 // 标记为已关闭
// 1. 收集所有等待的 receiver
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil { break }
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem) // 清空 elem → 零值
}
glist.push(sg.g) // 加入唤醒列表
}
// 2. 收集所有等待的 sender
for {
sg := c.sendq.dequeue()
if sg == nil { break }
glist.push(sg.g) // sender 会被唤醒 → 然后 panic!
}
unlock(&c.lock)
// 3. 唤醒所有等待者
for !glist.empty() {
gp := glist.pop()
goready(gp, 3)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
关键:receiver 在唤醒前已经被清空了 elem(设为零值)——所以醒来后看到 received=false。sender 醒来后会检查 c.closed——发现已关闭 → panic。
# 7.2 唤醒所有等待者
close 的唤醒是广播式的——不是只唤醒一个,而是唤醒所有在 recvq 和 sendq 中等待的 goroutine:
- recvq 中的 G:醒来后
<-ch返回零值 +ok=false - sendq 中的 G:醒来后发现
c.closed=1→panic("send on closed channel")
这保证了关闭 channel 后不会有 goroutine 被"遗忘"在等待队列中。
# 7.3 关闭后的行为保证
| 操作 | 关闭前 | 关闭后 |
|---|---|---|
ch <- x | 正常发送或阻塞 | panic(Go 1.x 至今) |
<-ch(缓冲区有数据) | 返回数据 | 返回数据(读完尽力) |
<-ch(缓冲区空) | 阻塞 | 返回零值 + ok=false |
close(ch) | 正常关闭 | panic(重复 close) |
for range ch | 持续等待 | 读完缓冲区后退出 |
「谁创建谁关闭」原则——不要在接收方关闭 channel。因为 close(ch) 对于还在发送的 goroutine 是 panic。正确的模式是:创建 channel 的 goroutine 负责关闭它。
# 8. select 的实现
# 8.1 随机化 case 顺序
// runtime/select.go
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
// 1. 生成随机轮询顺序
// pollorder: 随机排列 [0..ncases-1]
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
// 2. 生成锁排序(按 hchan 地址升序——避免死锁)
for i := 0; i < ncases; i++ {
j := fastrandn(uint32(ncases - i))
lockorder[i] = lockorder[i+j]
// ...
}
// 3. 按 pollorder 轮询所有 case
for _, casei := range pollorder {
cas = &scases[casei]
// 检查每个 channel 是否就绪
switch cas.kind {
case caseRecv:
if 可以立即接收 { return casei, true }
case caseSend:
if 可以立即发送 { return casei, true }
case caseDefault:
continue // default 最后检查
}
}
// 4. 所有 case 都不就绪 → 阻塞
// 将所有 channel 锁上 → 挂起 G 到所有 channel 的等待队列
// 被唤醒后 → 重新检查哪个 case 就绪
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
为什么 case 顺序是随机的——避免某条 case 被饿死。如果总是按声明顺序检查,第一条 case 永远有优先权。随机化保证了公平性。
# 8.2 锁顺序避免死锁
如果 select 中有多个 channel,需要按统一的顺序加锁——否则会出现死锁:
Goroutine A: select { case ch1 <- 1: case ch2 <- 2: }
Goroutine B: select { case ch2 <- 1: case ch1 <- 2: }
如果不排序:
A lock ch1 → B lock ch2 → A 等 ch2 → B 等 ch1 → 死锁!
2
3
4
5
Go 的方案:按 hchan 的内存地址升序排序所有 channel——所有 goroutine 都以相同顺序加锁。这消除了循环等待——因为没有两个 goroutine 能以不同的顺序持有相同的锁集合。
// runtime/select.go
// 按 hchan 地址排序——全局统一的锁顺序
for i := 0; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
lockorder[i] = lockorder[j]
lockorder[j] = uint16(i)
}
// 但 lockorder 是伪随机的——这里只是为了减少锁竞争
// 实际锁顺序仍然依赖 hchan 地址排序
2
3
4
5
6
7
8
9
# 8.3 select 的完整执行流程
select {
case v := <-ch1:
// ...
case ch2 <- x:
// ...
default:
// ...
}
1. 准备阶段:
- 随机排列 pollorder(case 检查顺序)
- 按 hchan 地址排序 lockorder(加锁顺序)
2. 第一轮:快速检查
- 按 pollorder 轮询每个 case
- 能立即执行?→ 执行并返回
- 都不能?→ 有 default?→ 执行 default
3. 第二轮:阻塞等待
- 按 lockorder 对所有 channel 加锁
- 将所有 G 注册到每个 channel 的 recvq/sendq
- gopark 挂起 G
4. 被唤醒后:
- 从所有 channel 的等待队列中移除 G
- 确定是哪个 case 触发的
- 执行对应的 case 并返回
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 9. channel 泄漏与最佳实践
# 9.1 goroutine 泄漏的三种模式
模式 1:生产者不关闭 channel——消费者永远阻塞
// ❌ 泄漏
func leakProducer() chan int {
ch := make(chan int)
go func() {
for i := 0; i < 100; i++ {
ch <- i
}
// 忘记 close(ch)!
}()
return ch
}
func consumer() {
ch := leakProducer()
for v := range ch { // 永远不退出——G 泄漏
fmt.Println(v)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
模式 2:消费者提前退出——生产者阻塞在发送上
// ❌ 泄漏
func leakConsumer() {
ch := make(chan int)
go func() {
for i := 0; i < 10000; i++ {
ch <- i // 消费者退出后 → 阻塞 → G 泄漏
}
}()
for i := 0; i < 10; i++ {
fmt.Println(<-ch)
}
// 消费者只读了 10 个就退出了 → 10000 个找不到接收者 → goroutine 泄漏
}
2
3
4
5
6
7
8
9
10
11
12
13
模式 3:select 中启用定时器但不关闭——timer goroutine 泄漏
// ❌ timer 泄漏
func leakTimer() {
for {
select {
case <-time.After(time.Second):
// time.After 每次创建新的 timer → GC 不会立即回收
// → timer 数量持续增长 → 内存泄漏
}
}
}
// ✅ 正确用法
func noLeakTimer() {
timer := time.NewTicker(time.Second)
defer timer.Stop()
for {
select {
case <-timer.C:
// tick
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 9.2 谁创建谁关闭原则
// ✅ 清晰的 channel 所有权
func producer(out chan<- int) {
defer close(out) // 创建者关闭
for i := 0; i < 10; i++ {
out <- i
}
}
func consumer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
// 消费者不关闭 channel——只负责读
}
func main() {
ch := make(chan int, 5)
go producer(ch)
consumer(ch)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
信号 channel 的特殊模式:
// 用 channel 做信号——接收方关闭
done := make(chan struct{})
go func() {
// 工作
close(done) // 工作者关闭信号 channel
}()
<-done // 主 goroutine 等待信号
2
3
4
5
6
7
# 9.3 性能基准与选型
BenchmarkChanUnbuffered-8 5000000 250 ns/op
BenchmarkChanBuffered-8 10000000 120 ns/op
BenchmarkChanSelect-8 2000000 800 ns/op
2
3
| 场景 | 推荐 | 原因 |
|---|---|---|
| 一对一同步 | 无缓冲 channel | 零额外内存,直接交付 |
| 生产者比消费者快 | 有缓冲 channel | 削峰填谷 |
| 广播通知 | close(ch) | 所有 <-ch 同时返回 |
| 等待超时 | select + time.After | 条件等待 |
| 控制并发数 | 有缓冲 channel 做令牌 | make(chan struct{}, N) |
# 10. 综合案例串讲
# 10.1 案例真相揭晓
回到第 1 章的订单流水线——8 个疑问逐条作答:
| 疑问 | 答案 |
|---|---|
| ① hchan 结构? | 第 2-3 章:环形缓冲区 buf + sendx/recvx 索引 + recvq/sendq 等待队列 |
| ② sudog 是什么? | 第 4 章:goroutine 在 channel 等待队列中的节点——gopark 挂起,goready 唤醒 |
| ③ send 流程? | 第 5 章:优先检查 recvq→直接交付;否则写入环形缓冲;都不行就阻塞 |
| ④ recv 流程? | 第 6 章:优先检查 sendq→直接取走;否则读环形缓冲;都不行就阻塞 |
| ⑤ close 行为? | 第 7 章:标记 closed=1 → 唤醒所有 recvq(返零值)+ sendq(panic) |
| ⑥ select 随机化? | 第 8.1:fastrandn 生成随机轮询顺序——防止 case 饿死 |
| ⑦ channel 泄漏? | 第 9.1:三种模式——不关闭/提前退出/timer 泄漏 |
| ⑧ 无缓冲 vs 有缓冲? | 第 3.3 + 9.3:无缓冲=同步;有缓冲=削峰 |
案例完整修复:
// 修复 1:errCh 用非阻塞发送
func stageValidate(in <-chan Order, out chan<- Order) {
for order := range in {
if order.ID%10 == 0 {
log.Printf("order %d validation failed", order.ID)
continue
}
out <- order
}
close(out) // 正常完毕后关闭下游 channel
}
// 修复 2:用 context 控制 goroutine 生命周期
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch1 := make(chan Order, 10)
ch2 := make(chan Order, 10)
go stageReceive(ch1)
go stageValidate(ch1, ch2)
go stagePersist(ctx, ch2)
<-ctx.Done()
// 等待 pipeline 优雅关闭
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 10.2 一个 channel 操作的完整旅程
ch := make(chan int, 3)
go func() { ch <- 42 }()
x := <-ch
───────────────────────────────────────────────────────────
make(chan int, 3):
→ runtime.makechan(t, 3)
→ mallocgc(sizeof(hchan) + 3*sizeof(int))
→ hchan.dataqsiz = 3
→ hchan.buf = 堆上分配的 24 字节数组
→ hchan.elemsize = 8
→ hchan.lock = unlocked
→ 返回 *hchan
goroutine A: ch <- 42:
→ lock(&ch.lock)
→ 检查 recvq 是否空 → 空
→ qcount(0) < dataqsiz(3) → 写入 buf[0]
→ typedmemmove(buf + 0*8, &42)
→ sendx = 1, qcount = 1
→ unlock(&ch.lock)
→ 返回
goroutine B: x = <-ch:
→ lock(&ch.lock)
→ 检查 sendq 是否空 → 空
→ qcount(1) > 0 → 从 buf[0] 读取
→ typedmemmove(&x, buf + 0*8)
→ typedmemclr(buf + 0*8) // 清空帮助 GC
→ recvx = 1, qcount = 0
→ unlock(&ch.lock)
→ 返回 42
无缓冲 channel 的对撞:
ch := make(chan int) // dataqsiz = 0
G1: ch <- 42:
→ recvq 空 → 缓冲区容量 0 → 阻塞
→ gopark(G1) → G1 挂到 sendq
G2: x = <-ch:
→ sendq 非空!→ 取出 sudog{G1, elem=&42}
→ typedmemmove(&x, sudog.elem) ← 直接将 42 拷贝到 x
→ goready(G1) ← 唤醒 sender
→ 返回 42
close(ch):
→ closed = 1
→ 遍历 recvq → 每个 receiver 的 elem 清零 → goready
→ 遍历 sendq → 每个 sender goready → sender 醒来后 panic
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 10.3 设计哲学回扣
哲学 1:通过通信共享内存——CSP 的 Go 实现
"Don't communicate by sharing memory; share memory by communicating." 这句 Go 名言在 hchan 的源码中具体体现为:sender 和 receiver 之间的数据传递不是通过"共享变量+锁",而是通过 channel 的 typedmemmove——一边直接拷贝到另一边的内存。无缓冲 channel 的对撞设计(sender 直接交付给 receiver)是最极致的 CSP 实践——数据从发送方栈到接收方栈,不经任何中间缓存。
哲学 2:用一个整体结构替代分离的同步原语
Java 实现生产者-消费者需要 Queue + Lock + Condition 三个对象协调。Go channel 把它们统一成一个 hchan:环形缓冲区(Queue)、mutex(Lock)、recvq/sendq + gopark/goready(Condition)。这种统一带来两个好处:死锁风险更低(只有一把锁),代码更简洁(一个 <-ch 代替 lock+条件检查+wait+unlock)。
哲学 3:快速路径优先——大多数操作不阻塞
chansend 的源码前三步是:检查已关闭 → 有 receiver?→ 缓冲区有空位?只有第四步才是阻塞路径。在正常的生产者-消费者中,缓冲区未满或总有接收方在等待——快速路径占 99% 的操作。慢路径(阻塞、gopark)只在边缘场景发生。这种设计让 channel 在高吞吐场景下几乎看不到锁竞争。
哲学 4:显式关闭——让接收方知道"结束了"
close(ch) 不只是"释放资源"——它是一种信号:没有更多数据了。接收方通过 v, ok := <-ch 的 ok=false 知道流结束了。这和 HTTP 的 Connection: close 头、TCP 的 FIN 包一样——是一种带内信号。Go 特意让 close 后的 channel 不能发送(避免"关闭后还在生产"的歧义),且关闭后可以继续接收(读完缓冲区的遗留数据)。
# 10.4 速查表
hchan 核心字段:
| 字段 | 含义 | 何时变化 |
|---|---|---|
qcount | 缓冲中元素个数 | send +1,recv -1 |
dataqsiz | 缓冲区容量 | 仅 make(chan T, N) 时设置 |
sendx | 写入索引 | 每次 send 后 +1(环形回绕) |
recvx | 读取索引 | 每次 recv 后 +1(环形回绕) |
closed | 关闭标志 | close(ch) 设为 1 |
recvq | 接收等待队列 | <-ch 阻塞时入队 |
sendq | 发送等待队列 | ch <- 阻塞时入队 |
操作语义:
| 操作 | nil channel | 已关闭 channel | 正常 channel |
|---|---|---|---|
ch <- x | 永久阻塞 | panic | 发送或阻塞 |
<-ch | 永久阻塞 | 零值+false | 接收或阻塞 |
close(ch) | panic | panic | 关闭 |
channel 泄漏模式:
| 模式 | 根因 | 修复 |
|---|---|---|
| 消费者永远等待 | 生产者不关闭 channel | defer close(ch) 在生产者 |
| 生产者阻塞 | 消费者提前退出 | 用 context 取消 |
| timer 泄漏 | time.After 在 select 循环 | 用 time.NewTicker + Stop |
诊断命令:
# goroutine 泄漏检测
go tool pprof http://localhost:6060/debug/pprof/goroutine
(pprof) top 10 # 看等待最多的 goroutine 栈
(pprof) list # 定位阻塞在哪个 channel 操作
# 竞态检测
go run -race main.go
go test -race ./...
# runtime 调度跟踪
GODEBUG=schedtrace=1000 ./app # 查看 G 状态分布
# channel 阻塞分析
curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep "chan"
2
3
4
5
6
7
8
9
10
11
12
13
14
下一篇:我们已经看清了 channel 的
hchan结构和 send/recv/close/select 的全流程,下一步进入 10.sync同步原语剖析——把sync.Mutex、sync.RWMutex、sync.WaitGroup、sync.Once的源码和锁竞争优化剖开。