Python——协程asyncio库的使用


整体说明

  • asyncio 是 Python 内置的 异步 I/O 框架 ,核心用于编写高效的并发代码,尤其适合处理 I/O 密集型任务(如网络请求、文件读写、数据库操作等)
  • asyncio 基于 协程(coroutine) 实现,通过事件循环(Event Loop)调度任务,避免了多线程的上下文切换开销,效率更高
  • asyncio 在较高的 Python 版本中才可以使用,建议在 Python 3.7 以上使用
  • 常见应用场景包括
    • 网络请求:并发调用 API(如结合 aiohttp 库)
    • 文件操作:异步读写文件(如结合 aiofiles 库)
    • 数据库操作:异步操作数据库(如 asyncpg 用于 PostgreSQL,motor 用于 MongoDB)
    • WebSocket 服务:实现高并发的实时通信(如 websockets 库)
    • 定时任务:通过 asyncio.create_task() + 循环实现简单定时任务
  • 仅适用于 I/O 密集型任务:
    • asyncio 是单线程的,CPU 密集型任务会阻塞事件循环,需结合 loop.run_in_executor() 提交到线程池/进程池
    • await 只能在协程中使用:await 关键字不能在普通函数中使用,必须在 async def 定义的协程中
    • 事件循环是单线程的:协程的并发是“协作式”的,需通过 await 主动交出执行权,否则会独占事件循环

asyncio 相关核心概念

协程(Coroutine)

  • 协程是可暂停、可恢复的函数,用 async def 定义(语法),是 asyncio 的核心执行单元
  • 协程函数调用后不会立即执行 ,而是返回一个协程对象(coroutine object),需通过事件循环调度才能运行

事件循环(Event Loop)

  • asyncio 的“大脑”,负责调度所有协程任务:
    • 管理任务的暂停/恢复、监听 I/O 事件、分发任务执行权
  • 通常通过 asyncio.run() 自动创建和管理事件循环(推荐用法)

等待对象(Awaitable)

  • 可被 await 关键字修饰的对象
    • 包括:协程对象、Task、Future
  • await 会暂停当前协程,等待目标对象完成后再恢复,期间事件循环可调度其他协程执行(实现并发)

Task(任务)

  • 对协程的封装,将协程注册到事件循环中,使其可被调度执行
  • 通过 asyncio.create_task() 创建,会自动加入事件循环并运行

Future

  • 表示异步操作的“未来结果”,是低层级的对象(通常无需手动创建,Task 继承自 Future)

asyncio 基础用法

  • 定义和运行协程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import asyncio

    # 定义协程函数(async def 关键字)
    async def hello(name):
    print(f"Hello, {name}! (开始)")
    # 模拟 I/O 等待(必须用 await 修饰可等待对象)
    await asyncio.sleep(1) # 暂停 1 秒,期间事件循环可执行其他任务
    print(f"Hello, {name}! (结束)")

    # 运行协程(Python 3.7+ 推荐用 asyncio.run())
    asyncio.run(hello("asyncio"))
    # Hello, asyncio! (开始)
    ## 【等待】... 1s
    # Hello, asyncio! (结束)

并发执行多个协程

  • 通过 asyncio.gather()asyncio.create_task() 实现并发(多个任务同时执行,总耗时接近最长任务的耗时)

  • 方式 1:asyncio.gather()(批量等待多个协程)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import asyncio

    async def task1():
    await asyncio.sleep(2)
    return "Task 1 完成"

    async def task2():
    await asyncio.sleep(1)
    return "Task 2 完成"

    async def main():
    # 并发执行 task1 和 task2,等待所有完成后返回结果(按传入顺序)
    result1, result2 = await asyncio.gather(task1(), task2())
    print(result1)
    print(result2)

    # 总耗时大约 2 秒(而非 2+1=3 秒)
    asyncio.run(main())

    # Task 1 完成
    # Task 2 完成
  • 方式 2:asyncio.create_task()(手动创建任务)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import asyncio

    async def task(name, delay):
    await asyncio.sleep(delay)
    print(f"Task {name} 完成(延迟 {delay} 秒)")

    async def main():
    # 创建任务并自动加入事件循环
    task1 = asyncio.create_task(task("A", 2))
    task2 = asyncio.create_task(task("B", 1))

    # 等待任务完成(可单独等待,也可一起等待)
    await task1
    await task2

    asyncio.run(main())

    # Task B 完成(延迟 1 秒)
    # Task A 完成(延迟 2 秒)

附录:处理异常

  • 协程中的异常需通过 try/except 捕获,或在 gather() 中通过 return_exceptions=True 收集异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import asyncio

    async def task1():
    await asyncio.sleep(2)
    return "Task 1 完成"

    async def faulty_task():
    await asyncio.sleep(1)
    raise ValueError("任务执行失败!") # 模拟抛出异常

    async def main():
    # 方式1:捕获单个协程的异常
    try:
    await faulty_task()
    except ValueError as e:
    print(f"捕获异常:{e}") # 捕获异常:任务执行失败!

    # 方式2:批量捕获多个协程的异常(return_exceptions=True)
    results = await asyncio.gather(
    task1(), # 正常任务,输出 "Task 1 完成"
    faulty_task(), # 异常任务,抛出异常 ValueError("任务执行失败!")
    return_exceptions=True # 不终止,返回异常对象
    )
    print(results) # 输出:["Task 1 完成", ValueError("任务执行失败!")]

    asyncio.run(main())

协程的进阶特性

超时控制(asyncio.wait_for()

  • 限制协程的执行时间,超时则抛出 TimeoutError
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import asyncio

    async def long_task():
    await asyncio.sleep(3) # 模拟耗时 3 秒的任务

    async def main():
    try:
    # 限制任务 2 秒内完成,超时则取消任务并抛出异常
    result = await asyncio.wait_for(long_task(), timeout=2)
    except asyncio.TimeoutError:
    print("任务超时被取消!")

    asyncio.run(main())

任务取消(Task.cancel()

  • 手动取消正在执行的任务,被取消的任务会抛出 CancelledError
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import asyncio

    async def endless_task():
    try:
    while True:
    print("任务运行中...")
    await asyncio.sleep(1)
    except asyncio.CancelledError:
    print("任务被取消!")
    raise # 可选:重新抛出,让调用方知道任务被取消

    async def main():
    task = asyncio.create_task(endless_task())
    await asyncio.sleep(2) # 运行 2 秒后取消
    task.cancel()
    await task # 必须等待任务处理取消逻辑

    asyncio.run(main())

异步上下文管理器(async with

  • 用于异步资源的获取和释放(如异步数据库连接、异步文件),需实现 __aenter____aexit__ 方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import asyncio

    class AsyncResource:
    async def __aenter__(self):
    print("获取异步资源")
    await asyncio.sleep(0.5)
    return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
    print("释放异步资源")
    await asyncio.sleep(0.5)

    async def main():
    async with AsyncResource() as res:
    print("使用异步资源")

    asyncio.run(main())

    # 获取异步资源
    # 使用异步资源
    # 释放异步资源

异步迭代器(async for

  • 用于迭代异步生成的数据(如异步流、分页接口),需实现 __aiter____anext__ 方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import asyncio

    class AsyncIterator:
    def __init__(self, limit):
    self.limit = limit
    self.count = 0

    def __aiter__(self):
    return self

    async def __anext__(self):
    if self.count >= self.limit:
    raise StopAsyncIteration
    self.count += 1
    await asyncio.sleep(0.5) # 模拟异步获取数据
    return self.count

    async def main():
    async for num in AsyncIterator(3):
    print(f"迭代得到:{num}")

    asyncio.run(main())

    # 迭代得到:1
    # 迭代得到:2
    # 迭代得到:3