异步 3 · 进阶模式与陷阱¶
前两章覆盖了机制与并发控制。本章讲进阶模式(异步迭代、异步上下文)和陷阱清单——后者是异步代码出 bug 的重灾区。读完本章你能避开绝大多数 asyncio 的坑。
3.1 异步迭代器与 async for¶
普通迭代器(__iter__/__next__)是同步的。要迭代异步数据源(每项要 await,如逐块读流、逐条查库),用异步迭代器(__aiter__/__anext__):
class AsyncRange:
def __init__(self, n):
self.n, self.i = n, 0
def __aiter__(self):
return self
async def __anext__(self):
if self.i >= self.n:
raise StopAsyncIteration # 用异常表示结束(同同步迭代器)
await asyncio.sleep(0.05) # 异步等待
self.i += 1
return self.i - 1
async def main():
async for x in AsyncRange(3): # async for
print(x)
3.2 异步生成器¶
async def + yield = 异步生成器,比手写 __anext__ 简洁:
async def astream(n):
for i in range(n):
await asyncio.sleep(0.05)
yield i
async def main():
async for x in astream(5):
print(x)
适合流式处理(数据库游标、分页 API、日志逐行)。
3.3 async with:异步上下文管理器¶
with(第 7 章)是同步的。异步资源(连接、会话)的获取/释放要 await,用 async with(实现 __aenter__/__aexit__):
class AsyncDB:
async def __aenter__(self):
await self.connect() # 异步获取
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close() # 异步释放(即使异常也执行)
async with AsyncDB() as db:
await db.query(...)
-
Java try-with-resources
-
Python async with
实战中 httpx.AsyncClient、sqlalchemy async session 都是异步上下文管理器。
3.4 异常传播¶
- 直接 await 的协程:异常正常抛到 await 处。
- Task 的异常:不会自动传播,
await task时才抛;忘记 await 会"吞掉"异常(静默失败)。 TaskGroup:任一子任务抛异常 → 取消其余 + 抛ExceptionGroup(3.11+,把多个异常打包)。
task = asyncio.create_task(may_fail())
# 此时 may_fail 抛了异常,但你不知道(还没 await)
result = await task # 这里才看到异常
务必 await 你的 Task
创建了 Task 就要在某处 await(或加入 TaskGroup),否则异常被吞、错误难追踪。
3.5 ⚠️ 致命陷阱:阻塞事件循环¶
这是异步代码最常见、最隐蔽的 bug。在 async def 里调用同步阻塞函数,会卡住整个事件循环——所有其他协程全部停滞。
async def bad_handler():
time.sleep(5) # ❌ 同步阻塞,卡死整个服务 5 秒
data = requests.get(url) # ❌ requests 是同步库,阻塞循环
result = cpu_heavy() # ❌ CPU 密集(无 await),独占循环
规则:async def 里只能用异步库(httpx.AsyncClient、aiosqlite、asyncio.sleep)。同步阻塞代码必须托管(下节)。
3.6 与线程/进程协作:run_in_executor / to_thread¶
遇到必须用的同步阻塞库(老 SDK、CPU 密集计算),把它丢到线程池/进程池,别让它堵事件循环:
import asyncio
# 3.9+ 简便写法:丢到线程池
result = await asyncio.to_thread(blocking_io, arg)
# 手动控制(自定义 executor,如进程池)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io, arg) # None=默认线程池
to_thread 把同步函数丢进线程池执行并 await 结果——循环期间能跑别的协程。对照 Java:相当于把阻塞任务提交 ExecutorService 再 Future.get()。
CPU 密集仍走进程
线程池里的 CPU 密集仍受 GIL(第 11 章)。真 CPU 并行用 ProcessPoolExecutor + run_in_executor。
3.7 调试 asyncio¶
开启 debug 模式,事件循环会检测并警告慢回调(阻塞过久)和未消费的协程:
常见排查:
- 协程从不执行 → 忘了 await/create_task(会警告
coroutine ... never awaited)。 - 服务间歇卡顿 → 某处同步阻塞(开 debug 找慢回调)。
- 异常消失 → 创建了 Task 没 await(3.4)。
3.8 异步陷阱速查清单¶
-
async def里调用了同步阻塞(time.sleep、requests、同步 DB)→ 卡死循环 - 创建了
Task却没await/没进 TaskGroup → 异常被吞、协程可能被回收 - 用
gather时一个任务异常,想"全失败"却没处理(默认行为是抛第一个异常) - 吞掉
CancelledError(清理后没raise)→ 破坏取消传播 - CPU 密集放
async def期待加速 → 协程不解决 CPU 并行,走进程 - 在
asyncio.run内再调asyncio.run→RuntimeError
一句话
异步的命门是"绝不在 async 里阻塞"——用异步库,或用 to_thread 托管阻塞调用。其余都是细节。
本章练习¶
练习 3.1
写一个异步生成器 read_lines_async,每"行"前 await asyncio.sleep(0.01),用 async for 消费。
练习 3.2
说明为什么下面代码会"卡死",并给出两种修复。
参考答案
requests.get 是同步阻塞,在 async def 里会卡住事件循环。修复一:改用异步客户端 httpx.AsyncClient + await client.get(...)。修复二:托管到线程 await asyncio.to_thread(requests.get, "https://slow.example.com")。
练习 3.3
用 asyncio.to_thread 在异步代码里调用一个 CPU 密集的同步函数(如 sum(i*i for i in range(10**7))),验证调用期间其他协程仍能运行。
练习 3.4
开启 debug 模式跑一个含 time.sleep(2) 的 async def,观察警告。
参考答案
asyncio.run(main(), debug=True),循环会警告 "Executing
上一章:异步 2 · 并发控制 | ← 回首页 | 核心教程路线图