【Python】多线程Thread全方位使用指南

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 一、threading是什么?
  • 二、使用步骤
  • 1.引入模块
  • 2.创建多线程
  • 3.多线程目标函数传递参数
  • 4. 等待线程结束 join()
  • 5. 线程同步 Lock()
  • 6. 线程之间通信,队列queue的使用
  • 7. 守护线程 (Daemon Thread)
  • 8. 线程池 concurrent.futures 模块
  • 总结

  • 前言

    提示:提到多线程必须要先明白关键词的定义:

    例如:进程和线程的概念。

    1. 进程是指一个程序,比如在电脑里面运行的QQ,浏览器就是一个进程。每个进程之间相互隔离,占用不同的资源,一个进程至少包含一个线程。
    2. 线程就是一个进程里面的一部分,一个进程可以拥有多个线程,其中包含一个主线程,多个线程并发实现效率的提高。

    提示:以下是本篇文章正文内容,下面案例可供参考

    一、threading是什么?

    示例:Thread是python已经存在的标准库,该模块提供了强大的多线程编程支持。通过Thread类可以创建和管理线程,使用Lock、Queue等工具可以解决线程同步和通信问题。对于I/O密集型任务,多线程可以有效提高效率。

    二、使用步骤

    1.引入模块

    代码如下(示例):

    import threading
    

    2.创建多线程

    代码如下(示例):

    import threading
    import time
    
    
    def target_1():
        for i in range(5):
            print("线程函数 1")
            time.sleep(1)
    
    
    def target_2():
        for i in range(5):
            print("线程函数 2")
            time.sleep(1)
    
    
    def th1():
        th = threading.Thread(target=target_1)  # 新建一个线程
        th.start()
        return th
    
    
    def th2():
        th = threading.Thread(target=target_2)
        th.start()
        return th
    
    
    def _main():
        print("主线程开始")
        threading_1 = th1()
        threading_2 = th2()
        print("主线程结束")
    
    
    if __name__ == '__main__':
        start_time = time.perf_counter()
        _main()
        end_time = time.perf_counter()
        total_time = end_time - start_time
        # print(f"耗时: {total_time}")
    
    

    以上代码实现创建两个线程,并且执行。

    3.多线程目标函数传递参数

    假如遇到目标函数需要传递参数,稍加修改,示例代码如下

    import threading
    import time
    
    
    def target_1(name):
        for i in range(5):
            print(name + " 线程函数 1")
            time.sleep(1)
    
    
    def target_2(name):
        for i in range(5):
            print(name + " 线程函数 2")
            time.sleep(1)
    
    
    def th1():
        th = threading.Thread(target=target_1, args=("Tom",))  # 新建一个线程
        th.start()
        return th
    
    
    def th2():
        th = threading.Thread(target=target_2, args=("Jerry",))
        th.start()
        return th
    
    
    def _main():
        print("主线程开始")
        threading_1 = th1()
        threading_2 = th2()
        print("主线程结束")
    
    
    if __name__ == '__main__':
        start_time = time.perf_counter()
        _main()
        end_time = time.perf_counter()
        total_time = end_time - start_time
        # print(f"耗时: {total_time}")
    
    

    代码输出如下:

    4. 等待线程结束 join()

    以上代码的输出来看,似乎遇到一个问题,那就是主线程已经结束了,创建执行的子线程还在运行,那就不能等所有线程都执行完了,程序再结束吗?这就需要用到 .join

    修改代码如下:

    import threading
    import time
    
    
    def target_1(name):
        for i in range(5):
            print(name + " 线程函数 1")
            time.sleep(1)
    
    
    def target_2(name):
        for i in range(5):
            print(name + " 线程函数 2")
            time.sleep(1)
    
    
    def th1():
        th = threading.Thread(target=target_1, args=("Tom",))  # 新建一个线程
        th.start()
        return th
    
    
    def th2():
        th = threading.Thread(target=target_2, args=("Jerry",))
        th.start()
        return th
    
    
    def _main():
        print("主线程开始")
        threading_1 = th1()
        threading_2 = th2()
        threading_1.join()
        threading_2.join()
        print("主线程结束")
    
    
    if __name__ == '__main__':
        start_time = time.perf_counter()
        _main()
        end_time = time.perf_counter()
        total_time = end_time - start_time
        # print(f"耗时: {total_time}")
    
    

    我们在主线程结束之间,加上线程对象调用 .join() 方法,程序就会在线程执行结束后才会执行下一句,输出如下:

    5. 线程同步 Lock()

    如果同时调用了多个线程资源,并且同时start(), 就会造成竞争关系,这时候就需要做线程同步使得线程在某个步骤可以依次进行。

    示例代码输出看起来是没问题的,但是其实两个线程之间已经构成了竞争,在输出打印的时候,他们在同一个时间输出,导致输出的格式看起来不是依次进行的。我们对打印操作做加锁和释放的操作,这样两个线程就不会在同一时刻在打印的时候有竞争关系,示例代码如下:

    import threading
    import time
    
    lock = threading.Lock()
    
    
    def target_1(name):
        for i in range(5):
            lock.acquire()  # 加锁
            print(name + " 线程函数 1")
            lock.release()  # 释放锁
            time.sleep(1)
    
    
    def target_2(name):
        for i in range(5):
            lock.acquire()  # 加锁
            print(name + " 线程函数 2")
            lock.release()  # 释放锁
            time.sleep(1)
    
    
    def th1():
        th = threading.Thread(target=target_1, args=("Tom",))  # 新建一个线程
        th.start()
        return th
    
    
    def th2():
        th = threading.Thread(target=target_2, args=("Jerry",))
        th.start()
        return th
    
    
    def _main():
        print("主线程开始")
        threading_1 = th1()
        threading_2 = th2()
        threading_1.join()
        threading_2.join()
        print("主线程结束")
    
    
    if __name__ == '__main__':
        start_time = time.perf_counter()
        _main()
        end_time = time.perf_counter()
        total_time = end_time - start_time
        # print(f"耗时: {total_time}")
    
    
    

    输入如下:

    注意: 如果两个线程同时调用进程里面的同一个资源,比如同一个变量,在线程里面处理这个资源时候也要加锁,避免多个线程在处理这个资源时候造成竞争,也就是说这个变量并没有被执行更新就同时被线程调用。

    6. 线程之间通信,队列queue的使用

    前面我们提到了线程对象的创建开启等待执行结束,但是没有提到怎么结束一个线程,那有结束一个线程的方法吗?答案是没有,线程在执行完成后才会自己结束,但是我们可以用传递参数的方式让线程自己退出。

    示例代码如下:

    import threading
    import time
    
    stop_target_1 = False
    
    
    def target_1():
        global stop_target_1
        while True:
            if stop_target_1:
                break
            time.sleep(1)
            print("正在运行子线程")
    
    
    def _main():
        global stop_target_1
        th = threading.Thread(target=target_1)
        th.start()  # 开启线程
        print("运行主线程")
        time.sleep(5)
        stop_target_1 = True  # 结束线程
        th.join()
        print("结束主线程")
    
    
    if __name__ == '__main__':
        _main()
    
    

    输出结果如下:

    我们还可以通过队列queue,控制线程的启动和结束,队列主要包含getput两个方法,put是往队列里放入参数,get是在队列里取参数。

    注意:假如队列里没有参数,get() 处于阻塞状态。

    通过队列参数的传递控制线程的启动和结束,示例代码如下:

    import threading
    import time
    import queue
    
    q = queue.Queue()
    
    
    def target_1(q):
        while True:
            item = q.get()
            if item is None:
                break
            time.sleep(1)
            print("正在运行子线程")
            q.task_done()
    
    
    def _main():
        th = threading.Thread(target=target_1, args=(q,))
        th.start()  # 开启线程
        print("运行主线程")
        for i in range(5):
            time.sleep(1)
            q.put(1)
        q.put(None)
        th.join()
        print("结束主线程")
    
    
    if __name__ == '__main__':
        _main()
    
    
    

    同理,两个子线程之间也可以通过队列传递参数

    import threading
    import queue
    import time
    
    
    def producer(q):
        for i in range(5):
            print(f"Producing {i}")
            q.put(i)
            time.sleep(1)
    
    
    def consumer(q):
        while True:
            item = q.get()
            if item is None:  # 结束信号
                break
            print(f"Consuming {item}")
            time.sleep(2)
            q.task_done()
    
    
    # 创建队列
    q = queue.Queue()
    
    # 创建生产者线程
    producer_thread = threading.Thread(target=producer, args=(q,))
    
    # 创建消费者线程
    consumer_thread = threading.Thread(target=consumer, args=(q,))
    
    # 启动线程
    producer_thread.start()
    consumer_thread.start()
    
    # 等待生产者完成
    producer_thread.join()
    
    # 发送结束信号
    q.put(None)
    
    # 等待消费者完成
    consumer_thread.join()
    
    

    7. 守护线程 (Daemon Thread)

    守护线程就一个特点,即当所有非守护线程结束时,守护线程会自动终止。从上面示例可以看出主线程退出后,子线程只有运行结束后整个程序才会退出,但是守护线程当主线程执行完成后会被强制结束。

    代码如下:

    import threading
    import time
    
    
    def background_task():
        while True:
            time.sleep(0.5)
            print("子线程正在执行")
    
    
    # 创建线程
    daemon_thread = threading.Thread(target=background_task)
    
    # 设置为守护线程
    daemon_thread.daemon = True
    
    # 启动线程
    daemon_thread.start()
    
    # 主线程继续执行
    print("主线程开始运行")
    time.sleep(2)  # 模拟主线程任务
    print("主线程结束运行")
    
    # 程序退出时,守护线程会自动终止
    
    

    执行结果如下:

    注意:当然,如果不想守护进程结束,同样可以通过 .jojn() 方法,等待线程结束程序才退出。

    8. 线程池 concurrent.futures 模块

    在 Python 中,concurrent.futures 模块提供了 ThreadPoolExecutor 类,用于创建和管理线程池。线程池可以帮助你并发地执行多个任务,而不需要手动创建和管理线程。

    加入我们有多个任务(5个)需要多线程运行,有了线程池,只要我们定义好多少个线程,然后把任务交给它就行,线程池会自动调度线程按最大线程数执行完毕,示例代码如下:

    import concurrent.futures
    import time
    
    # 定义一个简单的任务函数
    def task(n):
        print(f"Task {n} started")
        time.sleep(2)  # 模拟耗时操作
        print(f"Task {n} finished")
        return f"Task {n} result"
    
    # 使用 ThreadPoolExecutor 创建线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        # 提交任务到线程池
        futures = [executor.submit(task, i) for i in range(5)]
        
        # 获取任务的结果
        for future in concurrent.futures.as_completed(futures):
            try:
                result = future.result()
                print(f"Result: {result}")
            except Exception as e:
                print(f"Exception: {e}")
    

    代码中用with方法创建线程池,循环5次把任务提交给执行器,这时候线程就会自动启动了,同时也可以通过返回的迭代器对象futures获取线程执行结果。

    最大线程等于CPU核心数乘以5,在python中获取CPU核心数方法如下:


    总结

    例如:以上就是今天要讲的内容,本文主要介绍了Thread在Python中的主要用法。

    作者:大执

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】多线程Thread全方位使用指南

    发表回复