Python asyncio 异步模块使用教程

Python asyncio 异步模块使用教程

1. 异步编程基础概念

1.1 什么是异步编程

异步编程是一种非阻塞的编程范式,允许程序在等待I/O操作(如网络请求、文件读写)完成时执行其他任务,而不是干等。

1.2 事件循环(Event Loop)

事件循环是asyncio的核心,负责调度和执行异步任务。它不断检查并执行以下操作:

  • 执行可运行的协程
  • 处理I/O事件
  • 执行回调函数
  • 处理子进程
  • 1.3 协程(Coroutine)

    协程是asyncio的基本执行单元,使用async def定义的函数:

    async def my_coroutine():
        await asyncio.sleep(1)
        return "Done"
    

    1.4 Future和Task

  • Future: 表示异步操作的最终结果
  • Task: Future的子类,用于包装和管理协程的执行
  • 2. 基本用法

    2.1 创建和运行协程

    import asyncio
    
    async def hello_world():
        print("Hello")
        await asyncio.sleep(1)
        print("World")
    
    # Python 3.7+ 推荐方式
    asyncio.run(hello_world())
    
    # 旧版本方式
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(hello_world())
    # loop.close()
    

    2.2 创建任务(Task)

    async def main():
        task1 = asyncio.create_task(hello_world())
        task2 = asyncio.create_task(hello_world())
        await task1
        await task2
    
    asyncio.run(main())
    

    2.3 并发运行多个协程

    async def fetch_data(delay, id):
        print(f"Fetching data {id}...")
        await asyncio.sleep(delay)
        print(f"Data {id} fetched")
        return {"id": id, "delay": delay}
    
    async def main():
        results = await asyncio.gather(
            fetch_data(2, 1),
            fetch_data(1, 2),
            fetch_data(3, 3),
        )
        print(results)
    
    asyncio.run(main())
    

    3. 核心API详解

    3.1 事件循环管理

    3.1.1 获取事件循环
    loop = asyncio.get_event_loop()
    
    3.1.2 运行直到完成
    loop.run_until_complete(coro)
    
    3.1.3 运行永久
    loop.run_forever()
    
    3.1.4 停止事件循环
    loop.stop()
    

    3.2 协程控制

    3.2.1 asyncio.sleep
    await asyncio.sleep(1)  # 休眠1秒
    
    3.2.2 asyncio.gather
    results = await asyncio.gather(
        coro1(),
        coro2(),
        return_exceptions=True  # 异常时返回异常对象而不是抛出
    )
    
    3.2.3 asyncio.wait
    done, pending = await asyncio.wait(
        [coro1(), coro2()],
        timeout=2,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    3.2.4 asyncio.shield
    task = asyncio.create_task(coro())
    await asyncio.shield(task)  # 防止任务被取消
    

    3.3 任务管理

    3.3.1 创建任务
    task = asyncio.create_task(coro())
    
    3.3.2 取消任务
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")
    
    3.3.3 设置超时
    try:
        await asyncio.wait_for(coro(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Timeout!")
    

    4. 高级特性

    4.1 异步上下文管理器

    class AsyncResource:
        async def __aenter__(self):
            print("Opening resource")
            await asyncio.sleep(1)
            return self
        
        async def __aexit__(self, exc_type, exc, tb):
            print("Closing resource")
            await asyncio.sleep(1)
    
    async def main():
        async with AsyncResource() as resource:
            print("Using resource")
    
    asyncio.run(main())
    

    4.2 异步迭代器

    class AsyncCounter:
        def __init__(self, stop):
            self.current = 0
            self.stop = stop
        
        def __aiter__(self):
            return self
        
        async def __anext__(self):
            if self.current < self.stop:
                await asyncio.sleep(1)
                self.current += 1
                return self.current
            else:
                raise StopAsyncIteration
    
    async def main():
        async for i in AsyncCounter(3):
            print(i)
    
    asyncio.run(main())
    

    4.3 异步生成器

    async def async_gen():
        for i in range(3):
            await asyncio.sleep(1)
            yield i
    
    async def main():
        async for item in async_gen():
            print(item)
    
    asyncio.run(main())
    

    5. 与同步代码交互

    5.1 在异步中运行同步代码

    import time
    
    def blocking_io():
        time.sleep(1)
        return "Blocking IO done"
    
    async def main():
        loop = asyncio.get_running_loop()
        
        # 1. 在默认线程池中运行
        result = await loop.run_in_executor(
            None, blocking_io
        )
        print(result)
        
        # 2. 在自定义线程池中运行
        # with concurrent.futures.ThreadPoolExecutor() as pool:
        #     result = await loop.run_in_executor(
        #         pool, blocking_io
        #     )
        #     print(result)
    
    asyncio.run(main())
    

    5.2 在同步中运行异步代码

    def sync_main():
        return asyncio.run(async_main())
    
    async def async_main():
        await asyncio.sleep(1)
        return "Done"
    
    print(sync_main())
    

    6. 常见问题

    6.1 常见错误

    1. 忘记await:导致协程没有实际执行
    2. 混合阻塞和非阻塞代码:在协程中调用阻塞函数
    3. 不正确的任务取消处理:没有正确处理CancelledError
    4. 事件循环嵌套:在已有事件循环中创建新循环

    6.2 注意事项

    1. 使用async/await语法:替代旧式的@asyncio.coroutine和yield from
    2. 合理设置并发限制:避免资源耗尽
    3. 正确处理异常:特别是CancelledError
    4. 使用结构化并发:通过async with管理资源
    5. 避免CPU密集型任务:asyncio适合I/O密集型场景

    6.3 性能优化

    1. 减少await次数:批量处理任务
    2. 使用uvloop:替代默认事件循环提升性能
    import uvloop
    uvloop.install()
    
    1. 合理设置缓冲区大小:在网络编程中调整读写缓冲区

    7. 实际应用示例

    7.1 异步爬虫

    import aiohttp
    
    async def fetch_url(session, url):
        async with session.get(url) as response:
            return await response.text()
    
    async def main():
        async with aiohttp.ClientSession() as session:
            tasks = [
                fetch_url(session, "http://example.com") for _ in range(5)
            ]
            results = await asyncio.gather(*tasks)
            print(f"Fetched {len(results)} pages")
    
    asyncio.run(main())
    

    7.2 异步数据库访问

    import asyncpg
    
    async def fetch_data():
        conn = await asyncpg.connect(
            user='user', password='pass',
            database='db', host='localhost')
        
        try:
            result = await conn.fetch('SELECT * FROM table')
            return result
        finally:
            await conn.close()
    
    asyncio.run(fetch_data())
    

    7.3 异步Web框架(FastAPI)

    from fastapi import FastAPI
    import asyncio
    
    app = FastAPI()
    
    @app.get("/")
    async def read_root():
        await asyncio.sleep(1)
        return {"Hello": "World"}
    
    @app.get("/items/{item_id}")
    async def read_item(item_id: int):
        return {"item_id": item_id}
    

    作者:cugleem

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python asyncio 异步模块使用教程

    发表回复