编程进阶网 编程进阶网
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接

杨充

专注编程 · 终身学习者
首页
  • 计算机原理
  • 操作系统
  • 网络协议
  • 数据库原理
  • 面向对象
  • 设计原则
  • 设计模式
  • 系统架构
  • 性能优化
  • 编程原理
  • 方案设计
  • 稳定可靠
  • 工程运维
  • 基础认知
  • 线性结构
  • 树与哈希
  • 工业级实现
  • 算法思想
  • 实战与综合
  • 算法题考核
  • C语言入门
  • C综合案例
  • C专栏博客
  • C标准集库
  • C++入门教程
  • C++综合案例
  • C++专栏博客
  • C++开发技巧
  • Java入门教程
  • Java综合案例
  • Java专栏博客
  • Go入门教程
  • Go综合案例
  • Go专栏博客
  • Go开发技巧
  • JavaScript入门
  • JavaScript高级
  • Android库解读
  • Android专栏
  • Android智能硬件
  • iOS ObjC入门
  • iOS Swift入门
  • iOS入门精通
  • Web之Html手册
  • Web之TypeScript
  • Web之Vue高级进阶
  • Linux之QML入门
  • Linux之QT核心库
  • Linux实践开发
  • Python教程
  • Shell&Bash教程
  • 工具脚本
  • 自动化脚本
  • 质量保障
  • 产品思考
  • 软实力
  • 开发流程
  • Git应用
  • 技术模版
  • 技术规范
  • Markdown
  • Mermaid
  • 开源协议
  • JSON工具
  • 文本工具
  • 图片处理
  • 文档转化
  • 代码压缩
  • 关于我
  • 自我精进
  • 职场管理
  • 职场面试
  • 心情杂货
  • 友情链接
  • README
  • C语言入门精通

  • Cpp入门到精通

  • Java入门精通

  • Go入门到精通

    • 入门教程

    • 综合案例

    • 专栏博客

      • Go 专栏博客
      • 内存模型与栈堆布局
      • 指针与逃逸分析
      • 结构体内存布局对齐
      • 字符串与切片底层
      • 接口与类型系统
      • map哈希表底层实现
      • 零值初始化设计哲学
      • GMP协程调度器机制
      • 通道channel源码剖析
      • sync同步原语剖析
      • map并发安全与哈希
      • Go内存模型一致性
      • 加权信号量与限流
        • 1. 案例引入
          • 1.1 一段崩在哪
          • 1.2 顺藤摸到根因
          • 1.3 我们要回答什么
        • 2. 架构概览
          • 2.1 信号量模式全景
          • 2.2 为什么 channel 不够用
        • 3. Weighted 核心数据结构
          • 3.1 字段逐层解读
          • 3.2 waiter 等待节点
          • 3.3 并发安全保证
        • 4. Acquire 获取流程剖析
          • 4.1 快速路径:资源充足
          • 4.2 慢速路径:创建等待者
          • 4.3 context 取消的处理
          • 4.4 完整状态机
        • 5. Release 释放与唤醒
          • 5.1 原子递减与 FIFO 扫描
          • 5.2 闭包唤醒的并发语义
          • 5.3 Release 大于 Acquire 的陷阱
        • 6. TryAcquire 非阻塞获取
          • 6.1 快速无损尝试
          • 6.2 与带超时的 Acquire 对比
        • 7. Channel 信号量对比
          • 7.1 四种场景横向比较
          • 7.2 加权场景的 Channel 困境
        • 8. 限流实战模式
          • 8.1 固定窗口模式
          • 8.2 滑动日志模式
          • 8.3 令牌桶模式
          • 8.4 goroutine 并发度控制
        • 9. 连接池资源控制
          • 9.1 数据库连接池设计
          • 9.2 可中断的获取与归还
          • 9.3 诊断与监控
        • 10. 综合案例串讲
          • 10.1 案例真相揭晓
          • 10.2 一次 Acquire 的完整旅程
          • 10.3 设计哲学回扣
          • 10.4 速查表
      • errgroup并行控制
      • 协程泄漏排查与修复
      • 并发设计模式详解
      • GC三色标记与屏障
      • 内存分配器深挖
      • defer延迟执行机制
      • 定时器四叉堆实现
      • 抢占式调度器原理
      • 协程栈扩容与缩容
      • 上下文取消与传播
      • 泛型与类型约束
      • 反射机制与unsafe
      • 迭代器与rangefunc
      • 错误处理与panic
      • 网络轮询器netpoller
      • HTTP服务端源码分析
      • JSON序列化与编解码
      • 数据库SQL连接池
      • 文件IO与零拷贝
      • 结构化日志与配置
      • 单元测试与基准
      • cgo与系统调用切换
      • 编译链接与PGO优化
      • 写作模板
    • 开发技巧

  • JavaScript入门

  • CodeX
  • Go入门到精通
  • 专栏博客
杨充
2026-06-12
目录

加权信号量与限流

# 13.加权信号量与限流

卷三第十三篇——goroutine 是廉价的,但下游资源不是。本篇聚焦 golang.org/x/sync/semaphore 加权信号量——它不是锁,而是一个带权重的并发度控制器:Acquire 可以一次拿 N 个许可,Release 按 FIFO 顺序唤醒等待者,context 取消时还能安全退出。读完本篇,你能回答:为什么用 channel 做信号量会撑不住长尾请求?加权信号量怎么做到 O(1) 快速路径 + FIFO 公平唤醒?四种限流器模式各自的适用场景怎么选?关键词:Weighted、加权、FIFO 等待链表、channel 信号量对比、限流器模式、连接池控制。

# 目录介绍

  • 1. 案例引入
    • 1.1 一段崩在哪
    • 1.2 顺藤摸到根因
    • 1.3 我们要回答什么
  • 2. 架构概览
    • 2.1 信号量模式全景
    • 2.2 为什么 channel 不够用
  • 3. Weighted 核心数据结构
    • 3.1 字段逐层解读
    • 3.2 waiter 等待节点
    • 3.3 并发安全保证
  • 4. Acquire 获取流程剖析
    • 4.1 快速路径:资源充足
    • 4.2 慢速路径:创建等待者
    • 4.3 context 取消的处理
    • 4.4 完整状态机
  • 5. Release 释放与唤醒
    • 5.1 原子递减与 FIFO 扫描
    • 5.2 闭包唤醒的并发语义
    • 5.3 Release 大于 Acquire 的陷阱
  • 6. TryAcquire 非阻塞获取
    • 6.1 快速无损尝试
    • 6.2 与带超时的 Acquire 对比
  • 7. Channel 信号量对比
    • 7.1 四种场景横向比较
    • 7.2 加权场景的 Channel 困境
  • 8. 限流实战模式
    • 8.1 固定窗口模式
    • 8.2 滑动日志模式
    • 8.3 令牌桶模式
    • 8.4 goroutine 并发度控制
  • 9. 连接池资源控制
    • 9.1 数据库连接池设计
    • 9.2 可中断的获取与归还
    • 9.3 诊断与监控
  • 10. 综合案例串讲
    • 10.1 案例真相揭晓
    • 10.2 一次 Acquire 的完整旅程
    • 10.3 设计哲学回扣
    • 10.4 速查表

# 1. 案例引入

# 1.1 一段崩在哪

看一个图像处理微服务——它接收用户上传的图片,调用 GPU 推理服务做内容审核,再把结果写回业务系统。生产环境跑了两个月,某天运营推送了一条大促通知,突然间大量用户同时上传图片,服务崩了:

// image_processor.go —— 图像审核服务
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// GPU 推理引擎——单机最多同时处理 8 个任务
var gpuSlots = make(chan struct{}, 8)

type ImageTask struct {
    ImageID   string
    Size      int64  // 图片大小,字节
    ModelType string // "lite"(2GB显存) 或 "heavy"(6GB显存)
}

func processImage(task ImageTask) error {
    // 占一个 GPU 槽位
    gpuSlots <- struct{}{}
    defer func() { <-gpuSlots }()

    // 模拟 GPU 推理
    fmt.Printf("[%s] 开始推理: %s (%dB)\n", task.ModelType, task.ImageID, task.Size)
    time.Sleep(2 * time.Second)
    fmt.Printf("[%s] 完成推理: %s\n", task.ModelType, task.ImageID)
    return nil
}

func main() {
    var wg sync.WaitGroup
    tasks := []ImageTask{
        // 假设来了 20 个任务——10 个轻量、10 个重量
        {ImageID: "img_001", Size: 102400, ModelType: "lite"},
        {ImageID: "img_002", Size: 204800, ModelType: "heavy"},
        {ImageID: "img_003", Size: 51200,  ModelType: "lite"},
        // ...
    }

    for _, task := range tasks {
        wg.Add(1)
        go func(t ImageTask) {
            defer wg.Done()
            if err := processImage(t); err != nil {
                fmt.Printf("任务失败: %s, err=%v\n", t.ImageID, err)
            }
        }()
    }
    wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

现象:

  • 平时流量稳定,8 个 GPU 槽位刚好够用,任务平均 2 秒完成
  • 大促期间任务量暴增 10 倍,但 gpuSlots 的容量仍然是 8
  • 表面上看没什么问题——channel 信号量会排队,后续任务等槽位释放即可
  • 但诡异的是:GPU 显存报 Out of Memory,进程 OOM Kill——可我们明明限制了 8 个并发

查 GPU 日志——发现有 5 个 "heavy" 任务同时在跑,每个占 6GB 显存:5 × 6 = 30GB,超过 GPU 的 24GB 显存上限。

channel 信号量把每个任务都当"等权的"——不管它是 2GB 的轻量模型还是 6GB 的重量模型。当 8 个槽位里恰好坐了 5 个 heavy 任务,显存就爆了。

# 1.2 顺藤摸到根因

追查过程分三步:

第一步:验证假设。看 GPU 监控——出问题时 Running 任务列表里同时有 5 个 "heavy_model_v3" 进程,每个 RSS 约 6.2GB。但 gpuSlots channel 容量是 8——理论上最多 8 个并发——这里有 5 个确实"合法"。问题不在"并发数超标",在显存超标。

第二步:找为什么 channel 信号量兜不住。核心矛盾:

channel 信号量的「许可」是 1 个 struct{} → 无差别的
但 heavy 任务和 lite 任务的 GPU 显存消耗差别是 3 倍
→ channel 没办法表达「这个任务需要 3 个槽位」
1
2
3

第三步:看 pprof goroutine 快照——出问题时 goroutine 数量飙到 3000+:

$ curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep -c "gpuSlots"
# 2876 个 goroutine 全部卡在 gpuSlots <- struct{}{}
1
2

这 2876 个 goroutine 占着栈内存(每个 2KB 起步,共约 6MB+),虽然不多,但伴随着每个 goroutine 携带的 ImageTask 上下文数据(图片元数据、预处理结果)——这些对象因为闭包引用统统逃逸到了堆——pprof heap 显示额外消耗了 ~400MB。

而且更重要的是:这些 goroutine 在排队等待 channel 槽位时,context 超时了也退不出来——channel 的发送操作没有可取消的机制。用户已经超时放弃,goroutine 还在等槽位。

这个事故藏着 7 个原理点:

① 加权信号量怎么让任务按实际资源需求"占不同数量的坑"?        → 第 3-4 章
② Acquire(ctx, n) 的 context 取消怎么实现安全退出?           → 第 4.3
③ Release 后怎么按 FIFO 顺序唤醒等待者?                      → 第 5 章
④ TryAcquire 的非阻塞语义是什么?                             → 第 6 章
⑤ channel 信号量 vs 加权信号量在四种场景下的优劣对比?         → 第 7 章
⑥ 信号量怎么组合成四种限流器(固定窗口/滑动窗口/令牌桶/并发度)? → 第 8 章
⑦ 连接池怎么用加权信号量做可被 context 取消的资源获取?        → 第 9 章
1
2
3
4
5
6
7

# 1.3 我们要回答什么

这个 GPU 服务案例贯穿全篇。我们从加权信号量的数据结构出发,深入 Acquire/Release 的状态机、FIFO 唤醒策略、context 取消的竞态处理——再对比 channel 信号量在四种典型并发场景下的优劣——最后给出四种限流器设计模式和连接池实现。第 10 章回到 GPU 服务,用加权信号量 + 令牌桶限流器彻底修复。

本篇路线:

架构总图 (第 2 章) ── 信号量的三种模式 + channel 为什么不够
   ↓
数据结构 (第 3 章) ── Weighted 字段 + waiter 节点 + 并发保证
   ↓
Acquire 流程 (第 4 章) ── 快速路径 / 慢速路径 / context 取消
   ↓
Release 唤醒 (第 5 章) ── FIFO 扫描 + 闭包通知
   ↓
TryAcquire (第 6 章) ── 非阻塞获取的设计意图
   ↓
Channel 对比 (第 7 章) ── 四种场景横向评测
   ↓
限流实战 (第 8 章) ── 四种限流器模式
   ↓
连接池控制 (第 9 章) ── 数据库连接池 + 诊断
   ↓
综合案例 (第 10 章) ── 修复 GPU 服务 + 设计哲学
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

📌 本篇定位:这是 Go 并发控制中"流量整形"的核心篇。前面第 09 篇讲了 channel 作为通信原语、第 10 篇讲了 sync.Mutex/WaitGroup 的锁语义——本篇的 semaphore 处于两者之间:它不是锁(不保护临界区),它是并发度控制器(控制同时执行的数量)。理解了加权信号量,第 14 篇的 errgroup 并行任务组就能看到它的影子。

# 2. 架构概览

# 2.1 信号量模式全景

Go 生态中有三种"信号量模式的并发控制"方案,Weighted 处于最灵活的那一层:

                        信号量并发控制方案
                              │
        ┌─────────────────────┼─────────────────────┐
        │                     │                     │
        ▼                     ▼                     ▼
  channel 信号量         sync.WaitGroup       semaphore.Weighted
  (容量 = 许可数)        (计数 = 任务数)       (容量 = 总权重)
        │                     │                     │
  每个许可 = 1            等所有任务完成        每个许可可加权
        │                     │                     │
  ❌ 不可取消             ❌ 不能限并发          ✅ context 可取消
  ❌ 不可加权             ❌ 不能部分释放        ✅ 支持 TryAcquire
  ❌ 无 TryAcquire        ❌ 无 TryAcquire       ✅ FIFO 公平唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13

加权信号量解决的是**"每个请求的资源消耗不等价"**场景下的并发度控制问题:

传统信号量思维:
  有 8 个停车位 → 每辆车占 1 个 → 最多停 8 辆车

加权信号量思维:
  GPU 有 24GB 显存 → 轻量任务占 2GB → 重量任务占 6GB
  → 当前已占用 18GB:可以再放 (24-18)/2=3 个轻量,或 (24-18)/6=1 个重量
  → 不是「限制个数」,是「限制总资源量」
1
2
3
4
5
6
7

下面是 Weighted 信号量的完整数据流图:

          Acquire(ctx, 3)
               │
               ▼
    ┌──────────────────────┐
    │  mu.Lock()           │
    │  cur + n ≤ size ?    │
    │  是 → cur += n       │  ← 快速路径 O(1)
    │  否 → 创建 waiter    │  ← 慢速路径
    │  mu.Unlock()         │
    └──────┬───────────────┘
           │
    ┌──────┴──────────────┐
    │  快速路径(已拿到)    │  慢速路径(阻塞等待)
    │  return nil          │  监听 ready channel +
    │                      │  监听 ctx.Done()
    └──────────────────────┘
           │
    ┌──────┴──────────────────────────┐
    │  Release(3)                     │
    │  mu.Lock()                      │
    │  cur -= 3                       │
    │  遍历 waiters 链表:              │
    │    如果 waiter.n ≤ size - cur   │
    │    → cur += waiter.n            │
    │    → close(waiter.ready)  ◄── 唤醒等待 goroutine
    │  mu.Unlock()                    │
    └─────────────────────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 2.2 为什么 channel 不够用

疑惑:channel 不就是天然的信号量吗?make(chan struct{}, N) 容量就是最大并发度——为什么还需要专门实现一个 Weighted?

论证:

  1. channel 每个元素的"权重"是 1——ch <- struct{}{} 永远只占 1 个槽。如果 heavy 任务需要 3 倍于 lite 任务的资源,channel 无法表达"这个任务占 3 个槽位"的语义——总不能 ch<-{}{}{} 写三次吧(这既不原子也不优雅)。

  2. channel 的发送操作不可通过 context 取消——ch <- v 阻塞时,即使 ctx 已经 Done,goroutine 也退不出来。唯一的 workaround 是用 select:

select {
case ch <- struct{}{}:
    // 拿到许可
case <-ctx.Done():
    return ctx.Err()
}
1
2
3
4
5
6

这个 select 只能用于单个 channel 和单个 context——但实际场景中,ctx 的取消还需要"把已经在队列中的等待者移除"——channel 做不到:你往 channel 里塞了一个 struct{},没法"撤回"。semaphore.Weighted 通过维护显式的 waiter 链表实现了取消时精准移除。

  1. channel 无法实现 TryAcquire——没有非阻塞写入。只能用 select { case ch <- v: default: },这在 Go 里是"不阻塞的写入尝试"——但和真正的 TryAcquire 语义不完全一致:select 的 default 分支不提供"为什么失败"的信息。

  2. channel 没有 FIFO 保证——Go 的 channel 在多个 goroutine 同时 send 时,唤醒顺序由 runtime 的 sendq 链表决定——它是 FIFO 的。但 channel 的容量满了之后的排队行为是隐式的、不可观测的——你无法检查"当前有多少个 goroutine 在排队等待写入"。semaphore 的 waiter 链表是显式的、可观测的。

结论:channel 信号量适用于"所有任务等权、无需取消、不需要排队信息"的简单场景。一旦任务有权重差异(资源消耗不同)、需要 context 取消、需要排队诊断——semaphore.Weighted 是唯一正解。

# 3. Weighted 核心数据结构

# 3.1 字段逐层解读

golang.org/x/sync/semaphore 的 Weighted 结构体只有 4 个字段——但藏着完整的并发控制逻辑:

// golang.org/x/sync/semaphore/semaphore.go (简化)
type Weighted struct {
    size    int64        // 总容量——信号量的"最大权重"
    cur     int64        // 当前已占用的权重
    mu      sync.Mutex   // 保护 size/cur/waiters 的互斥锁
    waiters list.List    // 等待者链表——FIFO 队列
}
1
2
3
4
5
6
7

每个字段的设计意图:

字段 类型 设计意图
size int64 固定值,在 NewWeighted(n) 时设定——这是信号量的"总资源量"(如 24GB 显存 / 100 个连接)
cur int64 动态值,当前已分配的权重总和——每次 Acquire +n,Release -n
mu sync.Mutex 保护所有字段的全局锁——注意:不是 per-P 的无锁设计(信号量是低频操作,锁竞争不是瓶颈)
waiters list.List 标准库的双向链表——按 Acquire 的调用顺序排队,保证 FIFO 公平性

初始化:

// 创建一个总容量为 24(单位:GB 显存)的加权信号量
sem := semaphore.NewWeighted(24)
1
2

关键设计:size 和 cur 是 int64 而不是 uint64——为什么?因为 Release 可能出错(释放超过已获取的权重),负数可以更容易在测试中暴露而非静默溢出。

# 3.2 waiter 等待节点

当 cur + n > size 时,Acquire 创建一个 waiter 节点插入链表:

// golang.org/x/sync/semaphore/semaphore.go (简化)
type waiter struct {
    n     int64          // 这个等待者需要的权重
    ready chan<- struct{} // 可缓冲的 channel,用于唤醒通知
}
1
2
3
4
5

等待链表的结构:

waiters 链表 (FIFO):
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  waiter          │    │  waiter          │    │  waiter          │
│  n = 6 (heavy)   │───→│  n = 2 (lite)    │───→│  n = 2 (lite)    │
│  ready: buffered │    │  ready: buffered │    │  ready: buffered │
│  chan (cap=1)    │    │  chan (cap=1)    │    │  chan (cap=1)    │
└─────────────────┘    └─────────────────┘    └─────────────────┘
     Goroutine A            Goroutine B            Goroutine C

当前状态: cur = 20, size = 24
剩余容量: size - cur = 4

唤醒判断:
  - waiter A (n=6): 6 > 4 → 不能唤醒,跳过
  - waiter B (n=2): 2 ≤ 4 → 唤醒!cur += 2 → cur=22
  - waiter C (n=2): 2 ≤ 2 → 唤醒!cur += 2 → cur=24
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

为什么 ready channel 是 buffered (cap=1)——避免唤醒时在 close(ch) 上阻塞。Release 在持有 mu 锁时 close ready channel,如果 ready 是无缓冲的且接收方已经因 context 取消退出了,close 会阻塞——死锁。

# 3.3 并发安全保证

Weighted 的并发模型非常简洁:

所有操作都在 mu.Lock() / mu.Unlock() 之间
        │
        ├── Acquire: 只修改 cur 和 waiters 链表
        ├── Release: 只修改 cur 和 close waiter.ready
        └── TryAcquire: 只读 cur,尝试修改 cur

唤醒 goroutine 时:
  不持锁调用 close(ready) —— 避免在锁内做外部操作
  但 close 本身是线程安全的(Go 标准库保证)
1
2
3
4
5
6
7
8
9

goroutine 视角:当 goroutine 调用 Acquire(ctx, n) 而资源不足时——

  1. G 的状态:从 _Grunning → _Gwaiting(等待 ready channel 或 ctx.Done())
  2. 当前 P 被释放——其他 G 可以在这个 P 上执行
  3. 当 Release 触发 close(ready) 时——G 被唤醒,状态回到 _Grunnable

# 4. Acquire 获取流程剖析

# 4.1 快速路径:资源充足

当调用 Acquire(ctx, n) 时,首先检查是否可以直接分配:

// golang.org/x/sync/semaphore/semaphore.go (简化)
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
    // 快速路径:当前剩余容量 ≥ 请求的权重
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil  // ← O(1),只有一次加锁解锁
    }
    // ...
}
1
2
3
4
5
6
7
8
9
10
11

注意快速路径的第二个条件:s.waiters.Len() == 0。即使当前容量足够,如果有等待者在排队,新来的请求也必须排队——这是 FIFO 公平性的保证。

快速路径示例:
  size=24, cur=10, waiters=空
  
  Acquire(ctx, 6):
  → 24 - 10 = 14 ≥ 6 ✓ 且 waiters 为空 ✓
  → cur += 6 → cur = 16
  → return nil
  
  Acquire(ctx, 10):
  → 24 - 16 = 8 < 10 ✗
  → 走慢速路径
1
2
3
4
5
6
7
8
9
10
11

快速路径的代价:只有一次 Lock / Unlock,全程约 ~20ns(无竞争时)。

# 4.2 慢速路径:创建等待者

当资源不足时——创建 waiter 节点,挂到链表末尾,阻塞等待:

// golang.org/x/sync/semaphore/semaphore.go (简化)
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
    }

    // 请求的权重超过总容量 → 永远不可能满足
    if n > s.size {
        s.mu.Unlock()
        <-ctx.Done() // 直接等 context 取消(不用创建 waiter)
        return ctx.Err()
    }

    // 创建等待节点
    ready := make(chan struct{}, 1) // 注意:cap=1
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w) // 插入链表尾部 → FIFO
    s.mu.Unlock()

    // 阻塞等待两个事件之一
    select {
    case <-ctx.Done():          // context 取消
        err := ctx.Err()
        s.mu.Lock()
        select {
        case <-ready:            // 竞态:取消的同时被唤醒了
            err = nil            // → 以获取许可为准(取消作废)
        default:
            // 从链表中移除等待节点
            isFront := s.waiters.Front() == elem
            s.waiters.Remove(elem)
            if isFront {
                // 如果被取消的是队首,尝试唤醒后续等待者
                s.notifyWaiters()
            }
        }
        s.mu.Unlock()
        return err

    case <-ready:               // 被 Release 唤醒
        return nil              // → 许可已获取(Release 时 cur 已加过)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

这段代码里藏着两个精妙的设计:

精妙点 1:唤醒时 cur 已经加过了——Release 在 close(ready) 之前已经把 cur += waiter.n 了(见第 5 章)。所以 case <-ready 分支不需要再操作 cur——直接 return。

精妙点 2:context 取消的竞态处理——内层 select 的 case <-ready(default 分支)处理的是:context 取消的同时,Release 也刚好唤醒了这个 waiter。此时 "获取许可"优先于"取消"——因为资源已经拿到了。

# 4.3 context 取消的处理

疑惑:context 取消时,为什么要检查链表位置(isFront)来决定是否调用 notifyWaiters?

论证:

场景:等待链表中有三个 waiter
  [A: n=6] → [B: n=2] → [C: n=2]
              ↑ B 的 context 被取消

如果直接 Remove(B) 而不唤醒:
  [A: n=6] → [C: n=2]
  
  此时 cur=20, size=24
  → 剩余容量=4 ≥ A.n(6)? 不满足
  → A 继续等待……但 B 撤销后,A 前面的障碍没了!
  → 实际上如果是 C 被取消,A 同样需要被检查

所以 B 取消时:
  - B 是队首(isFront=true) → 可用的 4GB 可能可以给后续等待者
  - B 不是队首(isFront=false) → 不需要 notifyWaiters
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 内部方法:从队首开始扫描,逐个唤醒满足条件的 waiter
func (s *Weighted) notifyWaiters() {
    for {
        elem := s.waiters.Front()
        if elem == nil {
            break // 没有等待者
        }
        w := elem.Value.(waiter)
        if s.size-s.cur < w.n {
            // 队首的 waiter 都无法满足 → 后面的更不可能满足
            // (因为队首的 n 是最大的吗?不一定,但"先来后到"是原则)
            break
        }
        s.cur += w.n
        s.waiters.Remove(elem)
        close(w.ready) // ← 唤醒这个 waiter 的 goroutine
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

结论:context 取消不是简单的"移除节点"——它需要遵循 FIFO 契约:如果取消的是队首,阻塞解除后后续等待者应该被重新评估。

# 4.4 完整状态机

                         Acquire(ctx, n)
                              │
              ┌───────────────┼───────────────┐
              │               │               │
              ▼               ▼               ▼
         n > size ?    容量够且队空?    容量不够或队非空
              │               │               │
              ▼               ▼               ▼
         等 ctx.Done()   cur += n        创建 waiter
              │         return nil      插入链表尾部
              ▼               │               │
         return err             │     select { ready / ctx.Done() }
                                │               │
                                │       ┌───────┴────────┐
                                │       │                │
                                │       ▼                ▼
                                │    ready关闭        ctx.Done()
                                │       │                │
                                │    return nil     获取锁→ 再次检查ready
                                │                      │
                                │              ┌───────┴────────┐
                                │              │                │
                                │              ▼                ▼
                                │           ready已关闭      ready未关闭
                                │              │                │
                                │           return nil    从链表移除→通知
                                │                          return err
                                │
                                └──────────────────────────────────
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 5. Release 释放与唤醒

# 5.1 原子递减与 FIFO 扫描

Release 的逻辑清晰直接——但它在调用 notifyWaiters 之前就已经从 cur 中减去了释放的权重:

// golang.org/x/sync/semaphore/semaphore.go (简化)
func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n            // ← 先归还资源
    if s.cur < 0 {
        s.mu.Unlock()
        panic("semaphore: released more than held")  // 防御性检查
    }
    s.notifyWaiters()     // ← 再尝试唤醒等待者
    s.mu.Unlock()
}
1
2
3
4
5
6
7
8
9
10
11

notifyWaiters 的扫描算法:

Release(4) 前: cur=22, size=24
waiters: [A: n=6] → [B: n=2] → [C: n=6]

Release(4): cur = 22 - 4 = 18 → 剩余 6

扫描:
  📍 检查 waiter A (n=6):
     24 - 18 = 6 ≥ 6 ✓ → 唤醒 A
     cur += 6 → cur = 24
  
  📍 检查 waiter B (n=2):
     24 - 24 = 0 < 2 ✗ → 停止扫描(FIFO原则)
  
  📍 waiter C 不会在这次扫描中被检查
     → 必须等下一次 Release 释放足够容量

结果: 只唤醒了 A,B 和 C 继续等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

关键设计:notifyWaiters 是 "从队首逐一遍历、遇阻即停" 的策略——不是 "扫描整个链表找能放的"。这保证了:先来的 goroutine 一定先获取许可——即使后来的 goroutine 请求更小的权重(更容易满足)。

# 5.2 闭包唤醒的并发语义

唤醒的关键操作是 close(w.ready):

close(w.ready)
1

ready 是 chan struct{},在 waiter 创建时用了 make(chan struct{}, 1)——带 1 个缓冲的 channel。为什么不是无缓冲?

无缓冲 channel 的 close:
  → 如果接收方 goroutine 正好在 select 上等待 → 正常唤醒 ✓
  → 如果接收方 goroutine 因为 context 取消已经退出 select → close 本身成功 ✓
     (Go 允许 close 一个没有接收方的 channel)

但问题是:如果接收方还没有到 select(比如刚创建 waiter、还没 unlock)→ close 也 OK

所以"消息一定能送达"——close 信号是持久的(不可撤销的)。

那么缓冲的作用是什么?
  → 不是为了让 close 不阻塞,而是让 ready 可以作为「信号」而非「数据传输」
  → cap=1 的 channel 被 close 后,后续的 <-ready 永远非阻塞返回零值
1
2
3
4
5
6
7
8
9
10
11
12

goroutine 视角分析:

G1: 调用 Acquire → 资源不足 → 挂到 waiters → select { <-ready / <-ctx.Done() }
    状态: _Grunning → _Gwaiting

G2: 调用 Release(4) → cur -= 4 → notifyWaiters → close(G1.ready)
    → G1 从 _Gwaiting → _Grunnable → 被调度 → 执行 return nil

如果 G1 同时收到了 ctx.Done() 和 ready close:
    → select 伪随机选一个 case
    → 如果选了 ctx.Done(): 内层 select 检查 ready → 已 close → 覆盖为 nil
    → 如果选了 ready: 直接 return nil
1
2
3
4
5
6
7
8
9
10

# 5.3 Release 大于 Acquire 的陷阱

Release 一定要与 Acquire 配对——释放的权重 n 必须精确等于之前获取的权重。如果 Release 的 n 大于实际 Acquire 的 n(比如忘记了一个 defer Release 后又手动 Release 了一次):

sem := semaphore.NewWeighted(10)

// ❌ 危险:Acquire(3) 但 Release(5)
sem.Acquire(ctx, 3)
// ... do work ...
sem.Release(5) // panic: "released more than held" — 如果 cur 变成负数

// ❌ 更隐蔽的 bug:Acquire(3) 两次但 Release(3) 三次
sem.Acquire(ctx, 3)
sem.Acquire(ctx, 3) // cur = 6
// ... do work ...
sem.Release(3) // cur = 3
sem.Release(3) // cur = 0
sem.Release(3) // panic! cur 变成 -3
1
2
3
4
5
6
7
8
9
10
11
12
13
14

防御方案:

// ✅ 用 defer 保证一一对应
func processWithSemaphore(sem *semaphore.Weighted, ctx context.Context, weight int64) error {
    if err := sem.Acquire(ctx, weight); err != nil {
        return err
    }
    defer sem.Release(weight) // ← Acquire 和 Release 的 weight 严格一致
    return doWork()
}
1
2
3
4
5
6
7
8

结论:信号量的正确性依赖调用方的"配对契约"——这和 sync.Mutex 的 Lock/Unlock 一样,Go runtime 不做跨 goroutine 的验证。

# 6. TryAcquire 非阻塞获取

# 6.1 快速无损尝试

TryAcquire 是非阻塞获取——"能拿就拿,拿不到立刻返回":

// golang.org/x/sync/semaphore/semaphore.go (简化)
func (s *Weighted) TryAcquire(n int64) bool {
    s.mu.Lock()
    // 只看当前容量够不够 + 有没有人在排队
    success := s.size-s.cur >= n && s.waiters.Len() == 0
    if success {
        s.cur += n
    }
    s.mu.Unlock()
    return success
}
1
2
3
4
5
6
7
8
9
10
11

和 Acquire 的关键差异:

Acquire TryAcquire
阻塞行为 容量不够时阻塞等待 容量不够时立即返回 false
等待队列 容量够但队非空 → 排队 容量够但队非空 → 返回 false
context 可取消 不需要 context
用途 主流程的并发控制 优化路径 / 降级逻辑

典型使用场景——缓存优先策略:

// 尝试用 GPU 推理(非阻塞),如果 GPU 忙就用 CPU fallback
func infer(image []byte) (Result, error) {
    if sem.TryAcquire(6) { // GPU 有 6GB 空闲 → 用 GPU
        defer sem.Release(6)
        return gpuInfer(image)
    }
    // GPU 忙 → 降级到 CPU
    return cpuInfer(image)
}
1
2
3
4
5
6
7
8
9

# 6.2 与带超时的 Acquire 对比

带超时的 Acquire 可以用 context 实现,但它和 TryAcquire 的语义不同:

// 带超时的 Acquire —— "等 100ms,等不到就算了"
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
err := sem.Acquire(ctx, 6)
if err != nil {
    // 超时或取消
}

// TryAcquire —— "现在有没有?没有就算了"
if sem.TryAcquire(6) {
    defer sem.Release(6)
    // ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13

核心差异:带超时的 Acquire 在 100ms 内可能被 Release 唤醒——它给了一个"等待窗口"。TryAcquire 不给任何等待——"零窗口"。

两者的选择:

  • 低延迟场景(如 HTTP 请求处理)→ TryAcquire + fallback(不给等待时间,立刻降级)
  • 批处理场景(如离线计算任务)→ Acquire + ctx timeout(允许等待几百毫秒)

# 7. Channel 信号量对比

# 7.1 四种场景横向比较

用一个综合性对比表说明两种方案在四种典型并发场景中的表现:

场景 channel 信号量 semaphore.Weighted 胜出
等权并发控制(如限制 10 个 HTTP 并发请求) make(chan struct{}, 10) 简洁直接 semaphore.NewWeighted(10) + Acquire(ctx,1) 稍重 channel ✓
加权并发控制(如 GPU 不同模型显存消耗不同) 无法表达——每个许可权重固定为 1 Acquire(ctx, 6) 一次拿 6 个权重 Weighted ✓
可取消的等待(如 HTTP 请求的 ctx 超时) 必须用 select+ctx.Done()——但队列中的 goroutine 无法被移除 Acquire(ctx, n) → ctx.Done() 时精准移除 waiter Weighted ✓
非阻塞尝试(如缓存命中走快速通道) select { case ch <- v: default: } TryAcquire(n) 平手

# 7.2 加权场景的 Channel 困境

回到第 1 章的 GPU 问题——如果坚持用 channel 信号量,有 workaround 吗?

方案 A:给 heavy 任务分配多个 slot:

// ❌ hack:heavy 任务占 3 个 channel slot
func processHeavy(task ImageTask) {
    gpuSlots <- struct{}{} // 占 1
    gpuSlots <- struct{}{} // 占 2
    gpuSlots <- struct{}{} // 占 3
    defer func() {
        <-gpuSlots
        <-gpuSlots
        <-gpuSlots
    }()
    // ...
}
1
2
3
4
5
6
7
8
9
10
11
12

问题:三次 gpuSlots <- struct{}{} 不是原子的——在第二次和第三次之间,channel 可能满了,goroutine 在第二次发送时阻塞——但第一个 slot 已经占住了——这会导致"部分获取"的状态不一致。而 semaphore.Acquire(ctx, 3) 在 mu 锁的保护下原子完成。

方案 B:创建多个不同容量的 channel:

var heavySlots = make(chan struct{}, 4)  // 最多 4 个 heavy (4×6=24)
var liteSlots  = make(chan struct{}, 12) // 最多 12 个 lite (12×2=24)
1
2

问题:两类任务各抢各的 channel——但它们共享同一个物理 GPU 显存。如果有 3 个 heavy + 6 个 lite 同时跑 = 3×6 + 6×2 = 30GB > 24GB——依然超限。

结论:只要任务是共享同一块资源池且权重不同的——channel 信号量就兜不住。这是 Weighted 信号量存在的根本理由。

# 8. 限流实战模式

# 8.1 固定窗口模式

固定窗口(Fixed Window)是最简单的限流器:在固定时间窗口内(如 1 秒)允许 N 个请求。用信号量实现:

package main

import (
    "context"
    "sync"
    "time"
    "golang.org/x/sync/semaphore"
)

type FixedWindowLimiter struct {
    sem      *semaphore.Weighted
    interval time.Duration
    mu       sync.Mutex
    timer    *time.Timer
}

func NewFixedWindow(limit int64, interval time.Duration) *FixedWindowLimiter {
    return &FixedWindowLimiter{
        sem:      semaphore.NewWeighted(limit),
        interval: interval,
    }
}

func (l *FixedWindowLimiter) Allow(ctx context.Context) (bool, error) {
    // 用 TryAcquire 做非阻塞限流
    if l.sem.TryAcquire(1) {
        // 启动定时器——到期后重置
        l.mu.Lock()
        if l.timer == nil {
            l.timer = time.AfterFunc(l.interval, func() {
                l.sem.Release(l.sem.Size()) // 简化——实际需要更精确的计算
                l.mu.Lock()
                l.timer = nil
                l.mu.Unlock()
            })
        }
        l.mu.Unlock()
        return true, nil
    }
    return false, nil // 当前窗口已满
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

固定窗口的缺点——"边界突变":窗口第 0.9 秒时 100 个请求进来消耗所有许可,第 1.1 秒窗口重置后又来 100 个——在 0.9s~1.1s 这 0.2 秒内,实际有 200 个请求通过(窗口边界处突发翻倍)。

# 8.2 滑动日志模式

滑动日志(Sliding Log)记录了每个请求的时间戳,判断当前时间窗口内请求数:

type SlidingLogLimiter struct {
    mu      sync.Mutex
    log     []time.Time
    limit   int
    window  time.Duration
}

func (l *SlidingLogLimiter) Allow() bool {
    now := time.Now()
    l.mu.Lock()
    defer l.mu.Unlock()

    // 清理过期记录
    cutoff := now.Add(-l.window)
    for len(l.log) > 0 && l.log[0].Before(cutoff) {
        l.log = l.log[1:]
    }

    if len(l.log) < l.limit {
        l.log = append(l.log, now)
        return true
    }
    return false
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

信号量在此处的角色:滑动日志本身不需要信号量,但它可以和信号量组合——信号量控制并发度(正在处理的请求数),滑动日志控制速率(时间窗口内的请求数)。两者是正交的:

              ┌──────────────────┐
请求 →        │  滑动日志限流器   │ → 拒绝(429)
              │  (速率限制)      │
              └──────┬───────────┘
                     │ 通过
                     ▼
              ┌──────────────────┐
              │  加权信号量       │ → 排队等待
              │  (并发度控制)     │
              └──────┬───────────┘
                     │ 获取许可
                     ▼
              ┌──────────────────┐
              │  实际处理逻辑     │
              └──────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 8.3 令牌桶模式

令牌桶(Token Bucket)是生产环境最常用的限流器——以固定速率生成令牌,请求消耗令牌:

type TokenBucketLimiter struct {
    sem      *semaphore.Weighted
    rate     int64         // 每秒生成的令牌数
    capacity int64         // 桶容量(允许的突发)
    interval time.Duration // 令牌生成间隔
}

func NewTokenBucket(rate, capacity int64) *TokenBucketLimiter {
    limiter := &TokenBucketLimiter{
        sem:      semaphore.NewWeighted(capacity),
        rate:     rate,
        capacity: capacity,
        interval: time.Second / time.Duration(rate),
    }
    // 后台 goroutine:定期往桶里放令牌
    go limiter.refillLoop()
    return limiter
}

func (l *TokenBucketLimiter) refillLoop() {
    ticker := time.NewTicker(l.interval)
    defer ticker.Stop()
    for range ticker.C {
        // 释放一个令牌(相当于往桶里放一个)
        // 如果桶满了(cur == size),Release 会 panic
        // → 需要 safeRelease
        l.safeRelease(1)
    }
}

func (l *TokenBucketLimiter) safeRelease(n int64) {
    // 只在桶没满时释放
    if l.sem.TryAcquire(n) {
        // TryAcquire 成功了说明桶有空间 → 但我们是"释放令牌",不是"获取"
        // 实际上 TryAcquire 不适用这个场景
        // → 真正的令牌桶实现需要更精细的控制
        _ = n // 占位
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

注意:上面的实现显示了直接用信号量做令牌桶的局限性——Release 的对称性假设(Release 的 n 必须等于 Acquire 的 n)和令牌桶的"后台生成令牌"模式不太匹配。生产环境推荐使用 golang.org/x/time/rate 包的 rate.Limiter:

import "golang.org/x/time/rate"

limiter := rate.NewLimiter(rate.Limit(100), 200) // 100 QPS,突发 200
if !limiter.Allow() {
    http.Error(w, "rate limited", http.StatusTooManyRequests)
    return
}
1
2
3
4
5
6
7

rate.Limiter 内部使用惰性填充(lazy refill)——不是后台 goroutine 定时放令牌,而是在每次 Allow/Reserve 时实时计算自上次请求以来生成的新令牌数。在信号量的语境下,这是"信号量 + 时间戳"的组合模式。

# 8.4 goroutine 并发度控制

信号量最直接的应用:限制同时执行的 goroutine 数量:

func processAll(tasks []Task) error {
    var (
        sem = semaphore.NewWeighted(10) // 最多 10 个并发
        wg  sync.WaitGroup
    )

    for _, task := range tasks {
        wg.Add(1)
        go func(t Task) {
            defer wg.Done()

            // Acquire 阻塞——直到有可用的并发槽位
            if err := sem.Acquire(context.Background(), 1); err != nil {
                return
            }
            defer sem.Release(1)

            doWork(t)
        }(task)
    }
    wg.Wait()
    return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

和 WaitGroup 的区别:WaitGroup 是"等所有任务结束"——不限并发数。上面的代码是"同时只有 10 个任务在执行,其余排队"——这叫并发度控制。

和 channel 信号量对比:这个场景用 channel 也可以 (make(chan struct{}, 10))——因为每个任务权重都是 1。差异在于:如果用 context 控制超时,Weighted 可以安全取消等待中的 goroutine。

# 9. 连接池资源控制

# 9.1 数据库连接池设计

加权信号量的一个经典应用是数据库连接池——每个连接有"权重"(比如读连接 weight=1,写连接 weight=2):

package main

import (
    "context"
    "database/sql"
    "fmt"
    "golang.org/x/sync/semaphore"
)

type WeightedDBPool struct {
    db        *sql.DB
    sem       *semaphore.Weighted // 总连接权重 = 最大连接数 × 平均权重
    maxConns  int64
}

func NewWeightedDBPool(db *sql.DB, maxConns int64) *WeightedDBPool {
    db.SetMaxOpenConns(int(maxConns))
    return &WeightedDBPool{
        db:       db,
        sem:      semaphore.NewWeighted(maxConns * 2), // 每个连接权重为 1~2
        maxConns: maxConns,
    }
}

// 获取一个读连接(权重 1)
func (p *WeightedDBPool) AcquireReader(ctx context.Context) (*sql.Conn, error) {
    if err := p.sem.Acquire(ctx, 1); err != nil {
        return nil, fmt.Errorf("获取读连接被取消: %w", err)
    }
    conn, err := p.db.Conn(ctx)
    if err != nil {
        p.sem.Release(1) // 获取连接失败 → 归还信号量许可
        return nil, err
    }
    return &trackedConn{Conn: conn, pool: p, weight: 1}, nil
}

// 获取一个写连接(权重 2——需要更多资源/锁)
func (p *WeightedDBPool) AcquireWriter(ctx context.Context) (*sql.Conn, error) {
    if err := p.sem.Acquire(ctx, 2); err != nil {
        return nil, fmt.Errorf("获取写连接被取消: %w", err)
    }
    conn, err := p.db.Conn(ctx)
    if err != nil {
        p.sem.Release(2)
        return nil, err
    }
    return &trackedConn{Conn: conn, pool: p, weight: 2}, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

# 9.2 可中断的获取与归还

trackedConn 包装了标准的 *sql.Conn,在 Close() 时自动归还信号量许可:

type trackedConn struct {
    *sql.Conn
    pool   *WeightedDBPool
    weight int64
    closed bool
}

func (tc *trackedConn) Close() error {
    if tc.closed {
        return nil
    }
    tc.closed = true
    // 归还到连接池(标准操作)
    err := tc.Conn.Close()
    // 归还信号量许可——无论连接关闭是否出错
    tc.pool.sem.Release(tc.weight)
    return err
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

和传统 sql.DB 的区别:

传统 sql.DB:
  db.SetMaxOpenConns(10) → 最多 10 个连接,读和写不区分
  
WeightedDBPool:
  sem.NewWeighted(20) → 总权重 20
  → 10 个读 (weight=1) + 5 个写 (weight=2) = 20 ✓
  → 20 个读 (weight=1)                    = 20 ✓
  → 5 个读 + 7 个写 = 5+14 = 19           ✓
  → 0 个读 + 10 个写 = 20                  ✗ 超过 sql.DB 的 10 个最大连接
  → 需要联合检查:sem 权重 ≤ 20 且 sql.DB 连接数 ≤ 10
1
2
3
4
5
6
7
8
9
10

双重限制的设计:信号量控制权重,sql.DB 控制连接数——两者共同保证资源安全。

# 9.3 诊断与监控

信号量的排队信息可以通过源码中的 waiters 链表获取(但 Waiters 方法需要额外实现):

// 扩展 signal 以支持诊断
type DiagnosableWeighted struct {
    semaphore.Weighted
}

// 近似获取等待中的 goroutine 数量(通过 atomic 计数模拟)
// 注:标准 Weighted 不暴露 waiters 长度,需要自行维护计数器
type MonitoredWeighted struct {
    *semaphore.Weighted
    waitingCount int64 // atomic
    totalAcquired int64 // atomic
    totalReleased int64 // atomic
}

// pprof 诊断:
// curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep "semaphore.Acquire"
// → 看有多少 goroutine 卡在 Acquire 上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

监控指标:

指标 含义 健康值
cur / size 当前利用率 < 80%
等待 goroutine 数 排队长度 ≈ 0
Acquire 延迟 (p99) 获取许可的等待时间 < 10ms
context 取消率 Acquire 中 ctx 被取消的比例 < 5%

# 10. 综合案例串讲

# 10.1 案例真相揭晓

回到第 1 章 GPU 图像处理服务的七个疑问,逐条作答:

疑问 答案
① 加权信号量怎么实现不同权重? 第 3-4 章:Weighted 的 cur 按 n 累加,不按"个数"计算——Acquire(ctx, 6) 占用 6 个单位资源
② Acquire 的 context 取消怎么安全退出? 第 4.3:出队时检查 ctx.Done(),移除 waiter 节点,如果移除的是队首则唤醒后续等待者
③ Release 后怎么 FIFO 唤醒等待者? 第 5 章:notifyWaiters 从链表头开始扫描,满足条件才唤醒,遇阻即停
④ TryAcquire 的语义和带超时 Acquire 的区别? 第 6 章:TryAcquire 是零窗口非阻塞,带超时 Acquire 提供了等待窗口
⑤ channel 信号量 vs 加权信号量的优劣? 第 7 章:等权场景 channel 更简洁,加权/可取消场景 Weighted 唯一正解
⑥ 四种限流器模式怎么选? 第 8 章:固定窗口简单但有边界突变,滑动日志精确但内存高,令牌桶是最佳平衡,并发度控制用信号量
⑦ 连接池怎么结合加权信号量? 第 9 章:信号量控制资源权重,数据库驱动控制物理连接数——双重保证

案例完整根因链条:

GPU 显存 24GB——5 个 heavy 任务同时占用 → 30GB → OOM
  → 根因:channel 信号量 chan struct{}(8) 限的是并发数,不是显存量
  → 每个任务需要的显存不同(2GB vs 6GB)→ 需要加权控制
  → channel 无法表达"这个任务占 3 个许可"
  → 且 channel 没有 context 可取消——排队的 goroutine 即使 ctx 超时也退不出
1
2
3
4
5

修复方案:

// ✅ 加权信号量方案:用显存作为权重单位
var gpuSem = semaphore.NewWeighted(24) // 24GB 显存

type ImageTask struct {
    ImageID   string
    GPUMem    int64 // 需要的显存(GB),由模型类型决定
    ModelType string
}

func processImageV2(ctx context.Context, task ImageTask) error {
    // heavy=3, lite=1 (以 2GB 为 1 个单位权重)
    weight := task.GPUMem / (2 * 1024 * 1024 * 1024) // 或直接配置映射表

    // 可被 ctx 取消的获取
    if err := gpuSem.Acquire(ctx, weight); err != nil {
        return fmt.Errorf("获取 GPU 资源被取消: %w", err)
    }
    defer gpuSem.Release(weight)

    // 执行推理
    return gpuInfer(task)
}

// ✅ 配合令牌桶限流——先限速率、再控并发
var rateLimiter = rate.NewLimiter(50, 100) // 50 QPS,突发 100

func handleRequest(w http.ResponseWriter, r *http.Request) {
    if !rateLimiter.Allow() {
        http.Error(w, "rate limited", 429)
        return
    }
    // ... 调用 processImageV2 ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 10.2 一次 Acquire 的完整旅程

gpuSem.Acquire(ctx, 3)  // 请求 6GB 显存
───────────────────────────────────────────────────────────
        │
        ├─ 加锁 mu.Lock()
        │     │
        │     ├─ 快速路径检查:
        │     │    size=24, cur=20 → 剩余 4GB
        │     │    4 < 3? 不满足 → 跳过快速路径
        │     │
        │     ├─ n > size? 3 ≤ 24 → 不超限
        │     │
        │     ├─ 慢速路径:
        │     │    创建 ready = make(chan struct{}, 1)
        │     │    创建 waiter{n:3, ready: ready}
        │     │    waiters.PushBack(waiter) → 插入链表尾部
        │     │    mu.Unlock()
        │     │
        │     └─ 阻塞等待:
        │           select {
        │           case <-ctx.Done():     ← 如果 context 被取消
        │           case <-ready:          ← 如果被 Release 唤醒
        │           }
        │
        ├─ [假设被 Release 唤醒]
        │     → ready 被 close → select 走 ready 分支
        │     → return nil (此时 cur 已经是 23 了——Release 时加的)
        │
        ├─ [假设 ctx 被取消且没被 Release 唤醒]
        │     → select 走 ctx.Done() 分支
        │     → mu.Lock()
        │     → select { case <-ready: default: }
        │        └─ ready 未 close → default: 真的取消了
        │     → waiters.Remove(waiter_elem)
        │     → 如果是队首 → notifyWaiters()
        │     → return ctx.Err()
        │
        └─ [假设 ctx 被取消但刚好被 Release 唤醒——竞态]
              → select 可能选 ready 分支 → return nil
              → select 可能选 ctx.Done() 分支:
                   → mu.Lock()
                   → select { case <-ready: } → ready 已 close!
                   → err = nil → return nil (资源已获取,取消无效)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 10.3 设计哲学回扣

哲学 1:把"计数型"并发控制升级为"加权型"——让一个许可可以代表"多个单位"

channel 信号量把并发控制简化成一个计数器:N 个槽位 = N 个许可——每个 struct{} 都是一个不可分割的单位。Weighted 信号量把"许可"从"个数"抽象为"权重"——Acquire(3) 不是"拿 3 个许可",而是"拿 3 个单位"。这个抽象让信号量从"并发数上限"变成了"资源上限"——它约束的是总资源消耗,不是 goroutine 数量。

哲学 2:FIFO 公平性防止饥饿——但不能防止"大任务堵队"

Weighted 的 waiters 链表严格按 Acquire 的调用顺序排队——这和 Go channel 的 sendq/recvq FIFO 一致。但 FIFO 不是"最优调度方案"——一个大权重(n=6)的 waiter 排在队首,即使后面有 n=1 的小任务能 fit,也必须等队首先被满足。这是 "公平 vs 吞吐" 的经典权衡:Weighted 选择了公平(不饿死先来者),代价是可能降低吞吐(后续小任务被阻塞)。如果业务需要"小任务优先",需要自己实现优先级调度器。

哲学 3:context 集成让"等待"变得可控——不只是超时,是"随时可取消"

这是 Weighted 相对 channel 信号量的最大优势。在微服务架构中,一个请求可能经过多层 RPC 调用——每层都有一个 context 控制超时。当上游已经超时放弃时,下游等待信号量的 goroutine 必须能安全退出——否则就是 goroutine 泄漏。Weighted 的 await + ctx.Done() 双重 select + 内层竞态检查——保证了"取消"和"唤醒"之间没有死锁窗口。

哲学 4:最小化核心——只用 sync.Mutex + container/list.List + channel

Weighted 只依赖三个 Go 标准库组件:sync.Mutex(锁)、container/list.List(双向链表)、chan struct{}(信号通知)。没有自旋、没有 CAS 循环、没有 runtime 内部 API——这是一个"纯粹在 Go 用户态实现的同步原语"。它的简洁性意味着:你完全可以读得懂它的每一行源码——读完之后,你对"Go 怎么用这些基础组件搭建高级同步原语"会有完整认知。

# 10.4 速查表

信号量三种方案对比:

方案 权重 可取消 TryAcquire FIFO 适用场景
channel 信号量 固定为 1 ❌ (需 select workaround) ⚠️ (select+default) ✅ (隐式) 等权简单并发控制
semaphore.Weighted 可变 int64 ✅ (ctx) ✅ ✅ (显式链表) 加权资源控制、可取消等待
WaitGroup N/A ❌ ❌ N/A 等待所有任务完成

Acquire 三条路径:

路径 条件 复杂度
快速路径 size-cur ≥ n 且 waiters 为空 O(1)
慢速路径(创建 waiter) size-cur < n 或 waiters 非空 O(1) 创建 + 阻塞等待
拒绝路径 n > size O(1) 立刻失败

Release 唤醒策略:

策略 行为
FIFO 链表扫描 从队首开始,逐个判断能否满足
遇阻即停 队首 waiter 无法满足 → 停止扫描
close(ready) 通知 用 cap=1 channel,close 作为一次性信号

四种限流器场景选型:

限流器 核心组件 优点 缺点
固定窗口 信号量 + 定时重置 实现简单 边界突变
滑动日志 时间戳列表 精确 内存高
令牌桶 rate.Limiter(惰性填充) 允许突发 实现复杂
并发度控制 semaphore.Weighted 精确控制 不限速率

诊断命令:

# 查看阻塞在 Acquire 的 goroutine
curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep "semaphore.Acquire"

# 查看 goroutine 总数和阻塞状态
go tool pprof http://localhost:6060/debug/pprof/goroutine

# data race 检测(信号量的并发使用是否正确)
go test -race ./...

# 逃逸分析——确认任务结构体不会意外逃逸
go build -gcflags="-m" .
1
2
3
4
5
6
7
8
9
10
11

下一篇:我们已经掌握了加权信号量的 Acquire/Release 状态机、FIFO 唤醒策略和四种限流模式,下一步进入 14.errgroup并行控制——把并行任务组、错误传播、WithContext 取消级联的原理剖开。

上次更新: 2026/06/13, 21:14:36
Go内存模型一致性
errgroup并行控制

← Go内存模型一致性 errgroup并行控制→

最近更新
01
信号崩溃快速排查
06-15
02
CoreDump破案
06-15
03
perf火焰图实战
06-15
更多文章>
Theme by Vdoing | Copyright © 2019-2026 杨充 | MIT License | 桂ICP备2024034950号 | 桂公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式