跳转至

异步 2 · 并发控制

第 1 章讲了"一个 Task 怎么跑"。本章讲"多个任务怎么协调"——并发执行、限制并发数、任务间同步、超时与取消。这是写真实异步程序(爬虫限流、生产者-消费者、超时保护)的核心。


2.1 gather:并发等全部

asyncio.gather 并发跑多个协程,等全部完成,按顺序返回结果:

results = await asyncio.gather(fetch("a"), fetch("b"), fetch("c"))
# results = [结果a, 结果b, 结果c]  顺序与传入一致
  • Java CompletableFuture

    CompletableFuture.allOf(f1, f2, f3).join();
    
  • Python gather

    await asyncio.gather(f1, f2, f3)
    

异常处理:默认(return_exceptions=False)任一协程抛异常即立即传播到 await gather 处(其余任务不会自动取消,但结果被丢弃);return_exceptions=True 则把异常对象作为结果项返回、不抛出。


2.2 TaskGroup:结构化并发(3.11+,推荐)

TaskGroup 是现代写法,比 gather 更安全——结构化并发:任务生命周期被 async with 块限定,任一任务出错则自动取消其余

async with asyncio.TaskGroup() as tg:
    t1 = tg.create_task(fetch("a"))
    t2 = tg.create_task(fetch("b"))
# 退出 with 块时,所有 task 已完成;任一异常 → 其余自动取消 + 抛 ExceptionGroup
a, b = t1.result(), t2.result()

优先用 TaskGroup

  • 新代码用 TaskGroup,比 gather 的错误传播更清晰(不会出现"一个任务失败、其他仍在运行无人取消")。
  • 对照 Java 21 的 StructuredTaskScope(配 ShutdownOnFailure 策略)——同样的"结构化并发"理念:子任务生命周期与作用域绑定,任一失败即取消其余。
  • gather 仍适合"一次性收集结果、不在乎错误传播"的简单场景。

2.3 as_completed:谁先完成谁先处理

gather 等全部完才返回。想先到先处理(如爬虫先抓到的先解析)用 as_completed

for coro in asyncio.as_completed([fetch(u) for u in urls]):
    result = await coro          # 最先完成的先返回
    process(result)

2.4 同步原语(对照 java.util.concurrent

单线程协程通常无竞态(无 await 处不会被打断),但跨 await 的共享状态仍需同步。asyncio 提供异步版同步原语:

asyncio Java 对应 用途
asyncio.Lock Lock(非重入;asyncio 无 RLock) 互斥访问共享资源
asyncio.Semaphore Semaphore 限流(最多 N 个并发)
asyncio.Event CountDownLatch/Event 等待某个条件发生
asyncio.Queue BlockingQueue 生产者-消费者

Semaphore:限流并发(最常用)

爬一千个 URL,但不能同时打一千个请求(会被封/压垮服务)。用 Semaphore 限并发:

sem = asyncio.Semaphore(10)          # 最多 10 个并发

async def limited_fetch(url):
    async with sem:                  # 拿许可,超过 10 个就等
        return await fetch(url)

await asyncio.gather(*(limited_fetch(u) for u in urls))   # 上千 URL,但最多 10 并发

这是高并发 IO 的核心模式

Queue:生产者-消费者

queue = asyncio.Queue()

async def producer():
    for i in range(5):
        await queue.put(i)
    await queue.put(None)            # 哨兵:通知消费者结束

async def consumer():
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"处理 {item}")

await asyncio.gather(producer(), consumer())

2.5 超时与取消

wait_for:超时自动取消

try:
    result = await asyncio.wait_for(slow_op(), timeout=1.0)
except asyncio.TimeoutError:
    print("超时,slow_op 已被取消")    # 超时后协程自动取消

手动取消 task.cancel()

task = asyncio.create_task(long_running())
await asyncio.sleep(0.5)
task.cancel()                        # 请求取消
try:
    await task
except asyncio.CancelledError:
    print("已取消")

⚠️ 取消的语义

cancel()请求,在协程的下一个 await 处抛 CancelledError。协程可以捕获它做清理,但不应吞掉(除非有充分理由)——吞掉 CancelledError 会破坏取消传播。清理后应 raise


2.6 限制并发数的完整模式

把 Semaphore + gather 结合,是处理"大量 IO 任务、限并发"的标准范式:

async def main(urls: list[str]):
    sem = asyncio.Semaphore(20)
    async def bounded(u):
        async with sem:
            return await fetch(u)
    return await asyncio.gather(*(bounded(u) for u in urls))

或用 TaskGroup(结构化 + 限流):

async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(bounded(u)) for u in urls]
results = [t.result() for t in tasks]

本章练习

练习 2.1

Semaphore(3) 限制并发,同时请求 10 个 URL(用 asyncio.sleep 模拟),打印每个的开始/结束,观察同时不超过 3 个。

参考答案
sem = asyncio.Semaphore(3)
async def fetch(i):
    async with sem:
        print(f"start {i}")
        await asyncio.sleep(0.5)
        print(f"end {i}")
async def main():
    await asyncio.gather(*(fetch(i) for i in range(10)))
asyncio.run(main())
练习 2.2

TaskGroup 并发三个任务,其中一个抛异常。观察其他任务是否被取消。

参考答案
async def good(n): await asyncio.sleep(0.1); return n
async def bad(): raise ValueError("boom")
async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(good(1))
        tg.create_task(bad())
        tg.create_task(good(2))
# bad 抛异常 → TaskGroup 取消其余,main 抛 ExceptionGroup(ValueError)
练习 2.3

给一个可能很慢的操作加 0.5 秒超时,超时则打印"超时"。

参考答案
try:
    await asyncio.wait_for(slow(), timeout=0.5)
except asyncio.TimeoutError:
    print("超时")
练习 2.4

解释为什么协程在单线程下仍可能需要 Lock(明明没有线程竞态)。

参考答案

协程是协作式调度,await 处会切换到其他协程。若两协程在 await 之间读写同一共享状态(如"检查后操作":先读余额、await、再扣款),就可能交错产生逻辑错误。Lock 保证这段操作原子(跨 await 也不被打断)。无 await 的纯计算段无需锁(不会切换)。


上一章:异步 1 · 事件循环与协程机制← 回首页 | 下一章:异步 3 · 进阶模式与陷阱