编程进阶网 编程进阶网
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接

杨充

专注编程 · 终身学习者
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接
  • README
  • 性能优化实践

  • 程序编程原理

    • README
    • 序卷方法论

    • 数据的本质

    • 运行时模型

    • 并发的设计

      • README
      • 1.线程前世今生探索
      • 2.并发上下文切换原理
      • 3.线程通信设计思想
        • 1.案例引入
          • 1.1 实时聊天室场景
          • 1.2 完全无通信的代价
          • 1.3 裸共享变量的代价
          • 1.4 加锁与队列的价值
          • 1.5 引出核心矛盾
        • 2.通信设计哲学
          • 2.1 核心设计原则
          • 2.2 通信演进时间线
          • 2.3 共享内存与消息传递的边界
          • 2.4 同步与异步的取舍
          • 2.5 设计决策树
        • 3.通信原理深析
          • 3.1 数学本质:Happens-Before偏序
          • 3.2 三件硬件武器
          • 3.3 同步原语金字塔
          • 3.4 四种通信语义
          • 3.5 设计权衡
        • 4.共享内存模型深度剖析
          • 4.1 互斥的本质与 Mutex 实现
          • 4.2 等待与通知Condition Variable
          • 4.3 信号量及其变种
          • 4.4 内存屏障与原子操作
          • 4.5 读写分离与乐观锁
        • 5.消息传递模型深度剖析
          • 5.1 Channel 的设计本质
          • 5.2 Go channel 源码级剖析
          • 5.3 Actor 模型与 Erlang
          • 5.4 CSP vs Actor 对比
          • 5.5 异步消息与背压
        • 6.Java 线程通信全景
          • 6.1 wait/notify与Object Monitor
          • 6.2 Lock + Condition
          • 6.3 BlockingQueue
          • 6.4 CountDownLatch/Barrier/Phaser
          • 6.5 Semaphore
          • 6.6 Future与CompletableFuture
          • 6.7 通信场景全表
        • 7.C++/Go/Rust 通信对比
          • 7.1 C++:贴着 OS 的细粒度控制
          • 7.2 Go:channel 一统天下
          • 7.3 Rust:所有权防数据竞争
          • 7.4 横向对比总表
        • 8.经典陷阱与反模式
          • 8.1 锁陷阱
          • 8.2 条件变量陷阱
          • 8.3 channel 陷阱
          • 8.4 调试与定位
        • 9.一句话总结
          • 三个层次的认知升华
          • 终极建议
          • 四个关键收获
          • 延伸阅读
        • 9.案例驱动的设计深入
          • 9.1 案例:生产事故看选型错误
          • 9.2 案例:BlockingQueue vs Disruptor
          • 9.3 案例:Go channel关闭模式
        • 10.通信演化史与一句话总结
          • 10.1 通信范式的螺旋上升
          • 10.2 设计哲学三大流派
          • 10.3 一句话总结
      • 4.线程异常设计原理
      • 5.多线程并发经典案例
      • 6.并发Bug源头由来
      • 7.并发编程设计思想
      • 8.并发编程安全设计
      • 9.锁核心设计和思想
      • 10.理解CAS设计由来
      • 11.异步和同步的设计
      • 12.单线程模型的思想
      • 13.协程核心设计思想
      • 14.Actor与CSP并发模型
      • 15.线程池的设计思想
      • 16.线程池设计核心原理
      • 17.线程池使用技巧
      • 18.结构化并发设计思想
    • 内存的真相

    • 交互和系统

  • 稳定性与可靠性

  • 工程化与运维

  • 方案设计思想

  • 专栏
  • 程序编程原理
  • 并发的设计
杨充
2025-07-12
目录

3.线程通信设计思想

# 12.线程通信设计思想

📍 本篇位置:第 3 卷 · 并发之道 · 第 2 篇 🎯 核心矛盾:多个执行流必须协作完成任务 vs 它们各自独立运行、互不可见 —— 不通信就没意义,乱通信就出 bug,慢通信就废了并发的价值 🧭 设计灵魂:通信只有两条路——共享内存 + 同步原语(Java/C++ 的主流,快但易错)vs 消息传递 + Channel/Mailbox(Go/Erlang 的主流,慢但易写);现代语言的最高境界是"写起来像消息传递,跑起来像共享内存" 🌐 跨语言覆盖:Java(wait/notify + AQS + BlockingQueue) · POSIX(pthread_mutex/cond + futex) · C++(condition_variable + atomic + barrier) · Go(channel + select) · Rust(mpsc + ownership) · Erlang(Actor mailbox) 🔗 延伸阅读:← 11.线程前世今生探索 · → 13.线程异常设计原理 · → 15.并发编程设计思想 · → 18.锁核心设计和思想 · → 19.AQS核心思想揭秘 · → 23.协程核心设计思想 · → 24.Actor与CSP并发模型

flowchart TB
    A[多个执行流<br/>各自独立<br/>需要协作] --> B{怎么传递信息}
    B --> C1[共享内存派<br/>变量 + 锁/屏障]
    B --> C2[消息传递派<br/>channel/mailbox]
    C1 --> D1[优势:性能高<br/>劣势:易死锁/数据竞争]
    C2 --> D2[优势:易推理<br/>劣势:拷贝开销]
    D1 & D2 --> E[现代趋势<br/>不要用共享内存来通信<br/>而要通过通信共享内存]
    style E fill:#d4edda
1
2
3
4
5
6
7
8

# 目录介绍

  • 1.案例引入
    • 1.1 实时聊天室场景
    • 1.2 完全无通信的代价
    • 1.3 裸共享变量的代价
    • 1.4 加锁与队列的价值
    • 1.5 引出核心矛盾
  • 2.通信设计哲学
    • 2.1 核心设计原则
    • 2.2 通信演进时间线
    • 2.3 共享内存与消息传递的边界
    • 2.4 同步与异步的取舍
    • 2.5 设计决策树
  • 3.通信原理深析
    • 3.1 数学本质:Happens-Before偏序
    • 3.2 三件硬件武器
    • 3.3 同步原语金字塔
    • 3.4 四种通信语义
    • 3.5 设计权衡
  • 4.共享内存模型深度剖析
    • 4.1 互斥的本质与 Mutex 实现
    • 4.2 等待与通知Condition Variable
    • 4.3 信号量及其变种
    • 4.4 内存屏障与原子操作
    • 4.5 读写分离与乐观锁
  • 5.消息传递模型深度剖析
    • 5.1 Channel 的设计本质
    • 5.2 Go channel 源码级剖析
    • 5.3 Actor 模型与 Erlang
    • 5.4 CSP vs Actor 对比
    • 5.5 异步消息与背压
  • 6.Java 线程通信全景
    • 6.1 wait/notify与Object Monitor
    • 6.2 Lock + Condition
    • 6.3 BlockingQueue
    • 6.4 CountDownLatch/Barrier/Phaser
    • 6.5 Semaphore
    • 6.6 Future与CompletableFuture
    • 6.7 通信场景全表
  • 7.C++/Go/Rust 通信对比
    • 7.1 C++:贴着 OS 的细粒度控制
    • 7.2 Go:channel 一统天下
    • 7.3 Rust:所有权防数据竞争
    • 7.4 横向对比总表
  • 8.经典陷阱与反模式
    • 8.1 锁陷阱
    • 8.2 条件变量陷阱
    • 8.3 channel 陷阱
    • 8.4 调试与定位
  • 9.一句话总结

# 1.案例引入

# 1.1 实时聊天室场景

场景设定:你正在写一个实时聊天室服务器,100 个客户端同时在线,每个客户端可以发送消息,服务器需要把每条消息广播给其他 99 个客户端。架构上自然分成两类线程:

┌─── 接收线程组(100 个,每个对应一个客户端)───┐
│   accept message from socket                │
│         ↓                                    │
│   put into ??? for broadcast                │
└──────────────────────────────────────────────┘
                  ↓
┌─── 广播线程组(少量几个)─────────────────────┐
│   take message from ???                     │
│   send to all other 99 clients              │
└──────────────────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10

中间那个 ???——就是线程通信要解决的问题。100 个接收线程要把消息"传递"给广播线程,让广播线程"知道"有新消息要处理。这看似简单——但下面会看到,怎么设计这个 ???,决定了整个聊天室的对错和性能。

# 1.2 完全无通信的代价

最朴素的想法:广播线程"自己去看"接收线程的状态。

// 接收线程把消息存自己的 buffer
class Receiver {
    String latestMessage = null;     // 普通字段
    
    public void onMessage(String msg) {
        latestMessage = msg;          // 收到就放这里
    }
}

// 广播线程一直轮询
class Broadcaster {
    void run() {
        while (true) {
            for (Receiver r : receivers) {
                if (r.latestMessage != null) {
                    broadcast(r.latestMessage);
                    r.latestMessage = null;
                }
            }
            // 不 sleep?CPU 100%
            // sleep?延迟高
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

这种"无通信"写法埋了 5 颗大雷:

  • 雷一:可见性问题。latestMessage 是普通字段,接收线程写入后,广播线程可能永远看不到——因为 CPU 缓存让两个核看到的值不一致
  • 雷二:竞态条件。如果接收线程刚写入 msg1,立刻又写入 msg2,广播线程可能两条都没读到(被覆盖了)
  • 雷三:忙轮询烧 CPU。广播线程不停扫描 100 个 receiver,CPU 永远占满,整机性能崩
  • 雷四:sleep 加大延迟。改成 Thread.sleep(10),CPU 是省了,但消息广播至少延迟 10ms
  • 雷五:撕裂写。如果 latestMessage 是个对象,接收线程"半构造"时被广播线程读到,会拿到不完整数据

这就是为什么"通信"这件事必须用专门的原语来做——你不能指望"两个线程靠观察对方的内存就能协作"。多核 CPU 的物理特性(缓存、乱序、写缓冲)让这条路从根上就不可走。

小结:完全不用同步原语的"通信",本质上是让程序员独自对抗 CPU 的所有底层优化——绝大多数情况下,这场对抗你赢不了。

# 1.3 裸共享变量的代价

升级版:用 volatile 解决可见性,再加个标志:

class Receiver {
    volatile String latestMessage = null;     // ✅ 加了 volatile
    volatile boolean hasNew = false;
    
    public void onMessage(String msg) {
        latestMessage = msg;
        hasNew = true;
    }
}

class Broadcaster {
    void run() {
        while (true) {
            for (Receiver r : receivers) {
                if (r.hasNew) {                // 看起来对了?
                    String msg = r.latestMessage;
                    r.hasNew = false;
                    broadcast(msg);
                }
            }
            Thread.sleep(1);                   // 还是要 sleep
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

volatile 解决了可见性,但还有 4 个新问题:

  • 问题一:仍有竞态。hasNew = true 和 latestMessage = msg 不是原子的——广播线程可能看到 hasNew=true 但 latestMessage 还是旧值
  • 问题二:消息丢失。接收线程连续两次 onMessage,第一条还没被消费就被覆盖
  • 问题三:忙轮询依然存在。volatile 只解决可见性,没解决"如何高效等待"——广播线程还得不停扫描
  • 问题四:扩展性差。100 个接收线程对应 100 个 receiver——广播线程要扫 100 遍,规模一大就崩

根因:volatile 是最低级的通信原语——它只保证"我写的另一个线程能看到",但不保证"原子读改写",更不提供"高效等待"。这就像两个人能听见对方说话,但不知道何时该听、何时该说。

小结:volatile 解决了"能不能听到",但通信的核心问题是"何时听、听完干什么、听不到时该怎么等"——后面这一整套,必须靠更高层的原语(锁、条件变量、队列)才能解决。

# 1.4 加锁与队列的价值

最终方案:用阻塞队列。

class ChatServer {
    // 一个线程安全的阻塞队列:100 个接收线程都往这里塞
    BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
    
    // 接收线程
    void onMessage(String msg) {
        messageQueue.put(msg);                   // 满了自动阻塞
    }
    
    // 广播线程
    void broadcastLoop() {
        while (true) {
            String msg = messageQueue.take();    // 空了自动阻塞,省 CPU
            for (Client c : clients) c.send(msg);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

这一改造解决了之前所有问题:

问题 之前 改造后
可见性 volatile 才能解决 put/take 内部隐式 happens-before
竞态条件 多字段无法原子 put/take 内部加锁,原子操作
消息丢失 标志位会被覆盖 队列 FIFO,零丢失
忙轮询 sleep 导致延迟 vs CPU take 内部 futex_wait,零 CPU 消耗 + 零延迟唤醒
扩展性 100 个 receiver 要扫 一个队列搞定全部

性能对比(100 客户端 × 1000 msg/s 测试):

方案 广播线程 CPU 平均延迟 消息丢失
volatile + 轮询 + sleep(10ms) ~80% ~12 ms 高
volatile + 忙轮询 100% ~50 μs 中
BlockingQueue < 5% < 100 μs 零

这就是线程通信原语的真正价值——把"通信"这件事的正确性、效率、简洁性三者一起优化到位。一行 messageQueue.take() 替代了之前几十行充满坑的代码。

小结:线程通信的价值不只是"能传消息",而是用很少的代码,同时解决可见性、原子性、有效等待、有序性这 4 个问题。BlockingQueue/Channel 就是这种价值的典型体现。

# 1.5 引出核心矛盾

把 1.2、1.3、1.4 三种方案放一起:

维度 无通信 volatile 共享 BlockingQueue
正确性 ❌ 全错 部分正确 ✅
CPU 占用 100% 100% / 高延迟 接近 0
代码复杂度 看似简单实则不能用 仍复杂 简单
扩展性 差 差 好
延迟 不可控 不可控 < 100 μs

线程通信设计的核心矛盾就是这张表——正确性、性能、简洁性三者很难同时达到。但好的通信原语能在三者之间找到最佳平衡。

flowchart LR
    A[硬件诉求<br/>多核间隔离<br/>缓存独立] --> C[通信设计的核心问题]
    B[业务诉求<br/>多线程协作<br/>顺序简单] --> C
    C --> D1[问题一<br/>可见性<br/>我写的别人能看到吗]
    C --> D2[问题二<br/>原子性<br/>多字段同时改]
    C --> D3[问题三<br/>有序性<br/>谁先谁后]
    C --> D4[问题四<br/>等待效率<br/>不烧CPU怎么等]
    style C fill:#fff3cd
1
2
3
4
5
6
7
8

全文要回答的就是这 4 个子问题:从硬件 CAS 到 OS futex,从 Java 的 wait/notify 到 Go 的 channel——半个世纪的工业实践,都在打磨"线程之间该怎么说话"。


# 2.通信设计哲学

# 2.1 核心设计原则

回到第 1 章那个聊天室——好的通信原语都遵循统一原则。先看一段反例(这是早期 Java 1.0 的设计):

// Java 1.0 的设计:所有 Object 自带 wait/notify
class SharedBuffer {
    int data;
    boolean hasData;
    
    public synchronized void produce(int value) throws InterruptedException {
        if (hasData) wait();              // ❌ 用 if 而非 while
        data = value;
        hasData = true;
        notify();                         // ❌ notify 而非 notifyAll,可能错唤醒
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

这段代码在 Java 1.0 时代被无数书"作为标准答案",但它至少有 4 个坑:

  • 用 if 不防虚假唤醒(spurious wakeup)
  • notify() 可能唤醒生产者而非消费者
  • 没有公平性保证
  • 一个监视器只能挂一个等待队列

后来 Java 5 引入 Lock + Condition 才彻底解决这些问题。这个反例和它的演化告诉我们三条铁律:

flowchart TD
    A[通信设计哲学] --> B[原子完整原则]
    A --> C[最小通知原则]
    A --> D[失败安全原则]
    
    B --> B1[一次通信传完整状态<br/>不要拆成多个字段]
    B --> B2[hasData + data 拆开 → 错<br/>用一个 BlockingQueue → 对]
    
    C --> C1[只唤醒真正能继续的线程<br/>多 Condition 替代 notifyAll]
    C --> C2[避免惊群效应]
    
    D --> D1[等待要带 while<br/>防虚假唤醒]
    D --> D2[传输要不丢失<br/>哪怕异常也能保证不漏]
1
2
3
4
5
6
7
8
9
10
11
12
13
  • 原子完整原则:通信传递的应该是"一份完整、不可拆的状态"——把"标志位 + 数据"拆成两个字段是反模式,正确做法是用 BlockingQueue 这种整体原子的容器
  • 最小通知原则:唤醒应该精准——只唤醒"真正能继续工作"的线程。notify() 比 notifyAll() 高效但容易错唤醒;Lock + 多 Condition 才是工程级答案
  • 失败安全原则:通信代码必须考虑"异常路径"——while 循环防虚假唤醒、try-finally 释放锁、超时机制兜底死锁

小结:通信不是"把数据从 A 复制到 B"那么简单,而是在多核 CPU 的混乱中,用最小代价构建一条"原子、有序、可见、能等"的通道。这三条原则是从无数生产事故中沉淀下来的工程基线。

# 2.2 通信演进时间线

通信原语经过半个世纪迭代,每一步都解决了前一步的问题:

timeline
    title 线程/进程通信演进史
    section 1960s-70s 萌芽期
        Dijkstra 信号量 : 1965 提出 P/V 原语<br/>通信原语之祖
        Hoare Monitor : 1974 提出监视器<br/>条件变量诞生
        Hoare CSP : 1978 提出 Communicating Sequential Processes<br/>消息传递理论奠基
    section 1980s 共享内存时代
        Mach IPC : 微内核消息传递
        SysV IPC : Unix 共享内存/信号量/消息队列
        POSIX threads : pthread_mutex / cond
    section 1990s 标准化
        Java 1.0 synchronized : 1995 对象监视器进入主流语言
        Linux futex : 2002 用户态 + 内核态混合<br/>性能数量级提升
    section 2000s 高级抽象
        Doug Lea AQS : 2004 Java 5 引入 j.u.c<br/>BlockingQueue 等大量工具
        Erlang OTP : Actor 模型工业级
    section 2010s 消息传递崛起
        Go channel : 2009 CSP 理论落地<br/>chan 关键字
        Disruptor : 2011 LMAX 无锁队列<br/>性能极限挑战
        Kotlin channel : 协程 + channel 组合
    section 2020s 异步统一
        Java VirtualThread : 同步代码自动异步
        Rust async/await : 零成本异步通信
        Project Loom : Java 结构化并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

关键转折点的历史动机:

年份 事件 解决的痛点
1965 Dijkstra 信号量 第一个通用同步原语,P/V 操作至今未变
1974 Hoare Monitor 信号量难写易错,引入"自动加锁"的高级抽象
1978 Hoare CSP 论文 共享内存难推理,提出"通过通信共享"
2002 Linux futex pthread_mutex 进内核太慢,做了"快路径在用户态"的优化
2004 Java AQS wait/notify 太低级,提供统一框架支撑高级工具
2009 Go goroutine + chan 工业界第一次让 CSP 成为主流语言一等公民
2011 Disruptor mutex 在百万 QPS 下崩,需要无锁高吞吐方案

演化的总方向:

1960s: 信号量 (低级、难用、易错)
   ↓
1974: Monitor (高级、自动加锁,但单语言绑定)
   ↓
1978: CSP (理论上更优,但要 30 年才工业落地)
   ↓
2009: Go channel (CSP 终于落地)
   ↓
2020s: 虚拟线程 + 结构化并发 (统一同步异步)
1
2
3
4
5
6
7
8
9

# 2.3 共享内存与消息传递的边界

通信设计的根本分叉点:两个线程交换数据,应该共享同一块内存?还是把数据拷贝过去?

flowchart LR
    A[共享内存派<br/>Java/C++/C#] -->|直接读写共享变量| B[互斥锁/条件变量/原子操作]
    C[消息传递派<br/>Go/Erlang] -->|拷贝数据 + 同步隐含| D[Channel/Mailbox]
    
    B --> E[优势:零拷贝、最快<br/>劣势:易错、难调试]
    D --> F[优势:易推理、天然安全<br/>劣势:拷贝开销]
    
    style B fill:#f8d7da
    style D fill:#d4edda
1
2
3
4
5
6
7
8
9

两者的本质区别:

维度 共享内存 消息传递
数据所有权 共享,无主 单一所有者,传递所有权
同步机制 显式(锁、屏障) 隐含(send/recv 自带 HB)
错误倾向 数据竞争 / 死锁 消息丢失 / 通道死锁
性能 高(零拷贝) 中(拷贝/调度开销)
调试难度 高(heisenbug 多) 中(消息流可追踪)
扩展到多机 不可能 自然(消息可跨网络)

共享内存的"快"代价是什么?——心智负担。每次写共享变量都要问自己:

我加锁了吗?
锁的范围对吗?
其他线程能看到我的写入吗?(可见性)
其他线程读到的是完整的吗?(原子性)
代码的执行顺序和我想的一样吗?(有序性)
万一异常,锁能正确释放吗?
1
2
3
4
5
6

而消息传递的"慢"换来了什么?——可推理性。channel.send(x) 这一行就告诉你:

我把 x 完整地、原子地、有序地、安全地交给了对方
我不用管对方的内存模型
我不用管 CPU 缓存
我不用管编译器重排
1
2
3
4

Go 的著名格言精准捕捉了这个权衡:

"Don't communicate by sharing memory; share memory by communicating."

(不要通过共享内存来通信,而要通过通信来共享内存)

实战中的"混合策略":

// Go 标准做法:channel 主导,atomic/mutex 辅助
type Worker struct {
    inbox  chan Task        // 主通信通道(消息传递)
    cache  sync.Map         // 共享缓存(共享内存)
    count  atomic.Int64     // 统计计数(共享内存)
}
1
2
3
4
5
6
  • 跨"线程边界" 用 channel:天然安全,复杂度低
  • 线程内的高频小操作(计数、缓存)用 atomic / Mutex:性能优先

# 2.4 同步与异步的取舍

通信原语的另一个关键维度:发送者发完消息后,要不要等接收者收到?

维度 同步通信 异步通信
代表 无缓冲 channel、Future.get 缓冲 channel、消息队列
发送行为 阻塞,直到对方收到 立即返回(如果缓冲未满)
耦合度 高(双方时序绑定) 低(双方解耦)
背压 自然形成 需要显式做
典型场景 RPC、严格顺序 日志、事件、扇出处理

同步通信的代码长这样:

ch := make(chan int)        // 无缓冲
go func() {
    ch <- 42                 // 阻塞,直到有人 take
}()
value := <-ch                // 立即取到 42
1
2
3
4
5

异步通信的代码长这样:

ch := make(chan int, 1000)   // 缓冲 1000
go func() {
    ch <- 42                 // 立即返回(缓冲未满)
}()
// 后续可能很久才有人取
value := <-ch
1
2
3
4
5
6

同步 vs 异步的工程权衡:

  • 同步适合"发送者必须知道结果"的场景——比如 RPC 调用、事务提交
  • 异步适合"发送者只管发出去"的场景——比如日志、通知、事件处理
  • 永远不能用异步消除背压——异步会把"压力"从发送者转嫁到队列,队列不是无限的
  • 背压必须显式设计:缓冲区满了应该 drop?block?回退?这是个工程决策

# 2.5 设计决策树

flowchart TD
    A[要设计/选择通信方式] --> B{数据量?}
    B -->|单值/标志| C[atomic / volatile]
    B -->|结构化| D{发送频率?}
    
    D -->|高频| E[Channel/Queue<br/>异步缓冲]
    D -->|低频/事件型| F[Condition Variable<br/>wait/signal]
    
    A --> G{跨进程吗?}
    G -->|否| H[语言原生原语<br/>BlockingQueue/chan]
    G -->|是| I{跨机器吗?}
    
    I -->|否| J[共享内存 / Unix socket<br/>本地 IPC]
    I -->|是| K[MQ/RPC/HTTP<br/>网络通信]
    
    A --> L{要严格顺序?}
    L -->|是| M[同步阻塞 channel<br/>or Future]
    L -->|否| N[异步队列<br/>+ 背压策略]
    
    style E fill:#d4edda
    style H fill:#d4edda
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

实战经验三原则:

  1. 能用消息传递就别共享内存:默认用 channel/queue,只在性能不够时才考虑共享变量+锁
  2. 能用高级原语就别裸操底层:BlockingQueue > Lock+Condition > wait/notify > volatile+CAS
  3. 粒度要刚好匹配场景:用 synchronized 包整个方法 vs 用 AtomicLong 加一个计数,性能差 100 倍

# 3.通信原理深析

# 3.1 数学本质:Happens-Before偏序

并发程序中,多个线程的操作形成一个事件集合。如果没有任何同步,这些事件之间没有确定的先后顺序——它们是"并发的"。

线程通信的本质就是在这个无序集合中人为插入顺序约束,形式化地说:

建立 Happens-Before 偏序关系:如果 A happens-before B,则 A 的所有内存效果对 B 可见。

无同步:    A₁  A₂  A₃          B₁  B₂  B₃
           (两组事件之间无序,结果不确定)

加同步后:  A₁  A₂  A₃ ──HB──→ B₁  B₂  B₃
           (A₃ happens-before B₁, A的所有写对B可见)
1
2
3
4
5

任何编程语言的任何同步原语,做的事情都且仅是:在两个线程的操作之间插入一条 HB 边。 区别只是插入方式不同。

Java JMM 规定的 Happens-Before 规则(JSR-133):

规则 含义
程序顺序规则 单线程内,前面的语句 HB 后面的语句
管程锁规则 解锁 HB 后续对同一锁的加锁
volatile 规则 volatile 写 HB 后续 volatile 读
线程启动规则 start() HB 子线程的所有操作
线程终结规则 子线程内所有操作 HB join() 返回
传递性 A HB B,B HB C,则 A HB C

HB 关系的工程意义:

// Java 中正确的 "传递数据" 模式
class Holder {
    volatile boolean ready = false;
    int data = 0;
}

// 线程 A:写
holder.data = 42;          // ① 普通写
holder.ready = true;       // ② volatile 写

// 线程 B:读
if (holder.ready) {        // ③ volatile 读
    int x = holder.data;   // ④ 一定能读到 42
}

// 为什么 ④ 能读到 42?
// 因为:① HB ②(程序顺序)
//      ② HB ③(volatile 规则)
//      ③ HB ④(程序顺序)
//      所以 ① HB ④(传递性)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

这就是"happens-before"在工程中的真实价值——它给你一种推理工具,让你在不看汇编的情况下判断多线程代码是否正确。

# 3.2 三件硬件武器

剥到最底层,CPU 提供给操作系统和语言设计者的全部工具只有三样:

武器 1:原子指令(Atomic RMW)

cmpxchg [addr], expected, desired   ; Compare-And-Swap
lock xadd [addr], val               ; Fetch-And-Add
xchg    [addr], val                 ; Atomic Exchange
1
2
3

一条不可被打断的指令完成"读-改-写"。这是所有锁、信号量、无锁结构的基石。

武器 2:内存屏障(Memory Barrier / Fence)

mfence    ; 全屏障:之前的读写全部完成后,才能执行之后的读写
lfence    ; 读屏障
sfence    ; 写屏障
1
2
3

禁止 CPU 重排序 + 强制缓存刷新。这是"可见性"的保证。

武器 3:阻塞/唤醒(Futex 等 OS 原语)

futex_wait(addr, expected)   // 如果 *addr==expected,让线程睡眠
futex_wake(addr, n)          // 唤醒 n 个在 addr 上睡眠的线程
1
2

让线程主动让出 CPU(而非忙等),这是效率的关键。

三者的关系:

                    原子指令 ─── 保证操作的不可分割性
                        │
全部同步机制 ═══════════╪═══ 内存屏障 ─── 保证操作的可见性和有序性
                        │
                    阻塞/唤醒 ─── 保证等待的效率(不烧CPU)
1
2
3
4
5

futex 是 Linux 通信性能的转折点——2002 年之前,Linux 的 pthread_mutex 每次加锁都要进内核(即使没竞争),性能很差。futex(Fast User-space muTEX)的设计是:

// 加锁(伪代码)
void mutex_lock(int* lock) {
    if (atomic_cmpxchg(lock, 0, 1) == 0) {
        return;                          // 快路径:用户态 CAS 成功,无 syscall
    }
    // 慢路径:有竞争,进内核等待
    while (true) {
        if (atomic_cmpxchg(lock, 0, 1) == 0) return;
        futex_wait(lock, 1);             // 进内核睡
    }
}

// 解锁
void mutex_unlock(int* lock) {
    atomic_store(lock, 0);
    if (有人在等)                          // 这个判断也是无 syscall 的
        futex_wake(lock, 1);             // 仅在有人等时进内核唤醒
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

关键点:99% 无竞争的场景,加锁/解锁完全在用户态完成——只有真正发生竞争时才进内核。这让 mutex 的"无竞争开销"从微秒级降到了纳秒级(~25ns)。

# 3.3 同步原语金字塔

所有语言的同步原语都是同一棵树:

                        ┌─────────────────────┐
                        │  语义意图 (What)      │
                        │  "等数据" "互斥" "通知" │
                        └──────────┬──────────┘
                                   │
          ┌──────────┬──────────┬──┴───┬──────────┬──────────┐
          │          │          │      │          │          │
       Mutex    Semaphore   Channel  Barrier   Future   RWLock
       (互斥)   (计数)     (传递)   (汇合)   (异步结果) (读写分离)
          │          │          │      │          │          │
          └──────────┴──────────┴──┬───┴──────────┴──────────┘
                                   │
                        ┌──────────┴──────────┐
                        │  同步协议 (How)       │
                        │  Happens-Before 关系  │
                        └──────────┬──────────┘
                                   │
                    ┌──────────────┼──────────────┐
                    │              │              │
               原子指令         内存屏障       阻塞/唤醒
               (CAS)          (Fence)        (Futex)
                    │              │              │
                    └──────────────┼──────────────┘
                                   │
                        ┌──────────┴──────────┐
                        │  硬件 (Mechanism)     │
                        │  缓存一致性协议(MESI) │
                        │  CPU流水线 + 总线锁   │
                        └─────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

每种语言只是在这棵树的不同层次切了一刀,把下面的复杂性封装起来:

语言 暴露的抽象层 设计哲学
C/C++ 最底层——atomic + memory_order 直接映射到屏障 给你全部控制权,后果自负
Java 中间层——JMM 定义 HB 规则,volatile/synchronized 语言规范定义内存模型,屏蔽硬件差异
Go 高层——channel 为主,sync.Mutex 为辅 "用消息传递代替共享内存"
Rust 底层+类型系统——atomic + Send/Sync trait 编译期保证不会数据竞争
Erlang 最高层——actor 消息传递,无共享状态 进程隔离,物理上不可能竞争

# 3.4 四种通信语义

所有通信场景,抽象到极致只有四种语义:

语义 1:互斥(Mutual Exclusion)——"同一时刻只能一个人进"

本质: 将并发操作序列化
实现: CAS抢锁 + 抢不到就 futex 睡
代表: mutex, synchronized, Lock
1
2
3

语义 2:等待/通知(Wait/Signal)——"条件满足了叫我"

本质: 一个线程阻塞,另一个线程改变条件后唤醒它
实现: futex_wait(条件不满足就睡)+ futex_wake(条件满足就唤醒)
代表: condition_variable, wait/notify, channel recv
1
2
3

语义 3:数据传递(Data Transfer)——"把这个给你"

本质: 将数据的可见性从一个线程传播到另一个线程
实现: 写入数据 + release 屏障 → acquire 屏障 + 读取数据
代表: channel send/recv, pipe write/read, future set/get
1
2
3

语义 4:可见性发布(Publication)——"我改好了,你可以看了"

本质: 一个线程的写入对另一个线程变得可见
实现: atomic store(release) → atomic load(acquire)
代表: volatile, atomic, memory_order_release/acquire
1
2
3

它们的嵌套关系:

互斥 = 可见性发布(锁状态) + 等待/通知(抢不到就等)
Channel = 互斥(保护缓冲区) + 等待/通知(空则等/满则等) + 数据传递
Future = 可见性发布(结果) + 等待/通知(结果未就绪就等)
Barrier = 可见性发布(计数器) + 等待/通知(最后一个到达时唤醒全部)
1
2
3
4

理解了这四种语义就理解了所有通信原语——以后看到任何同步工具,问自己:它在做哪几种语义的组合?

# 3.5 设计权衡

所有语言在设计线程通信时,面对的核心权衡只有两个:

权衡 1:忙等(Spinning) vs 阻塞(Blocking)

忙等: while (!ready) {}          // 不放弃CPU,延迟极低,但烧CPU
阻塞: futex_wait(&ready, false)  // 放弃CPU,省能耗,但唤醒有延迟(~微秒)

实际做法: 自适应——先短暂自旋,超时后再阻塞
         (Java的synchronized, Linux的mutex都是这么做的)
1
2
3
4
5

自旋的合理性:如果锁的持有时间 < 一次上下文切换(~1μs),那自旋反而更划算。Linux mutex、JVM synchronized 都用 adaptive spinning——先自旋几次(~30 次),不行再陷入内核。

权衡 2:共享(Sharing) vs 隔离(Isolation)

共享: 线程直接读写同一块内存 → 快,但需要同步,容易出bug
隔离: 线程各自持有数据,通过消息交换 → 慢(拷贝开销),但天然安全

C++/Java: 默认共享,程序员负责同步
Go:       鼓励隔离(channel),允许共享(sync.Mutex)
Rust:     默认隔离(ownership),编译器检查共享是否安全
Erlang:   强制隔离,物理上无法共享
1
2
3
4
5
6
7

# 4.共享内存模型深度剖析

# 4.1 互斥的本质与 Mutex 实现

Mutex 的核心抽象——把"多个线程并发访问"序列化成"一个接一个访问"。

Mutex 实现的 4 个层次:

flowchart TD
    A[Mutex 实现层次] --> B[① 完全自旋锁<br/>spinlock]
    A --> C[② 完全阻塞锁<br/>纯 syscall]
    A --> D[③ 自适应锁<br/>先自旋后阻塞]
    A --> E[④ futex 优化<br/>无竞争零开销]
    
    B --> B1[OS 内核短临界区使用<br/>用户态浪费 CPU]
    C --> C1[每次都进内核<br/>无竞争场景太慢]
    D --> D1[平衡 CPU 和延迟<br/>JVM synchronized 在用]
    E --> E1[Linux pthread_mutex<br/>无竞争 ~25ns]
    
    style E fill:#d4edda
1
2
3
4
5
6
7
8
9
10
11
12

futex-based Mutex 的快路径(Linux glibc pthread_mutex 简化):

// 加锁:99% 场景一条 CAS 搞定
int mutex_lock(pthread_mutex_t* m) {
    if (atomic_cmpxchg(&m->state, UNLOCKED, LOCKED) == UNLOCKED) {
        return 0;                              // 快路径:用户态成功
    }
    // 慢路径
    return mutex_lock_slow(m);
}

int mutex_lock_slow(pthread_mutex_t* m) {
    int spin = 30;
    while (spin-- > 0) {                       // 先自适应自旋
        if (atomic_cmpxchg(&m->state, UNLOCKED, LOCKED) == UNLOCKED) 
            return 0;
        cpu_pause();
    }
    // 自旋失败,标记为有等待者
    atomic_store(&m->state, LOCKED_CONTENDED);
    while (true) {
        futex_wait(&m->state, LOCKED_CONTENDED);  // 进内核等待
        if (atomic_cmpxchg(&m->state, UNLOCKED, LOCKED_CONTENDED) == UNLOCKED) 
            return 0;
    }
}

// 解锁
void mutex_unlock(pthread_mutex_t* m) {
    int prev = atomic_exchange(&m->state, UNLOCKED);
    if (prev == LOCKED_CONTENDED) {            // 有人在等
        futex_wake(&m->state, 1);              // 唤醒一个
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

关键性能数据(x86_64 Linux):

操作 耗时
无竞争加锁 ~25 ns
无竞争解锁 ~10 ns
有竞争(自旋成功) ~100-500 ns
有竞争(陷入内核) ~1-3 μs
跨核唤醒 ~5-10 μs

Mutex 设计的关键决策:

  • 公平 vs 非公平:FIFO 公平锁防止饥饿但慢,非公平锁吞吐高但有线程被饿死风险
  • 可重入 vs 不可重入:可重入支持递归调用(同一线程多次加锁),代价是要记 owner
  • 错误检查 vs 普通:错误检查模式能检测重复 unlock 等错误,但慢一点

Java synchronized vs ReentrantLock:

维度 synchronized ReentrantLock
基础实现 对象头 markword + Monitor AQS + futex
可中断 ❌ ✓ lockInterruptibly()
超时 ❌ ✓ tryLock(timeout)
公平性 非公平 可选
多 Condition 只有一个 wait set 多个 newCondition()
写法 简单 必须 try-finally

# 4.2 等待与通知Condition Variable

Condition Variable 的核心问题:线程"等待某个条件成立"时,不要忙等——但又要在条件变化时被精确唤醒。

没有 condvar 时:
while (!condition) {
    // 怎么等?
    sleep(1);   // 太慢,且不知道睡多久
    yield();    // 没用,调度器还是会回来
    // 都不对
}

有 condvar 时:
mutex.lock();
while (!condition) {
    condvar.wait(mutex);   // 原子地:释放锁 + 阻塞 + 醒来后重新加锁
}
mutex.unlock();

// 另一个线程修改条件后:
mutex.lock();
condition = true;
condvar.signal();          // 唤醒一个等待者
mutex.unlock();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

condvar.wait 的"原子三步" 是关键设计:

// 必须是不可分割的:释放锁 → 阻塞 → 重新加锁
// 如果"释放锁"和"阻塞"之间被打断,可能错过 signal → 永远不被唤醒(lost wakeup)

// pthread_cond_wait 简化实现
int pthread_cond_wait(pthread_cond_t* cv, pthread_mutex_t* mu) {
    int seq = atomic_load(&cv->seq);     // 记录当前 signal 序号
    pthread_mutex_unlock(mu);            // 释放锁
    futex_wait(&cv->seq, seq);           // 如果序号没变就睡
                                          // futex_wait 是原子的:
                                          // "比较 + 睡" 之间不会被打断
    pthread_mutex_lock(mu);              // 醒来重新加锁
    return 0;
}

int pthread_cond_signal(pthread_cond_t* cv) {
    atomic_add(&cv->seq, 1);             // 序号 +1
    futex_wake(&cv->seq, 1);             // 唤醒一个
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

核心设计要点:用 seq 序号而不是布尔值——避免 ABA 问题,且支持多次 signal 累计。

为什么必须用 while 而不是 if?三个原因:

  1. 虚假唤醒(Spurious Wakeup):操作系统可能在没有 signal 的情况下唤醒等待者(POSIX 规范允许)
  2. 多消费者竞争:3 个消费者等数据,1 个 signal 唤醒了第一个,第二个被唤醒后看到没数据,必须再次等
  3. 唤醒到加锁之间状态可能变化:signal 后另一个线程可能抢先消费了数据
// ❌ 错误:用 if
if (!has_data) condvar.wait(mu);
consume();   // 可能 has_data 已经被别人消费了

// ✅ 正确:用 while
while (!has_data) condvar.wait(mu);
consume();   // 醒来一定有数据
1
2
3
4
5
6
7

# 4.3 信号量及其变种

信号量 = 一个非负整数 + 两个原子操作(P/V)——Dijkstra 1965 年发明,至今仍是计数同步的核心。

sem_init(&s, 0, N);     // 初始值 N

// 线程 A
sem_wait(&s);           // P 操作:s--,如果 s<0 则阻塞
// 临界区
sem_post(&s);           // V 操作:s++,唤醒一个等待者
1
2
3
4
5
6

信号量的两种用法:

用法 1:互斥锁(信号量初值 = 1)

sem_init(&s, 0, 1);     // 二元信号量

// 多个线程
sem_wait(&s);           // 同 mutex_lock
critical_section();
sem_post(&s);           // 同 mutex_unlock
1
2
3
4
5
6

用法 2:限流 / 资源池(信号量初值 = N)

sem_init(&s, 0, 5);     // 最多 5 个并发

// 100 个线程
sem_wait(&s);           // 超过 5 个就阻塞
do_work();
sem_post(&s);
1
2
3
4
5
6

信号量 vs Mutex 的本质区别:

维度 Mutex Semaphore
状态 二值(锁/未锁) 计数
释放者 必须是加锁者 任何人都能 post
典型场景 保护临界区 限流、生产消费
是否记 owner 是(可重入) 否

信号量典型用法:限流连接池:

// Java Semaphore
Semaphore connPool = new Semaphore(10);   // 最多 10 个连接

void handleRequest() {
    connPool.acquire();              // 获取一个许可
    try {
        Connection conn = pool.get();
        useConnection(conn);
    } finally {
        connPool.release();          // 归还
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

# 4.4 内存屏障与原子操作

为什么需要内存屏障?看一段经典反例:

// 看似简单的发布模式
int data = 0;
bool ready = false;

// 线程 A
data = 42;                // ①
ready = true;             // ②

// 线程 B
while (!ready) {}         // ③
print(data);              // ④ 可能打印 0!
1
2
3
4
5
6
7
8
9
10
11

为什么会打印 0?三种可能:

  1. CPU 乱序执行:Core 0 把 ② 的写入提前到 ① 之前(store-store 重排)
  2. 缓存延迟:Core 0 写入 ① 还在 Store Buffer 里没刷到 cache
  3. 编译器重排:编译器把 ② 移到 ① 之前

内存屏障解决方案:

// 加上屏障
data = 42;
atomic_thread_fence(memory_order_release);   // 之前的写不能重排到之后
ready = true;

// 线程 B
while (!ready) {}
atomic_thread_fence(memory_order_acquire);   // 之后的读不能重排到之前
print(data);                                  // 一定打印 42
1
2
3
4
5
6
7
8
9

C++ 内存序模型——六种顺序,从弱到强:

memory_order 语义 性能 用途
relaxed 仅保证原子,无屏障 最快 计数器、统计
consume 数据依赖的 acquire 快(已废弃) 罕用
acquire 之后的读写不能重排到之前 中 读屏障
release 之前的读写不能重排到之后 中 写屏障
acq_rel acquire + release 中 RMW 操作
seq_cst 全局顺序一致 慢(默认) 安全但慢

典型用例:

// 无锁计数器:用 relaxed(不需要 HB 关系)
std::atomic<long> counter{0};
counter.fetch_add(1, std::memory_order_relaxed);

// 发布对象:release 写 + acquire 读
std::atomic<Data*> shared{nullptr};
Data* d = new Data(42);
shared.store(d, std::memory_order_release);

Data* p = shared.load(std::memory_order_acquire);
if (p) p->use();   // 一定能看到 d 的初始化

// CAS 通常用 acq_rel
shared.compare_exchange_strong(expected, desired, 
    std::memory_order_acq_rel);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 4.5 读写分离与乐观锁

问题:多读单写场景下,Mutex 浪费——读和读其实可以并行。

ReadWriteLock:把锁拆成读锁和写锁,读读共享,读写/写写互斥:

读锁状态:      [R | R | R]   多个读者并行
写锁状态:      [W]            单个写者独占
读锁+写锁:     冲突,互斥
1
2
3

Java ReentrantReadWriteLock:

ReadWriteLock rwLock = new ReentrantReadWriteLock();

// 读:可并发
rwLock.readLock().lock();
try {
    return data.get(key);
} finally {
    rwLock.readLock().unlock();
}

// 写:独占
rwLock.writeLock().lock();
try {
    data.put(key, value);
} finally {
    rwLock.writeLock().unlock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

StampedLock(Java 8)—— 引入"乐观读":

StampedLock lock = new StampedLock();

// 乐观读:先不加锁,读完后验证
long stamp = lock.tryOptimisticRead();
double curX = x, curY = y;       // 读
if (!lock.validate(stamp)) {     // 验证读期间没被写
    // 被写过,降级为悲观读
    stamp = lock.readLock();
    try {
        curX = x; curY = y;
    } finally {
        lock.unlockRead(stamp);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

乐观锁的设计哲学:绝大多数情况下读不会冲突,那就先假设没冲突,读完再验证——验证失败再回退到悲观锁。这种"先猜后验"的思想在数据库 MVCC、CAS 操作里都能看到。

性能对比(多读少写场景,95% 读):

锁类型 QPS
synchronized ~500 万
ReadWriteLock ~2000 万
StampedLock 乐观读 ~5000 万

# 5.消息传递模型深度剖析

# 5.1 Channel 的设计本质

Channel 是什么? 一个带"线程安全的入队/出队 + 自动等待/唤醒"的队列。它把"数据传递"和"同步"统一成一个操作:

ch <- value    // 发送:把数据放进去(满了就阻塞)
v := <-ch      // 接收:取数据(空了就阻塞)
1
2

Channel 集成了三种语义:

语义 体现
互斥 内部锁保护缓冲区
等待/通知 满/空时阻塞,对方操作时唤醒
数据传递 send/recv 配对,数据安全交付

两种 channel 类型:

// 无缓冲 channel:同步
ch := make(chan int)
// 发送方阻塞,直到接收方 take
// 类似"接力棒交接",双方时刻同步

// 有缓冲 channel:异步
ch := make(chan int, 100)
// 缓冲未满时发送不阻塞,未空时接收不阻塞
// 类似"邮箱",发完就走
1
2
3
4
5
6
7
8
9

Channel 的高级用法 —— select:

// 多路复用:从多个 channel 中等任意一个就绪
select {
case msg := <-ch1:
    handle(msg)
case ch2 <- task:
    // 发送给 ch2 成功
case <-time.After(5 * time.Second):
    // 超时
default:
    // 都没就绪,立即返回
}
1
2
3
4
5
6
7
8
9
10
11

select 的工程价值——用一个语法构造解决了"超时、取消、扇入扇出、非阻塞尝试"四个常见场景。这是 Go 比其他语言写并发更舒服的根本原因。

# 5.2 Go channel 源码级剖析

Go channel 内部实现(runtime/chan.go):

type hchan struct {
    qcount   uint           // 当前缓冲数据数
    dataqsiz uint           // 缓冲区大小
    buf      unsafe.Pointer // 环形缓冲区
    elemsize uint16
    closed   uint32
    elemtype *_type
    sendx    uint           // 发送索引
    recvx    uint           // 接收索引
    recvq    waitq          // 等待接收的 G 队列
    sendq    waitq          // 等待发送的 G 队列
    lock     mutex          // 保护一切的锁
}
1
2
3
4
5
6
7
8
9
10
11
12
13

send 的完整流程:

flowchart TD
    A[ch <- value] --> B{有等待 recv 的 G?}
    B -->|是| C[直接拷贝给那个 G<br/>唤醒它]
    B -->|否| D{缓冲区有空?}
    D -->|是| E[写入缓冲区<br/>返回]
    D -->|否| F[把当前 G 放入 sendq<br/>park 自己]
    F --> G[等被唤醒]
    G --> H[醒后数据已被对方拿走<br/>返回]
    
    style C fill:#d4edda
    style E fill:#d4edda
    style F fill:#fff3cd
1
2
3
4
5
6
7
8
9
10
11
12

核心代码(简化):

func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
    lock(&c.lock)
    
    // 情况 1:有等待接收的 G,直接传递(零拷贝优化)
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep)        // 直接拷贝到 receiver 的栈上
        unlock(&c.lock)
        goready(sg.g)          // 唤醒 receiver
        return true
    }
    
    // 情况 2:缓冲区有空间
    if c.qcount < c.dataqsiz {
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz { c.sendx = 0 }
        c.qcount++
        unlock(&c.lock)
        return true
    }
    
    // 情况 3:阻塞
    gp := getg()
    mysg := acquireSudog()
    mysg.g = gp
    mysg.elem = ep
    c.sendq.enqueue(mysg)
    gopark(...)               // 让出 P,让其他 G 跑
    
    // 醒来:对方已经取走数据
    return true
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

Go channel 的两个精妙设计:

  • "直接交接"优化:当有 G 在等待接收时,发送数据不经过缓冲区,直接拷贝到 receiver 栈——一次拷贝省一次
  • 基于 G 的 park/goready:goroutine 阻塞不阻塞 OS 线程,只是让 G 让出 P,其他 G 可以继续跑

性能数据:

操作 耗时
send/recv(无竞争) ~80 ns
send/recv(满/空,需 park) ~500 ns
select(2 个 case) ~200 ns
select(10 个 case) ~500 ns

# 5.3 Actor 模型与 Erlang

Actor 模型 是消息传递的另一支——比 channel 更彻底:每个 Actor 是独立的实体,有自己的状态、邮箱、行为。

┌─── Actor A ───┐    ┌─── Actor B ───┐
│ 私有状态       │    │ 私有状态       │
│ Mailbox: [m1]  │    │ Mailbox: [m2]  │
│ 处理逻辑       │    │ 处理逻辑       │
└───────┬───────┘    └───────┬───────┘
        │   send msg          │
        └──────────────────→──┘
1
2
3
4
5
6
7

Erlang 的 Actor 实现:

% 定义一个 counter actor
counter(Count) ->
    receive
        {increment} ->
            counter(Count + 1);
        {get, From} ->
            From ! {value, Count},
            counter(Count);
        stop ->
            ok
    end.

% 启动
Pid = spawn(fun() -> counter(0) end).

% 发消息
Pid ! {increment}.
Pid ! {get, self()}.
receive {value, V} -> io:format("~p~n", [V]) end.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

Erlang Actor 的 4 个核心特征:

  • 完全隔离:Actor 之间不共享任何内存——从根本上消除数据竞争
  • 轻量到极致:一个 Erlang process 仅占几百字节,单机可跑千万级
  • 错误隔离:一个 Actor 崩溃不影响其他——配合 supervisor 树自动重启
  • 位置透明:本地 Actor 和远程 Actor 用相同 API——天然分布式

Akka(JVM 上的 Actor):

class Counter extends Actor {
    var count = 0
    def receive = {
        case "inc"  => count += 1
        case "get"  => sender() ! count
    }
}

val system = ActorSystem("MySystem")
val counter = system.actorOf(Props[Counter])
counter ! "inc"
counter ! "get"
1
2
3
4
5
6
7
8
9
10
11
12

# 5.4 CSP vs Actor 对比

CSP(Go channel)和 Actor(Erlang)是消息传递的两大流派——都用消息但哲学不同:

维度 CSP(Go) Actor(Erlang)
实体 goroutine(无名) Actor(有 PID)
通信媒介 channel(独立对象) mailbox(绑定 Actor)
寻址方式 通过 channel 引用 通过 PID 直接发
匿名性 goroutine 匿名 Actor 有身份
耦合度 channel 解耦双方 发送者必须知道接收者 PID
错误处理 靠 panic/recover supervisor 树
代表语言 Go, Occam Erlang, Akka, Elixir

CSP 和 Actor 各有适用场景:

  • CSP 更适合:"数据流"明确的场景——管道处理、扇入扇出、生产消费
  • Actor 更适合:"有状态的实体"——游戏角色、IoT 设备、聊天室用户

# 5.5 异步消息与背压

消息传递在异步模式下,必须解决一个关键问题——背压(Backpressure):当生产者比消费者快,消息堆积了,怎么办?

生产 1000 msg/s ──→ 缓冲区 ──→ 消费 100 msg/s
                    📈 积压
1
2

4 种背压策略:

策略 行为 适用
Block(阻塞) 缓冲满,生产者阻塞 默认;重要数据不能丢
Drop(丢弃) 缓冲满,丢新消息 可丢日志、监控数据
Drop Oldest 缓冲满,丢最旧的 实时数据如股价
Slow Down 反向告知生产者放慢 RxJava、Reactive Streams

Go channel 默认是 Block 策略:

ch := make(chan Task, 100)

// 生产者
ch <- task    // 满了自动阻塞——天然背压
1
2
3
4

Reactive Streams 规范——Java 9 把背压做成了标准 API:

Publisher.subscribe(new Subscriber() {
    Subscription subscription;
    
    public void onSubscribe(Subscription s) {
        subscription = s;
        s.request(10);   // 关键:告诉生产者我能处理 10 个
    }
    
    public void onNext(Item item) {
        process(item);
        subscription.request(1);   // 处理完一个再要一个
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13

核心设计哲学:消费者主动 pull,而不是生产者 push——天然的反压机制。


# 6.Java 线程通信全景

# 6.1 wait/notify与Object Monitor

原理:每个 Java 对象都有一个监视器锁(Monitor),wait() 释放锁并进入等待队列,notify() 从等待队列唤醒一个线程。底层依赖 OS 的 pthread_cond_wait / pthread_cond_signal。

class SharedBuffer {
  private int data;
  private boolean hasData = false;

  public synchronized void produce(int value) throws InterruptedException {
    while (hasData) wait();       // 有数据就等消费者取走
    data = value;
    hasData = true;
    notify();                     // 唤醒消费者
  }

  public synchronized int consume() throws InterruptedException {
    while (!hasData) wait();      // 没数据就等生产者放入
    hasData = false;
    notify();                     // 唤醒生产者
    return data;
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

Monitor 内部结构:

┌─── ObjectMonitor ───┐
│ owner: 当前持锁线程   │
│ count: 重入次数       │
│ EntryList: [T1, T2]  │ ← 等待获取锁的 BLOCKED 线程
│ WaitSet:   [T3, T4]  │ ← wait() 调用后挂这里
└──────────────────────┘
1
2
3
4
5
6

wait/notify 的 5 个核心要点:

  1. 必须在 synchronized 块内调用——否则抛 IllegalMonitorStateException
  2. 用 while 防虚假唤醒——POSIX 规范明确允许
  3. wait() 会释放锁,唤醒后必须重新竞争
  4. notify() 只唤醒一个,notifyAll() 唤醒全部
  5. 优先用 notifyAll() 除非你能严格证明 notify 不会唤醒错的线程

Thread.join() 的内部实现:

// JDK 源码(简化)
public final synchronized void join(long millis) throws InterruptedException {
    while (isAlive()) {
        wait(0);                  // 在 Thread 对象上 wait
    }
}
// 当线程结束时,JVM 自动调用 this.notifyAll() 唤醒等待者
1
2
3
4
5
6
7

# 6.2 Lock + Condition

ReentrantLock + Condition 是 wait/notify 的工业升级版——核心增强是"多个 Condition 队列":

class SharedBuffer {
  private final Lock lock = new ReentrantLock();
  private final Condition notFull = lock.newCondition();    // 单独的"非满"队列
  private final Condition notEmpty = lock.newCondition();   // 单独的"非空"队列
  private int[] buf = new int[5];
  private int count = 0;

  public void produce(int value) throws InterruptedException {
    lock.lock();
    try {
      while (count == buf.length) notFull.await();
      buf[count++] = value;
      notEmpty.signal();        // 精准唤醒消费者,不会误伤生产者
    } finally {
      lock.unlock();
    }
  }

  public int consume() throws InterruptedException {
    lock.lock();
    try {
      while (count == 0) notEmpty.await();
      int val = buf[--count];
      notFull.signal();         // 精准唤醒生产者
      return val;
    } finally {
      lock.unlock();
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

vs wait/notify 的 4 个改进:

  • 多 Condition:可以为不同的等待原因建立独立队列,避免错唤醒
  • 可中断:lockInterruptibly() 让等锁的线程响应中断
  • 超时尝试:tryLock(timeout) 防止无限等待
  • 公平模式:new ReentrantLock(true) 保证 FIFO

# 6.3 BlockingQueue

BlockingQueue 是日常并发编程最常用的工具——把"锁 + 条件变量"封装成线程安全的队列:

BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

// 生产者
new Thread(() -> {
  for (int i = 0; i < 10; i++) {
    queue.put(i);                    // 满了自动阻塞
    System.out.println("Produced: " + i);
  }
}).start();

// 消费者
new Thread(() -> {
  for (int i = 0; i < 10; i++) {
    int val = queue.take();          // 空了自动阻塞
    System.out.println("Consumed: " + val);
  }
}).start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

Java 主要的 BlockingQueue 实现:

实现 特点 适用
ArrayBlockingQueue 数组 + 单锁 + 2 Condition 通用,有界
LinkedBlockingQueue 链表 + 双锁(put 和 take 各一) 高吞吐、可有界可无界
PriorityBlockingQueue 堆 + 单锁 按优先级
DelayQueue 堆 + 延迟 定时任务
SynchronousQueue 无缓冲,直接交付 严格生产消费同步
LinkedTransferQueue TransferQueue 接口 可同步可异步

LinkedBlockingQueue 的双锁优化:

// put 用 putLock,take 用 takeLock
// 生产者和消费者完全不阻塞对方
public void put(E e) throws InterruptedException {
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) notFull.await();
        enqueue(e);
        count.incrementAndGet();
    } finally {
        putLock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

这就是工业级队列的精髓——找出锁的可拆分粒度,最大化并发度。

# 6.4 CountDownLatch/Barrier/Phaser

三个"等待汇合"的工具,看似类似实则不同:

CountDownLatch —— 一次性倒计时:

CountDownLatch latch = new CountDownLatch(3);

for (int i = 0; i < 3; i++) {
  int id = i;
  new Thread(() -> {
    process();
    latch.countDown();               // 计数 -1
  }).start();
}

latch.await();                       // 阻塞直到 3 次 countDown
System.out.println("All done");
1
2
3
4
5
6
7
8
9
10
11
12

特征:一次性,不可重置。底层是 AQS 共享模式,state 表示剩余计数。

CyclicBarrier —— 可重用屏障:

CyclicBarrier barrier = new CyclicBarrier(3, () -> {
  System.out.println("Phase done");  // 最后一个到达时执行
});

// 多个线程
new Thread(() -> {
  for (int phase = 0; phase < 5; phase++) {
    work(phase);
    barrier.await();                 // 等其他人 + 屏障重置
  }
}).start();
1
2
3
4
5
6
7
8
9
10
11

特征:可循环使用,每轮所有线程到齐后自动重置。底层是 ReentrantLock + Condition + count。

Phaser —— 灵活的多阶段同步(Java 7):

Phaser phaser = new Phaser(3);

new Thread(() -> {
    phaser.arriveAndAwaitAdvance();   // 等其他人完成阶段
    phaser.arriveAndDeregister();     // 主动退出
}).start();
1
2
3
4
5
6

特征:支持动态注册/注销 + 多阶段——比 CyclicBarrier 更灵活。

三者对比:

特性 CountDownLatch CyclicBarrier Phaser
可重用 ❌ ✓ ✓
动态参与 ❌ ❌ ✓
多阶段 ❌ ✓(重置) ✓(明确阶段)
回调 ❌ ✓ ✓(onAdvance)

# 6.5 Semaphore

Semaphore semaphore = new Semaphore(2);  // 最多2个线程同时执行

for (int i = 0; i < 5; i++) {
  int id = i;
  new Thread(() -> {
    try {
      semaphore.acquire();             // 获取许可
      System.out.println("Thread " + id + " working");
      Thread.sleep(1000);
    } finally {
      semaphore.release();             // 必须 try-finally 释放
    }
  }).start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

典型应用:

  • 限流:限制下游调用并发数(保护被调用方)
  • 资源池:连接池、线程池的 worker 限制
  • 生产消费:empty / full 两个信号量实现经典 P/V

# 6.6 Future与CompletableFuture

Future(Java 5)—— 阻塞获取结果:

ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> future = pool.submit(() -> {
  Thread.sleep(1000);
  return 42;
});
int result = future.get();           // 阻塞直到结果就绪
int withTimeout = future.get(2, TimeUnit.SECONDS);   // 限时
1
2
3
4
5
6
7

CompletableFuture(Java 8)—— 链式组合:

CompletableFuture
    .supplyAsync(() -> fetchUser(id))
    .thenApplyAsync(user -> fetchOrders(user))
    .thenApply(orders -> render(orders))
    .exceptionally(ex -> {
        log.error("error", ex);
        return defaultValue();
    })
    .thenAccept(result -> save(result));
1
2
3
4
5
6
7
8
9

CompletableFuture 的核心 API:

方法 含义
supplyAsync(fn) 异步执行,返回 CF
thenApply(fn) 上一步结果 → 转换
thenCompose(fn) 上一步结果 → 返回 CF(扁平化)
thenCombine(cf2, fn) 两个 CF 都完成后合并
allOf(cf1, cf2, ...) 等所有完成
anyOf(cf1, cf2, ...) 任一完成
exceptionally(fn) 异常处理

CompletableFuture 解决了 Future 的两大痛点:

  • 回调地狱:嵌套的 .get() → 链式 .thenApply()
  • 组合困难:手写多 Future 协调 → .allOf() / anyOf()

# 6.7 通信场景全表

机制 底层原理 适用场景 性能
volatile 内存屏障 状态标志可见性 极高
synchronized Monitor(OS mutex + cond) 基础锁通信 高
wait/notify Monitor 等待队列 基础协调 中
Lock + Condition ReentrantLock + AQS 多条件精细控制 高
BlockingQueue Lock + 2 Condition 生产者-消费者 高
CountDownLatch AQS 共享模式 等 N 个任务完成 高
CyclicBarrier Lock + Condition 多线程汇合(可重用) 高
Phaser 64 位 state + CAS 分阶段同步 高
Semaphore AQS 共享模式 限流/资源池 高
Future AQS 状态机 异步结果(阻塞式) 中
CompletableFuture Treiber Stack 回调链 异步链式 高
Exchanger CAS + park 两线程数据交换 高
Thread.join wait/notifyAll 等待线程结束 中
Disruptor RingBuffer + 无锁 极致高吞吐 极高

依赖链:

应用层:  BlockingQueue / CountDownLatch / Semaphore / CyclicBarrier
           │
           ▼
框架层:  AQS (AbstractQueuedSynchronizer)
           │
           ▼
基础层:  LockSupport.park/unpark → Unsafe
           │
           ▼
OS 层:   pthread_mutex + pthread_cond (futex on Linux)
           │
           ▼
硬件层:  CAS 指令 + 内存屏障 (MFENCE/LFENCE)
1
2
3
4
5
6
7
8
9
10
11
12
13

# 7.C++/Go/Rust 通信对比

# 7.1 C++:贴着 OS 的细粒度控制

#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>

template <typename T>
class BlockingQueue {
    std::mutex mtx;
    std::condition_variable cv_not_full, cv_not_empty;
    std::queue<T> q;
    size_t cap;
    
public:
    BlockingQueue(size_t c) : cap(c) {}
    
    void Push(T value) {
        std::unique_lock<std::mutex> lk(mtx);
        cv_not_full.wait(lk, [this] { return q.size() < cap; });
        q.push(std::move(value));
        cv_not_empty.notify_one();
    }
    
    T Pop() {
        std::unique_lock<std::mutex> lk(mtx);
        cv_not_empty.wait(lk, [this] { return !q.empty(); });
        T val = std::move(q.front());
        q.pop();
        cv_not_full.notify_one();
        return val;
    }
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

C++20 协程 + channel(用第三方库 cppcoro / Boost.Asio):

asio::experimental::channel<void(error_code, int)> ch(io, 100);

co_await ch.async_send(error_code{}, 42, asio::use_awaitable);
auto v = co_await ch.async_receive(asio::use_awaitable);
1
2
3
4

C++ 原子操作的精细控制:

std::atomic<int> counter{0};
counter.fetch_add(1, std::memory_order_relaxed);   // 计数器:最弱

std::atomic<Data*> shared{nullptr};
shared.store(new_data, std::memory_order_release); // 发布对象:release
auto p = shared.load(std::memory_order_acquire);   // 读取对象:acquire
1
2
3
4
5
6

# 7.2 Go:channel 一统天下

最经典的生产者-消费者:

package main

import (
    "fmt"
    "sync"
)

func producer(ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for v := range ch {       // channel 关闭后自动退出
        fmt.Println("got", v)
    }
}

func main() {
    ch := make(chan int, 5)
    var wg sync.WaitGroup
    wg.Add(2)
    go producer(ch, &wg)
    go consumer(ch, &wg)
    wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

select 的强大用法:

// 带超时的多路选择
select {
case msg := <-ch1:
    handle(msg)
case ch2 <- task:
    // 发送成功
case <-time.After(5 * time.Second):
    return errors.New("timeout")
case <-ctx.Done():
    return ctx.Err()
}
1
2
3
4
5
6
7
8
9
10
11

Go 同步原语全景:

原语 用法 适用
chan 通信 + 同步 主推用法
sync.Mutex 互斥锁 共享状态保护
sync.RWMutex 读写锁 读多写少
sync.WaitGroup 等组完成 等所有 goroutine
sync.Once 单次执行 单例初始化
sync.Cond 条件变量 罕用,channel 优先
sync.Map 并发 Map 缓存场景
sync/atomic 原子操作 计数器、标志位
context.Context 取消传播 跨 goroutine 取消

# 7.3 Rust:所有权防数据竞争

Rust 的核心创新——编译期就拒绝数据竞争:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        tx.send(42).unwrap();      // tx 的所有权移到子线程
    });
    
    let v = rx.recv().unwrap();
    println!("got {}", v);
}
1
2
3
4
5
6
7
8
9
10
11
12
13

Send / Sync trait 是 Rust 并发安全的灵魂:

  • Send:类型可以安全地移动到另一个线程(所有权转移)
  • Sync:类型可以安全地共享到多个线程(多线程持引用)
// 编译错误示例:Rc<T> 不实现 Send
use std::rc::Rc;
use std::thread;

fn main() {
    let rc = Rc::new(42);
    thread::spawn(move || {
        println!("{}", rc);    // ❌ 编译错误:Rc 不是 Send
    });
}
// 错误信息:`Rc<i32>` cannot be sent between threads safely
1
2
3
4
5
6
7
8
9
10
11

Rust 跨线程共享状态的正确姿势:

use std::sync::{Arc, Mutex};
use std::thread;

let counter = Arc::new(Mutex::new(0));    // Arc:线程安全的引用计数
let mut handles = vec![];

for _ in 0..10 {
    let c = Arc::clone(&counter);
    let handle = thread::spawn(move || {
        let mut num = c.lock().unwrap();   // Mutex 保护
        *num += 1;
    });
    handles.push(handle);
}

for h in handles { h.join().unwrap(); }
println!("counter: {}", *counter.lock().unwrap());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

Rust 异步通信(tokio):

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(100);
    
    tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.unwrap();
        }
    });
    
    while let Some(v) = rx.recv().await {
        println!("got {}", v);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 7.4 横向对比总表

维度 Java C++ Go Rust Erlang
主推模型 共享内存 共享内存 channel 共享 + channel Actor
互斥锁 synchronized / ReentrantLock std::mutex sync.Mutex std::sync::Mutex 不需要
条件等待 Object.wait / Condition std::condition_variable channel std::sync::Condvar receive
队列 BlockingQueue 手动实现 channel mpsc::channel mailbox
原子操作 java.util.concurrent.atomic std::atomic sync/atomic std::sync::atomic 无需
数据竞争防御 运行时(JMM) 程序员自己 runtime race detector 编译期 物理隔离
跨进程 需 IPC 库 POSIX shm/msg net.Pipe 第三方 原生支持
典型并发量 万级(虚拟线程:百万) 万级 百万级 百万级(async) 千万级

一句话挑选指南:

  • Java 业务:先 BlockingQueue / CompletableFuture,性能瓶颈再考虑 Disruptor
  • C++ 高性能:std::atomic + std::mutex,必要时用 lock-free 数据结构
  • 高并发服务:Go channel 是首选,简单又强大
  • 极致安全 / 系统底层:Rust 是唯一答案,编译期帮你检查
  • 分布式高可用:Erlang/Elixir 的 Actor + supervisor 树

# 8.经典陷阱与反模式

# 8.1 锁陷阱

❌ 反例 1:锁粒度过大

// 整个方法都锁住,包括 I/O
public synchronized void process(Request req) {
    Data d = readFromDb(req);          // 慢操作!锁定 10ms
    Result r = compute(d);             // 快
    writeToDb(r);                      // 慢操作!锁定 10ms
}
// 后果:100 个线程串行执行,吞吐降到 50 QPS
1
2
3
4
5
6
7

✅ 正例:缩小锁粒度

public void process(Request req) {
    Data d = readFromDb(req);          // 不加锁
    Result r;
    synchronized (lock) {              // 只锁需要保护的部分
        r = compute(d);
    }
    writeToDb(r);                      // 不加锁
}
1
2
3
4
5
6
7
8

❌ 反例 2:锁顺序不一致 → 死锁

// 线程 A
synchronized (lockA) {
    synchronized (lockB) { ... }       // 先 A 后 B
}

// 线程 B
synchronized (lockB) {
    synchronized (lockA) { ... }       // 先 B 后 A
}
// → 经典死锁
1
2
3
4
5
6
7
8
9
10

✅ 正例:定义全局锁顺序

// 任何地方都按同一顺序加锁
synchronized (lockA) {
    synchronized (lockB) { ... }       // 永远 A → B
}
1
2
3
4

# 8.2 条件变量陷阱

❌ 反例 1:用 if 而非 while

synchronized (mu) {
    if (!ready) wait();                // ❌ 虚假唤醒就出 bug
    consume();
}
1
2
3
4

✅ 正例:

synchronized (mu) {
    while (!ready) wait();             // ✓ 永远是 while
    consume();
}
1
2
3
4

❌ 反例 2:丢失唤醒(Lost Wakeup)

// 线程 A:等数据
if (!hasData) {                       // 检查时还没数据
    // ← 此处线程 B 设置 hasData = true 并 notify
    // B 的 notify 没人接,丢失!
    wait();                            // A 永远睡下去
}

// 必须用同步:
synchronized (mu) {
    while (!hasData) wait();           // 检查 + 等待原子
}
1
2
3
4
5
6
7
8
9
10
11

# 8.3 channel 陷阱

❌ 反例 1:向已关闭的 channel 发送 → panic

ch := make(chan int)
close(ch)
ch <- 42                              // panic: send on closed channel
1
2
3

✅ 正例:只在生产者侧 close,且只 close 一次

// 永远遵守 "Don't close from receiver side" 原则
go func() {
    defer close(ch)        // 由生产者唯一负责关闭
    for _, v := range data {
        ch <- v
    }
}()
1
2
3
4
5
6
7

❌ 反例 2:goroutine 泄漏

func leaky() {
    ch := make(chan int)
    go func() {
        ch <- 42                       // 没人接,永远阻塞
    }()
    // 函数返回,goroutine 永远卡在那
}
1
2
3
4
5
6
7

✅ 正例:用带缓冲的 channel 或 context 取消

func notLeaky(ctx context.Context) {
    ch := make(chan int, 1)            // 缓冲 1 防卡住
    go func() {
        select {
        case ch <- 42:
        case <-ctx.Done():             // 上下文取消时退出
        }
    }()
}
1
2
3
4
5
6
7
8
9

❌ 反例 3:nil channel 永久阻塞

var ch chan int                        // nil channel
ch <- 42                              // 永远阻塞,不报错
1
2

但这其实是 select 的妙用:

// 用 nil channel 动态屏蔽某个 case
var ch chan int
if !shouldRecv {
    ch = nil    // 这个 case 永远不会被选中
}
select {
case v := <-ch:
    ...
case <-otherCh:
    ...
}
1
2
3
4
5
6
7
8
9
10
11

# 8.4 调试与定位

实战技巧 1:死锁检测

# Java
$ jstack <pid> | grep -A 10 "deadlock"
Found one Java-level deadlock:
=============================
"Thread-1": waiting to lock 0x... (held by Thread-2)
"Thread-2": waiting to lock 0x... (held by Thread-1)

# Go
GODEBUG=asyncpreemptoff=1 go run -race main.go
# 内置死锁检测:所有 goroutine 都阻塞会报 fatal: all goroutines are asleep - deadlock!
1
2
3
4
5
6
7
8
9
10

实战技巧 2:竞态条件检测

# Go race detector
go run -race main.go
go test -race ./...
# 发现数据竞争时打印详细栈

# Java:用 jcstress(OpenJDK 官方工具)
1
2
3
4
5
6

实战技巧 3:性能瓶颈定位

# Java:观察锁竞争
$ jstack <pid> | grep BLOCKED | wc -l    # BLOCKED 多说明锁竞争重

# JFR(Java Flight Recorder)
jcmd <pid> JFR.start duration=60s filename=record.jfr
# 看 JavaMonitorBlocked 事件

# Go:观察 channel
import _ "net/http/pprof"
go tool pprof http://localhost:6060/debug/pprof/block
1
2
3
4
5
6
7
8
9
10

实战技巧 4:好习惯

  • 每个锁要有清晰文档:保护什么、加锁顺序
  • 临界区越小越好:把 I/O 拉出锁外
  • 优先用高层原语:BlockingQueue > Lock+Condition > wait/notify
  • 添加监控:阻塞队列长度、锁等待时间、goroutine 数

# 9.一句话总结

线程通信的本质,是在"共享内存"和"消息传递"两条路线之间做权衡——共享内存快但需要锁/屏障保证正确性,消息传递慢但天然避免竞争;现代语言的最高境界是"看起来像消息传递,跑起来像共享内存"。

# 三个层次的认知升华

第一层(机制层):通信 = 同步 + 数据交换

  • 三大基石:互斥(保证临界区独占)、等待/唤醒(避免忙轮询)、可见性/有序性(让另一个核看到正确的值)
  • 锁、信号量、Condition、CountDownLatch 都是 AQS 这一颗"原子状态 + 等待队列"种子上长出的不同果实
  • 底层硬件只给了你一条 cmpxchg 和几条内存屏障,其余都是软件用这两块积木堆出来的

第二层(设计层):通信原语是"频率 vs 复杂度"的不同切片

  • 高频小数据(计数器、标志位)→ volatile / atomic(无锁)
  • 中频结构化(生产消费)→ BlockingQueue / Channel(有锁但封装了细节)
  • 低频复杂状态(多条件等待)→ Lock + Condition(最灵活但最容易写错)
  • 选错粒度比选错原语更致命:用 synchronized 包整个方法 vs 用 AtomicLong 加一个计数,性能差 100 倍

第三层(哲学层):通信即耦合,耦合即风险

  • "Don't communicate by sharing memory; share memory by communicating"(Go 的座右铭)—— 把数据流向显式化
  • 共享内存就像两人用同一本笔记本:高效但容易撕纸;消息传递就像递条子:慢但条条留底
  • 真正的高并发系统往往是三层混合:进程间用消息(Kafka/MQ)、线程间用 Channel/Queue、寄存器级别用 atomic——每一层各司其职
  • 理解通信的最高境界,是看一段代码就能画出"哪些字段是共享的、哪些是隔离的、它们在哪一行被同步"——也就是脑中实时维护内存模型

# 终极建议

场景 推荐姿势
简单标志位 volatile boolean + 其他线程主动检查
计数 / 累加 LongAdder > AtomicLong >> synchronized
生产消费 BlockingQueue / Disruptor / Go chan,不要自己写 wait/notify
等多个任务完成 CountDownLatch / CompletableFuture.allOf
读多写少共享 ReadWriteLock / StampedLock(乐观读)
分阶段同步 CyclicBarrier / Phaser
异步链式 CompletableFuture
跨进程跨机器 直接上 MQ(Kafka / RocketMQ),别用共享内存
调试死锁 jstack 看 BLOCKED 链,按"申请锁的顺序一致"原则修复

# 四个关键收获

  1. 共享内存和消息传递是同一枚硬币的两面:底层都是 CAS + 屏障 + futex,只是抽象层不同
  2. Channel 不是魔法:它内部就是 mutex + condvar + 队列——但封装让代码不再易错
  3. 通信原语的选择比写法更重要:用错粒度比写错代码更致命
  4. 现代演化的方向是"统一同步异步":Virtual Thread / coroutine / async-await 让你写"看似同步"的代码,跑出"实质异步"的性能

# 延伸阅读

  • ← 11.线程前世今生探索:通信前置——线程是什么
  • → 13.线程异常设计原理:通信失败时如何安全终止
  • → 14.多线程并发经典案例:把通信原理落到生产实战
  • → 18.锁核心设计和思想:mutex / synchronized / ReentrantLock 的实现差异
  • → 19.AQS核心思想揭秘:理解 Doug Lea 的并发框架基石
  • → 23.协程核心设计思想:协程时代的通信范式
  • → 24.Actor与CSP并发模型:消息传递的两大流派

# 9.案例驱动的设计深入

# 9.1 案例:生产事故看选型错误

真实案例:某金融系统用 volatile boolean 做开关控制,正常情况下工作良好。某天产品要求"当开关从 true 变 false 时,发送一封通知邮件",工程师写了:

volatile boolean enabled = true;

// 关闭开关的地方
public void disable() {
    enabled = false;
    sendNotificationEmail();    // ← 直接调用
}

// 业务线程
public void doWork() {
    while (enabled) {
        process();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

问题:高峰期 disable() 被并发调用 50 次,发了 50 封邮件——业务线程的 while 循环根本不知道有多少线程改了 enabled。

根因诊断:

flowchart LR
    A[volatile 解决"可见性"] -.不能解决.-> B[多次写的"幂等性"]
    A -.不能解决.-> C[写入与副作用的"原子性"]
    style B fill:#f8d7da
    style C fill:#f8d7da
1
2
3
4
5

修复方案:用 CAS 保证只有第一次写入生效:

AtomicBoolean enabled = new AtomicBoolean(true);

public void disable() {
    if (enabled.compareAndSet(true, false)) {  // 只有第一次成功
        sendNotificationEmail();
    }
}
1
2
3
4
5
6
7

学到了什么:

通信原语 解决的问题 不能解决的问题
volatile 可见性、有序性 复合操作的原子性
Atomic 原子读改写 多变量一致性
Lock 多变量一致性 异步通信、跨进程
Channel 异步解耦 极致延迟
MQ 跨进程跨机器 同进程毫秒级延迟

通信原语选错"档",永远修不完 Bug——上面那个 volatile 的例子,再加多少 volatile 也救不了,必须升级到 CAS 这一档。

# 9.2 案例:BlockingQueue vs Disruptor

场景:某交易系统消息中转,原使用 LinkedBlockingQueue,QPS 5 万。换成 Disruptor 后,QPS 600 万。

LinkedBlockingQueue 的瓶颈:

生产者:                   消费者:
  1. lock putLock           1. lock takeLock
  2. enqueue                2. dequeue
  3. signal notEmpty        3. signal notFull
  4. unlock                 4. unlock

→ 两把锁 + 节点分配(每条消息 new 一个 Node)
→ 锁竞争 + GC 压力
1
2
3
4
5
6
7
8

Disruptor 的设计:

flowchart LR
    A[环形数组<br/>预分配 N 个 slot] --> B[生产者 sequence<br/>CAS 申请]
    B --> C[消费者 sequence<br/>volatile 读]
    C --> D[消费完成后<br/>更新 sequence]
    D -.允许生产者覆盖.-> A

    style A fill:#d4edda
1
2
3
4
5
6
7

核心优化:

  1. 预分配:环形数组 + 对象重用,零 GC
  2. 无锁:生产者 CAS 申请,消费者 volatile 读
  3. 缓存行填充:sequence 之间填充到 64 字节,避免 false sharing
  4. 批量消费:消费者一次取多个 slot,减少 sequence 同步

适用边界:

LinkedBlockingQueue 适用:
  ✅ 通用业务,QPS < 10 万
  ✅ 消息大小不固定
  ✅ 不愿意做容量规划

Disruptor 适用:
  ✅ 极致低延迟(金融、游戏)
  ✅ QPS > 100 万
  ✅ 能预估容量上限
  ❌ 单 JVM 内才有意义(跨进程上 Kafka)
1
2
3
4
5
6
7
8
9
10

学到了什么:通信性能不是线性提升,而是阶梯式的。从锁到无锁是一个数量级,从有 GC 到零 GC 又是一个数量级,从指令到缓存友好还能再翻一倍——真正的高性能通信,是把硬件特性吃干榨尽。

# 9.3 案例:Go channel关闭模式

新手陷阱:往已关闭的 channel 写数据 → panic;关闭已关闭的 channel → panic。

生产模板:

// 模式 1:单生产者-多消费者
func producer(ch chan<- int) {
    defer close(ch)              // ← 由唯一生产者关闭
    for i := 0; i < 10; i++ {
        ch <- i
    }
}

func consumer(ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for v := range ch {           // ← range 自动检测 close
        process(v)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

模式 2:多生产者-单消费者 —— 引入"关闭信号":

func main() {
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{}) // ← 用专门的信号 channel

    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for {
                select {
                case <-stopCh:
                    return
                case dataCh <- compute(id):
                }
            }
        }(i)
    }

    time.Sleep(5 * time.Second)
    close(stopCh)                // ← 通知所有生产者退出
    wg.Wait()
    close(dataCh)                // ← 等生产者全部退出后才关闭数据 channel
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

Go 关闭原则(社区共识):

"don't close a channel from the receiver side, and don't close a channel if the channel has multiple concurrent senders" 接收方不关闭、多发送方场景必须额外加协调。

学到了什么:channel 是把同步原语,关闭它涉及多线程协调,只有一个发送方时关闭最简单,其他场景必须引入额外机制——这正是为什么很多团队制定"channel 关闭规范"。

# 10.通信演化史与一句话总结

# 10.1 通信范式的螺旋上升

flowchart LR
    A[1970s<br/>共享内存<br/>+ 信号量] --> B[1980s<br/>消息传递<br/>Actor/CSP]
    B --> C[1990s<br/>RPC<br/>跨进程通信]
    C --> D[2000s<br/>MQ + ESB<br/>跨服务异步]
    D --> E[2010s<br/>Reactive Stream<br/>背压]
    E --> F[2020s<br/>Virtual Thread<br/>同步表达异步性能]

    style F fill:#d4edda
1
2
3
4
5
6
7
8

每一代不是替代上一代,而是在上一代之上抽象出更易用的封装:

  • 共享内存(最原始)→ 高性能、易出错
  • Channel(消息传递)→ 解耦、还是单机
  • MQ(持久化消息)→ 跨进程、有延迟
  • Reactive(流式)→ 处理无限数据流
  • 虚拟线程 → 同步代码 + 异步性能

# 10.2 设计哲学三大流派

flowchart TD
    A[通信哲学] --> B[共享派<br/>Java JUC]
    A --> C[消息派<br/>Go channel<br/>Erlang Actor]
    A --> D[流派<br/>Reactive<br/>Kotlin Flow]

    B --> B1[内存即接口<br/>高性能但易翻]
    C --> C1[通信即同步<br/>不共享降复杂度]
    D --> D1[数据流即程序<br/>函数式风格]
1
2
3
4
5
6
7
8

# 10.3 一句话总结

线程通信的本质,是"在违背共享心智的硬件上,重建共享语义"——CPU 缓存让你写下的值不立刻被别人看见,CPU 流水线让你写的顺序被打乱,OS 调度让你的两条指令之间随时插入别人的代码。所有通信原语——volatile、CAS、锁、channel、MQ——都是在这个混乱底层之上,为不同性能/语义档位提供的"共享假象"。 选对档位、画清边界、画 happens-before 图——这就是并发通信工程师的内功。通信原语用错档,加多少代码都救不回来;用对档,几行代码就能撑起百万 QPS。

上次更新: 2026/06/07, 10:26:12
2.并发上下文切换原理
4.线程异常设计原理

← 2.并发上下文切换原理 4.线程异常设计原理→

最近更新
01
信号崩溃快速排查
06-15
02
CoreDump破案
06-15
03
perf火焰图实战
06-15
更多文章>
Theme by Vdoing | Copyright © 2019-2026 杨充 | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式