编程进阶网 编程进阶网
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接

杨充

专注编程 · 终身学习者
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接
  • README
  • C语言入门精通

  • Cpp入门到精通

  • Java入门精通

  • Go入门到精通

    • 入门教程

      • README
      • Go简史
      • 基础语法
      • 数据类型
      • 运算符
      • 复合类型
      • 流程语句
      • 函数
      • 指针与逃逸
      • 结构体与方法
      • 接口与多态
      • 错误处理
      • 并发goroutine
      • 通道channel
        • 目录介绍
        • 13.1 本章学习目标
        • 13.2 channel 的三种形态
          • 13.2.1 无缓冲 make(chan T)
          • 13.2.2 有缓冲 make(chan T, N)
          • 13.2.3 单向 channel chan<- T / <-chan T
          • 13.2.4 综合案例与思考
        • 13.3 send / receive 的阻塞规则
          • 13.3.1 完整的状态矩阵
          • 13.3.2 综合案例与思考
        • 13.4 close(ch) 语义
          • 13.4.1 关闭后再发送 → panic
          • 13.4.2 关闭后接收 → 零值 + ok=false
          • 13.4.3 重复关闭 → panic
          • 13.4.4 关闭只读 channel → 编译错
          • 13.4.5 综合案例与思考
        • 13.5 select 多路复用
          • 13.5.1 default 分支:非阻塞
          • 13.5.2 select 随机化
          • 13.5.3 nil channel 在 select 中"消失"
          • 13.5.4 综合案例与思考
        • 13.6 经典并发模式
          • 13.6.1 扇出 / 扇入
          • 13.6.2 worker pool
          • 13.6.3 done channel 取消
          • 13.6.4 信号量模式
          • 13.6.5 综合案例与思考
        • 13.7 综合示例:实现一个限流器
        • 13.8 本章底层原理(简介)
        • 13.9 Go 新手陷阱 Top 5
        • 13.10 思考题
        • 13.11 训练题
        • 13.12 推荐阅读
      • 同步sync包
      • IO和文件
      • 标准库与泛型
      • 工程化与模块
      • 特性图谱
    • 综合案例

    • 专栏博客

    • 开发技巧

  • JavaScript入门

  • CodeX
  • Go入门到精通
  • 入门教程
杨充
2026-05-21
目录

通道channel

# 第 13 章 通道 channel

"Don't communicate by sharing memory; share memory by communicating." — Rob Pike 关键词:有缓冲 / 无缓冲 channel、select、close 语义、nil channel 妙用、扇入扇出、信号量模式、限流器


# 目录介绍

  • 13.1 本章学习目标
  • 13.2 channel 的三种形态
    • 13.2.1 无缓冲 make(chan T)
    • 13.2.2 有缓冲 make(chan T, N)
    • 13.2.3 单向 channel chan<- T / <-chan T
    • 13.2.4 综合案例与思考
  • 13.3 send / receive 的阻塞规则
    • 13.3.1 完整的状态矩阵
    • 13.3.2 综合案例与思考
  • 13.4 close(ch) 语义
    • 13.4.1 关闭后再发送 → panic
    • 13.4.2 关闭后接收 → 零值 + ok=false
    • 13.4.3 重复关闭 → panic
    • 13.4.4 关闭只读 channel → 编译错
    • 13.4.5 综合案例与思考
  • 13.5 select 多路复用
    • 13.5.1 default 分支:非阻塞
    • 13.5.2 select 随机化
    • 13.5.3 nil channel 在 select 中"消失"
    • 13.5.4 综合案例与思考
  • 13.6 经典并发模式
    • 13.6.1 扇出 / 扇入
    • 13.6.2 worker pool
    • 13.6.3 done channel 取消
    • 13.6.4 信号量模式
    • 13.6.5 综合案例与思考
  • 13.7 综合示例:实现一个限流器
  • 13.8 本章底层原理(简介)
  • 13.9 Go 新手陷阱 Top 5
  • 13.10 思考题
  • 13.11 训练题
  • 13.12 推荐阅读

# 13.1 本章学习目标

学完本章你应当能够:

  • ✅ 能解释有缓冲 / 无缓冲 channel 的区别,说出各自适用场景
  • ✅ 能写出正确的 send / receive 代码,知道什么情况会阻塞、什么情况会 panic
  • ✅ 能说出 close(ch) 的 4 条语义规则,写出安全的关闭逻辑
  • ✅ 能用 select + nil channel 做"动态启用/禁用分支"
  • ✅ 能写出扇入扇出 / worker pool / 信号量三种并发模式
  • ✅ 能用 channel 实现一个限流器
  • ✅ 能画出 for range ch 和 select 的控制流图

本章是 Go 并发的核心。第 12 章让你启动 goroutine,本章让你让 goroutine 之间通信。


# 13.2 channel 的三种形态

channel 是 goroutine 之间传递数据的类型安全管道——一端发送、一端接收。

# 13.2.1 无缓冲 make(chan T)

无缓冲 channel 没有容量——发送方和接收方必须同时就绪:

ch := make(chan int)    // 无缓冲 channel——容量 0

// 发送方 goroutine
go func() {
    ch <- 42            // 阻塞——直到有接收方就绪
    fmt.Println("发送完成")
}()

// 接收方 goroutine
value := <-ch           // 阻塞——直到有发送方就绪
fmt.Println("收到:", value)
// 输出:收到: 42
//       发送完成
1
2
3
4
5
6
7
8
9
10
11
12
13

执行时序:

时间 →
发送方: ch <- 42 ──── 阻塞等待 ──── 解除阻塞 ──── println
接收方:               <-ch ────── 收到 42 ──── println
                      ↑ 两端"握手"成功的瞬间
1
2
3
4

核心特性:无缓冲 channel 让发送和接收同步——它是 goroutine 之间的"握手"机制,而不是"数据队列"。

# 13.2.2 有缓冲 make(chan T, N)

有缓冲 channel 有容量——可以存入 N 个元素后再阻塞:

ch := make(chan int, 3)  // 有缓冲 channel——容量 3

ch <- 1   // 不阻塞(缓冲 1/3)
ch <- 2   // 不阻塞(缓冲 2/3)
ch <- 3   // 不阻塞(缓冲 3/3——满了)

// ch <- 4   // 阻塞!——缓冲已满,等待接收方腾出空间

fmt.Println(<-ch) // 1(缓冲 2/3——有空间了)
ch <- 4           // 不阻塞(缓冲 3/3)
1
2
3
4
5
6
7
8
9
10

有缓冲 vs 无缓冲:

无缓冲 make(chan T) 有缓冲 make(chan T, N)
容量 0 N
发送行为 必须等待接收方 缓冲未满时不阻塞
接收行为 必须等待发送方 缓冲非空时不阻塞
典型场景 同步通知、结果传递 生产者-消费者解耦、队列
类比 握手 邮箱

# 13.2.3 单向 channel chan<- T / <-chan T

Go 允许把双向 channel 转换为单向 channel——编译期约束:

// 双向 channel——能做任何操作
ch := make(chan int)

// ① 只发送 channel:chan<- T
func producer(out chan<- int) {
    out <- 42                    // ✅ 发送——合法
    // <-out                     // ❌ 接收——编译错
}

// ② 只接收 channel:<-chan T
func consumer(in <-chan int) {
    value := <-in                // ✅ 接收——合法
    // in <- 42                  // ❌ 发送——编译错
    fmt.Println(value)
}

// ③ 调用时——双向 channel 自动转换为单向
producer(ch)    // chan int → chan<- int  ✅
consumer(ch)    // chan int → <-chan int  ✅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

为什么需要单向 channel?

  • 语义清晰:函数签名中 chan<- T 明确告诉调用者"这个函数只会往 channel 发数据"
  • 编译期安全:不会出现"不小心在消费者里发了数据"的 bug
  • 和 close 的配合:close(ch) 需要双向 channel(防止只读方错误关闭)

# 13.2.4 综合案例与思考

综合案例:三种形态完整演示

package main

import (
    "fmt"
    "time"
)

// 无缓冲:同步通知
func unbufferedDemo() {
    ch := make(chan string)
    go func() {
        time.Sleep(100 * time.Millisecond)
        ch <- "goroutine 完成"     // 发送方等待接收方
    }()

    fmt.Println("等待 goroutine...")
    result := <-ch                  // 主 goroutine 等待
    fmt.Println("收到:", result)
}

// 有缓冲:生产者-消费者
func bufferedDemo() {
    jobs := make(chan int, 5)       // 缓冲 5 个任务

    // 生产者——不会因为消费者慢而阻塞(缓冲空闲时)
    go func() {
        for i := 1; i <= 10; i++ {
            jobs <- i
            fmt.Printf("生产: %d\n", i)
        }
        close(jobs)
    }()

    // 消费者——按自己的速度处理
    for job := range jobs {
        time.Sleep(50 * time.Millisecond)
        fmt.Printf("  消费: %d\n", job)
    }
}

// 单向 channel:职责分离
func generate(out chan<- int) {     // 只能发送
    for i := 1; i <= 3; i++ {
        out <- i
    }
    close(out)
}

func square(in <-chan int, out chan<- int) { // 只接收 + 只发送
    for v := range in {
        out <- v * v
    }
    close(out)
}

func print(in <-chan int) {         // 只能接收
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    fmt.Println("=== 无缓冲 ===")
    unbufferedDemo()

    fmt.Println("\n=== 有缓冲 ===")
    bufferedDemo()

    fmt.Println("\n=== 单向 ===")
    nums := make(chan int)
    squares := make(chan int)
    go generate(nums)
    go square(nums, squares)
    print(squares)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

案例知识融合:这个案例演示了 channel 三种形态的典型用法——无缓冲用于 goroutine 间同步通知、有缓冲用于生产者-消费者解耦、单向 channel 在函数签名中约束操作方向。

思考题:

  1. 如果把无缓冲 channel 示例中的 goroutine 去掉,直接在 main 中 ch <- "hello"——会发生什么?为什么?
  2. 有缓冲 channel 的容量选多大?有没有计算公式?太大或太小的后果是什么?
  3. 单向 channel 只是编译期的约束——底层仍是一个双向 channel。如果你用 interface{} 绕过编译检查,会发生什么?

# 13.3 send / receive 的阻塞规则

# 13.3.1 完整的状态矩阵

channel 的 send 和 receive 在不同状态下有不同的行为——这张表覆盖了所有组合:

channel 状态 ch <- v(发送) v := <-ch(接收) v, ok := <-ch(接收+检测)
nil 永远阻塞 永远阻塞 永远阻塞
无缓冲,有接收方就绪 成功发送 — —
无缓冲,无接收方 阻塞等待 — —
有缓冲,未满 成功存入 — —
有缓冲,已满 阻塞等待 — —
有缓冲,非空 — 成功取出 ok=true
有缓冲,为空 — 阻塞等待 阻塞等待
已关闭,有未读数据 panic 成功取出 ok=true
已关闭,无未读数据 panic 零值 ok=false

关键规律:

  • nil channel 永远阻塞——这是 select 中"禁用分支"的秘诀(见 §13.5.3)
  • 关闭后发送一定 panic——唯一的发送规则
  • 关闭后接收不会阻塞——返回零值或缓冲中剩余的数据

# 13.3.2 综合案例与思考

综合案例:nil channel 的阻塞验证

package main

import "fmt"

func main() {
    var nilCh chan int        // nil channel

    // 验证 1:发送到 nil channel——永久阻塞
    go func() {
        fmt.Println("发送方 开始")
        nilCh <- 1
        fmt.Println("这行永远不会执行")  // 永远不会
    }()

    // 验证 2:从 nil channel 接收——永久阻塞
    go func() {
        fmt.Println("接收方 开始")
        <-nilCh
        fmt.Println("这行永远不会执行")  // 永远不会
    }()

    // 验证 3:关闭 nil channel——panic
    func() {
        defer func() {
            if r := recover(); r != nil {
                fmt.Println("关闭 nil channel:", r)  // close of nil channel
            }
        }()
        close(nilCh)
    }()
}
// 输出:
// 关闭 nil channel: close of nil channel
// (两个 goroutine 永远阻塞——程序不会退出)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

案例知识融合:这个案例验证了 nil channel 的核心行为——发送和接收都会永久阻塞,关闭会 panic。nil channel 在 select 中极有用——把某个 case 的 channel 设为 nil,这个 case 就永远不会被选中(见 §13.5.3)。

思考题:

  1. 为什么 Go 设计成"关闭后发送 panic"而不是"关闭后发送什么也不做"?这带来了什么好处?
  2. nil channel 永远阻塞——这看起来是个"bug",但它在 select 中非常有用。说说为什么?
  3. 有缓冲 channel 满了之后发送方阻塞——如果这时消费者 goroutine 也阻塞了——会发生什么?(提示:死锁检测)

# 13.4 close(ch) 语义

# 13.4.1 关闭后再发送 → panic

ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)

ch <- 3     // panic: send on closed channel
1
2
3
4
5
6

保护方法——只有发送方应该关闭 channel。如果多个发送方,用 sync.Once 或协调机制:

var once sync.Once
once.Do(func() { close(ch) })
1
2

# 13.4.2 关闭后接收 → 零值 + ok=false

关闭后仍可接收缓冲中剩余的数据——消耗完毕后返回零值:

ch := make(chan int, 2)
ch <- 10
ch <- 20
close(ch)

// 缓冲中还有数据——正常接收
fmt.Println(<-ch)                  // 10
val, ok := <-ch
fmt.Println(val, ok)               // 20, true

// 缓冲耗尽——返回零值
val, ok = <-ch
fmt.Println(val, ok)               // 0, false

// for range 自动检测关闭
for v := range ch {
    fmt.Println(v)                 // 只打印 10, 20——耗尽后停止
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 13.4.3 重复关闭 → panic

ch := make(chan int)
close(ch)
close(ch)   // panic: close of closed channel
1
2
3

谁该关闭 channel? 黄金法则——发送方关闭:

  • 只有一个发送方 → 发送方直接 close
  • 多个发送方 → 用 sync.Once 或通知接收方"我已发完"(接收方不关)
  • 接收方绝不关闭——收到的 channel 通常是 <-chan T,编译期就不让你关

# 13.4.4 关闭只读 channel → 编译错

func consumer(in <-chan int) {
    // close(in)   // 编译错:cannot close receive-only channel
}
1
2
3

# 13.4.5 综合案例与思考

综合案例:安全的关闭模式

package main

import (
    "fmt"
    "sync"
)

// 模式 1:单发送方——发送方关
func singleSender() {
    ch := make(chan int)

    // 发送方——唯一的生产者
    go func() {
        defer close(ch)         // ★ 发送方关
        for i := 1; i <= 5; i++ {
            ch <- i
        }
    }()

    // 接收方——不关,range 自动检测
    for v := range ch {
        fmt.Println(v)
    }
}

// 模式 2:多发送方——sync.Once 确保只关一次
func multiSender() {
    ch := make(chan int)
    var once sync.Once

    // 三个发送方
    for i := 0; i < 3; i++ {
        go func(id int) {
            for j := 0; j < 3; j++ {
                ch <- id*10 + j
            }
            // ★ 每个发送方都尝试关——但只有第一个生效
            once.Do(func() { close(ch) })
        }(i)
    }

    for v := range ch {
        fmt.Println(v)
    }
}

// 模式 3:接收方通过"done channel"通知发送方停止
func doneChannel() {
    dataCh := make(chan int)
    doneCh := make(chan struct{})

    // 发送方——发到 done 信号
    go func() {
        i := 0
        for {
            select {
            case <-doneCh:
                close(dataCh)
                return
            default:
                dataCh <- i
                i++
            }
        }
    }()

    // 接收方——拿到 5 个后通知停止
    count := 0
    for v := range dataCh {
        fmt.Println(v)
        count++
        if count >= 5 {
            close(doneCh)
        }
    }
}

func main() {
    fmt.Println("=== 单发送方 ===")
    singleSender()

    fmt.Println("=== 多发送方 ===")
    multiSender()

    fmt.Println("=== done channel ===")
    doneChannel()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

案例知识融合:这个案例展示了三种安全的 channel 关闭模式——单发送方直接关、多发送方用 sync.Once、以及通过 done channel 让接收方通知发送方停止。核心原则:不要从接收方关闭 channel。

思考题:

  1. 为什么 Go 设计成"重复关闭 panic"而不是"什么都不做"?这和不检查数组越界有什么异曲同工之处?
  2. for range ch 是怎么检测到 channel 关闭的?它内部调用的是 v, ok := <-ch 吗?

# 13.5 select 多路复用

# 13.5.1 default 分支:非阻塞

select 像"多路开关"——同时监听多个 channel。加上 default 分支实现非阻塞操作:

// 非阻塞发送
select {
case ch <- value:
    fmt.Println("发送成功")
default:
    fmt.Println("channel 已满——跳过发送")
}

// 非阻塞接收
select {
case v := <-ch:
    fmt.Println("收到:", v)
default:
    fmt.Println("channel 为空——没有数据")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 13.5.2 select 随机化

当多个 case 同时就绪时,select 随机选择一个——不是按代码顺序:

ch1 := make(chan string)
ch2 := make(chan string)

go func() {
    for i := 0; i < 10; i++ {
        ch1 <- "A"
        ch2 <- "B"
    }
}()

for i := 0; i < 10; i++ {
    select {
    case v := <-ch1:
        fmt.Print(v)
    case v := <-ch2:
        fmt.Print(v)
    }
}
// 输出类似:ABBAABABBA——交错的,不是严格的交替
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

为什么要随机化?

  • 防止某个 case 被"饿死"——如果按顺序,第一个 case 永远优先
  • 让所有 case 有公平的执行机会

# 13.5.3 nil channel 在 select 中"消失"

当一个 case 的 channel 是 nil——这个 case 永远不会被选中:

var nilCh chan int   // nil channel

select {
case <-nilCh:        // 永远不会被选中
    fmt.Println("这行永远不会执行")
case v := <-dataCh:
    fmt.Println(v)
default:
    fmt.Println("no data")
}
// 输出:no data(如果 dataCh 也没数据)
1
2
3
4
5
6
7
8
9
10
11

实战——动态启用/禁用分支:

// 实现"超时重置定时器"——每次收到数据,重新计时
func resetTimer() {
    dataCh := make(chan int)
    var timerCh <-chan time.Time  // nil——初始禁用

    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(300 * time.Millisecond)
            dataCh <- i
        }
        close(dataCh)
    }()

    timerCh = time.After(500 * time.Millisecond)  // 启用定时器

    for {
        select {
        case v, ok := <-dataCh:
            if !ok {
                fmt.Println("dataCh 关闭,退出")
                return
            }
            fmt.Println("收到:", v)
            // ★ 重置定时器——重新赋值,启用新定时器
            timerCh = time.After(500 * time.Millisecond)
        case <-timerCh:
            fmt.Println("超时!")
            return
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

nil channel 的妙用表:

需求 做法
禁用某个 case 把它的 channel 设为 nil
启用禁用的 case 重新赋值为有效 channel
初次不启用定时器 var timerCh <-chan time.Time(nil)

# 13.5.4 综合案例与思考

综合案例:同时监听 3 个 channel——数据、超时、取消

package main

import (
    "context"
    "fmt"
    "time"
)

func monitor(ctx context.Context, dataCh <-chan int) {
    // 初始禁用定时器
    var timeoutCh <-chan time.Time

    for {
        select {
        case <-ctx.Done():
            fmt.Println("取消:", ctx.Err())
            return

        case v, ok := <-dataCh:
            if !ok {
                fmt.Println("dataCh 关闭")
                return
            }
            fmt.Println("处理:", v)
            // 每次收到数据——重置超时
            timeoutCh = time.After(2 * time.Second)

        case <-timeoutCh:
            if timeoutCh != nil {
                fmt.Println("2 秒无数据——超时")
                return
            }
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    dataCh := make(chan int)
    go func() {
        for i := 0; i < 3; i++ {
            time.Sleep(500 * time.Millisecond)
            dataCh <- i
        }
        // 停止发送——2 秒后超时分支触发
    }()

    monitor(ctx, dataCh)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

案例知识融合:这个案例展示了 select 的实战用法——同时监听数据、超时、取消三个 channel。利用 var timeoutCh <-chan time.Time(nil)让定时器初始禁用,收到数据后才启用——完美实现了"可重置超时"。

思考题:

  1. 如果 select 中多个 case 同时就绪——Go 为什么要随机选择而不是按代码顺序?这和操作系统调度器的公平性有什么关系?
  2. time.After 返回的 <-chan time.Time 在超时前如果不接收——goroutine 会泄漏吗?为什么?
  3. 如果 select 里同时有发送和接收 case——它们之间公平吗?读和写有优先级吗?

# 13.6 经典并发模式

# 13.6.1 扇出 / 扇入

扇出(Fan-out)——一个输入 channel,多个 goroutine 并行消费:

func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan int)
        outs[i] = ch
        go func(out chan<- int) {
            defer close(out)
            for v := range in {
                out <- v * v   // 每个 worker 做平方
            }
        }(ch)
    }
    return outs
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

扇入(Fan-in)——多个 channel 合并成一个:

func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }

    // 等所有输入 channel 关闭后——关闭输出
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

扇出 + 扇入组合——数据并行处理管道:

func main() {
    in := make(chan int)
    go func() {
        for i := 1; i <= 10; i++ {
            in <- i
        }
        close(in)
    }()

    // 扇出——3 个 worker
    workers := fanOut(in, 3)
    // 扇入——合并结果
    out := fanIn(workers...)

    for v := range out {
        fmt.Println(v)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 13.6.2 worker pool

固定数量的 goroutine 从 channel 中取任务——控制并发度:

func workerPool(jobs <-chan int, results chan<- int, workers int) {
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for job := range jobs {
                fmt.Printf("worker %d 处理 %d\n", id, job)
                time.Sleep(100 * time.Millisecond) // 模拟工作
                results <- job * 2
            }
        }(i)
    }

    // 等所有 worker 完成——关闭结果 channel
    go func() {
        wg.Wait()
        close(results)
    }()
}

func main() {
    jobs := make(chan int, 10)
    results := make(chan int, 10)

    workerPool(jobs, results, 3)

    // 发送任务
    go func() {
        for i := 1; i <= 10; i++ {
            jobs <- i
        }
        close(jobs)
    }()

    // 接收结果
    for r := range results {
        fmt.Println("结果:", r)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 13.6.3 done channel 取消

用 channel 广播"停止"信号——所有 goroutine 同时感知:

func worker(done <-chan struct{}, jobs <-chan int) <-chan int {
    results := make(chan int)
    go func() {
        defer close(results)
        for {
            select {
            case <-done:            // ★ 收到取消信号——立即退出
                fmt.Println("worker 收到取消")
                return
            case job, ok := <-jobs:
                if !ok {
                    return          // jobs 关闭——正常退出
                }
                results <- job * 2
            }
        }
    }()
    return results
}

// 使用
done := make(chan struct{})
results := worker(done, jobs)

// 需要取消时——关闭 done channel
close(done)  // 所有 worker 的 <-done 立刻返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

context 替代 done channel:Go 1.7+ 推荐用 context 代替裸 done chan struct{}——功能更强(超时、deadline、值传播):

func workerV2(ctx context.Context, jobs <-chan int) <-chan int {
    results := make(chan int)
    go func() {
        defer close(results)
        for {
            select {
            case <-ctx.Done():     // ← 和 done channel 一样的效果
                return
            case job, ok := <-jobs:
                if !ok { return }
                results <- job * 2
            }
        }
    }()
    return results
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 13.6.4 信号量模式

用有缓冲 channel 做信号量——控制最大并发数:

// 信号量:容量 3 = 最多 3 个并发
sem := make(chan struct{}, 3)

for i := 0; i < 10; i++ {
    sem <- struct{}{}    // 获取信号量——如果满则阻塞
    go func(id int) {
        defer func() { <-sem }()  // 释放信号量
        fmt.Printf("任务 %d 开始\n", id)
        time.Sleep(time.Second)
        fmt.Printf("任务 %d 完成\n", id)
    }(i)
}
// 任何时候最多 3 个 goroutine 在执行
1
2
3
4
5
6
7
8
9
10
11
12
13

有缓冲 channel 作为信号量的精妙:

  • sem <- struct{}{} → 如果缓冲满(3/3),阻塞——等待
  • <-sem → 释放一个槽——下一个等待者可以进入
  • 零内存开销——struct{} 不占空间

# 13.6.5 综合案例与思考

综合案例:流水线——生成 → 过滤 → 平方 → 输出

package main

import "fmt"

// 阶段 1:生成数字
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// 阶段 2:过滤偶数
func even(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
    }()
    return out
}

// 阶段 3:平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // 管道串联:gen → even → square
    for v := range square(even(gen(1, 2, 3, 4, 5, 6, 7, 8))) {
        fmt.Println(v)
    }
    // 输出:4, 16, 36, 64(偶数的平方)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

案例知识融合:这个案例展示了 Go 的 pipeline 模式——每个阶段都是"接收一个 channel、返回一个 channel"。阶段之间用 goroutine 连接——天然并发。整个管道是"声明式"的:square(even(gen(...))) 读起来像数据流。

思考题:

  1. 这个 pipeline 中,每个阶段都有独立的 goroutine——如果数据量非常大,goroutine 会不会太多?如何优化?
  2. 扇出模式中,多个 worker 共用一个输入 channel——这是并发安全的吗?为什么不需要锁?
  3. 信号量用有缓冲 channel 实现——和 sync.WaitGroup + worker pool 有什么区别?各自适合什么场景?

# 13.7 综合示例:实现一个限流器

用 channel 实现令牌桶限流器——控制每秒请求数:

package main

import (
    "fmt"
    "time"
)

type RateLimiter struct {
    tokens  chan struct{}   // 令牌
    ticker  *time.Ticker    // 定时放令牌
    done    chan struct{}   // 停止信号
}

// 创建限流器——每秒 rate 个请求
func NewRateLimiter(rate int) *RateLimiter {
    rl := &RateLimiter{
        tokens: make(chan struct{}, rate),  // 桶容量 = 速率
        ticker: time.NewTicker(time.Second / time.Duration(rate)),
        done:   make(chan struct{}),
    }
    // 后台 goroutine——定期补充令牌
    go rl.refill()
    return rl
}

func (rl *RateLimiter) refill() {
    // 初始填满
    for i := 0; i < cap(rl.tokens); i++ {
        rl.tokens <- struct{}{}
    }
    for {
        select {
        case <-rl.ticker.C:
            // 尝试补充一个令牌——桶满了就跳过
            select {
            case rl.tokens <- struct{}{}:
            default:
            }
        case <-rl.done:
            rl.ticker.Stop()
            return
        }
    }
}

// 等待获取一个令牌——阻塞直到有令牌可用
func (rl *RateLimiter) Wait() {
    <-rl.tokens
}

func (rl *RateLimiter) Stop() {
    close(rl.done)
}

func main() {
    limiter := NewRateLimiter(5) // 每秒 5 个请求
    defer limiter.Stop()

    // 模拟 10 个请求——每秒只放行 5 个
    for i := 1; i <= 10; i++ {
        limiter.Wait()  // 获取令牌(阻塞)
        fmt.Printf("[%s] 请求 %d 通过\n",
            time.Now().Format("15:04:05.000"), i)
    }
}
// 输出(间隔约 200ms):
// [10:00:00.000] 请求 1 通过
// [10:00:00.000] 请求 2 通过
// [10:00:00.000] 请求 3 通过
// [10:00:00.000] 请求 4 通过
// [10:00:00.000] 请求 5 通过
// [10:00:00.200] 请求 6 通过
// ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

限流器设计要点:

  1. 有缓冲 channel 做令牌桶——容量 = 速率,满了跳过补充(default 分支)
  2. time.Ticker 定期放令牌——精度取决于 ticker 的间隔
  3. Wait() 方法阻塞获取——调用方不需要关心什么时候就绪
  4. stop 机制——通过 done channel 优雅关闭

# 13.8 本章底层原理(简介)

本章涉及的底层机制在**卷三"专栏博客"**中详细展开:

本章概念 卷三对应篇
make(chan T) 创建的 hchan 结构 09.通道 channel 源码剖析
环形缓冲区、sendq/recvq 等待队列 09.通道 channel 源码剖析
select 的随机化实现(runtime.selectgo) 09.通道 channel 源码剖析
channel 与 GMP 调度器的交互 08.GMP 协程调度器机制
close(ch) 的底层状态变更 09.通道 channel 源码剖析

hchan 结构速览(每个 make(chan T) 创建的就是这个):

hchan {
    qcount   uint    // 当前队列中的元素数
    dataqsiz uint    // 环形缓冲区大小(make(chan T, N) 中的 N)
    buf      unsafe.Pointer  // 环形缓冲区指针
    elemsize uint16  // 每个元素大小
    closed   uint32  // 是否已关闭
    sendx    uint    // 发送索引
    recvx    uint    // 接收索引
    recvq    waitq   // 接收等待队列(sudog 链表)
    sendq    waitq   // 发送等待队列(sudog 链表)
    lock     mutex   // 互斥锁
}
1
2
3
4
5
6
7
8
9
10
11
12

卷三第 09 篇会拆解到每个字段、每个 send/recv/close 操作的源码级流程。


# 13.9 Go 新手陷阱 Top 5

# 陷阱 说明
1 关闭一个有发送者还在写的 channel → panic 发送方必须知道"什么时候没人发了"。多发送方用 sync.Once 或协调机制。
2 多个 sender,谁关 channel? 答案:通常由 sender 关,但要协调——用 sync.Once 或引入"协调者" goroutine。
3 用 channel 当 mutex(仅一个槽位)→ 性能不如 sync.Mutex ch <- struct{}{}; ...; <-ch 模拟锁——channel 内部有锁和调度开销,不如 sync.Mutex。
4 for v := range ch 期望某条件后跳出 → 必须 close(ch) 或 break range 只有 close 才退出。条件跳出用 select + ctx.Done() 或 break。
5 非缓冲 channel 在同一 goroutine ch <- 1; <-ch → 死锁 发送方等接收方,接收方等发送方——但都在同一个 goroutine → deadlock。

# 13.10 思考题

  1. channel vs mutex:Go 的谚语是"用通信共享内存"。但 channel 底层也是锁+队列。在哪些场景下用 channel 更好?哪些场景下用 mutex 更好?给出 3 条判断标准。

  2. 无缓冲 channel 的同步开销:每次 ch <- v 和 <-ch 都要做 goroutine 上下文切换——这个开销有多大?在什么情况下"用 channel 比用锁慢"?

  3. select 的随机化实现:Go 保证 select 的 case 是均匀随机的——但它是真随机还是伪随机?如果某个 case 的数据量是另一个的 100 倍——随机化会不会导致那个 case 被"饿死"?

  4. 关闭 channel 的设计选择:Go 设计成"关闭后发送 panic"。为什么不设计成"静默忽略"?如果让你设计,你会怎么选?

  5. 有缓冲 channel 的大小选择:make(chan T, 0) vs make(chan T, 1) vs make(chan T, 100)——这三种容量在语义上有什么不同?什么时候该选 1,什么时候该选 100?


# 13.11 训练题

训练 1:实现一个"超时重试"机制——用 channel + select 组合:

  • 启动一个 goroutine 执行耗时的 doWork()
  • 主 goroutine 等待结果,最多等 3 秒
  • 3 秒内完成 → 打印结果
  • 3 秒超时 → 打印"超时"并退出
  • 使用 select + time.After

训练 2:实现一个"广播器"——一个生产者,多个消费者:

  • 生产者往 channel 发 10 个数字
  • 3 个消费者 goroutine 并行消费——每个数字只被消费一次(不是广播)
  • 用扇出模式实现

训练 3:实现"优雅关闭"——收到 SIGINT(Ctrl+C)后:

  • 不再接受新任务
  • 等正在执行的任务完成
  • 关闭所有 channel
  • 退出程序
  • 使用 os.Signal channel + context.WithCancel

# 13.12 推荐阅读

  • 入门卷:第 12 章 并发 goroutine——goroutine 起步、WaitGroup、context
  • 入门卷:第 14 章 sync 包——Mutex、RWMutex、Once、Pool
  • 卷三:09.通道 channel 源码剖析——hchan 结构、send/recv/close 源码级拆解
  • 卷三:08.GMP 协程调度器机制——goroutine 如何被调度
  • 卷三:16.并发设计模式详解——Pipeline、Fan-out/Fan-in 源码级讲解
  • Go Concurrency Patterns - Rob Pike (opens new window)——经典演讲
  • Go Channels: An Illustrated Guide (opens new window)——图文并茂
上次更新: 2026/06/14, 15:49:50
并发goroutine
同步sync包

← 并发goroutine 同步sync包→

最近更新
01
信号崩溃快速排查
06-15
02
CoreDump破案
06-15
03
perf火焰图实战
06-15
更多文章>
Theme by Vdoing | Copyright © 2019-2026 杨充 | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式