【Python】使用 asyncio 库实现高效异步编程实践指南
Python 的 asyncio 库是标准库的一部分,引入于 Python 3.4(通过 PEP 3156),用于实现异步编程。它提供了一种基于事件循环(Event Loop)的并发机制,适合处理 I/O 密集型任务(如网络请求、文件操作、数据库查询等)。asyncio 通过协程(coroutines)、任务(tasks)和事件循环,允许程序在等待 I/O 操作时执行其他任务,从而提高效率。
以下是对 asyncio 库的详细说明和常见用法。
1. asyncio 库的作用
aiohttp、aiomysql)集成,支持异步网络请求、数据库操作等。2. 核心概念
async def 定义的函数,代表可暂停和恢复的异步操作。asyncio 的核心,负责调度协程和处理 I/O 事件。await 的对象,包括协程、任务和 Future。3. 基本用法
以下是 asyncio 的基本用法,展示如何定义和运行协程。
3.1 定义和运行协程
import asyncio
# 定义协程
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟异步 I/O 操作
print("World")
# 运行协程
async def main():
await asyncio.gather(say_hello(), say_hello()) # 并发运行多个协程
# 执行事件循环
if __name__ == "__main__":
asyncio.run(main())
输出:
Hello
Hello
World
World
说明:
async def 定义协程,await 表示暂停点,允许事件循环调度其他任务。asyncio.sleep(1) 模拟异步 I/O(如网络请求),不会阻塞事件循环。asyncio.gather() 并发运行多个协程。asyncio.run() 是运行异步程序的推荐入口,自动创建和关闭事件循环。3.2 事件循环
事件循环是 asyncio 的核心,负责调度协程和处理回调。以下是手动操作事件循环的示例:
import asyncio
async def task():
print("Task started")
await asyncio.sleep(1)
print("Task finished")
loop = asyncio.get_event_loop() # 获取事件循环
try:
loop.run_until_complete(task()) # 运行协程直到完成
finally:
loop.close() # 关闭事件循环
说明:
asyncio.get_event_loop() 获取默认事件循环。loop.run_until_complete() 运行单个协程或 Future。asyncio.run(),因为它更安全且自动管理循环的生命周期。4. 常用功能
asyncio 提供了丰富的 API,以下是常见功能和用法。
4.1 并发运行多个协程
使用 asyncio.gather() 或 asyncio.create_task() 实现并发:
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
async def main():
# 使用 gather 并发运行
await asyncio.gather(task1(), task2())
# 或者使用 create_task
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
await t1
await t2
asyncio.run(main())
输出:
Task 1 started
Task 2 started
Task 2 finished
Task 1 finished
说明:
asyncio.gather() 等待所有协程完成,返回结果列表。asyncio.create_task() 将协程包装为任务,立即调度运行。4.2 超时控制
使用 asyncio.wait_for() 为协程设置超时:
import asyncio
async def long_task():
await asyncio.sleep(5)
print("Task completed")
async def main():
try:
await asyncio.wait_for(long_task(), timeout=2) # 2 秒超时
except asyncio.TimeoutError:
print("Task timed out")
asyncio.run(main())
输出:
Task timed out
4.3 异步迭代
asyncio 支持异步迭代器和异步上下文管理器:
import asyncio
async def async_generator():
for i in range(3):
await asyncio.sleep(1)
yield i
async def main():
async for value in async_generator():
print(f"Received: {value}")
asyncio.run(main())
输出:
Received: 0
Received: 1
Received: 2
4.4 异步上下文管理器
使用 async with 管理资源:
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def resource():
print("Resource acquired")
try:
yield
finally:
print("Resource released")
async def main():
async with resource():
print("Using resource")
await asyncio.sleep(1)
asyncio.run(main())
输出:
Resource acquired
Using resource
Resource released
5. 与外部库集成
asyncio 常与异步库结合使用,例如:
aiohttp:异步 HTTP 客户端/服务器。
import aiohttp
import asyncio
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
html = await fetch_url("https://example.com")
print(html[:100])
asyncio.run(main())
aiomysql:异步 MySQL 数据库操作。
aiofiles:异步文件读写。
6. 高级功能
6.1 任务取消
可以取消正在运行的任务:
import asyncio
async def long_task():
try:
print("Task started")
await asyncio.sleep(10)
print("Task finished")
except asyncio.CancelledError:
print("Task was cancelled")
raise
async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(1)
task.cancel() # 取消任务
try:
await task
except asyncio.CancelledError:
print("Main caught cancellation")
asyncio.run(main())
输出:
Task started
Task was cancelled
Main caught cancellation
6.2 同步与异步混合
使用 loop.run_in_executor() 将同步代码(阻塞操作)运行在线程池或进程池中:
import asyncio
import time
def blocking_task():
time.sleep(1) # 模拟阻塞操作
return "Done"
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_task) # 在默认线程池运行
print(result)
asyncio.run(main())
输出:
Done
6.3 自定义事件循环
可以自定义事件循环策略,例如使用 uvloop(高性能事件循环):
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 后续代码使用 uvloop 作为事件循环
安装 uvloop:
pip install uvloop
7. 实际应用场景
8. 注意事项
asyncio 在单线程中运行,依赖事件循环调度,无法利用多核 CPU(需要结合 multiprocessing)。time.sleep、requests.get)会阻塞事件循环,需使用异步替代或 run_in_executor。await 或任务调度运行,否则不会执行。asyncio.run(debug=True))。asyncio.run() 和上下文管理器改进。asyncio.TaskGroup)等新功能。9. 综合示例
以下是一个综合示例,展示异步爬虫的实现:
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://example.com",
"https://python.org",
"https://github.com"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"Failed to fetch {url}: {result}")
else:
print(f"Fetched {url}: {len(result)} bytes")
if __name__ == "__main__":
asyncio.run(main())
输出示例:
Fetched https://example.com: 1256 bytes
Fetched https://python.org: 50342 bytes
Fetched https://github.com: 123456 bytes
说明:
aiohttp.ClientSession 管理 HTTP 会话。asyncio.gather 并发请求多个 URL。return_exceptions=True 防止单个失败影响其他任务。作者:彬彬侠