整体说明
asyncio是 Python 内置的 异步 I/O 框架 ,核心用于编写高效的并发代码,尤其适合处理 I/O 密集型任务(如网络请求、文件读写、数据库操作等)asyncio基于 协程(coroutine) 实现,通过事件循环(Event Loop)调度任务,避免了多线程的上下文切换开销,效率更高asyncio在较高的 Python 版本中才可以使用,建议在 Python 3.7 以上使用- 常见应用场景包括
- 网络请求:并发调用 API(如结合
aiohttp库) - 文件操作:异步读写文件(如结合
aiofiles库) - 数据库操作:异步操作数据库(如
asyncpg用于 PostgreSQL,motor用于 MongoDB) - WebSocket 服务:实现高并发的实时通信(如
websockets库) - 定时任务:通过
asyncio.create_task()+ 循环实现简单定时任务
- 网络请求:并发调用 API(如结合
- 仅适用于 I/O 密集型任务:
asyncio是单线程的,CPU 密集型任务会阻塞事件循环,需结合loop.run_in_executor()提交到线程池/进程池await只能在协程中使用:await关键字不能在普通函数中使用,必须在async def定义的协程中- 事件循环是单线程的:协程的并发是“协作式”的,需通过
await主动交出执行权,否则会独占事件循环
asyncio 相关核心概念
协程(Coroutine)
- 协程是可暂停、可恢复的函数,用
async def定义(语法),是asyncio的核心执行单元 - 协程函数调用后不会立即执行 ,而是返回一个协程对象(coroutine object),需通过事件循环调度才能运行
事件循环(Event Loop)
asyncio的“大脑”,负责调度所有协程任务:- 管理任务的暂停/恢复、监听 I/O 事件、分发任务执行权
- 通常通过
asyncio.run()自动创建和管理事件循环(推荐用法)
等待对象(Awaitable)
- 可被
await关键字修饰的对象- 包括:协程对象、Task、Future
await会暂停当前协程,等待目标对象完成后再恢复,期间事件循环可调度其他协程执行(实现并发)
Task(任务)
- 对协程的封装,将协程注册到事件循环中,使其可被调度执行
- 通过
asyncio.create_task()创建,会自动加入事件循环并运行
Future
- 表示异步操作的“未来结果”,是低层级的对象(通常无需手动创建,Task 继承自 Future)
asyncio 基础用法
- 定义和运行协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14import asyncio
# 定义协程函数(async def 关键字)
async def hello(name):
print(f"Hello, {name}! (开始)")
# 模拟 I/O 等待(必须用 await 修饰可等待对象)
await asyncio.sleep(1) # 暂停 1 秒,期间事件循环可执行其他任务
print(f"Hello, {name}! (结束)")
# 运行协程(Python 3.7+ 推荐用 asyncio.run())
asyncio.run(hello("asyncio"))
# Hello, asyncio! (开始)
## 【等待】... 1s
# Hello, asyncio! (结束)
并发执行多个协程
通过
asyncio.gather()或asyncio.create_task()实现并发(多个任务同时执行,总耗时接近最长任务的耗时)方式 1:
asyncio.gather()(批量等待多个协程)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import asyncio
async def task1():
await asyncio.sleep(2)
return "Task 1 完成"
async def task2():
await asyncio.sleep(1)
return "Task 2 完成"
async def main():
# 并发执行 task1 和 task2,等待所有完成后返回结果(按传入顺序)
result1, result2 = await asyncio.gather(task1(), task2())
print(result1)
print(result2)
# 总耗时大约 2 秒(而非 2+1=3 秒)
asyncio.run(main())
# Task 1 完成
# Task 2 完成方式 2:
asyncio.create_task()(手动创建任务)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"Task {name} 完成(延迟 {delay} 秒)")
async def main():
# 创建任务并自动加入事件循环
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))
# 等待任务完成(可单独等待,也可一起等待)
await task1
await task2
asyncio.run(main())
# Task B 完成(延迟 1 秒)
# Task A 完成(延迟 2 秒)
附录:处理异常
- 协程中的异常需通过
try/except捕获,或在gather()中通过return_exceptions=True收集异常1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import asyncio
async def task1():
await asyncio.sleep(2)
return "Task 1 完成"
async def faulty_task():
await asyncio.sleep(1)
raise ValueError("任务执行失败!") # 模拟抛出异常
async def main():
# 方式1:捕获单个协程的异常
try:
await faulty_task()
except ValueError as e:
print(f"捕获异常:{e}") # 捕获异常:任务执行失败!
# 方式2:批量捕获多个协程的异常(return_exceptions=True)
results = await asyncio.gather(
task1(), # 正常任务,输出 "Task 1 完成"
faulty_task(), # 异常任务,抛出异常 ValueError("任务执行失败!")
return_exceptions=True # 不终止,返回异常对象
)
print(results) # 输出:["Task 1 完成", ValueError("任务执行失败!")]
asyncio.run(main())
协程的进阶特性
超时控制(asyncio.wait_for())
- 限制协程的执行时间,超时则抛出
TimeoutError1
2
3
4
5
6
7
8
9
10
11
12
13import asyncio
async def long_task():
await asyncio.sleep(3) # 模拟耗时 3 秒的任务
async def main():
try:
# 限制任务 2 秒内完成,超时则取消任务并抛出异常
result = await asyncio.wait_for(long_task(), timeout=2)
except asyncio.TimeoutError:
print("任务超时被取消!")
asyncio.run(main())
任务取消(Task.cancel())
- 手动取消正在执行的任务,被取消的任务会抛出
CancelledError1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import asyncio
async def endless_task():
try:
while True:
print("任务运行中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("任务被取消!")
raise # 可选:重新抛出,让调用方知道任务被取消
async def main():
task = asyncio.create_task(endless_task())
await asyncio.sleep(2) # 运行 2 秒后取消
task.cancel()
await task # 必须等待任务处理取消逻辑
asyncio.run(main())
异步上下文管理器(async with)
- 用于异步资源的获取和释放(如异步数据库连接、异步文件),需实现
__aenter__和__aexit__方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import asyncio
class AsyncResource:
async def __aenter__(self):
print("获取异步资源")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放异步资源")
await asyncio.sleep(0.5)
async def main():
async with AsyncResource() as res:
print("使用异步资源")
asyncio.run(main())
# 获取异步资源
# 使用异步资源
# 释放异步资源
异步迭代器(async for)
- 用于迭代异步生成的数据(如异步流、分页接口),需实现
__aiter__和__anext__方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import asyncio
class AsyncIterator:
def __init__(self, limit):
self.limit = limit
self.count = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.count >= self.limit:
raise StopAsyncIteration
self.count += 1
await asyncio.sleep(0.5) # 模拟异步获取数据
return self.count
async def main():
async for num in AsyncIterator(3):
print(f"迭代得到:{num}")
asyncio.run(main())
# 迭代得到:1
# 迭代得到:2
# 迭代得到:3