异步 2 · 并发控制¶
第 1 章讲了"一个 Task 怎么跑"。本章讲"多个任务怎么协调"——并发执行、限制并发数、任务间同步、超时与取消。这是写真实异步程序(爬虫限流、生产者-消费者、超时保护)的核心。
2.1 gather:并发等全部¶
asyncio.gather 并发跑多个协程,等全部完成,按顺序返回结果:
results = await asyncio.gather(fetch("a"), fetch("b"), fetch("c"))
# results = [结果a, 结果b, 结果c] 顺序与传入一致
-
Java CompletableFuture
-
Python gather
异常处理:默认(return_exceptions=False)任一协程抛异常即立即传播到 await gather 处(其余任务不会自动取消,但结果被丢弃);return_exceptions=True 则把异常对象作为结果项返回、不抛出。
2.2 TaskGroup:结构化并发(3.11+,推荐)¶
TaskGroup 是现代写法,比 gather 更安全——结构化并发:任务生命周期被 async with 块限定,任一任务出错则自动取消其余。
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch("a"))
t2 = tg.create_task(fetch("b"))
# 退出 with 块时,所有 task 已完成;任一异常 → 其余自动取消 + 抛 ExceptionGroup
a, b = t1.result(), t2.result()
优先用 TaskGroup
- 新代码用
TaskGroup,比gather的错误传播更清晰(不会出现"一个任务失败、其他仍在运行无人取消")。 - 对照 Java 21 的
StructuredTaskScope(配ShutdownOnFailure策略)——同样的"结构化并发"理念:子任务生命周期与作用域绑定,任一失败即取消其余。 gather仍适合"一次性收集结果、不在乎错误传播"的简单场景。
2.3 as_completed:谁先完成谁先处理¶
gather 等全部完才返回。想先到先处理(如爬虫先抓到的先解析)用 as_completed:
for coro in asyncio.as_completed([fetch(u) for u in urls]):
result = await coro # 最先完成的先返回
process(result)
2.4 同步原语(对照 java.util.concurrent)¶
单线程协程通常无竞态(无 await 处不会被打断),但跨 await 的共享状态仍需同步。asyncio 提供异步版同步原语:
| asyncio | Java 对应 | 用途 |
|---|---|---|
asyncio.Lock |
Lock(非重入;asyncio 无 RLock) |
互斥访问共享资源 |
asyncio.Semaphore |
Semaphore |
限流(最多 N 个并发) |
asyncio.Event |
CountDownLatch/Event |
等待某个条件发生 |
asyncio.Queue |
BlockingQueue |
生产者-消费者 |
Semaphore:限流并发(最常用)¶
爬一千个 URL,但不能同时打一千个请求(会被封/压垮服务)。用 Semaphore 限并发:
sem = asyncio.Semaphore(10) # 最多 10 个并发
async def limited_fetch(url):
async with sem: # 拿许可,超过 10 个就等
return await fetch(url)
await asyncio.gather(*(limited_fetch(u) for u in urls)) # 上千 URL,但最多 10 并发
这是高并发 IO 的核心模式。
Queue:生产者-消费者¶
queue = asyncio.Queue()
async def producer():
for i in range(5):
await queue.put(i)
await queue.put(None) # 哨兵:通知消费者结束
async def consumer():
while True:
item = await queue.get()
if item is None:
break
print(f"处理 {item}")
await asyncio.gather(producer(), consumer())
2.5 超时与取消¶
wait_for:超时自动取消¶
try:
result = await asyncio.wait_for(slow_op(), timeout=1.0)
except asyncio.TimeoutError:
print("超时,slow_op 已被取消") # 超时后协程自动取消
手动取消 task.cancel()¶
task = asyncio.create_task(long_running())
await asyncio.sleep(0.5)
task.cancel() # 请求取消
try:
await task
except asyncio.CancelledError:
print("已取消")
⚠️ 取消的语义
cancel() 是请求,在协程的下一个 await 处抛 CancelledError。协程可以捕获它做清理,但不应吞掉(除非有充分理由)——吞掉 CancelledError 会破坏取消传播。清理后应 raise。
2.6 限制并发数的完整模式¶
把 Semaphore + gather 结合,是处理"大量 IO 任务、限并发"的标准范式:
async def main(urls: list[str]):
sem = asyncio.Semaphore(20)
async def bounded(u):
async with sem:
return await fetch(u)
return await asyncio.gather(*(bounded(u) for u in urls))
或用 TaskGroup(结构化 + 限流):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(bounded(u)) for u in urls]
results = [t.result() for t in tasks]
本章练习¶
练习 2.1
用 Semaphore(3) 限制并发,同时请求 10 个 URL(用 asyncio.sleep 模拟),打印每个的开始/结束,观察同时不超过 3 个。
练习 2.2
用 TaskGroup 并发三个任务,其中一个抛异常。观察其他任务是否被取消。
练习 2.3
给一个可能很慢的操作加 0.5 秒超时,超时则打印"超时"。
练习 2.4
解释为什么协程在单线程下仍可能需要 Lock(明明没有线程竞态)。
参考答案
协程是协作式调度,await 处会切换到其他协程。若两协程在 await 之间读写同一共享状态(如"检查后操作":先读余额、await、再扣款),就可能交错产生逻辑错误。Lock 保证这段操作原子(跨 await 也不被打断)。无 await 的纯计算段无需锁(不会切换)。
上一章:异步 1 · 事件循环与协程机制 | ← 回首页 | 下一章:异步 3 · 进阶模式与陷阱