Python队列(Queue)多任务列表详解

相关链接
1. 队列和栈
2. heapq模块与queue.PriorityQueue
3. 多任务列表

队列的区别

队列类型 线程/进程安全 适用场景 阻塞机制
asyncio.Queue 协程安全 异步 I/O 密集型任务 异步非阻塞(await)
queue.Queue 线程安全 多线程同步任务 同步阻塞
multiprocessing.Queue 进程安全 多进程任务 跨进程同步阻塞
JoinableQueue 进程安全 多进程任务 + 完成状态跟踪 同步阻塞

一、asyncio.Queue 协程安全队列

通过 asyncio.Queue 可高效协调异步任务,尤其适用于需要非阻塞 I/O 和精细任务管理的场景。需注意协程安全性与事件循环的兼容性。

1. 模块概述

asyncio.Queue 是 Python 异步编程库 asyncio 中的协程安全队列,专为异步任务设计,用于在协程(生产者-消费者模型)之间高效传递数据。它基于事件循环实现非阻塞操作,适合高并发 I/O 密集型场景。

2. 核心特性

* 协程安全

所有操作(put/get)均为异步,通过 await 实现非阻塞,确保在单线程事件循环中安全使用。

* 容量控制

支持设置最大容量(maxsize),队列满时 put 会阻塞,队列空时 get 会阻塞。

* 任务完成跟踪

提供 task_done() 和 join() 方法,用于标记任务完成并等待所有任务处理完毕。

3. 基本用法

* 生产者-消费者模型示例

import asyncio

async def producer(queue, name):
    for i in range(3):
        item = f"{name}-Task{i}"
        await queue.put(item)  # 异步放入队列
        print(f"生产者 {name} 添加: {item}")
        await asyncio.sleep(0.5)
    await queue.join()  # 等待所有任务完成
    print(f"生产者 {name} 结束")

async def consumer(queue):
    while True:
        item = await queue.get()  # 异步获取元素
        if item is None:
            break
        print(f"消费者处理: {item}")
        await asyncio.sleep(1)  # 模拟耗时操作
        queue.task_done()  # 标记任务完成

async def main():
    queue = asyncio.Queue(maxsize=2)  # 队列容量为2
    # 启动生产者和消费者
    producer_task = asyncio.create_task(producer(queue, "P1"))
    consumer_task = asyncio.create_task(consumer(queue))
    # 等待生产者完成
    await producer_task
    await queue.put(None)  # 发送终止信号给消费者
    await consumer_task

asyncio.run(main())

* 关键方法

方法 说明
await queue.put(item) 异步添加元素,队列满时阻塞直到有空间
await queue.get() 异步获取元素,队列空时阻塞直到有数据
queue.task_done() 消费者调用,标记一个任务完成(必须与 get() 成对调用)
await queue.join() 阻塞直到所有元素被处理(所有 task_done() 调用次数等于 put() 次数)

4. 注意事项

协程中避免同步阻塞操作

所有队列操作必须使用 await,否则会阻塞事件循环。

# 错误示例(阻塞事件循环):
queue.put_nowait(item)  # 非阻塞方法需谨慎使用

队列容量限制与异常处理

使用 put_nowait() 或 get_nowait() 需捕获 QueueFull/QueueEmpty 异常:

try:
    queue.put_nowait(item)
except asyncio.QueueFull:
    print("队列已满")

正确关闭消费者

通过发送终止信号(如 None)或调用 queue.put(None) 通知消费者退出循环。

任务完成跟踪一致性

task_done() 的调用次数必须等于 get() 次数,否则 join() 会永久阻塞。

5. 应用场景

高并发网络请求

管理异步 HTTP 请求任务的分发与结果收集。

实时数据处理

在异步流处理中缓冲数据,平衡生产者和消费者的速度差异。

任务调度系统

结合 asyncio.gather() 实现动态任务优先级调度。

二、queue.Queue 线程安全队列

通过 queue.Queue 可高效管理多线程任务,其线程安全特性和丰富的队列类型使其成为并发编程的核心工具。

1. 模块概述

queue.Queue 是 Python 标准库中提供的线程安全队列,支持多线程环境下的数据共享与同步,适用于生产者-消费者模型。其默认实现为先进先出(FIFO)队列,并支持阻塞操作和容量控制。

2. 核心特性

线程安全

所有操作(put()/get())通过内部锁机制保证线程安全,适合多线程任务调度。

阻塞与非阻塞操作

  • put():队列满时阻塞线程,直到有空位(可设置超时或非阻塞模式)。
  • get():队列空时阻塞线程,直到有新数据。
  • 容量控制

    初始化时可设置 maxsize 参数限制队列长度,默认为无界队列(maxsize=0)。

    多种队列类型

    支持 FIFO 队列、LIFO 队列(栈结构)、优先级队列。

    3. 基本用法

    创建队列

    import queue
    
    # 默认 FIFO 队列,容量无限制
    q = queue.Queue(maxsize=10)  
    

    入队与出队

    # 入队(阻塞操作)
    q.put(item)          # 阻塞至有空位
    q.put_nowait(item)   # 非阻塞,队列满时抛出 queue.Full 异常
    
    # 出队(阻塞操作)
    item = q.get()       # 阻塞至有数据
    item = q.get_nowait()# 非阻塞,队列空时抛出 queue.Empty 异常
    

    队列状态检查

    q.empty()  # 返回队列是否为空(非线程安全,结果可能不可靠)
    q.full()   # 返回队列是否已满
    q.qsize()  # 返回队列当前元素数量(非线程安全)
    

    任务完成跟踪

    q.task_done()  # 标记一个任务完成(需与 get() 成对调用)
    q.join()       # 阻塞主线程,直到所有任务被处理完毕
    

    4. 队列类型扩展

    队列类型 实现类 特性
    FIFO 队列 queue.Queue 默认队列,先进先出。
    LIFO 队列(栈) queue.LifoQueue 后进先出,模拟栈结构。
    优先级队列 queue.PriorityQueue 按优先级排序,元素需为可比较元组。
    示例:优先级队列
    pq = queue.PriorityQueue()
    pq.put((3, "低优先级任务"))  # 元组格式:(优先级, 数据)
    pq.put((1, "高优先级任务"))  # 优先级数值越小越优先
    

    5. 注意事项

    线程安全与性能

  • 多线程场景必须使用 queue.Queue 而非 list 或 collections.deque。
  • 非阻塞方法(put_nowait()/get_nowait())需配合异常处理使用。
  • 异常处理

    try:
        q.put_nowait(item)
    except queue.Full:
        print("队列已满")
    

    任务完成标记一致性

    task_done() 调用次数必须等于 get() 次数,否则 join() 会永久阻塞。

    三、multiprocessing.Queue 进程间通信(IPC)队列

    通过合理使用 multiprocessing.Queue,可高效实现多进程协作,突破 Python 全局解释器锁(GIL)限制,充分发挥多核 CPU 性能。

    1. 核心概念

    multiprocessing.Queue 是 Python 多进程模块中提供的进程间通信(IPC)队列,用于在多进程环境中安全传递数据。与 queue.Queue(仅用于多线程)不同,它通过序列化(pickle)和底层 IPC 机制(如管道或套接字)实现跨进程数据共享。

    2. 与 queue.Queue 对比

    特性 multiprocessing.Queue queue.Queue
    适用场景 多进程间通信(如 CPU 密集型任务) 多线程间共享数据
    线程/进程安全 进程安全 线程安全
    底层实现 基于 IPC 机制(管道或套接字) 基于锁和条件变量
    性能开销 较高(需序列化与跨进程通信) 较低(内存直接共享)
    数据序列化 必须支持 pickle 序列化 无需序列化(直接内存共享)

    3. 基本用法

    创建队列

    from multiprocessing import Queue
    
    # 初始化队列(默认无界,可设置 maxsize 限制容量)
    mp_queue = Queue(maxsize=10)
    

    入队与出队

    # 生产者进程:放入数据
    def producer(queue):
        for i in range(5):
            queue.put(f"数据-{i}")
    
    # 消费者进程:取出数据
    def consumer(queue):
        while True:
            item = queue.get()
            if item is None:  # 终止信号
                break
            print(f"处理: {item}")
    
    # 启动进程
    from multiprocessing import Process
    
    p1 = Process(target=producer, args=(mp_queue,))
    p2 = Process(target=consumer, args=(mp_queue,))
    
    p1.start()
    p2.start()
    
    # 等待生产者完成并发送终止信号
    p1.join()
    mp_queue.put(None)  # 通知消费者停止
    p2.join()
    

    4. 注意事项

    数据序列化限制

    队列中传递的对象必须可被 pickle 序列化(如自定义类需实现 reduce 方法)。

    不支持的示例:

    class CustomData:
        def __init__(self, value):
            self.value = value
        def __reduce__(self):  # 必须实现以支持序列化
            return (self.__class__, (self.value,))
    

    潜在死锁风险

  • 生产者-消费者不平衡:若消费者未及时取数据,队列满时生产者会阻塞。
  • 未正确处理终止信号:消费者可能无限等待数据,需显式发送终止标记(如 None)。
  • 性能优化

  • 批量传输:使用 Queue.put_many()(需自定义实现)减少 IPC 调用次数。
  • 替代方案:考虑 multiprocessing.Pipe(点对点)或共享内存(multiprocessing.Value/Array)提升性能。
  • 5. 替代方案

    multiprocessing.Manager().Queue()

  • 通过代理进程管理队列,支持网络分布式场景,但性能较低。
  • from multiprocessing import Manager
    manager = Manager()
    shared_queue = manager.Queue()
    

    multiprocessing.SimpleQueue

  • 更轻量级的队列,仅支持 put()/get(),无大小限制或任务跟踪。
  • from multiprocessing import SimpleQueue
    sq = SimpleQueue()
    

    6. 典型应用场景

    CPU 密集型任务分发

    将计算任务拆分到多个进程并行处理,通过队列汇总结果。

    跨进程状态同步

    监控进程向队列发送状态信息,主进程统一处理日志或报警。

    流水线数据处理

    多进程形成处理链,上游进程通过队列传递数据给下游。

    四、JoinableQueue 可连接进程队列

    JoinableQueue 是 Python multiprocessing 模块中用于多进程协作的可连接进程队列,继承自 multiprocessing.Queue,支持任务完成状态的跟踪与同步,适用于生产者-消费者模型等需要任务确认的场景。

    1. 核心特性

    任务完成通知机制

  • 消费者调用 task_done() 标记任务完成,生产者通过 join() 阻塞等待所有任务处理完毕。
  • 底层通过信号量(Semaphore)和条件变量(Condition)实现同步。
  • 与 Queue 的区别

    特性 JoinableQueue multiprocessing.Queue
    任务完成跟踪 支持(task_done() 和 join()) 不支持
    适用场景 需确认任务完成的协作场景 简单任务分发

    2. 核心方法

    task_done()

  • 作用:消费者调用此方法,表示 get() 获取的任务已处理完成。
  • 规则:调用次数必须与 get() 获取的任务数一致,否则会引发 ValueError 异常。
  • join()

  • 作用:阻塞生产者进程,直到队列中所有任务被处理完毕(即所有任务均调用了 task_done())。
  • 3. 使用步骤

    初始化队列

    from multiprocessing import JoinableQueue  
    jq = JoinableQueue()  
    

    定义生产者与消费者

    # 生产者:填充任务  
    def producer(jq):  
        for i in range(10):  
            jq.put(i)  
        # 发送终止信号(可选)  
        for _ in range(num_workers):  
            jq.put(None)  
    
    # 消费者:处理任务  
    def consumer(jq):  
        while True:  
            item = jq.get()  
            if item is None:  
                break  
            print(f"处理任务: {item}")  
            jq.task_done()  # 标记任务完成  
    

    启动进程并等待完成

    from multiprocessing import Process  
    
    # 启动消费者进程  
    num_workers = 4  
    processes = [Process(target=consumer, args=(jq,)) for _ in range(num_workers)]  
    for p in processes:  
        p.start()  
    
    # 启动生产者进程  
    producer_process = Process(target=producer, args=(jq,))  
    producer_process.start()  
    
    # 等待所有任务完成  
    jq.join()  
    print("所有任务处理完毕")  
    

    4. 注意事项

    终止信号处理

  • 需显式发送终止标记(如 None),否则消费者会无限等待任务。
  • 终止标记数量应与消费者进程数一致,确保所有进程正常退出。
  • 异常处理

  • 消费者处理任务时需捕获异常,并在失败时调用 task_done(),避免 join() 永久阻塞。
  • 性能优化

  • 高频任务场景下,优先使用批量任务提交(如 put_many()),减少进程间通信开销。
  • 5. 适用场景

    生产者-消费者模型

    生产者分发任务,消费者处理并反馈完成状态(如爬虫、批量数据处理)。

    流水线任务链

    多进程协作处理任务链,下游进程处理完成后触发上游清理操作。


    上一篇:

    2. heapq模块与queue.PriorityQueue

    作者:riven78

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python队列(Queue)多任务列表详解

    发表回复