Python队列(Queue)多任务列表详解
相关链接
1. 队列和栈
2. heapq模块与queue.PriorityQueue
3. 多任务列表
队列的区别
| 队列类型 | 线程/进程安全 | 适用场景 | 阻塞机制 |
|---|---|---|---|
| asyncio.Queue | 协程安全 | 异步 I/O 密集型任务 | 异步非阻塞(await) |
| queue.Queue | 线程安全 | 多线程同步任务 | 同步阻塞 |
| multiprocessing.Queue | 进程安全 | 多进程任务 | 跨进程同步阻塞 |
| JoinableQueue | 进程安全 | 多进程任务 + 完成状态跟踪 | 同步阻塞 |
一、asyncio.Queue 协程安全队列
通过 asyncio.Queue 可高效协调异步任务,尤其适用于需要非阻塞 I/O 和精细任务管理的场景。需注意协程安全性与事件循环的兼容性。
1. 模块概述
asyncio.Queue 是 Python 异步编程库 asyncio 中的协程安全队列,专为异步任务设计,用于在协程(生产者-消费者模型)之间高效传递数据。它基于事件循环实现非阻塞操作,适合高并发 I/O 密集型场景。
2. 核心特性
* 协程安全
所有操作(put/get)均为异步,通过 await 实现非阻塞,确保在单线程事件循环中安全使用。
* 容量控制
支持设置最大容量(maxsize),队列满时 put 会阻塞,队列空时 get 会阻塞。
* 任务完成跟踪
提供 task_done() 和 join() 方法,用于标记任务完成并等待所有任务处理完毕。
3. 基本用法
* 生产者-消费者模型示例
import asyncio
async def producer(queue, name):
for i in range(3):
item = f"{name}-Task{i}"
await queue.put(item) # 异步放入队列
print(f"生产者 {name} 添加: {item}")
await asyncio.sleep(0.5)
await queue.join() # 等待所有任务完成
print(f"生产者 {name} 结束")
async def consumer(queue):
while True:
item = await queue.get() # 异步获取元素
if item is None:
break
print(f"消费者处理: {item}")
await asyncio.sleep(1) # 模拟耗时操作
queue.task_done() # 标记任务完成
async def main():
queue = asyncio.Queue(maxsize=2) # 队列容量为2
# 启动生产者和消费者
producer_task = asyncio.create_task(producer(queue, "P1"))
consumer_task = asyncio.create_task(consumer(queue))
# 等待生产者完成
await producer_task
await queue.put(None) # 发送终止信号给消费者
await consumer_task
asyncio.run(main())
* 关键方法
| 方法 | 说明 |
|---|---|
| await queue.put(item) | 异步添加元素,队列满时阻塞直到有空间 |
| await queue.get() | 异步获取元素,队列空时阻塞直到有数据 |
| queue.task_done() | 消费者调用,标记一个任务完成(必须与 get() 成对调用) |
| await queue.join() | 阻塞直到所有元素被处理(所有 task_done() 调用次数等于 put() 次数) |
4. 注意事项
* 协程中避免同步阻塞操作
所有队列操作必须使用 await,否则会阻塞事件循环。
# 错误示例(阻塞事件循环):
queue.put_nowait(item) # 非阻塞方法需谨慎使用
* 队列容量限制与异常处理
使用 put_nowait() 或 get_nowait() 需捕获 QueueFull/QueueEmpty 异常:
try:
queue.put_nowait(item)
except asyncio.QueueFull:
print("队列已满")
* 正确关闭消费者
通过发送终止信号(如 None)或调用 queue.put(None) 通知消费者退出循环。
* 任务完成跟踪一致性
task_done() 的调用次数必须等于 get() 次数,否则 join() 会永久阻塞。
5. 应用场景
* 高并发网络请求
管理异步 HTTP 请求任务的分发与结果收集。
* 实时数据处理
在异步流处理中缓冲数据,平衡生产者和消费者的速度差异。
* 任务调度系统
结合 asyncio.gather() 实现动态任务优先级调度。
二、queue.Queue 线程安全队列
通过 queue.Queue 可高效管理多线程任务,其线程安全特性和丰富的队列类型使其成为并发编程的核心工具。
1. 模块概述
queue.Queue 是 Python 标准库中提供的线程安全队列,支持多线程环境下的数据共享与同步,适用于生产者-消费者模型。其默认实现为先进先出(FIFO)队列,并支持阻塞操作和容量控制。
2. 核心特性
* 线程安全
所有操作(put()/get())通过内部锁机制保证线程安全,适合多线程任务调度。
* 阻塞与非阻塞操作
* 容量控制
初始化时可设置 maxsize 参数限制队列长度,默认为无界队列(maxsize=0)。
* 多种队列类型
支持 FIFO 队列、LIFO 队列(栈结构)、优先级队列。
3. 基本用法
* 创建队列
import queue
# 默认 FIFO 队列,容量无限制
q = queue.Queue(maxsize=10)
* 入队与出队
# 入队(阻塞操作)
q.put(item) # 阻塞至有空位
q.put_nowait(item) # 非阻塞,队列满时抛出 queue.Full 异常
# 出队(阻塞操作)
item = q.get() # 阻塞至有数据
item = q.get_nowait()# 非阻塞,队列空时抛出 queue.Empty 异常
* 队列状态检查
q.empty() # 返回队列是否为空(非线程安全,结果可能不可靠)
q.full() # 返回队列是否已满
q.qsize() # 返回队列当前元素数量(非线程安全)
* 任务完成跟踪
q.task_done() # 标记一个任务完成(需与 get() 成对调用)
q.join() # 阻塞主线程,直到所有任务被处理完毕
4. 队列类型扩展
| 队列类型 | 实现类 | 特性 |
|---|---|---|
| FIFO 队列 | queue.Queue | 默认队列,先进先出。 |
| LIFO 队列(栈) | queue.LifoQueue | 后进先出,模拟栈结构。 |
| 优先级队列 | queue.PriorityQueue | 按优先级排序,元素需为可比较元组。 |
示例:优先级队列
pq = queue.PriorityQueue()
pq.put((3, "低优先级任务")) # 元组格式:(优先级, 数据)
pq.put((1, "高优先级任务")) # 优先级数值越小越优先
5. 注意事项
* 线程安全与性能
* 异常处理
try:
q.put_nowait(item)
except queue.Full:
print("队列已满")
* 任务完成标记一致性
task_done() 调用次数必须等于 get() 次数,否则 join() 会永久阻塞。
三、multiprocessing.Queue 进程间通信(IPC)队列
通过合理使用 multiprocessing.Queue,可高效实现多进程协作,突破 Python 全局解释器锁(GIL)限制,充分发挥多核 CPU 性能。
1. 核心概念
multiprocessing.Queue 是 Python 多进程模块中提供的进程间通信(IPC)队列,用于在多进程环境中安全传递数据。与 queue.Queue(仅用于多线程)不同,它通过序列化(pickle)和底层 IPC 机制(如管道或套接字)实现跨进程数据共享。
2. 与 queue.Queue 对比
| 特性 | multiprocessing.Queue | queue.Queue |
|---|---|---|
| 适用场景 | 多进程间通信(如 CPU 密集型任务) | 多线程间共享数据 |
| 线程/进程安全 | 进程安全 | 线程安全 |
| 底层实现 | 基于 IPC 机制(管道或套接字) | 基于锁和条件变量 |
| 性能开销 | 较高(需序列化与跨进程通信) | 较低(内存直接共享) |
| 数据序列化 | 必须支持 pickle 序列化 | 无需序列化(直接内存共享) |
3. 基本用法
* 创建队列
from multiprocessing import Queue
# 初始化队列(默认无界,可设置 maxsize 限制容量)
mp_queue = Queue(maxsize=10)
* 入队与出队
# 生产者进程:放入数据
def producer(queue):
for i in range(5):
queue.put(f"数据-{i}")
# 消费者进程:取出数据
def consumer(queue):
while True:
item = queue.get()
if item is None: # 终止信号
break
print(f"处理: {item}")
# 启动进程
from multiprocessing import Process
p1 = Process(target=producer, args=(mp_queue,))
p2 = Process(target=consumer, args=(mp_queue,))
p1.start()
p2.start()
# 等待生产者完成并发送终止信号
p1.join()
mp_queue.put(None) # 通知消费者停止
p2.join()
4. 注意事项
* 数据序列化限制
队列中传递的对象必须可被 pickle 序列化(如自定义类需实现 reduce 方法)。
不支持的示例:
class CustomData:
def __init__(self, value):
self.value = value
def __reduce__(self): # 必须实现以支持序列化
return (self.__class__, (self.value,))
* 潜在死锁风险
* 性能优化
5. 替代方案
* multiprocessing.Manager().Queue()
from multiprocessing import Manager
manager = Manager()
shared_queue = manager.Queue()
* multiprocessing.SimpleQueue
from multiprocessing import SimpleQueue
sq = SimpleQueue()
6. 典型应用场景
* CPU 密集型任务分发
将计算任务拆分到多个进程并行处理,通过队列汇总结果。
* 跨进程状态同步
监控进程向队列发送状态信息,主进程统一处理日志或报警。
* 流水线数据处理
多进程形成处理链,上游进程通过队列传递数据给下游。
四、JoinableQueue 可连接进程队列
JoinableQueue 是 Python multiprocessing 模块中用于多进程协作的可连接进程队列,继承自 multiprocessing.Queue,支持任务完成状态的跟踪与同步,适用于生产者-消费者模型等需要任务确认的场景。
1. 核心特性
* 任务完成通知机制
* 与 Queue 的区别
| 特性 | JoinableQueue | multiprocessing.Queue |
|---|---|---|
| 任务完成跟踪 | 支持(task_done() 和 join()) | 不支持 |
| 适用场景 | 需确认任务完成的协作场景 | 简单任务分发 |
2. 核心方法
* task_done()
* join()
3. 使用步骤
* 初始化队列
from multiprocessing import JoinableQueue
jq = JoinableQueue()
* 定义生产者与消费者
# 生产者:填充任务
def producer(jq):
for i in range(10):
jq.put(i)
# 发送终止信号(可选)
for _ in range(num_workers):
jq.put(None)
# 消费者:处理任务
def consumer(jq):
while True:
item = jq.get()
if item is None:
break
print(f"处理任务: {item}")
jq.task_done() # 标记任务完成
* 启动进程并等待完成
from multiprocessing import Process
# 启动消费者进程
num_workers = 4
processes = [Process(target=consumer, args=(jq,)) for _ in range(num_workers)]
for p in processes:
p.start()
# 启动生产者进程
producer_process = Process(target=producer, args=(jq,))
producer_process.start()
# 等待所有任务完成
jq.join()
print("所有任务处理完毕")
4. 注意事项
* 终止信号处理
* 异常处理
* 性能优化
5. 适用场景
* 生产者-消费者模型
生产者分发任务,消费者处理并反馈完成状态(如爬虫、批量数据处理)。
* 流水线任务链
多进程协作处理任务链,下游进程处理完成后触发上游清理操作。
上一篇:
2. heapq模块与queue.PriorityQueue
作者:riven78