Python中进程与线程的深入解析
进程与线程
1.进程
进程是指启动后的程序,系统会为进程分配内存空间
1.1 创建进程的方式
(1)使用Process子类
class 子进程(Process):
pass
(2)使用multiprocessing模块
Process(group=None,target,name,args,kwargs)
group
:表示分组,实际上不使用,值默认为None
target
:表示子进程要执行的任务,支持函数名name
:表示子进程的名称args
:表示调用函数的位置参数,以元组的形式进行传递kwargs
:表示调用函数的关键字参数,以字典的形式进行传递
方法/属性名称 | 功能描述 |
---|---|
name |
当前进程实例别名,默认为Process-N |
pid |
当前进程对象的PID 值 |
is_alive() |
进程是否执行完,没执行完结果为True ,否则为False |
join(timeout) |
等待结束或等待timeout 秒 |
start() |
启动进程 |
run() |
如果没有指定target 参数,则启动进程后,会调用父类中的run 方法 |
terminate() |
强制终止进程 |
from multiprocessing import Process
def task():
print("子进程任务")
p = Process(target=task)
p.start()
p.join()
a. 获取进程信息
from multiprocessing import current_process
print(current_process().name)
print(current_process().pid)
b. 多进程共享数据问题
进程之间的内存不共享,需要使用:
multiprocessing.Queue
:进程安全队列multiprocessing.Value
/ Array
Manager().dict()
/ Manager().list()
from multiprocessing import Process, Value
def task(v):
for _ in range(10000):
v.value += 1 # 不安全
v = Value('i', 0) # 'i'表示整数类型
p1 = Process(target=task, args=(v,))
p2 = Process(target=task, args=(v,))
p1.start()
p2.start()
p1.join()
p2.join()
print(v.value)
c. 使用 Lock 保证进程同步
from multiprocessing import Lock, Process, Value
def safe_task(v, lock):
for _ in range(10000):
with lock:
v.value += 1
lock = Lock()
v = Value('i', 0)
p1 = Process(target=safe_task, args=(v, lock))
p2 = Process(target=safe_task, args=(v, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print(v.value) # 正确值
1.2 线程池&进程池
(1)进程池
进程池对象=Pool(N) # N 最大进程数量
方法名 | 功能描述 |
---|---|
apply_async(func,args,kwargs) |
使用非阻塞方式调用函数func |
apply(func,args,kwargs) |
使用阻塞方式1调用函数func |
close() |
关闭进程池,不再接收新任务 |
terminate() |
不管任务是否完成,立即终止 |
join() |
阻塞主进程,必须在terminate() 或close() 之后使用 |
from concurrent.futures import ProcessPoolExecutor
def task(n):
return n * n
with ProcessPoolExecutor(max_workers=4) as pool:
futures = [pool.submit(task, i) for i in range(10)]
for future in futures:
print(future.result())
(2)线程池
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
with ThreadPoolExecutor(max_workers=4) as pool:
futures = [pool.submit(task, i) for i in range(10)]
for future in futures:
print(future.result())
(3)submit vs map
# 用 map 更简洁
with ThreadPoolExecutor() as pool:
results = pool.map(task, [1, 2, 3, 4])
for r in results:
print(r)
(4)对比分析
特性 | 多线程 threading |
多进程 multiprocessing |
---|---|---|
适合任务类型 | I/O 密集型 | CPU 密集型 |
内存是否共享 | 是 | 否(各自独立) |
是否受 GIL 限制 | 是 | 否 |
开销大小 | 小 | 大(启动/通信慢) |
通信方式 | 共享变量 | Queue、Pipe、Manager 等 |
1.3 进程之间的通信
进程之间的数据不是共享的
通过队列实现进程之间的数据共享
队列对象=Queue(N)
方法名称 | 功能描述 |
---|---|
qsize() |
获取当前队列包含的消息数量 |
empty() |
判断队列是否为空,为空结果为True ,否则为False |
full() |
判断队列是否满了,满结果为True |
get(block=True) |
获取队列中的一条消息,然后从队列中移除,block 默认值为True |
get_nowait() |
同上,但消息队列为空时,抛出异常 |
put(item,block=True) |
将item 消息放入队列,block 默认为True |
put_nowait(item) |
同上 |
queue
中的Queue
类方法名称 | 功能描述 |
---|---|
put(item) |
向队列中放置数据,如果队列为满,则阻塞 |
get() |
从队列中取走数据,如果队列为空,则阻塞 |
join() |
如果队列不为空,则等待队列变为空 |
task_done() |
消费者从队列中取走一项数据,当队列变为空时,唤醒调用join() 进程 |
2.线程
线程式CPU可调度的最小单位,被包含在进程中,是进程的实际运作单位,一个进程中可以拥有N多个线程并发执行,而每个线程并行执行不同的任务
2.1 创建线程的方式
(1)第一种
t = Thread(group,target,name,args,kwargs)
group
:创建线程对象的进程组target
:创建的线程对象所要执行的目标函数name
:创建线程对象的名称,默认为"Tread-n
"args
:用元组以位置参数的形式传入target
对应函数的参数kwargs
:用字典以关键字参数的形式传入target
对应函数的参数
import threading
def task():
print("任务执行中")
# 创建线程
t = threading.Thread(target=task)
# 启动线程
t.start()
# 等待线程结束
t.join()
(2)第二种
继承式
Thread
子类创建线程的操作步骤
- 自定义类继承
threading
模块下的Thread
类 - 实现
run
方法
class MyThread(threading.Thread):
def run(self):
print("自定义线程任务")
t = MyThread()
t.start()
t.join()
2.2 线程之间的通信
2.3 获取线程信息
import threading
print(threading.current_thread().name) # 当前线程名
print(threading.active_count()) # 当前活跃线程数
2.4 线程操作共享数据的安全性问题
解决办法:Lock锁
标准写法
from threading import Lock
lock = Lock()
# 使用方式:
lock.acquire() # 加锁
try:
# 操作共享数据的代码块
finally:
lock.release() # 无论如何都要释放锁
也可以用 with
语句自动管理锁的释放:
with lock:
# 操作共享数据
示例
import threading
# 全局变量:多个线程共享
count = 0
# 创建锁
lock = threading.Lock()
def add():
global count
for _ in range(100000):
with lock: # 保证这块代码同一时间只有一个线程能执行
count += 1
# 创建两个线程
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
# 启动线程
t1.start()
t2.start()
# 等待线程执行完毕
t1.join()
t2.join()
print("最终count值:", count)
输出如下:
最终count值:200000
当未加锁时,输出可能如下:
最终count值:197xxx 或 198xxx(不是200000!)
2.5 可重入锁(Rlock
)
RLock
(递归锁):允许一个线程多次获取同一个锁,避免死锁。rlock = threading.RLock()
with rlock:
with rlock:
# 允许多次进入
pass
2.6 信号量(Semaphore
)
import threading
sem = threading.Semaphore(3) # 最多允许3个线程同时执行
def task():
with sem:
print("执行任务")
2.7 Event(事件)线程通信
import threading
import time
event = threading.Event()
def wait_for_event():
print("等待事件...")
event.wait()
print("事件已触发")
t = threading.Thread(target=wait_for_event)
t.start()
time.sleep(2)
event.set() # 通知事件发生
2.8 线程之间共享数据的方式
方式 | 说明 |
---|---|
全局变量 | 多线程间自动共享 |
线程传参 | 创建线程时通过 args 传入参数 |
queue.Queue |
线程安全的队列,推荐用于数据传输 |
2.9 使用线程安全队列
from queue import Queue
import threading
q = Queue()
def producer():
for i in range(5):
q.put(i)
def consumer():
while not q.empty():
item = q.get()
print("消费:", item)
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
作者:TY-2025