Python进程管理详解:从入门到精通
进程是指一个程序的运行实例,它是一个独立的执行环境,拥有自己的内存空间、系统资源和执行状态。Python 中的进程可以通过标准库中的 multiprocessing 模块来创建和管理。
进程和线程的对比:
| 维度 | 进程 | 线程 |
|---|---|---|
| 资源分配 | 独立内存空间,资源占用大 | 共享进程内存,资源占用小 |
| 并发性 | 真正的并行(多核 CPU) | 受 GIL 限制,多线程为并发而非并行 |
| 通信成本 | 需通过 IPC,复杂且效率低 | 可直接共享内存,通信简单高效 |
| 适用场景 | CPU 密集型、需要隔离的任务 | I/O 密集型、轻量级任务 |
基本创建方法
from multiprocessing import Process
import os
def worker():
print(f"子进程ID:{os.getpid()},父进程ID:{os.getppid()}")
if __name__ == "__main__": # Windows系统必须添加此判断
p = Process(target=worker)
p.start() # 启动子进程
p.join() # 等待子进程结束
可以看出和Python线程的使用方式上十分一致。
📢:在 Windows 系统中,Python 借助 spawn 方式来启动新进程。这一过程意味着新进程会重新执行整个 Python 脚本。要是主逻辑没有被 if __name__ == "__main__" 包裹起来,新进程在启动时就会再次运行创建进程的代码,进而造成递归循环,最终引发错误或者死锁。Linux 和 macOS 系统采用的是 fork 方式来启动新进程。在这种方式下,子进程会复制父进程的内存空间,因此无需 if __name__ == "__main__" 也能正常运行。不过,为了保证代码在不同平台上都具有可移植性,最好还是统一添加这一保护机制。
spawn和fork两种启动新进程的方式有兴趣的读者可以进一步探究,这里不作讨论。
进程锁
在多进程环境中,多个进程可能会同时访问或修改共享资源(如文件、队列、数据库等)。如果没有适当的同步机制,可能会出现以下问题:
进程锁的使用方式与线程锁类似:
from multiprocessing import Process, Lock
def worker(lock, name):
with lock: # 自动获取和释放锁
print(f"{name} is working")
"""
手动管理就是:
lock.acquire() # 获取锁
try:
print(f"{name} is working")
finally:
lock.release() # 释放锁
"""
if __name__ == "__main__":
lock = Lock()
processes = []
for i in range(5):
p = Process(target=worker, args=(lock, f"Process-{i}"))
processes.append(p)
p.start()
for p in processes:
p.join()
进程锁的实现依赖于操作系统的同步机制。在底层,锁通常是一个互斥量(Mutex),它是一个二进制信号量,用于控制对共享资源的访问。互斥量的状态可以是“锁定”或“解锁”:
操作系统会负责管理这些锁的状态,并在锁被释放时唤醒等待的进程。
同线程一样,进程还有信号量、事件、条件等同步机制,这里不作讨论。
进程间通信(IPC)
在线程通信那节(8.4)我们了解过queue.Queue 是 Python 标准库中用于线程间通信的线程安全队列,但它不能用于进程。
由于进程是独立的执行单元,它们拥有各自的内存空间,因此进程间通信需要通过特定的机制来实现。Python 提供了多种进程间通信的方式,包括管道(Pipes)、队列(Queues,底层也用到了管道)、共享内存(Shared Memory)、消息队列(Message Queues)、信号(Signals)等。
这里我们只讨论进程间队列。
multiprocessing.Queue 是 Python 的 multiprocessing 模块中用于实现进程间通信的一个队列类。它允许在不同的进程之间安全地传递数据,即使这些进程运行在不同的内存空间中。
multiprocessing.Queue 的实现基于操作系统的管道(Pipe)和锁机制:
multiprocessing.Queue 内部使用了匿名管道,用于在进程之间传输数据。数据被序列化后通过管道发送到另一个进程。具体来说:
put 方法时(此时会上锁),数据会被序列化并发送到管道中。get 方法时(此时会上锁),它会从管道中读取数据并反序列化。from multiprocessing import Process, Queue
import time
def producer(queue):
for i in range(5):
print(f"Producer putting {i} into the queue")
queue.put(f"Message {i}")
time.sleep(0.1) # 模拟生产者的工作时间
def consumer(queue):
while True:
item = queue.get()
if item is None: # 使用 None 作为结束信号
break
print(f"Consumer got {item} from the queue")
time.sleep(0.2) # 模拟消费者的工作时间
if __name__ == "__main__":
queue = Queue()
# 创建生产者和消费者进程
producer_process = Process(target=producer, args=(queue,))
consumer_process = Process(target=consumer, args=(queue,))
# 启动进程
producer_process.start()
consumer_process.start()
# 等待生产者完成
producer_process.join()
# 向队列中放入结束信号
queue.put(None)
# 等待消费者完成
consumer_process.join()
作者:观智能