加权信号量与限流
# 13.加权信号量与限流
卷三第十三篇——goroutine 是廉价的,但下游资源不是。本篇聚焦
golang.org/x/sync/semaphore加权信号量——它不是锁,而是一个带权重的并发度控制器:Acquire 可以一次拿 N 个许可,Release 按 FIFO 顺序唤醒等待者,context 取消时还能安全退出。读完本篇,你能回答:为什么用 channel 做信号量会撑不住长尾请求?加权信号量怎么做到 O(1) 快速路径 + FIFO 公平唤醒?四种限流器模式各自的适用场景怎么选?关键词:Weighted、加权、FIFO 等待链表、channel 信号量对比、限流器模式、连接池控制。
# 目录介绍
- 1. 案例引入
- 2. 架构概览
- 3. Weighted 核心数据结构
- 4. Acquire 获取流程剖析
- 5. Release 释放与唤醒
- 6. TryAcquire 非阻塞获取
- 7. Channel 信号量对比
- 8. 限流实战模式
- 9. 连接池资源控制
- 10. 综合案例串讲
# 1. 案例引入
# 1.1 一段崩在哪
看一个图像处理微服务——它接收用户上传的图片,调用 GPU 推理服务做内容审核,再把结果写回业务系统。生产环境跑了两个月,某天运营推送了一条大促通知,突然间大量用户同时上传图片,服务崩了:
// image_processor.go —— 图像审核服务
package main
import (
"context"
"fmt"
"sync"
"time"
)
// GPU 推理引擎——单机最多同时处理 8 个任务
var gpuSlots = make(chan struct{}, 8)
type ImageTask struct {
ImageID string
Size int64 // 图片大小,字节
ModelType string // "lite"(2GB显存) 或 "heavy"(6GB显存)
}
func processImage(task ImageTask) error {
// 占一个 GPU 槽位
gpuSlots <- struct{}{}
defer func() { <-gpuSlots }()
// 模拟 GPU 推理
fmt.Printf("[%s] 开始推理: %s (%dB)\n", task.ModelType, task.ImageID, task.Size)
time.Sleep(2 * time.Second)
fmt.Printf("[%s] 完成推理: %s\n", task.ModelType, task.ImageID)
return nil
}
func main() {
var wg sync.WaitGroup
tasks := []ImageTask{
// 假设来了 20 个任务——10 个轻量、10 个重量
{ImageID: "img_001", Size: 102400, ModelType: "lite"},
{ImageID: "img_002", Size: 204800, ModelType: "heavy"},
{ImageID: "img_003", Size: 51200, ModelType: "lite"},
// ...
}
for _, task := range tasks {
wg.Add(1)
go func(t ImageTask) {
defer wg.Done()
if err := processImage(t); err != nil {
fmt.Printf("任务失败: %s, err=%v\n", t.ImageID, err)
}
}()
}
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
现象:
- 平时流量稳定,8 个 GPU 槽位刚好够用,任务平均 2 秒完成
- 大促期间任务量暴增 10 倍,但
gpuSlots的容量仍然是 8 - 表面上看没什么问题——channel 信号量会排队,后续任务等槽位释放即可
- 但诡异的是:GPU 显存报 Out of Memory,进程 OOM Kill——可我们明明限制了 8 个并发
查 GPU 日志——发现有 5 个 "heavy" 任务同时在跑,每个占 6GB 显存:5 × 6 = 30GB,超过 GPU 的 24GB 显存上限。
channel 信号量把每个任务都当"等权的"——不管它是 2GB 的轻量模型还是 6GB 的重量模型。当 8 个槽位里恰好坐了 5 个 heavy 任务,显存就爆了。
# 1.2 顺藤摸到根因
追查过程分三步:
第一步:验证假设。看 GPU 监控——出问题时 Running 任务列表里同时有 5 个 "heavy_model_v3" 进程,每个 RSS 约 6.2GB。但 gpuSlots channel 容量是 8——理论上最多 8 个并发——这里有 5 个确实"合法"。问题不在"并发数超标",在显存超标。
第二步:找为什么 channel 信号量兜不住。核心矛盾:
channel 信号量的「许可」是 1 个 struct{} → 无差别的
但 heavy 任务和 lite 任务的 GPU 显存消耗差别是 3 倍
→ channel 没办法表达「这个任务需要 3 个槽位」
2
3
第三步:看 pprof goroutine 快照——出问题时 goroutine 数量飙到 3000+:
$ curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep -c "gpuSlots"
# 2876 个 goroutine 全部卡在 gpuSlots <- struct{}{}
2
这 2876 个 goroutine 占着栈内存(每个 2KB 起步,共约 6MB+),虽然不多,但伴随着每个 goroutine 携带的 ImageTask 上下文数据(图片元数据、预处理结果)——这些对象因为闭包引用统统逃逸到了堆——pprof heap 显示额外消耗了 ~400MB。
而且更重要的是:这些 goroutine 在排队等待 channel 槽位时,context 超时了也退不出来——channel 的发送操作没有可取消的机制。用户已经超时放弃,goroutine 还在等槽位。
这个事故藏着 7 个原理点:
① 加权信号量怎么让任务按实际资源需求"占不同数量的坑"? → 第 3-4 章
② Acquire(ctx, n) 的 context 取消怎么实现安全退出? → 第 4.3
③ Release 后怎么按 FIFO 顺序唤醒等待者? → 第 5 章
④ TryAcquire 的非阻塞语义是什么? → 第 6 章
⑤ channel 信号量 vs 加权信号量在四种场景下的优劣对比? → 第 7 章
⑥ 信号量怎么组合成四种限流器(固定窗口/滑动窗口/令牌桶/并发度)? → 第 8 章
⑦ 连接池怎么用加权信号量做可被 context 取消的资源获取? → 第 9 章
2
3
4
5
6
7
# 1.3 我们要回答什么
这个 GPU 服务案例贯穿全篇。我们从加权信号量的数据结构出发,深入 Acquire/Release 的状态机、FIFO 唤醒策略、context 取消的竞态处理——再对比 channel 信号量在四种典型并发场景下的优劣——最后给出四种限流器设计模式和连接池实现。第 10 章回到 GPU 服务,用加权信号量 + 令牌桶限流器彻底修复。
本篇路线:
架构总图 (第 2 章) ── 信号量的三种模式 + channel 为什么不够
↓
数据结构 (第 3 章) ── Weighted 字段 + waiter 节点 + 并发保证
↓
Acquire 流程 (第 4 章) ── 快速路径 / 慢速路径 / context 取消
↓
Release 唤醒 (第 5 章) ── FIFO 扫描 + 闭包通知
↓
TryAcquire (第 6 章) ── 非阻塞获取的设计意图
↓
Channel 对比 (第 7 章) ── 四种场景横向评测
↓
限流实战 (第 8 章) ── 四种限流器模式
↓
连接池控制 (第 9 章) ── 数据库连接池 + 诊断
↓
综合案例 (第 10 章) ── 修复 GPU 服务 + 设计哲学
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
📌 本篇定位:这是 Go 并发控制中"流量整形"的核心篇。前面第 09 篇讲了 channel 作为通信原语、第 10 篇讲了 sync.Mutex/WaitGroup 的锁语义——本篇的 semaphore 处于两者之间:它不是锁(不保护临界区),它是并发度控制器(控制同时执行的数量)。理解了加权信号量,第 14 篇的 errgroup 并行任务组就能看到它的影子。
# 2. 架构概览
# 2.1 信号量模式全景
Go 生态中有三种"信号量模式的并发控制"方案,Weighted 处于最灵活的那一层:
信号量并发控制方案
│
┌─────────────────────┼─────────────────────┐
│ │ │
▼ ▼ ▼
channel 信号量 sync.WaitGroup semaphore.Weighted
(容量 = 许可数) (计数 = 任务数) (容量 = 总权重)
│ │ │
每个许可 = 1 等所有任务完成 每个许可可加权
│ │ │
❌ 不可取消 ❌ 不能限并发 ✅ context 可取消
❌ 不可加权 ❌ 不能部分释放 ✅ 支持 TryAcquire
❌ 无 TryAcquire ❌ 无 TryAcquire ✅ FIFO 公平唤醒
2
3
4
5
6
7
8
9
10
11
12
13
加权信号量解决的是**"每个请求的资源消耗不等价"**场景下的并发度控制问题:
传统信号量思维:
有 8 个停车位 → 每辆车占 1 个 → 最多停 8 辆车
加权信号量思维:
GPU 有 24GB 显存 → 轻量任务占 2GB → 重量任务占 6GB
→ 当前已占用 18GB:可以再放 (24-18)/2=3 个轻量,或 (24-18)/6=1 个重量
→ 不是「限制个数」,是「限制总资源量」
2
3
4
5
6
7
下面是 Weighted 信号量的完整数据流图:
Acquire(ctx, 3)
│
▼
┌──────────────────────┐
│ mu.Lock() │
│ cur + n ≤ size ? │
│ 是 → cur += n │ ← 快速路径 O(1)
│ 否 → 创建 waiter │ ← 慢速路径
│ mu.Unlock() │
└──────┬───────────────┘
│
┌──────┴──────────────┐
│ 快速路径(已拿到) │ 慢速路径(阻塞等待)
│ return nil │ 监听 ready channel +
│ │ 监听 ctx.Done()
└──────────────────────┘
│
┌──────┴──────────────────────────┐
│ Release(3) │
│ mu.Lock() │
│ cur -= 3 │
│ 遍历 waiters 链表: │
│ 如果 waiter.n ≤ size - cur │
│ → cur += waiter.n │
│ → close(waiter.ready) ◄── 唤醒等待 goroutine
│ mu.Unlock() │
└─────────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 2.2 为什么 channel 不够用
疑惑:channel 不就是天然的信号量吗?make(chan struct{}, N) 容量就是最大并发度——为什么还需要专门实现一个 Weighted?
论证:
channel 每个元素的"权重"是 1——
ch <- struct{}{}永远只占 1 个槽。如果 heavy 任务需要 3 倍于 lite 任务的资源,channel 无法表达"这个任务占 3 个槽位"的语义——总不能ch<-{}{}{}写三次吧(这既不原子也不优雅)。channel 的发送操作不可通过 context 取消——
ch <- v阻塞时,即使 ctx 已经 Done,goroutine 也退不出来。唯一的 workaround 是用 select:
select {
case ch <- struct{}{}:
// 拿到许可
case <-ctx.Done():
return ctx.Err()
}
2
3
4
5
6
这个 select 只能用于单个 channel 和单个 context——但实际场景中,ctx 的取消还需要"把已经在队列中的等待者移除"——channel 做不到:你往 channel 里塞了一个 struct{},没法"撤回"。semaphore.Weighted 通过维护显式的 waiter 链表实现了取消时精准移除。
channel 无法实现 TryAcquire——没有非阻塞写入。只能用
select { case ch <- v: default: },这在 Go 里是"不阻塞的写入尝试"——但和真正的 TryAcquire 语义不完全一致:select 的 default 分支不提供"为什么失败"的信息。channel 没有 FIFO 保证——Go 的 channel 在多个 goroutine 同时 send 时,唤醒顺序由 runtime 的
sendq链表决定——它是 FIFO 的。但 channel 的容量满了之后的排队行为是隐式的、不可观测的——你无法检查"当前有多少个 goroutine 在排队等待写入"。semaphore 的 waiter 链表是显式的、可观测的。
结论:channel 信号量适用于"所有任务等权、无需取消、不需要排队信息"的简单场景。一旦任务有权重差异(资源消耗不同)、需要 context 取消、需要排队诊断——semaphore.Weighted 是唯一正解。
# 3. Weighted 核心数据结构
# 3.1 字段逐层解读
golang.org/x/sync/semaphore 的 Weighted 结构体只有 4 个字段——但藏着完整的并发控制逻辑:
// golang.org/x/sync/semaphore/semaphore.go (简化)
type Weighted struct {
size int64 // 总容量——信号量的"最大权重"
cur int64 // 当前已占用的权重
mu sync.Mutex // 保护 size/cur/waiters 的互斥锁
waiters list.List // 等待者链表——FIFO 队列
}
2
3
4
5
6
7
每个字段的设计意图:
| 字段 | 类型 | 设计意图 |
|---|---|---|
size | int64 | 固定值,在 NewWeighted(n) 时设定——这是信号量的"总资源量"(如 24GB 显存 / 100 个连接) |
cur | int64 | 动态值,当前已分配的权重总和——每次 Acquire +n,Release -n |
mu | sync.Mutex | 保护所有字段的全局锁——注意:不是 per-P 的无锁设计(信号量是低频操作,锁竞争不是瓶颈) |
waiters | list.List | 标准库的双向链表——按 Acquire 的调用顺序排队,保证 FIFO 公平性 |
初始化:
// 创建一个总容量为 24(单位:GB 显存)的加权信号量
sem := semaphore.NewWeighted(24)
2
关键设计:size 和 cur 是 int64 而不是 uint64——为什么?因为 Release 可能出错(释放超过已获取的权重),负数可以更容易在测试中暴露而非静默溢出。
# 3.2 waiter 等待节点
当 cur + n > size 时,Acquire 创建一个 waiter 节点插入链表:
// golang.org/x/sync/semaphore/semaphore.go (简化)
type waiter struct {
n int64 // 这个等待者需要的权重
ready chan<- struct{} // 可缓冲的 channel,用于唤醒通知
}
2
3
4
5
等待链表的结构:
waiters 链表 (FIFO):
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ waiter │ │ waiter │ │ waiter │
│ n = 6 (heavy) │───→│ n = 2 (lite) │───→│ n = 2 (lite) │
│ ready: buffered │ │ ready: buffered │ │ ready: buffered │
│ chan (cap=1) │ │ chan (cap=1) │ │ chan (cap=1) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Goroutine A Goroutine B Goroutine C
当前状态: cur = 20, size = 24
剩余容量: size - cur = 4
唤醒判断:
- waiter A (n=6): 6 > 4 → 不能唤醒,跳过
- waiter B (n=2): 2 ≤ 4 → 唤醒!cur += 2 → cur=22
- waiter C (n=2): 2 ≤ 2 → 唤醒!cur += 2 → cur=24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
为什么 ready channel 是 buffered (cap=1)——避免唤醒时在 close(ch) 上阻塞。Release 在持有 mu 锁时 close ready channel,如果 ready 是无缓冲的且接收方已经因 context 取消退出了,close 会阻塞——死锁。
# 3.3 并发安全保证
Weighted 的并发模型非常简洁:
所有操作都在 mu.Lock() / mu.Unlock() 之间
│
├── Acquire: 只修改 cur 和 waiters 链表
├── Release: 只修改 cur 和 close waiter.ready
└── TryAcquire: 只读 cur,尝试修改 cur
唤醒 goroutine 时:
不持锁调用 close(ready) —— 避免在锁内做外部操作
但 close 本身是线程安全的(Go 标准库保证)
2
3
4
5
6
7
8
9
goroutine 视角:当 goroutine 调用 Acquire(ctx, n) 而资源不足时——
- G 的状态:从
_Grunning→_Gwaiting(等待 ready channel 或 ctx.Done()) - 当前 P 被释放——其他 G 可以在这个 P 上执行
- 当 Release 触发
close(ready)时——G 被唤醒,状态回到_Grunnable
# 4. Acquire 获取流程剖析
# 4.1 快速路径:资源充足
当调用 Acquire(ctx, n) 时,首先检查是否可以直接分配:
// golang.org/x/sync/semaphore/semaphore.go (简化)
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
// 快速路径:当前剩余容量 ≥ 请求的权重
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil // ← O(1),只有一次加锁解锁
}
// ...
}
2
3
4
5
6
7
8
9
10
11
注意快速路径的第二个条件:s.waiters.Len() == 0。即使当前容量足够,如果有等待者在排队,新来的请求也必须排队——这是 FIFO 公平性的保证。
快速路径示例:
size=24, cur=10, waiters=空
Acquire(ctx, 6):
→ 24 - 10 = 14 ≥ 6 ✓ 且 waiters 为空 ✓
→ cur += 6 → cur = 16
→ return nil
Acquire(ctx, 10):
→ 24 - 16 = 8 < 10 ✗
→ 走慢速路径
2
3
4
5
6
7
8
9
10
11
快速路径的代价:只有一次 Lock / Unlock,全程约 ~20ns(无竞争时)。
# 4.2 慢速路径:创建等待者
当资源不足时——创建 waiter 节点,挂到链表末尾,阻塞等待:
// golang.org/x/sync/semaphore/semaphore.go (简化)
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// 请求的权重超过总容量 → 永远不可能满足
if n > s.size {
s.mu.Unlock()
<-ctx.Done() // 直接等 context 取消(不用创建 waiter)
return ctx.Err()
}
// 创建等待节点
ready := make(chan struct{}, 1) // 注意:cap=1
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w) // 插入链表尾部 → FIFO
s.mu.Unlock()
// 阻塞等待两个事件之一
select {
case <-ctx.Done(): // context 取消
err := ctx.Err()
s.mu.Lock()
select {
case <-ready: // 竞态:取消的同时被唤醒了
err = nil // → 以获取许可为准(取消作废)
default:
// 从链表中移除等待节点
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
if isFront {
// 如果被取消的是队首,尝试唤醒后续等待者
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready: // 被 Release 唤醒
return nil // → 许可已获取(Release 时 cur 已加过)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
这段代码里藏着两个精妙的设计:
精妙点 1:唤醒时 cur 已经加过了——Release 在 close(ready) 之前已经把 cur += waiter.n 了(见第 5 章)。所以 case <-ready 分支不需要再操作 cur——直接 return。
精妙点 2:context 取消的竞态处理——内层 select 的 case <-ready(default 分支)处理的是:context 取消的同时,Release 也刚好唤醒了这个 waiter。此时 "获取许可"优先于"取消"——因为资源已经拿到了。
# 4.3 context 取消的处理
疑惑:context 取消时,为什么要检查链表位置(isFront)来决定是否调用 notifyWaiters?
论证:
场景:等待链表中有三个 waiter
[A: n=6] → [B: n=2] → [C: n=2]
↑ B 的 context 被取消
如果直接 Remove(B) 而不唤醒:
[A: n=6] → [C: n=2]
此时 cur=20, size=24
→ 剩余容量=4 ≥ A.n(6)? 不满足
→ A 继续等待……但 B 撤销后,A 前面的障碍没了!
→ 实际上如果是 C 被取消,A 同样需要被检查
所以 B 取消时:
- B 是队首(isFront=true) → 可用的 4GB 可能可以给后续等待者
- B 不是队首(isFront=false) → 不需要 notifyWaiters
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 内部方法:从队首开始扫描,逐个唤醒满足条件的 waiter
func (s *Weighted) notifyWaiters() {
for {
elem := s.waiters.Front()
if elem == nil {
break // 没有等待者
}
w := elem.Value.(waiter)
if s.size-s.cur < w.n {
// 队首的 waiter 都无法满足 → 后面的更不可能满足
// (因为队首的 n 是最大的吗?不一定,但"先来后到"是原则)
break
}
s.cur += w.n
s.waiters.Remove(elem)
close(w.ready) // ← 唤醒这个 waiter 的 goroutine
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
结论:context 取消不是简单的"移除节点"——它需要遵循 FIFO 契约:如果取消的是队首,阻塞解除后后续等待者应该被重新评估。
# 4.4 完整状态机
Acquire(ctx, n)
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
n > size ? 容量够且队空? 容量不够或队非空
│ │ │
▼ ▼ ▼
等 ctx.Done() cur += n 创建 waiter
│ return nil 插入链表尾部
▼ │ │
return err │ select { ready / ctx.Done() }
│ │
│ ┌───────┴────────┐
│ │ │
│ ▼ ▼
│ ready关闭 ctx.Done()
│ │ │
│ return nil 获取锁→ 再次检查ready
│ │
│ ┌───────┴────────┐
│ │ │
│ ▼ ▼
│ ready已关闭 ready未关闭
│ │ │
│ return nil 从链表移除→通知
│ return err
│
└──────────────────────────────────
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 5. Release 释放与唤醒
# 5.1 原子递减与 FIFO 扫描
Release 的逻辑清晰直接——但它在调用 notifyWaiters 之前就已经从 cur 中减去了释放的权重:
// golang.org/x/sync/semaphore/semaphore.go (简化)
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n // ← 先归还资源
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held") // 防御性检查
}
s.notifyWaiters() // ← 再尝试唤醒等待者
s.mu.Unlock()
}
2
3
4
5
6
7
8
9
10
11
notifyWaiters 的扫描算法:
Release(4) 前: cur=22, size=24
waiters: [A: n=6] → [B: n=2] → [C: n=6]
Release(4): cur = 22 - 4 = 18 → 剩余 6
扫描:
📍 检查 waiter A (n=6):
24 - 18 = 6 ≥ 6 ✓ → 唤醒 A
cur += 6 → cur = 24
📍 检查 waiter B (n=2):
24 - 24 = 0 < 2 ✗ → 停止扫描(FIFO原则)
📍 waiter C 不会在这次扫描中被检查
→ 必须等下一次 Release 释放足够容量
结果: 只唤醒了 A,B 和 C 继续等待
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
关键设计:notifyWaiters 是 "从队首逐一遍历、遇阻即停" 的策略——不是 "扫描整个链表找能放的"。这保证了:先来的 goroutine 一定先获取许可——即使后来的 goroutine 请求更小的权重(更容易满足)。
# 5.2 闭包唤醒的并发语义
唤醒的关键操作是 close(w.ready):
close(w.ready)
ready 是 chan struct{},在 waiter 创建时用了 make(chan struct{}, 1)——带 1 个缓冲的 channel。为什么不是无缓冲?
无缓冲 channel 的 close:
→ 如果接收方 goroutine 正好在 select 上等待 → 正常唤醒 ✓
→ 如果接收方 goroutine 因为 context 取消已经退出 select → close 本身成功 ✓
(Go 允许 close 一个没有接收方的 channel)
但问题是:如果接收方还没有到 select(比如刚创建 waiter、还没 unlock)→ close 也 OK
所以"消息一定能送达"——close 信号是持久的(不可撤销的)。
那么缓冲的作用是什么?
→ 不是为了让 close 不阻塞,而是让 ready 可以作为「信号」而非「数据传输」
→ cap=1 的 channel 被 close 后,后续的 <-ready 永远非阻塞返回零值
2
3
4
5
6
7
8
9
10
11
12
goroutine 视角分析:
G1: 调用 Acquire → 资源不足 → 挂到 waiters → select { <-ready / <-ctx.Done() }
状态: _Grunning → _Gwaiting
G2: 调用 Release(4) → cur -= 4 → notifyWaiters → close(G1.ready)
→ G1 从 _Gwaiting → _Grunnable → 被调度 → 执行 return nil
如果 G1 同时收到了 ctx.Done() 和 ready close:
→ select 伪随机选一个 case
→ 如果选了 ctx.Done(): 内层 select 检查 ready → 已 close → 覆盖为 nil
→ 如果选了 ready: 直接 return nil
2
3
4
5
6
7
8
9
10
# 5.3 Release 大于 Acquire 的陷阱
Release 一定要与 Acquire 配对——释放的权重 n 必须精确等于之前获取的权重。如果 Release 的 n 大于实际 Acquire 的 n(比如忘记了一个 defer Release 后又手动 Release 了一次):
sem := semaphore.NewWeighted(10)
// ❌ 危险:Acquire(3) 但 Release(5)
sem.Acquire(ctx, 3)
// ... do work ...
sem.Release(5) // panic: "released more than held" — 如果 cur 变成负数
// ❌ 更隐蔽的 bug:Acquire(3) 两次但 Release(3) 三次
sem.Acquire(ctx, 3)
sem.Acquire(ctx, 3) // cur = 6
// ... do work ...
sem.Release(3) // cur = 3
sem.Release(3) // cur = 0
sem.Release(3) // panic! cur 变成 -3
2
3
4
5
6
7
8
9
10
11
12
13
14
防御方案:
// ✅ 用 defer 保证一一对应
func processWithSemaphore(sem *semaphore.Weighted, ctx context.Context, weight int64) error {
if err := sem.Acquire(ctx, weight); err != nil {
return err
}
defer sem.Release(weight) // ← Acquire 和 Release 的 weight 严格一致
return doWork()
}
2
3
4
5
6
7
8
结论:信号量的正确性依赖调用方的"配对契约"——这和 sync.Mutex 的 Lock/Unlock 一样,Go runtime 不做跨 goroutine 的验证。
# 6. TryAcquire 非阻塞获取
# 6.1 快速无损尝试
TryAcquire 是非阻塞获取——"能拿就拿,拿不到立刻返回":
// golang.org/x/sync/semaphore/semaphore.go (简化)
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
// 只看当前容量够不够 + 有没有人在排队
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
2
3
4
5
6
7
8
9
10
11
和 Acquire 的关键差异:
| Acquire | TryAcquire | |
|---|---|---|
| 阻塞行为 | 容量不够时阻塞等待 | 容量不够时立即返回 false |
| 等待队列 | 容量够但队非空 → 排队 | 容量够但队非空 → 返回 false |
| context | 可取消 | 不需要 context |
| 用途 | 主流程的并发控制 | 优化路径 / 降级逻辑 |
典型使用场景——缓存优先策略:
// 尝试用 GPU 推理(非阻塞),如果 GPU 忙就用 CPU fallback
func infer(image []byte) (Result, error) {
if sem.TryAcquire(6) { // GPU 有 6GB 空闲 → 用 GPU
defer sem.Release(6)
return gpuInfer(image)
}
// GPU 忙 → 降级到 CPU
return cpuInfer(image)
}
2
3
4
5
6
7
8
9
# 6.2 与带超时的 Acquire 对比
带超时的 Acquire 可以用 context 实现,但它和 TryAcquire 的语义不同:
// 带超时的 Acquire —— "等 100ms,等不到就算了"
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
err := sem.Acquire(ctx, 6)
if err != nil {
// 超时或取消
}
// TryAcquire —— "现在有没有?没有就算了"
if sem.TryAcquire(6) {
defer sem.Release(6)
// ...
}
2
3
4
5
6
7
8
9
10
11
12
13
核心差异:带超时的 Acquire 在 100ms 内可能被 Release 唤醒——它给了一个"等待窗口"。TryAcquire 不给任何等待——"零窗口"。
两者的选择:
- 低延迟场景(如 HTTP 请求处理)→ TryAcquire + fallback(不给等待时间,立刻降级)
- 批处理场景(如离线计算任务)→ Acquire + ctx timeout(允许等待几百毫秒)
# 7. Channel 信号量对比
# 7.1 四种场景横向比较
用一个综合性对比表说明两种方案在四种典型并发场景中的表现:
| 场景 | channel 信号量 | semaphore.Weighted | 胜出 |
|---|---|---|---|
| 等权并发控制(如限制 10 个 HTTP 并发请求) | make(chan struct{}, 10) 简洁直接 | semaphore.NewWeighted(10) + Acquire(ctx,1) 稍重 | channel ✓ |
| 加权并发控制(如 GPU 不同模型显存消耗不同) | 无法表达——每个许可权重固定为 1 | Acquire(ctx, 6) 一次拿 6 个权重 | Weighted ✓ |
| 可取消的等待(如 HTTP 请求的 ctx 超时) | 必须用 select+ctx.Done()——但队列中的 goroutine 无法被移除 | Acquire(ctx, n) → ctx.Done() 时精准移除 waiter | Weighted ✓ |
| 非阻塞尝试(如缓存命中走快速通道) | select { case ch <- v: default: } | TryAcquire(n) | 平手 |
# 7.2 加权场景的 Channel 困境
回到第 1 章的 GPU 问题——如果坚持用 channel 信号量,有 workaround 吗?
方案 A:给 heavy 任务分配多个 slot:
// ❌ hack:heavy 任务占 3 个 channel slot
func processHeavy(task ImageTask) {
gpuSlots <- struct{}{} // 占 1
gpuSlots <- struct{}{} // 占 2
gpuSlots <- struct{}{} // 占 3
defer func() {
<-gpuSlots
<-gpuSlots
<-gpuSlots
}()
// ...
}
2
3
4
5
6
7
8
9
10
11
12
问题:三次 gpuSlots <- struct{}{} 不是原子的——在第二次和第三次之间,channel 可能满了,goroutine 在第二次发送时阻塞——但第一个 slot 已经占住了——这会导致"部分获取"的状态不一致。而 semaphore.Acquire(ctx, 3) 在 mu 锁的保护下原子完成。
方案 B:创建多个不同容量的 channel:
var heavySlots = make(chan struct{}, 4) // 最多 4 个 heavy (4×6=24)
var liteSlots = make(chan struct{}, 12) // 最多 12 个 lite (12×2=24)
2
问题:两类任务各抢各的 channel——但它们共享同一个物理 GPU 显存。如果有 3 个 heavy + 6 个 lite 同时跑 = 3×6 + 6×2 = 30GB > 24GB——依然超限。
结论:只要任务是共享同一块资源池且权重不同的——channel 信号量就兜不住。这是 Weighted 信号量存在的根本理由。
# 8. 限流实战模式
# 8.1 固定窗口模式
固定窗口(Fixed Window)是最简单的限流器:在固定时间窗口内(如 1 秒)允许 N 个请求。用信号量实现:
package main
import (
"context"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
type FixedWindowLimiter struct {
sem *semaphore.Weighted
interval time.Duration
mu sync.Mutex
timer *time.Timer
}
func NewFixedWindow(limit int64, interval time.Duration) *FixedWindowLimiter {
return &FixedWindowLimiter{
sem: semaphore.NewWeighted(limit),
interval: interval,
}
}
func (l *FixedWindowLimiter) Allow(ctx context.Context) (bool, error) {
// 用 TryAcquire 做非阻塞限流
if l.sem.TryAcquire(1) {
// 启动定时器——到期后重置
l.mu.Lock()
if l.timer == nil {
l.timer = time.AfterFunc(l.interval, func() {
l.sem.Release(l.sem.Size()) // 简化——实际需要更精确的计算
l.mu.Lock()
l.timer = nil
l.mu.Unlock()
})
}
l.mu.Unlock()
return true, nil
}
return false, nil // 当前窗口已满
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
固定窗口的缺点——"边界突变":窗口第 0.9 秒时 100 个请求进来消耗所有许可,第 1.1 秒窗口重置后又来 100 个——在 0.9s~1.1s 这 0.2 秒内,实际有 200 个请求通过(窗口边界处突发翻倍)。
# 8.2 滑动日志模式
滑动日志(Sliding Log)记录了每个请求的时间戳,判断当前时间窗口内请求数:
type SlidingLogLimiter struct {
mu sync.Mutex
log []time.Time
limit int
window time.Duration
}
func (l *SlidingLogLimiter) Allow() bool {
now := time.Now()
l.mu.Lock()
defer l.mu.Unlock()
// 清理过期记录
cutoff := now.Add(-l.window)
for len(l.log) > 0 && l.log[0].Before(cutoff) {
l.log = l.log[1:]
}
if len(l.log) < l.limit {
l.log = append(l.log, now)
return true
}
return false
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
信号量在此处的角色:滑动日志本身不需要信号量,但它可以和信号量组合——信号量控制并发度(正在处理的请求数),滑动日志控制速率(时间窗口内的请求数)。两者是正交的:
┌──────────────────┐
请求 → │ 滑动日志限流器 │ → 拒绝(429)
│ (速率限制) │
└──────┬───────────┘
│ 通过
▼
┌──────────────────┐
│ 加权信号量 │ → 排队等待
│ (并发度控制) │
└──────┬───────────┘
│ 获取许可
▼
┌──────────────────┐
│ 实际处理逻辑 │
└──────────────────┘
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 8.3 令牌桶模式
令牌桶(Token Bucket)是生产环境最常用的限流器——以固定速率生成令牌,请求消耗令牌:
type TokenBucketLimiter struct {
sem *semaphore.Weighted
rate int64 // 每秒生成的令牌数
capacity int64 // 桶容量(允许的突发)
interval time.Duration // 令牌生成间隔
}
func NewTokenBucket(rate, capacity int64) *TokenBucketLimiter {
limiter := &TokenBucketLimiter{
sem: semaphore.NewWeighted(capacity),
rate: rate,
capacity: capacity,
interval: time.Second / time.Duration(rate),
}
// 后台 goroutine:定期往桶里放令牌
go limiter.refillLoop()
return limiter
}
func (l *TokenBucketLimiter) refillLoop() {
ticker := time.NewTicker(l.interval)
defer ticker.Stop()
for range ticker.C {
// 释放一个令牌(相当于往桶里放一个)
// 如果桶满了(cur == size),Release 会 panic
// → 需要 safeRelease
l.safeRelease(1)
}
}
func (l *TokenBucketLimiter) safeRelease(n int64) {
// 只在桶没满时释放
if l.sem.TryAcquire(n) {
// TryAcquire 成功了说明桶有空间 → 但我们是"释放令牌",不是"获取"
// 实际上 TryAcquire 不适用这个场景
// → 真正的令牌桶实现需要更精细的控制
_ = n // 占位
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
注意:上面的实现显示了直接用信号量做令牌桶的局限性——Release 的对称性假设(Release 的 n 必须等于 Acquire 的 n)和令牌桶的"后台生成令牌"模式不太匹配。生产环境推荐使用 golang.org/x/time/rate 包的 rate.Limiter:
import "golang.org/x/time/rate"
limiter := rate.NewLimiter(rate.Limit(100), 200) // 100 QPS,突发 200
if !limiter.Allow() {
http.Error(w, "rate limited", http.StatusTooManyRequests)
return
}
2
3
4
5
6
7
rate.Limiter 内部使用惰性填充(lazy refill)——不是后台 goroutine 定时放令牌,而是在每次 Allow/Reserve 时实时计算自上次请求以来生成的新令牌数。在信号量的语境下,这是"信号量 + 时间戳"的组合模式。
# 8.4 goroutine 并发度控制
信号量最直接的应用:限制同时执行的 goroutine 数量:
func processAll(tasks []Task) error {
var (
sem = semaphore.NewWeighted(10) // 最多 10 个并发
wg sync.WaitGroup
)
for _, task := range tasks {
wg.Add(1)
go func(t Task) {
defer wg.Done()
// Acquire 阻塞——直到有可用的并发槽位
if err := sem.Acquire(context.Background(), 1); err != nil {
return
}
defer sem.Release(1)
doWork(t)
}(task)
}
wg.Wait()
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
和 WaitGroup 的区别:WaitGroup 是"等所有任务结束"——不限并发数。上面的代码是"同时只有 10 个任务在执行,其余排队"——这叫并发度控制。
和 channel 信号量对比:这个场景用 channel 也可以 (make(chan struct{}, 10))——因为每个任务权重都是 1。差异在于:如果用 context 控制超时,Weighted 可以安全取消等待中的 goroutine。
# 9. 连接池资源控制
# 9.1 数据库连接池设计
加权信号量的一个经典应用是数据库连接池——每个连接有"权重"(比如读连接 weight=1,写连接 weight=2):
package main
import (
"context"
"database/sql"
"fmt"
"golang.org/x/sync/semaphore"
)
type WeightedDBPool struct {
db *sql.DB
sem *semaphore.Weighted // 总连接权重 = 最大连接数 × 平均权重
maxConns int64
}
func NewWeightedDBPool(db *sql.DB, maxConns int64) *WeightedDBPool {
db.SetMaxOpenConns(int(maxConns))
return &WeightedDBPool{
db: db,
sem: semaphore.NewWeighted(maxConns * 2), // 每个连接权重为 1~2
maxConns: maxConns,
}
}
// 获取一个读连接(权重 1)
func (p *WeightedDBPool) AcquireReader(ctx context.Context) (*sql.Conn, error) {
if err := p.sem.Acquire(ctx, 1); err != nil {
return nil, fmt.Errorf("获取读连接被取消: %w", err)
}
conn, err := p.db.Conn(ctx)
if err != nil {
p.sem.Release(1) // 获取连接失败 → 归还信号量许可
return nil, err
}
return &trackedConn{Conn: conn, pool: p, weight: 1}, nil
}
// 获取一个写连接(权重 2——需要更多资源/锁)
func (p *WeightedDBPool) AcquireWriter(ctx context.Context) (*sql.Conn, error) {
if err := p.sem.Acquire(ctx, 2); err != nil {
return nil, fmt.Errorf("获取写连接被取消: %w", err)
}
conn, err := p.db.Conn(ctx)
if err != nil {
p.sem.Release(2)
return nil, err
}
return &trackedConn{Conn: conn, pool: p, weight: 2}, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 9.2 可中断的获取与归还
trackedConn 包装了标准的 *sql.Conn,在 Close() 时自动归还信号量许可:
type trackedConn struct {
*sql.Conn
pool *WeightedDBPool
weight int64
closed bool
}
func (tc *trackedConn) Close() error {
if tc.closed {
return nil
}
tc.closed = true
// 归还到连接池(标准操作)
err := tc.Conn.Close()
// 归还信号量许可——无论连接关闭是否出错
tc.pool.sem.Release(tc.weight)
return err
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
和传统 sql.DB 的区别:
传统 sql.DB:
db.SetMaxOpenConns(10) → 最多 10 个连接,读和写不区分
WeightedDBPool:
sem.NewWeighted(20) → 总权重 20
→ 10 个读 (weight=1) + 5 个写 (weight=2) = 20 ✓
→ 20 个读 (weight=1) = 20 ✓
→ 5 个读 + 7 个写 = 5+14 = 19 ✓
→ 0 个读 + 10 个写 = 20 ✗ 超过 sql.DB 的 10 个最大连接
→ 需要联合检查:sem 权重 ≤ 20 且 sql.DB 连接数 ≤ 10
2
3
4
5
6
7
8
9
10
双重限制的设计:信号量控制权重,sql.DB 控制连接数——两者共同保证资源安全。
# 9.3 诊断与监控
信号量的排队信息可以通过源码中的 waiters 链表获取(但 Waiters 方法需要额外实现):
// 扩展 signal 以支持诊断
type DiagnosableWeighted struct {
semaphore.Weighted
}
// 近似获取等待中的 goroutine 数量(通过 atomic 计数模拟)
// 注:标准 Weighted 不暴露 waiters 长度,需要自行维护计数器
type MonitoredWeighted struct {
*semaphore.Weighted
waitingCount int64 // atomic
totalAcquired int64 // atomic
totalReleased int64 // atomic
}
// pprof 诊断:
// curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep "semaphore.Acquire"
// → 看有多少 goroutine 卡在 Acquire 上
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
监控指标:
| 指标 | 含义 | 健康值 |
|---|---|---|
cur / size | 当前利用率 | < 80% |
| 等待 goroutine 数 | 排队长度 | ≈ 0 |
| Acquire 延迟 (p99) | 获取许可的等待时间 | < 10ms |
| context 取消率 | Acquire 中 ctx 被取消的比例 | < 5% |
# 10. 综合案例串讲
# 10.1 案例真相揭晓
回到第 1 章 GPU 图像处理服务的七个疑问,逐条作答:
| 疑问 | 答案 |
|---|---|
| ① 加权信号量怎么实现不同权重? | 第 3-4 章:Weighted 的 cur 按 n 累加,不按"个数"计算——Acquire(ctx, 6) 占用 6 个单位资源 |
| ② Acquire 的 context 取消怎么安全退出? | 第 4.3:出队时检查 ctx.Done(),移除 waiter 节点,如果移除的是队首则唤醒后续等待者 |
| ③ Release 后怎么 FIFO 唤醒等待者? | 第 5 章:notifyWaiters 从链表头开始扫描,满足条件才唤醒,遇阻即停 |
| ④ TryAcquire 的语义和带超时 Acquire 的区别? | 第 6 章:TryAcquire 是零窗口非阻塞,带超时 Acquire 提供了等待窗口 |
| ⑤ channel 信号量 vs 加权信号量的优劣? | 第 7 章:等权场景 channel 更简洁,加权/可取消场景 Weighted 唯一正解 |
| ⑥ 四种限流器模式怎么选? | 第 8 章:固定窗口简单但有边界突变,滑动日志精确但内存高,令牌桶是最佳平衡,并发度控制用信号量 |
| ⑦ 连接池怎么结合加权信号量? | 第 9 章:信号量控制资源权重,数据库驱动控制物理连接数——双重保证 |
案例完整根因链条:
GPU 显存 24GB——5 个 heavy 任务同时占用 → 30GB → OOM
→ 根因:channel 信号量 chan struct{}(8) 限的是并发数,不是显存量
→ 每个任务需要的显存不同(2GB vs 6GB)→ 需要加权控制
→ channel 无法表达"这个任务占 3 个许可"
→ 且 channel 没有 context 可取消——排队的 goroutine 即使 ctx 超时也退不出
2
3
4
5
修复方案:
// ✅ 加权信号量方案:用显存作为权重单位
var gpuSem = semaphore.NewWeighted(24) // 24GB 显存
type ImageTask struct {
ImageID string
GPUMem int64 // 需要的显存(GB),由模型类型决定
ModelType string
}
func processImageV2(ctx context.Context, task ImageTask) error {
// heavy=3, lite=1 (以 2GB 为 1 个单位权重)
weight := task.GPUMem / (2 * 1024 * 1024 * 1024) // 或直接配置映射表
// 可被 ctx 取消的获取
if err := gpuSem.Acquire(ctx, weight); err != nil {
return fmt.Errorf("获取 GPU 资源被取消: %w", err)
}
defer gpuSem.Release(weight)
// 执行推理
return gpuInfer(task)
}
// ✅ 配合令牌桶限流——先限速率、再控并发
var rateLimiter = rate.NewLimiter(50, 100) // 50 QPS,突发 100
func handleRequest(w http.ResponseWriter, r *http.Request) {
if !rateLimiter.Allow() {
http.Error(w, "rate limited", 429)
return
}
// ... 调用 processImageV2 ...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 10.2 一次 Acquire 的完整旅程
gpuSem.Acquire(ctx, 3) // 请求 6GB 显存
───────────────────────────────────────────────────────────
│
├─ 加锁 mu.Lock()
│ │
│ ├─ 快速路径检查:
│ │ size=24, cur=20 → 剩余 4GB
│ │ 4 < 3? 不满足 → 跳过快速路径
│ │
│ ├─ n > size? 3 ≤ 24 → 不超限
│ │
│ ├─ 慢速路径:
│ │ 创建 ready = make(chan struct{}, 1)
│ │ 创建 waiter{n:3, ready: ready}
│ │ waiters.PushBack(waiter) → 插入链表尾部
│ │ mu.Unlock()
│ │
│ └─ 阻塞等待:
│ select {
│ case <-ctx.Done(): ← 如果 context 被取消
│ case <-ready: ← 如果被 Release 唤醒
│ }
│
├─ [假设被 Release 唤醒]
│ → ready 被 close → select 走 ready 分支
│ → return nil (此时 cur 已经是 23 了——Release 时加的)
│
├─ [假设 ctx 被取消且没被 Release 唤醒]
│ → select 走 ctx.Done() 分支
│ → mu.Lock()
│ → select { case <-ready: default: }
│ └─ ready 未 close → default: 真的取消了
│ → waiters.Remove(waiter_elem)
│ → 如果是队首 → notifyWaiters()
│ → return ctx.Err()
│
└─ [假设 ctx 被取消但刚好被 Release 唤醒——竞态]
→ select 可能选 ready 分支 → return nil
→ select 可能选 ctx.Done() 分支:
→ mu.Lock()
→ select { case <-ready: } → ready 已 close!
→ err = nil → return nil (资源已获取,取消无效)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 10.3 设计哲学回扣
哲学 1:把"计数型"并发控制升级为"加权型"——让一个许可可以代表"多个单位"
channel 信号量把并发控制简化成一个计数器:N 个槽位 = N 个许可——每个 struct{} 都是一个不可分割的单位。Weighted 信号量把"许可"从"个数"抽象为"权重"——Acquire(3) 不是"拿 3 个许可",而是"拿 3 个单位"。这个抽象让信号量从"并发数上限"变成了"资源上限"——它约束的是总资源消耗,不是 goroutine 数量。
哲学 2:FIFO 公平性防止饥饿——但不能防止"大任务堵队"
Weighted 的 waiters 链表严格按 Acquire 的调用顺序排队——这和 Go channel 的 sendq/recvq FIFO 一致。但 FIFO 不是"最优调度方案"——一个大权重(n=6)的 waiter 排在队首,即使后面有 n=1 的小任务能 fit,也必须等队首先被满足。这是 "公平 vs 吞吐" 的经典权衡:Weighted 选择了公平(不饿死先来者),代价是可能降低吞吐(后续小任务被阻塞)。如果业务需要"小任务优先",需要自己实现优先级调度器。
哲学 3:context 集成让"等待"变得可控——不只是超时,是"随时可取消"
这是 Weighted 相对 channel 信号量的最大优势。在微服务架构中,一个请求可能经过多层 RPC 调用——每层都有一个 context 控制超时。当上游已经超时放弃时,下游等待信号量的 goroutine 必须能安全退出——否则就是 goroutine 泄漏。Weighted 的 await + ctx.Done() 双重 select + 内层竞态检查——保证了"取消"和"唤醒"之间没有死锁窗口。
哲学 4:最小化核心——只用 sync.Mutex + container/list.List + channel
Weighted 只依赖三个 Go 标准库组件:sync.Mutex(锁)、container/list.List(双向链表)、chan struct{}(信号通知)。没有自旋、没有 CAS 循环、没有 runtime 内部 API——这是一个"纯粹在 Go 用户态实现的同步原语"。它的简洁性意味着:你完全可以读得懂它的每一行源码——读完之后,你对"Go 怎么用这些基础组件搭建高级同步原语"会有完整认知。
# 10.4 速查表
信号量三种方案对比:
| 方案 | 权重 | 可取消 | TryAcquire | FIFO | 适用场景 |
|---|---|---|---|---|---|
| channel 信号量 | 固定为 1 | ❌ (需 select workaround) | ⚠️ (select+default) | ✅ (隐式) | 等权简单并发控制 |
| semaphore.Weighted | 可变 int64 | ✅ (ctx) | ✅ | ✅ (显式链表) | 加权资源控制、可取消等待 |
| WaitGroup | N/A | ❌ | ❌ | N/A | 等待所有任务完成 |
Acquire 三条路径:
| 路径 | 条件 | 复杂度 |
|---|---|---|
| 快速路径 | size-cur ≥ n 且 waiters 为空 | O(1) |
| 慢速路径(创建 waiter) | size-cur < n 或 waiters 非空 | O(1) 创建 + 阻塞等待 |
| 拒绝路径 | n > size | O(1) 立刻失败 |
Release 唤醒策略:
| 策略 | 行为 |
|---|---|
| FIFO 链表扫描 | 从队首开始,逐个判断能否满足 |
| 遇阻即停 | 队首 waiter 无法满足 → 停止扫描 |
| close(ready) 通知 | 用 cap=1 channel,close 作为一次性信号 |
四种限流器场景选型:
| 限流器 | 核心组件 | 优点 | 缺点 |
|---|---|---|---|
| 固定窗口 | 信号量 + 定时重置 | 实现简单 | 边界突变 |
| 滑动日志 | 时间戳列表 | 精确 | 内存高 |
| 令牌桶 | rate.Limiter(惰性填充) | 允许突发 | 实现复杂 |
| 并发度控制 | semaphore.Weighted | 精确控制 | 不限速率 |
诊断命令:
# 查看阻塞在 Acquire 的 goroutine
curl http://localhost:6060/debug/pprof/goroutine?debug=2 | grep "semaphore.Acquire"
# 查看 goroutine 总数和阻塞状态
go tool pprof http://localhost:6060/debug/pprof/goroutine
# data race 检测(信号量的并发使用是否正确)
go test -race ./...
# 逃逸分析——确认任务结构体不会意外逃逸
go build -gcflags="-m" .
2
3
4
5
6
7
8
9
10
11
下一篇:我们已经掌握了加权信号量的 Acquire/Release 状态机、FIFO 唤醒策略和四种限流模式,下一步进入 14.errgroup并行控制——把并行任务组、错误传播、WithContext 取消级联的原理剖开。