跳转至

异步 3 · 进阶模式与陷阱

前两章覆盖了机制与并发控制。本章讲进阶模式(异步迭代、异步上下文)和陷阱清单——后者是异步代码出 bug 的重灾区。读完本章你能避开绝大多数 asyncio 的坑。


3.1 异步迭代器与 async for

普通迭代器(__iter__/__next__)是同步的。要迭代异步数据源(每项要 await,如逐块读流、逐条查库),用异步迭代器__aiter__/__anext__):

class AsyncRange:
    def __init__(self, n):
        self.n, self.i = n, 0
    def __aiter__(self):
        return self
    async def __anext__(self):
        if self.i >= self.n:
            raise StopAsyncIteration      # 用异常表示结束(同同步迭代器)
        await asyncio.sleep(0.05)         # 异步等待
        self.i += 1
        return self.i - 1

async def main():
    async for x in AsyncRange(3):         # async for
        print(x)

3.2 异步生成器

async def + yield = 异步生成器,比手写 __anext__ 简洁:

async def astream(n):
    for i in range(n):
        await asyncio.sleep(0.05)
        yield i

async def main():
    async for x in astream(5):
        print(x)

适合流式处理(数据库游标、分页 API、日志逐行)。


3.3 async with:异步上下文管理器

with(第 7 章)是同步的。异步资源(连接、会话)的获取/释放要 await,用 async with(实现 __aenter__/__aexit__):

class AsyncDB:
    async def __aenter__(self):
        await self.connect()              # 异步获取
        return self
    async def __aexit__(self, exc_type, exc, tb):
        await self.close()                # 异步释放(即使异常也执行)

async with AsyncDB() as db:
    await db.query(...)
  • Java try-with-resources

    try (var conn = ds.getConnection()) {
        conn.query();
    }
    
  • Python async with

    async with AsyncDB() as db:
        await db.query()
    

实战中 httpx.AsyncClientsqlalchemy async session 都是异步上下文管理器。


3.4 异常传播

  • 直接 await 的协程:异常正常抛到 await 处。
  • Task 的异常:不会自动传播,await task 时才抛;忘记 await 会"吞掉"异常(静默失败)。
  • TaskGroup:任一子任务抛异常 → 取消其余 + 抛 ExceptionGroup(3.11+,把多个异常打包)。
task = asyncio.create_task(may_fail())
# 此时 may_fail 抛了异常,但你不知道(还没 await)
result = await task        # 这里才看到异常

务必 await 你的 Task

创建了 Task 就要在某处 await(或加入 TaskGroup),否则异常被吞、错误难追踪。


3.5 ⚠️ 致命陷阱:阻塞事件循环

这是异步代码最常见、最隐蔽的 bug。在 async def 里调用同步阻塞函数,会卡住整个事件循环——所有其他协程全部停滞。

async def bad_handler():
    time.sleep(5)            # ❌ 同步阻塞,卡死整个服务 5 秒
    data = requests.get(url) # ❌ requests 是同步库,阻塞循环
    result = cpu_heavy()     # ❌ CPU 密集(无 await),独占循环

规则async def 里只能用异步库httpx.AsyncClientaiosqliteasyncio.sleep)。同步阻塞代码必须托管(下节)。


3.6 与线程/进程协作:run_in_executor / to_thread

遇到必须用的同步阻塞库(老 SDK、CPU 密集计算),把它丢到线程池/进程池,别让它堵事件循环:

import asyncio

# 3.9+ 简便写法:丢到线程池
result = await asyncio.to_thread(blocking_io, arg)

# 手动控制(自定义 executor,如进程池)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io, arg)   # None=默认线程池

to_thread 把同步函数丢进线程池执行并 await 结果——循环期间能跑别的协程。对照 Java:相当于把阻塞任务提交 ExecutorServiceFuture.get()

CPU 密集仍走进程

线程池里的 CPU 密集仍受 GIL(第 11 章)。真 CPU 并行用 ProcessPoolExecutor + run_in_executor


3.7 调试 asyncio

开启 debug 模式,事件循环会检测并警告慢回调(阻塞过久)和未消费的协程

asyncio.run(main(), debug=True)       # 代码开启
# 或环境变量 PYTHONASYNCIODEBUG=1

常见排查:

  • 协程从不执行 → 忘了 await/create_task(会警告 coroutine ... never awaited)。
  • 服务间歇卡顿 → 某处同步阻塞(开 debug 找慢回调)。
  • 异常消失 → 创建了 Task 没 await(3.4)。

3.8 异步陷阱速查清单

  • async def 里调用了同步阻塞(time.sleeprequests、同步 DB)→ 卡死循环
  • 创建了 Task 却没 await/没进 TaskGroup → 异常被吞、协程可能被回收
  • gather 时一个任务异常,想"全失败"却没处理(默认行为是抛第一个异常)
  • 吞掉 CancelledError(清理后没 raise)→ 破坏取消传播
  • CPU 密集放 async def 期待加速 → 协程不解决 CPU 并行,走进程
  • asyncio.run 内再调 asyncio.runRuntimeError

一句话

异步的命门是"绝不在 async 里阻塞"——用异步库,或用 to_thread 托管阻塞调用。其余都是细节。


本章练习

练习 3.1

写一个异步生成器 read_lines_async,每"行"前 await asyncio.sleep(0.01),用 async for 消费。

参考答案
async def read_lines_async(lines):
    for line in lines:
        await asyncio.sleep(0.01)
        yield line
async def main():
    async for line in read_lines_async(["a", "b", "c"]):
        print(line)
练习 3.2

说明为什么下面代码会"卡死",并给出两种修复。

async def handler():
    import requests
    return requests.get("https://slow.example.com").text

参考答案

requests.get 是同步阻塞,在 async def 里会卡住事件循环。修复一:改用异步客户端 httpx.AsyncClient + await client.get(...)。修复二:托管到线程 await asyncio.to_thread(requests.get, "https://slow.example.com")

练习 3.3

asyncio.to_thread 在异步代码里调用一个 CPU 密集的同步函数(如 sum(i*i for i in range(10**7))),验证调用期间其他协程仍能运行。

参考答案

def cpu(): return sum(i*i for i in range(10**7))
async def main():
    t = asyncio.create_task(asyncio.to_thread(cpu))
    while not t.done():
        print("loop alive"); await asyncio.sleep(0.01)
    print(await t)
to_thread 把 cpu 丢线程池,主循环持续打印"loop alive"——证明未阻塞。

练习 3.4

开启 debug 模式跑一个含 time.sleep(2)async def,观察警告。

参考答案

asyncio.run(main(), debug=True),循环会警告 "Executing took 2.0 seconds"(检测到慢回调),帮助定位阻塞点。


上一章:异步 2 · 并发控制← 回首页核心教程路线图