Python异步编程详解:await与asyncio深度解析

在 Python 中,awaitasyncio 是异步编程的核心工具,用于高效处理 I/O 密集型任务(如网络请求、文件读写、数据库操作等)。它们的核心思想是通过 协程(Coroutine)事件循环(Event Loop) 实现非阻塞并发,避免线程切换的开销。


一、核心概念

  1. 协程(Coroutine)
    async def 定义的函数,返回一个协程对象,可以通过 await 挂起执行,让出控制权。

    async def my_coroutine():
        await asyncio.sleep(1)
        print("Done")
    
  2. 事件循环(Event Loop)
    异步程序的核心调度器,负责执行协程并在 I/O 操作时切换任务。

  3. 可等待对象(Awaitable)
    包括协程、asyncio.Taskasyncio.Future。只有可等待对象才能被 await


二、使用场景

  1. 高并发网络请求
    如爬虫、API 调用等需要同时处理大量连接的场景。
  2. Web 服务器
    如 FastAPI、Sanic 等异步框架处理 HTTP 请求。
  3. 数据库操作
    异步驱动(如 asyncpgaiomysql)避免阻塞主线程。
  4. 实时通信
    WebSocket、聊天服务器等需要长连接的场景。

三、基本用法

1. 定义协程
async def fetch_data(url):
    # 模拟网络请求
    await asyncio.sleep(1)
    return f"Data from {url}"
2. 运行协程
async def main():
    result = await fetch_data("https://example.com")
    print(result)

# Python 3.7+ 推荐方式
asyncio.run(main())
3. 并发执行多个任务

使用 asyncio.gather()asyncio.create_task()

async def main():
    # 同时执行多个协程
    task1 = asyncio.create_task(fetch_data("url1"))
    task2 = asyncio.create_task(fetch_data("url2"))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    print(results)

四、关键 API 详解

  1. asyncio.run(coro)
    启动事件循环并运行协程(Python 3.7+)。

  2. asyncio.create_task(coro)
    将协程包装为 Task,加入事件循环并发执行。

  3. asyncio.gather(*coros)
    并发执行多个协程,返回结果列表。

  4. asyncio.sleep(delay)
    非阻塞等待(模拟 I/O 操作)。


五、高级用法

1. 控制并发量

使用信号量(Semaphore)限制同时运行的任务数:

async def limited_fetch(url, semaphore):
    async with semaphore:
        return await fetch_data(url)

async def main():
    semaphore = asyncio.Semaphore(5)  # 最大并发5
    tasks = [limited_fetch(url, semaphore) for url in urls]
    await asyncio.gather(*tasks)
2. 超时控制
async def fetch_with_timeout():
    try:
        async with asyncio.timeout(3):  # Python 3.11+
            await fetch_data("slow_url")
    except TimeoutError:
        print("Timeout!")
3. 回调与 Future
async def main():
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    def callback():
        future.set_result("Done")

    loop.call_soon(callback)
    result = await future
    print(result)

六、常见错误

  1. 忘记 await
    协程不会被自动执行:

    async def main():
        fetch_data("url")  # 错误!没有 await
    
  2. 阻塞主线程
    在协程中调用同步代码(如 time.sleep())会阻塞事件循环:

    async def bad_example():
        time.sleep(1)  # 错误!应使用 await asyncio.sleep(1)
    
  3. 滥用并发
    异步不适合 CPU 密集型任务,此时应使用多进程。


七、完整示例

import asyncio

async def download(url, delay):
    print(f"Start downloading {url}")
    await asyncio.sleep(delay)
    print(f"Finished {url}")
    return url

async def main():
    urls = [
        ("https://site1.com", 1),
        ("https://site2.com", 2),
        ("https://site3.com", 3),
    ]
    
    tasks = [asyncio.create_task(download(url, delay)) for url, delay in urls]
    results = await asyncio.gather(*tasks)
    print("All done:", results)

if __name__ == "__main__":
    asyncio.run(main())

输出:

Start downloading https://site1.com
Start downloading https://site2.com
Start downloading https://site3.com
Finished https://site1.com
Finished https://site2.com
Finished https://site3.com
All done: ['https://site1.com', 'https://site2.com', 'https://site3.com']

八、总结

  • 适用场景:I/O 密集型任务,如网络、文件、数据库操作。
  • 关键点
  • 使用 async def 定义协程,用 await 挂起阻塞操作。
  • 通过 asyncio.create_task()asyncio.gather() 实现并发。
  • 避免在协程中调用阻塞同步代码。
  • 通过合理使用 asyncio,可以在单线程内高效处理成千上万的并发连接。

    作者:大大小小聪明

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python异步编程详解:await与asyncio深度解析

    发表回复