并发底层原理揭秘
# 第 12 章 并发底层原理揭秘
# 目录介绍
多线程为什么"有时快有时慢"?asyncio 凭什么在单线程里处理上万连接?——从 GIL 的物理锁到事件循环的 select/epoll,还原 Python 并发的底层真相。
# 12.1 GIL 的物理本质
几乎每个 Python 程序员都经历过这种挫败感:
import threading, time
def cpu_work():
s = 0
for i in range(10**8):
s += i
return s
# 单线程
t0 = time.time(); cpu_work()
print(f"单线程: {time.time()-t0:.1f}s")
# 4 线程
t0 = time.time()
threads = [threading.Thread(target=cpu_work) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"4 线程: {time.time()-t0:.1f}s")
# 两个时间差不多!甚至 4 线程更慢!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
然后你在网上搜到了答案:"因为 GIL"。但 GIL 到底是什么?为什么 I/O 操作时多线程又有效了?asyncio 凭什么在没有多线程的情况下处理上万并发?
本文将从一次"detective journey"起手,追踪 Python 并发的三层真相:
第一层:GIL 的物理本质 —— 打开 ceval.c,看这"锁"到底怎么工作
第二层:多线程 vs 多进程 —— CPU 密集/IO 密集的实际对比
第三层:asyncio 的魔法 —— 事件循环 / select / epoll / 协程调度
2
3
# 12.1.1 GIL 的物理定义
GIL 不是抽象的概念,它就是一个全局的 C 互斥锁。如果你打开 CPython 源码(Python/ceval.c),你会看到这样的定义:
// Python/ceval.c —— GIL 的物理定义
static PyThread_type_lock interpreter_lock = NULL; // 一个 pthread_mutex
// 获取 GIL:哪个线程持有这把锁,就能执行 Python 字节码
void PyEval_AcquireLock(void) {
PyThread_acquire_lock(interpreter_lock, 1);
}
// 释放 GIL:执行完一段代码或碰到 I/O 操作时释放
void PyEval_ReleaseLock(void) {
PyThread_release_lock(interpreter_lock);
}
2
3
4
5
6
7
8
9
10
11
12
这就是全部。GIL = Global Interpreter Lock = 一个互斥锁。同一个时刻,只有恰好一个线程能持有它。
# 12.1.2 为什么需要 GIL
现在你可能会问:谁在主循环中"获取"和"释放"这把锁?——CPython 的字节码执行循环:
// Python/ceval.c —— 简化
for (;;) {
take_gil(tstate); // ① 获取 GIL
opcode = NEXTOPARG();
switch (opcode) { // ② 执行一条字节码指令
case BINARY_ADD: { ... }
case STORE_FAST: { ... }
}
drop_gil(tstate); // ③ 释放 GIL
}
2
3
4
5
6
7
8
9
10
GIL 是在一条字节码指令级别加锁/解锁的——每条指令执行时都持有 GIL(这就是为什么多线程始终串行执行),但两帧之间是释放的——这就是为什么 IO 等待时可以让出锁。
但这只是现象。为什么非要 GIL? 答案藏在前一篇文章的 PyObject 中——每个 Python 对象有一个 ob_refcnt 字段:
typedef struct _object {
Py_ssize_t ob_refcnt; // 引用计数——多线程同时修改会竞态!
PyTypeObject *ob_type;
} PyObject;
2
3
4
CPython 的设计者面临选择:
- 方案 A:给每个对象的
ob_refcnt加细粒度锁 → 性能开销大,复杂,容易死锁 - 方案 B:给整个解释器加一把大锁 → 简单,但多线程 CPU 密集型无效
他们选了 B。GIL 将"线程安全"的复杂度从"每个对象"转移到了"全局"——代价就是在多核 CPU 上不能并行。
PyPy、Jython、IronPython 都没有 GIL。GIL 不是 Python 语言的规范,是 CPython 的实现选择。
# 12.1.3 GIL 释放时机
GIL 不是始终持有的——CPython 有意让 GIL 有"呼吸节奏"来给其他线程机会:
线程 A: [获取GIL]→执行→执行→执行→[释放GIL]→等待→[获取GIL]→...
线程 B: 等待→等待→等待→[获取GIL]→执行→执行→[释放GIL]→...
线程 C: 等待→等待→[获取GIL]→执行→[释放GIL]→等待→...
2
3
释放时机有两个:
① 主动释放——每 5 毫秒:
Python 3.2+ 引入 sys.setswitchinterval(seconds)——默认 5ms 后主动释放 GIL:
import sys
print(sys.getswitchinterval()) # 0.005 ← 默认 5ms
# sys.setswitchinterval(0.01) # 改成 10ms
2
3
② I/O 操作释放——这才是多线程有效的场景:
当一个线程调用 socket.read()、file.read() 或任何系统 I/O 时,CPython 在进入系统调用前主动释放 GIL,在系统调用返回后重新获取 GIL。这使得 I/O 等待期间其他线程可以执行:
import threading, time, urllib.request
# ✅ 多线程爬虫——I/O 密集,GIL 被释放,真正的并行
def fetch(url):
with urllib.request.urlopen(url) as r:
return len(r.read()) # urlopen 内部释放了 GIL
start = time.time()
urls = ['http://httpbin.org/delay/1'] * 10
threads = [threading.Thread(target=fetch, args=(u,)) for u in urls]
for t in threads: t.start()
for t in threads: t.join()
print(f"10 个请求,10 线程: {time.time()-start:.1f}s")
# 输出:约 1~2 秒 ← 而不是 10 秒!GIL 在 I/O 时被释放了
2
3
4
5
6
7
8
9
10
11
12
13
14
# 12.1.4 绕过 GIL 的方案
| 路线 | 原理 | 何时用 |
|---|---|---|
multiprocessing | 多进程,每个进程独立 CPython + 独立 GIL | CPU 密集型任务 |
| C 扩展释放 GIL | C 代码执行前调用 Py_BEGIN_ALLOW_THREADS,执行后恢复 | 编写 C 扩展时 |
asyncio | 单线程协作式并发,完全避开 GIL 竞争 | I/O 密集型,下一节详解 |
| 换解释器 | PyPy(无 GIL)、Jython、IronPython | 如果环境允许 |
| Python 3.13+ free-threaded | 实验性禁用 GIL(--disable-gil 编译) | 未来 |
from multiprocessing import Pool
import time
def cpu_work(n):
s = 0
for i in range(10**7): s += i
return s
# ✅ 多进程——真正的并行
start = time.time()
with Pool(4) as p:
p.map(cpu_work, [0]*4)
print(f"4 进程: {time.time()-start:.1f}s")
# 输出:约是单线程的 1/4 时间——真正的并行加速
2
3
4
5
6
7
8
9
10
11
12
13
14
# 12.1.5 CPU 与 I/O 对比
这是一个关键的实验——把这组数字记住,你就永远不会用错并发模型:
import threading, multiprocessing, time, urllib.request
# ---- CPU 密集型 ----
def cpu_task():
s = 0
for i in range(5*10**7): s += i
return s
def benchmark(fn, workers=4, use_thread=True):
start = time.time()
if use_thread:
ts = [threading.Thread(target=fn) for _ in range(workers)]
else:
ts = [multiprocessing.Process(target=fn) for _ in range(workers)]
for t in ts: t.start()
for t in ts: t.join()
return time.time() - start
print(f"CPU 单线程: {benchmark(cpu_task, 1):.2f}s")
print(f"CPU 4线程: {benchmark(cpu_task, 4, True):.2f}s") # ≈ 4x slower than expected!
print(f"CPU 4进程: {benchmark(cpu_task, 4, False):.2f}s") # ≈ 0.25x of single
# ---- I/O 密集型 ----
def io_task():
for _ in range(3):
urllib.request.urlopen('http://httpbin.org/delay/1').read()
print(f"I/O 单线程: {benchmark(io_task, 1):.2f}s")
print(f"I/O 4线程: {benchmark(io_task, 4, True):.2f}s") # ≈ 1/4 of single——有效!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
结论:CPU 密集 → multiprocessing;I/O 密集 → 多线程或 asyncio。
# 12.2 asyncio 异步原理
# 12.2.1 线程与协程对比
线程的开销是什么?
- 每个线程 ≈ 8MB 栈空间(Linux 默认)
- 上下文切换 ≈ 保存/恢复寄存器 + 刷新 TLB
- 10000 个线程 = 80GB 内存(不可能)
asyncio 走了一条完全不同的路:用一个线程 + 一个事件循环,管理成千上万个"协程"——每个协程只是 Python 函数,切换成本接近零。
传统多线程: 每个连接 → 一个线程 → 阻塞等待(占着栈,什么也不做)
asyncio 模式: 所有连接 → 一个线程 → 哪个可读了就切到哪个协程(否则跳过)
2
# 12.2.2 select 与 epoll
事件循环不是一个神秘的概念——它只是对操作系统 I/O 多路复用 API 的封装。
import select, socket
# 极简事件循环——3 行代码演示核心原理
server = socket.socket()
server.bind(('localhost', 8000))
server.listen(100)
server.setblocking(False) # 关键:非阻塞模式
monitored = [server]
while True:
# select() 是系统调用——"帮我看看这些 socket,哪个可以读了?"
readable, _, _ = select.select(monitored, [], [])
for sock in readable:
if sock is server:
client, _ = sock.accept()
client.setblocking(False)
monitored.append(client)
else:
data = sock.recv(1024)
if not data:
monitored.remove(sock)
else:
sock.send(b'HTTP/1.1 200 OK\r\n\r\nHello')
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
select 的本质:把一堆文件描述符(socket 也是文件)传给内核,内核返回"哪些就绪了"。应用程序只处理就绪的——不需要忙等。这就是 asyncio 事件循环的物理实现。
epoll 对比 select:select 有 1024 个文件描述符上限,且每次都要把所有 fd 从用户态拷贝到内核态。epoll(Linux 2.6+)用红黑树维护 fd 集合,只返回就绪的 — 处理 10000 个连接时比 select 快百倍。
# 12.2.3 事件循环执行模型
应用代码 事件循环 操作系统(内核)
──────── ────────── ──────────
async def fetch():
data = await → 注册 fd 到 epoll → 等待网络数据
sock.recv() → 切换到下一个协程
→ 检查哪些 fd 就绪 ← 数据到达,fd 标记为可读
→ 把 fetch() 切回来
print(data) → 继续执行
2
3
4
5
6
7
8
import asyncio
# 协程 = async def 定义的"可暂停的函数"
async def fetch_url(url):
print(f"开始: {url}")
await asyncio.sleep(1) # sleep 内部释放控制权给事件循环
print(f"完成: {url}")
return f"result of {url}"
async def main():
# gather 并发执行——全部 5 个耗时约 1 秒,而非 5 秒
results = await asyncio.gather(
fetch_url('a'), fetch_url('b'), fetch_url('c'),
fetch_url('d'), fetch_url('e')
)
print(results)
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
关键认知:await 不是"阻塞等待"——它是 "我主动交出控制权"。当执行到 await asyncio.sleep(1) 时,这个协程被挂起,事件循环去执行其他协程。1 秒后事件循环把该协程重新加入就绪队列。
# 12.2.4 异步 HTTP 爬虫
#!/usr/bin/env python3
"""aiohttp 异步爬虫——100 个 URL 约 1 秒完成"""
import aiohttp, asyncio, time
async def fetch(session, url):
try:
async with session.get(url, timeout=10) as resp:
return url, resp.status, len(await resp.read())
except Exception as e:
return url, 0, str(e)[:30]
async def main(urls):
# 只创建一个 session——底层连接池复用 TCP 连接
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, u) for u in urls]
results = await asyncio.gather(*tasks)
return results
urls = ['http://httpbin.org/delay/1'] * 100
start = time.time()
results = asyncio.run(main(urls))
print(f"\n100 请求,asyncio: {time.time()-start:.1f}s")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 12.2.5 异步与多线程对比
| 维度 | asyncio | 多线程 (threading) |
|---|---|---|
| 并发模型 | 协作式(显式 await 切出) | 抢占式(OS 决定切换) |
| CPU 密集 | ❌ 单线程,无效 | ❌ GIL 限制,无效 |
| I/O 密集 | ✅ 最佳,单线程管理 10k+ 连接 | ✅ 有效(I/O 时释放 GIL) |
| 切换成本 | 极低(函数调用级) | 高(OS 上下文切换) |
| 内存开销 | ~1KB/协程 | ~8MB/线程 |
| 调试难度 | 中等(需要理解事件循环) | 高(竞态条件、死锁) |
| 库兼容性 | 需要 async/await 版本的库 | 几乎所有库都能用 |
| 选择 | aiohttp / httpx.AsyncClient | requests / urllib |
# 简单决策公式:
# if 任务 = I/O 密集 and 库支持异步: → asyncio
# elif 任务 = I/O 密集 and 库不支持异步: → threading
# elif 任务 = CPU 密集: → multiprocessing
# elif 任务 = 混合: → asyncio + 进程池
2
3
4
5
# 12.2.6 异步与进程池混用
import asyncio, concurrent.futures, time
def cpu_work(n):
"""CPU 密集型——放到进程池里"""
s = 0
for i in range(n):
s += i ** 0.5
return s
async def io_task(name):
await asyncio.sleep(1)
return f"{name} done"
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
# I/O 任务用协程,CPU 任务扔进进程池——二者并行
io_coros = [io_task(f'io_{i}') for i in range(5)]
cpu_futs = [loop.run_in_executor(pool, cpu_work, 10**7) for _ in range(4)]
results = await asyncio.gather(*io_coros, *cpu_futs)
print(results)
asyncio.run(main())
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
run_in_executor 的优雅之处:把同步 CPU 任务"包装"成可 await 的 Future——让异步事件循环和进程池无缝协作。
# 12.3 三种并发模型总结
任务类型 GIL 影响 推荐方案 为什么
────────────────────────────────────────────────────────────
纯 Python 计算 ❌ 阻塞 multiprocessing GIL 阻止多线程并行
Numpy/Pandas ✅ 释放 threading/subprocess C 扩展内部释放了 GIL
网络爬虫 ✅ 释放 asyncio > threading asyncio 内存开销更小
文件读写 ✅ 释放 asyncio 或 threading 取决于文件系统是否支持异步 IO
数据库查询 ✅ 释放 asyncio + 连接池 高并发场景
阻塞 C 调用 ❌ 阻塞 threading+executor 无法异步化的同步操作
2
3
4
5
6
7
8
一口诀:
asyncio 是单线程里的"协程合奏"
threading 是多线程里的"GIL 独奏"
multiprocessing 是真正的"多乐器齐奏"
2
3
你不需要把所有代码都写成异步。对于大多数 Web 应用:FastAPI(异步)+ SQLAlchemy(同步)run_in_executor 就是最佳实践——异步处理 HTTP 请求,同步查询数据库,用进程池跑 CPU 密集计算。Python 的并发从来不是"选一种"——而是"根据需要组合"。