同步sync包
# 第 14 章 同步 sync 包
当 channel 不够"轻",就用
sync包:互斥锁、读写锁、原子操作、对象池、一次性初始化。 关键词:Mutex/RWMutex/Once/WaitGroup/atomic/sync.Pool/sync.Map/sync.Cond
# 目录介绍
- 14.1 本章学习目标
- 14.2 何时用 sync,何时用 channel
- 14.3
sync.Mutex:互斥锁 - 14.4
sync.RWMutex:读多写少场景 - 14.5
sync.Once:只执行一次 - 14.6
sync.WaitGroup:等多个 goroutine 完成 - 14.7
sync/atomic:无锁原子操作 - 14.8
sync.Pool:临时对象池 - 14.9
sync.Map:并发安全的 map - 14.10
sync.Cond:条件变量 - 14.11 综合示例:并发安全的计数器与缓存
- 14.12 本章底层原理(简介)
- 14.13 Go 新手陷阱 Top 5
- 14.14 思考题
- 14.15 训练题
- 14.16 推荐阅读
# 14.1 本章学习目标
学完本章你应当能够:
- ✅ 能根据"读多/写少""临时/长期""锁粒度"决定用 channel 还是 sync
- ✅ 能用
sync.Mutex/sync.RWMutex保护共享数据,写出正确的Lock/Unlock配对 - ✅ 知道
Mutex不可重入、不可复制,并用 defer 防止忘记 unlock - ✅ 能用
sync.Once实现线程安全的单例初始化 - ✅ 能用
sync/atomic做无锁计数和 CAS 操作,理解何时比 Mutex 更快 - ✅ 知道
sync.Pool的对象会被 GC 清掉——不能当持久缓存用 - ✅ 知道
sync.Map的适用场景(读多写少 + key 集合稳定),而不是替代普通 map
本章是 Go 并发的底层工具包。第 13 章的 channel 是"通信",本章的 sync 是"同步"——两者互补。
# 14.2 何时用 sync,何时用 channel
第 12 章的 goroutine + 第 13 章的 channel 组成"通过通信共享内存"。但并非所有场景都适合 channel——有时候一把锁更直接。
sync vs channel 决策矩阵:
| 场景 | 推荐 | 原因 |
|---|---|---|
| 保护一个共享字段(计数器、状态) | Mutex 或 atomic | channel "传数据"的语义太重 |
| goroutine 间传递数据流 | channel | 天然的生产者-消费者语义 |
| 等待一批 goroutine 完成 | WaitGroup | 比 done chan struct{} 更轻 |
| 多个 goroutine 读、少量写 | RWMutex | 读锁不互斥——读并发 |
| 单例初始化 | Once | 一次初始化是它的精确语义 |
| 临时对象复用 | Pool | GC 可清空池——仅临时复用 |
| 复杂的状态机、多个条件等待 | channel + select | sync.Cond 也可,但 channel 更 Go 风格 |
# 14.2.1 综合案例与思考
综合案例:同需求两种实现对比
package main
import (
"fmt"
"sync"
)
// 需求:goroutine 安全的计数器
// 方案 A:Mutex——最直观
type CounterA struct {
mu sync.Mutex
value int
}
func (c *CounterA) Inc() {
c.mu.Lock()
c.value++
c.mu.Unlock()
}
func (c *CounterA) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// 方案 B:channel——通过通信同步
type CounterB struct {
incCh chan struct{}
readCh chan chan int
}
func NewCounterB() *CounterB {
c := &CounterB{
incCh: make(chan struct{}),
readCh: make(chan chan int),
}
go func() {
var value int
for {
select {
case <-c.incCh:
value++
case replyCh := <-c.readCh:
replyCh <- value
}
}
}()
return c
}
func (c *CounterB) Inc() { c.incCh <- struct{}{} }
func (c *CounterB) Value() int {
reply := make(chan int)
c.readCh <- reply
return <-reply
}
func main() {
// 方案 A——Mutex:3 行核心代码,简洁直观
ca := &CounterA{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() { defer wg.Done(); ca.Inc() }()
}
wg.Wait()
fmt.Println("Mutex:", ca.Value())
// 方案 B——channel:需要额外的 goroutine + select 循环
cb := NewCounterB()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() { defer wg.Done(); cb.Inc() }()
}
wg.Wait()
fmt.Println("channel:", cb.Value())
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
案例知识融合:同一个"计数器"需求,Mutex 方案 3 行核心代码——锁住、修改、解锁。Channel 方案需要额外 goroutine + select 循环 + 双向 channel 回复。不是说 channel 不好——当需求是"传递数据流"时 channel 更优雅。但当需求是"保护一个共享变量"时,Mutex 是最直接的表达。
思考题:
- 上面的 channel 版计数器有 goroutine 泄漏吗?
NewCounterB里的 goroutine 什么时候退出? - 如果计数器不只有
Inc,还有Dec、Reset、Snapshot——用 channel 实现会比 Mutex 更清晰吗?
# 14.3 sync.Mutex:互斥锁
# 14.3.1 基本用法 Lock / Unlock
Mutex 保证同一时刻只有一个 goroutine 进入临界区:
var (
mu sync.Mutex
balance int
)
func Deposit(amount int) {
mu.Lock() // 获取锁——如果被占用则阻塞
balance += amount // 临界区——同一时间只有一个 goroutine 执行这里
mu.Unlock() // 释放锁
}
func Balance() int {
mu.Lock()
defer mu.Unlock()
return balance
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 14.3.2 配合 defer 的标准写法
必须 defer Unlock——防止 panic 或提前 return 导致锁永远不释放:
// ✅ 标准写法——defer Unlock 保证锁一定释放
func criticalSection() {
mu.Lock()
defer mu.Unlock() // ★ 无论如何都会执行
if someCondition {
return // 提前返回——defer 保证 unlock
}
// ... 可能 panic 的代码——defer 保证 unlock
}
// ❌ 危险写法——if 分支忘记 unlock
func badExample() {
mu.Lock()
if someCondition {
return // 忘记 Unlock!→ 死锁
}
mu.Unlock()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 14.3.3 TryLock(Go 1.18+)
TryLock 尝试获取锁——不阻塞,立即返回是否成功:
// Go 1.18+
if mu.TryLock() {
defer mu.Unlock()
// 成功获取——执行临界区
} else {
// 没拿到锁——做其他事情,不阻塞等待
fmt.Println("锁被占用,跳过")
}
2
3
4
5
6
7
8
适用场景:非阻塞的"尽力而为"操作——拿到锁就做,拿不到也不等。
# 14.3.4 不可复制与不可重入
Go 的 Mutex 不可复制——复制后新旧 Mutex 状态独立,锁失效:
type Container struct {
mu sync.Mutex // ❌ 如果 Container 被值传递——mu 被复制
data int
}
func (c Container) Bad() { // 值接收者——c 是副本!
c.mu.Lock() // 锁住的是副本的 mu——原始 mu 没锁
c.data++ // 修改的是副本的 data——原始 data 不变
c.mu.Unlock()
}
// ✅ 用指针接收者——或把 mutex 设为指针类型
func (c *Container) Good() {
c.mu.Lock()
c.data++
c.mu.Unlock()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Mutex 不可重入——同一 goroutine 不能 Lock 两次(会死锁):
mu.Lock()
mu.Lock() // 死锁!——同一个 goroutine 对同一个锁 Lock 两次
2
# 14.3.5 综合案例与思考
综合案例:银行账户转账——需要锁两个账户
package main
import (
"fmt"
"sync"
)
type Account struct {
mu sync.Mutex
balance int
}
// ❌ 错误写法:Lock A → Lock B——可能死锁
func TransferBad(from, to *Account, amount int) {
from.mu.Lock()
to.mu.Lock() // 如果同时转账 A→B 和 B→A → 死锁!
from.balance -= amount
to.balance += amount
to.mu.Unlock()
from.mu.Unlock()
}
// ✅ 正确写法:按地址排序锁定——避免死锁
func Transfer(from, to *Account, amount int) {
// ★ 总是先锁地址小的那个
if from == to {
return
}
first, second := from, to
if fmt.Sprintf("%p", from) > fmt.Sprintf("%p", to) {
first, second = to, from
}
first.mu.Lock()
second.mu.Lock()
if from.balance >= amount {
from.balance -= amount
to.balance += amount
}
second.mu.Unlock()
first.mu.Unlock()
}
func main() {
a := &Account{balance: 1000}
b := &Account{balance: 500}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() { defer wg.Done(); Transfer(a, b, 10) }()
}
wg.Wait()
fmt.Println("A:", a.balance) // 0
fmt.Println("B:", b.balance) // 1500
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
案例知识融合:这个案例展示了多锁场景的核心问题——锁顺序。Lock A → Lock B 和 Lock B → Lock A 同时发生会死锁。解决方案是按固定顺序(如地址大小)锁定——所有 goroutine 遵循同一顺序,死锁就不会发生。
思考题:
- 如果转账涉及 3 个账户(A→B→C),锁顺序策略还简单吗?
TryLock能解决死锁问题吗?如果能,怎么写?
# 14.4 sync.RWMutex:读多写少场景
# 14.4.1 RLock / RUnlock vs Lock / Unlock
RWMutex 区分读锁和写锁——多个读锁可共存,写锁独占:
| 持有锁 | 新的 RLock? | 新的 Lock? |
|---|---|---|
| 无锁 | ✅ 成功 | ✅ 成功 |
| 已有 RLock(读锁) | ✅ 成功(多个读共存) | ❌ 阻塞等待 |
| 已有 Lock(写锁) | ❌ 阻塞等待 | ❌ 阻塞等待 |
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) string {
c.mu.RLock() // 读锁——不会阻塞其他 Get
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁——阻塞所有读和写
defer c.mu.Unlock()
c.data[key] = value
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 14.4.2 读写锁的优先级与公平性
Go 的 RWMutex 优先级:当写锁在等待时,后续的读锁会被阻塞——防止写锁"饿死"。
时间线:
goroutine A: RLock (成功)
goroutine B: RLock (成功——读锁共存)
goroutine C: Lock (等待——A 和 B 释放后)
goroutine D: RLock (等待!——C 在排队,D 不能插队)
2
3
4
5
为什么——如果 D 持续来读锁,C 永远拿不到写锁——写锁饥饿。
# 14.4.3 综合案例与思考
综合案例:并发安全配置管理器
package main
import (
"fmt"
"sync"
"time"
)
type Config struct {
mu sync.RWMutex
values map[string]string
}
func NewConfig() *Config {
return &Config{values: make(map[string]string)}
}
func (c *Config) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.values[key]
}
func (c *Config) GetAll() map[string]string {
c.mu.RLock()
defer c.mu.RUnlock()
// 拷贝——避免外面拿到 map 后无锁访问
result := make(map[string]string, len(c.values))
for k, v := range c.values {
result[k] = v
}
return result
}
func (c *Config) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.values[key] = value
}
func main() {
cfg := NewConfig()
cfg.Set("host", "localhost")
// 100 个读 goroutine
for i := 0; i < 100; i++ {
go func(id int) {
for {
fmt.Printf("reader %d: %s\n", id, cfg.Get("host"))
time.Sleep(10 * time.Millisecond)
}
}(i)
}
// 1 个写 goroutine——每 100ms 更新
go func() {
for i := 0; ; i++ {
time.Sleep(100 * time.Millisecond)
cfg.Set("host", fmt.Sprintf("server-%d", i))
fmt.Printf("writer: updated to server-%d\n", i)
}
}()
time.Sleep(time.Second)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
思考题:
GetAll中拷贝 map 是必要的吗?不拷贝会有什么风险?- Go 的 RWMutex 写锁等待时阻塞新读锁——这和 Java 的
ReentrantReadWriteLock默认行为不同。为什么 Go 这么设计?
# 14.5 sync.Once:只执行一次
# 14.5.1 经典用法:单例初始化
Once.Do 保证传入的函数只被执行一次——即使多个 goroutine 同时调用:
var (
once sync.Once
instance *Database
)
func GetDatabase() *Database {
once.Do(func() {
// 这段代码只执行一次——无论多少个 goroutine 并发调用
instance = &Database{
conn: connectToDB(),
}
})
return instance
}
func main() {
// 100 个 goroutine 并发调用——connectToDB 只执行一次
for i := 0; i < 100; i++ {
go GetDatabase()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 14.5.2 OnceFunc / OnceValue / OnceValues(Go 1.21+)
Go 1.21 引入了三个便利函数——不再需要手动写 once.Do:
// OnceFunc——包装一个无返回值的函数
initDB := sync.OnceFunc(func() {
db = connectToDB()
})
// OnceValue——包装一个有返回值的函数
getConfig := sync.OnceValue(func() *Config {
return loadConfig("/etc/app.yaml")
})
// OnceValues——包装返回两个值的函数
getSecrets := sync.OnceValues(func() (string, string, error) {
return loadSecrets()
})
// 使用——多次调用,函数只执行一次
func main() {
initDB()
initDB() // 不执行
cfg := getConfig()
cfg2 := getConfig() // 返回同一个 cfg
key, secret, err := getSecrets() // 返回缓存的值
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 14.5.3 综合案例与思考
综合案例:懒加载 + 错误处理的单例
package main
import (
"errors"
"fmt"
"sync"
)
type Service struct {
name string
}
var (
service *Service
serviceErr error
serviceOnce sync.Once
)
func GetService() (*Service, error) {
serviceOnce.Do(func() {
// ★ Once.Do 内部 panic 不会影响"已执行"的标记
// 如果 init 失败,下次调用不会再执行——永远拿到 nil!
s, err := initService()
if err != nil {
serviceErr = err
return // Once 标记已执行——但 service 是 nil
}
service = s
})
if serviceErr != nil {
return nil, serviceErr
}
return service, nil
}
func initService() (*Service, error) {
return nil, errors.New("初始化失败")
}
func main() {
s, err := GetService()
fmt.Println(s, err) // <nil> 初始化失败
s, err = GetService() // 第二次调用——Once 已标记,不再重试
fmt.Println(s, err) // <nil> 初始化失败(一样的错误)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
案例知识融合:这个案例揭示了 Once 的陷阱——一旦 Do 执行过(即使失败),不会重试。对于可恢复的初始化(如网络连接重试),需要自己实现重试逻辑,或把 Once 和错误处理分开。
思考题:
- 如果
initService可能临时失败(网络超时),如何改造GetService支持重试?(提示:不要用 Once——用 Mutex + flag) OnceFunc和传统的once.Do有什么本质区别?它的返回值可以被并发调用吗?
# 14.6 sync.WaitGroup:等多个 goroutine 完成
WaitGroup 已在第 12 章详述。本节补充两个进阶对比。
# 14.6.1 WaitGroup 与 channel 关闭信号的对比
| WaitGroup | done chan struct{} | |
|---|---|---|
| 用途 | 等 N 个 goroutine 完成 | 广播"停止"信号 |
| 方向 | 被等方通知等方 | 等方通知被等方 |
| goroutine 数 | N 个 | 任意 |
// WaitGroup:等 N 个 worker 完成
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() { defer wg.Done(); work() }()
}
wg.Wait()
// done channel:通知所有 worker 停止
done := make(chan struct{})
for i := 0; i < 5; i++ {
go func() {
for {
select {
case <-done:
return
default:
work()
}
}
}()
}
close(done) // 同时通知所有 worker
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 14.6.2 综合案例与思考
// 综合案例:WaitGroup + errgroup 替代方案
// 需求:并发执行 N 个任务,第一个错误就取消剩余任务
// 方案 A:WaitGroup + context(手动管理)
func fetchAllWithWG(ctx context.Context, urls []string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
if err := fetch(ctx, url); err != nil {
select {
case errCh <- err: // 只发第一个错误
cancel() // 取消其他 fetch
default:
}
}
}(url)
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}
// 方案 B:errgroup(golang.org/x/sync/errgroup)
// import "golang.org/x/sync/errgroup"
func fetchAllWithEG(ctx context.Context, urls []string) error {
g, ctx := errgroup.WithContext(ctx)
for _, url := range urls {
url := url
g.Go(func() error {
return fetch(ctx, url)
})
}
return g.Wait() // 第一个错误返回,ctx 自动取消
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
思考题:方案 A 的 errCh 容量为什么是 1?如果容量是 0 会怎样?
# 14.7 sync/atomic:无锁原子操作
# 14.7.1 类型化 API(Go 1.19+ atomic.Int64 等)
Go 1.19+ 推荐用类型化 API——更安全,不会误用:
// ❌ 旧版 API——传指针,类型不安全
var counter int64
atomic.AddInt64(&counter, 1) // 可能误传其他类型的指针
val := atomic.LoadInt64(&counter)
// ✅ 新版 API(Go 1.19+)——类型安全
var counter atomic.Int64
counter.Add(1) // 不可能误传
val := counter.Load()
// 其他类型
var flag atomic.Bool
var value atomic.Uint32
var ptr atomic.Pointer[Config]
2
3
4
5
6
7
8
9
10
11
12
13
14
atomic vs Mutex 性能对比:
func BenchmarkMutex(b *testing.B) {
var mu sync.Mutex
var n int
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mu.Lock()
n++
mu.Unlock()
}
})
}
func BenchmarkAtomic(b *testing.B) {
var n atomic.Int64
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n.Add(1)
}
})
}
// atomic 比 Mutex 快 3-10×——但只能做单变量操作
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 14.7.2 CAS(Compare-And-Swap)
CAS 是 lock-free 编程的核心原语——"如果值还是 old,就改成 new":
// 用 CAS 实现无锁的"最大并发数"追踪
type MaxTracker struct {
value atomic.Int64
}
func (t *MaxTracker) Update(n int64) {
for {
old := t.value.Load()
if n <= old {
return // 不是新的最大值——跳过
}
// ★ CAS:如果 value 还是 old,改成 n;否则重试
if t.value.CompareAndSwap(old, n) {
return
}
// 被其他 goroutine 改了——重试
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CAS 循环的代价——高竞争时不断重试(spin),CPU 空转。极端情况下不如 Mutex。
# 14.7.3 综合案例与思考
综合案例:用 atomic 实现无锁环形队列的计数器
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// 无锁环形缓冲区——仅原子操作用于读写指针
type RingBuffer struct {
buf []int
mask int
writePos atomic.Int64
readPos atomic.Int64
}
func NewRingBuffer(size int) *RingBuffer {
// size 必须是 2 的幂——掩码加速
return &RingBuffer{
buf: make([]int, size),
mask: size - 1,
}
}
func (rb *RingBuffer) Push(v int) bool {
w := rb.writePos.Load()
if w-rb.readPos.Load() >= int64(len(rb.buf)) {
return false // 满了
}
rb.buf[w&int64(rb.mask)] = v
rb.writePos.Add(1)
return true
}
func (rb *RingBuffer) Pop() (int, bool) {
r := rb.readPos.Load()
if r >= rb.writePos.Load() {
return 0, false // 空了
}
v := rb.buf[r&int64(rb.mask)]
rb.readPos.Add(1)
return v, true
}
func main() {
rb := NewRingBuffer(8)
var wg sync.WaitGroup
// 并发写入
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 10; j++ {
rb.Push(id*10 + j)
}
}(i)
}
wg.Wait()
// 读取
for {
v, ok := rb.Pop()
if !ok {
break
}
fmt.Println(v)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
思考题:
- 上面的 RingBuffer 真的是"无锁"吗?如果有两个 goroutine 同时 Push,会发生什么?
- 什么场景下 atomic 不适用,必须上 Mutex?给出至少 2 个场景。
# 14.8 sync.Pool:临时对象池
# 14.8.1 池的 Get / Put 机制
Pool 用于临时对象复用——减少 GC 压力。Get 时如果池空,自动调用 New 函数:
var bufPool = sync.Pool{
New: func() any {
return make([]byte, 4096) // 池空时创建新对象
},
}
func process(data []byte) {
buf := bufPool.Get().([]byte) // 从池中取
defer bufPool.Put(buf) // ★ 用完放回——不要丢
copy(buf, data)
// 用 buf 处理...
}
2
3
4
5
6
7
8
9
10
11
12
13
核心规则:
Get返回的对象类型随机——池中可能是任何 goroutine 放回的旧对象Put后对象不能继续使用——其他 goroutine 可能已经 Get 走了- 一定要 Put 回去——否则池失去意义
# 14.8.2 GC 会清空 Pool——不能当缓存
Pool 的关键特性——每次 GC 时,池中所有对象都被清空:
// ❌ Pool 当缓存——GC 后缓存丢失
var cache = sync.Pool{...}
func getFromCache(key string) User {
u := cache.Get().(User) // GC 后拿到的是 New 函数创建的新对象——不是旧数据
return u
}
// ✅ Pool 的正确用途——临时对象复用,不在乎内容
var bufPool = sync.Pool{...}
func useBuf() {
buf := bufPool.Get().([]byte) // 不在乎 buf 里有什么——只是复用内存
// 使用 buf...
bufPool.Put(buf)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 14.8.3 综合案例与思考
综合案例:JSON 编解码缓冲池
package main
import (
"bytes"
"encoding/json"
"fmt"
"sync"
)
var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
func encode(v any) ([]byte, error) {
buf := bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset() // ★ 清空缓冲区——但不释放底层内存
bufferPool.Put(buf) // 放回池中
}()
if err := json.NewEncoder(buf).Encode(v); err != nil {
return nil, err
}
// ★ 复制数据——buf 马上要 Reset 并放回池
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
data, _ := encode(map[string]int{"id": id})
fmt.Printf("goroutine %d: %s\n", id, data)
}(i)
}
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
案例知识融合:这个案例展示了 Pool 的典型用法——复用 bytes.Buffer 减少 json.Encode 的分配。关键三点:① Reset 清空内容但保留底层数组;② 复制结果(buf 马上要归还);③ defer 确保 Put 一定执行。
思考题:
buf.Reset()后底层数组还在吗?GC 能回收它吗?- 如果
encode函数 panic 了,buf 会被 Put 回池中吗?怎么保证?
# 14.9 sync.Map:并发安全的 map
# 14.9.1 适用场景判断树
你的 map 场景...
│
├── 大量并发写 → 普通 map + sync.Mutex
│
├── 少量写、大量读、key 固定
│ └── ✅ sync.Map(读几乎无锁)
│
├── key 集合随时间变化(新增/删除频繁)
│ └── ⚠️ sync.Map 的 dirty map 重建有开销
│
└── 只需要简单的 Get/Set/Load
└── 普通 map + sync.RWMutex 更直观
2
3
4
5
6
7
8
9
10
11
12
var sm sync.Map
// 存储
sm.Store("key", "value")
// 读取
if v, ok := sm.Load("key"); ok {
fmt.Println(v.(string)) // 需要类型断言
}
// 读取或存入
actual, loaded := sm.LoadOrStore("key", "default")
// loaded=true → key 已存在,actual 是旧值
// loaded=false → key 不存在,actual 是刚存入的 default
// 删除
sm.Delete("key")
// 遍历——Range 接受回调函数
sm.Range(func(key, value any) bool {
fmt.Println(key, value)
return true // 返回 false 停止遍历
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 14.9.2 综合案例与思考
综合案例:并发安全的访问计数器
package main
import (
"fmt"
"sync"
)
type AccessCounter struct {
m sync.Map
}
func (c *AccessCounter) Record(key string) {
// LoadOrStore + atomic 自增——无锁的计数
for {
val, _ := c.m.LoadOrStore(key, new(int64))
count := val.(*int64)
// 原子自增
old := count.Load() // Wait—actually we need atomic.Add
// 直接用 atomic 包...
_ = old
break
}
}
// 更好的写法:直接用 atomic.Value
func (c *AccessCounter) RecordV2(key string) {
val, _ := c.m.LoadOrStore(key, &atomic.Int64{})
val.(*atomic.Int64).Add(1)
}
func (c *AccessCounter) Stats() map[string]int64 {
result := make(map[string]int64)
c.m.Range(func(key, value any) bool {
result[key.(string)] = value.(*atomic.Int64).Load()
return true
})
return result
}
func main() {
var c AccessCounter
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
c.RecordV2(fmt.Sprintf("user_%d", id%5))
}(i)
}
wg.Wait()
for k, v := range c.Stats() {
fmt.Println(k, v)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
思考题:
sync.Map的Range遍历时——如果其他 goroutine 在并发写,遍历能看到新写入的值吗?- 什么场景下"普通 map + Mutex"反而比
sync.Map更快?给出具体数据场景。
# 14.10 sync.Cond:条件变量
# 14.10.1 综合案例与思考
Cond 的典型场景:等待某个条件满足——不是轮询,而是被通知:
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
cond *sync.Cond
items []int
maxSize int
}
func NewQueue(maxSize int) *Queue {
return &Queue{
cond: sync.NewCond(&sync.Mutex{}),
items: make([]int, 0, maxSize),
maxSize: maxSize,
}
}
func (q *Queue) Put(item int) {
q.cond.L.Lock()
for len(q.items) >= q.maxSize {
q.cond.Wait() // ★ 释放锁 + 休眠——等 Notify 唤醒
}
q.items = append(q.items, item)
q.cond.L.Unlock()
q.cond.Signal() // 通知一个等待的消费者
}
func (q *Queue) Get() int {
q.cond.L.Lock()
for len(q.items) == 0 {
q.cond.Wait() // ★ 释放锁 + 休眠——等 Notify 唤醒
}
item := q.items[0]
q.items = q.items[1:]
q.cond.L.Unlock()
q.cond.Signal() // 通知一个等待的生产者
return item
}
func main() {
q := NewQueue(3)
// 生产者
go func() {
for i := 1; i <= 10; i++ {
q.Put(i)
fmt.Println("生产:", i)
time.Sleep(50 * time.Millisecond)
}
}()
// 消费者
for i := 0; i < 10; i++ {
item := q.Get()
fmt.Println(" 消费:", item)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
Cond 三个核心操作:
| 操作 | 行为 |
|---|---|
Wait() | 释放锁 → goroutine 休眠 → 被唤醒后重新获取锁 → 返回 |
Signal() | 唤醒一个在 Wait 的 goroutine |
Broadcast() | 唤醒所有在 Wait 的 goroutine |
⚠️ Wait 必须在 Lock 和 Unlock 之间。
思考题:
- 上面的 Queue 能用 channel 实现吗?如果能,Cond 版和 channel 版各有什么优劣?
for 条件 { cond.Wait() }中的for为什么是必须的——不能换成if吗?
# 14.11 综合示例:并发安全的计数器与缓存
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 组件 1:无锁计数器(atomic)
type Counter struct {
value atomic.Int64
}
func (c *Counter) Inc() { c.value.Add(1) }
func (c *Counter) Dec() { c.value.Add(-1) }
func (c *Counter) Value() int64 { return c.value.Load() }
// 组件 2:读多写少的缓存(RWMutex)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func NewCache() *Cache {
return &Cache{data: make(map[string]string)}
}
func (c *Cache) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
// 组件 3:对象池(Pool)
var bufPool = sync.Pool{
New: func() any {
return make([]byte, 1024)
},
}
// 组装:并发安全的缓存服务
type CacheService struct {
cache *Cache
hits Counter
misses Counter
once sync.Once
}
var service *CacheService
func GetCacheService() *CacheService {
// Once 保证单例——这里不需要 sync.Once
// 直接 init 更简单
return service
}
func init() {
service = &CacheService{
cache: NewCache(),
}
}
func (s *CacheService) Fetch(key string) (string, bool) {
val := s.cache.Get(key)
if val != "" {
s.hits.Inc()
return val, true
}
s.misses.Inc()
return "", false
}
func (s *CacheService) Store(key, value string) {
s.cache.Set(key, value)
}
func (s *CacheService) Stats() (hits, misses int64) {
return s.hits.Value(), s.misses.Value()
}
func main() {
svc := GetCacheService()
svc.Store("hello", "world")
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
svc.Fetch("hello")
time.Sleep(time.Microsecond)
}()
}
wg.Wait()
hits, misses := svc.Stats()
fmt.Printf("hits: %d, misses: %d\n", hits, misses)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
组件映射——这个案例集成了本章 5 个 sync 原语:
| 原语 | 使用场景 |
|---|---|
atomic.Int64 | Counter——频繁增量的无锁计数 |
sync.RWMutex | Cache——读多写少的数据存取 |
sync.Pool | bufPool——临时 buffer 复用 |
sync.Once | 单例初始化(可选) |
sync.WaitGroup | 等所有 goroutine 完成 |
# 14.12 本章底层原理(简介)
本章涉及的底层机制在**卷三"专栏博客"**中详细展开:
| 本章概念 | 卷三对应篇 |
|---|---|
sync.Mutex 的自旋 + 信号量实现 | 10.sync 同步原语剖析 |
RWMutex 的读锁计数与写锁等待队列 | 10.sync 同步原语剖析 |
atomic 的 CPU 指令(LOCK 前缀) | 12.Go 内存模型一致性 |
sync.Pool 的 local pool + victim cache | 10.sync 同步原语剖析 |
sync.Map 的 read/dirty 双 map 结构 | 11.map 并发安全与哈希 |
# 14.13 Go 新手陷阱 Top 5
| # | 陷阱 | 说明 |
|---|---|---|
| 1 | Mutex 值传递——锁被复制 | struct 内嵌 Mutex + 值接收者 → 副本的锁无效。一律用指针接收者。 |
| 2 | Lock 后 panic 没 recover——锁永远不释放 | 用 defer Unlock() ——即使 panic 也执行 |
| 3 | WaitGroup.Add 写在 goroutine 内 | Add 必须在新 goroutine 启动之前——否则 Wait 可能提前返回 |
| 4 | sync.Pool 当缓存用——GC 会清空 | Pool 里的对象随时可能被 GC 回收——只用于临时复用,不存状态 |
| 5 | 用 sync.Map 替代普通 map——大多数场景反而更慢 | sync.Map 只适合读多写少 + key 集合稳定。其他场景用普通 map + Mutex |
# 14.14 思考题
Mutex 的自旋 vs 信号量睡眠:Go 的 Mutex 在竞争时先自旋(spin)几轮,然后才进入信号量睡眠。为什么不在第一次竞争就直接睡眠?自旋的代价是什么?
RWMutex的"写饥饿":Go 的 RWMutex 在有写锁等待时阻塞新读锁。假设一个极端场景——每秒 10000 次读 + 1 次写。写锁的等待会导致读吞吐量下降多少?如何测量?atomic和volatile:C/C++ 有volatile关键字,Java 有volatile字段。Go 没有——为什么?atomic包提供的是什么保证?sync.Map的 dirty map:sync.Map内部用 read map(无锁读)+ dirty map(有锁写)。在什么条件下 dirty map 会晋升为 read map?这个晋升的开销有多大?Once和init()的区别:init()在包加载时执行一次;Once在第一次调用时执行一次。什么时候用 Once 比 init 更好?
# 14.15 训练题
训练 1:用 sync.Mutex 实现一个线程安全的栈(Push / Pop / Len)。要求:
- Push 时如果容量已满,返回错误
- Pop 时如果栈空,返回错误
- 支持并发 Push 和 Pop
训练 2:用 sync/atomic 实现一个无锁的"状态机"——状态在 idle → running → done 之间转换:
Start()将 idle 改为 running(用 CAS)Finish()将 running 改为 done(用 CAS)- 如果状态转换非法(如 done→running),返回错误
训练 3:用 sync.Pool 优化以下代码——减少 strings.Builder 的分配:
func concat(items []string) string {
var b strings.Builder
for _, s := range items {
b.WriteString(s)
}
return b.String()
}
2
3
4
5
6
7
# 14.16 推荐阅读
- 入门卷:第 12 章 goroutine——goroutine 起步、WaitGroup、context
- 入门卷:第 13 章 channel——"通过通信共享内存"
- 卷三:10.sync 同步原语剖析——Mutex/RWMutex/Once/Pool 源码级拆解
- 卷三:12.Go 内存模型一致性——happens-before、atomic 内存序
- 卷三:11.map 并发安全与哈希——sync.Map 的 read/dirty 结构
- The Go Memory Model (opens new window)——官方内存模型文档
- sync 包官方文档 (opens new window)