python并行计算—concurrent.futures模块的使用方法

python并行计算—concurrent.futures模块的使用方法

  • 1. 选择多线程ThreadPoolExecutor和多进程ProcessPoolExecutor的经验法则
  • 2.一个重CPU计算的例子,对比顺序执行、多线程、多进程的耗时情况
  • 3. 为什么进程池ProcessPoolExecutor只有可序列化的对象可以执行并返回
  • 3.1 什么是可序列化的对象?
  • 3.2 为什么这会带来限制?
  • 3.3 怎么解决解决不可序列化对象的限制
  • 4. submit与map方法
  • map和submit处理多参函数
  • concurrent.futures提供了两种Executor的子类分别为: ThreadPoolExecutor和ProcessPoolExecutor。前者创建一个可以提交作业的线程池,后者创建一个进程池。

    1. 选择多线程ThreadPoolExecutor和多进程ProcessPoolExecutor的经验法则

  • 执行重 I/O 操作的任务 (IO 密集型) 选择 ThreadPoolExecutor,例如请求网页数据,文件读写等涉及网络、磁盘 I/O 相关的内容。
    需要注意的是,线程池会受到python的全局解释器(GIL)的影响,如果使用多线程处理重CPU的任务,耗时很大可能性还不如顺序执行。但在 I/O 密集型任务中,线程大部分时间都在等待 I/O 操作完成(如网络下载、磁盘读写),而不是执行 CPU 计算。Python 的 GIL 在线程等待 I/O 时会释放锁,使得其他线程可以继续运行。这意味着在 I/O 密集型任务中,GIL 的影响并不明显,多线程仍然可以显著提高性能
  • 执行重 CPU 的任务 (CPU 密集型) 选择 ProcessPoolExecutor,例如大量消耗 CPU 的数学与逻辑运算、视频编解码等内容。
    ProcessPoolExecutor可以避开 GIL 的问题,但是由于需要传递参数给工作进程,所以正常情况下只有可序列化的对象可以执行并返回(具体解释见小标3)
  • 2.一个重CPU计算的例子,对比顺序执行、多线程、多进程的耗时情况

    import concurrent.futures
    import time
    
    number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    def evaluate_item(x):
        result_item = count(x)
        # return result_item
    
    def count(number) :
        for i in range(0, 10000):
            i=i+1
        # return i * number
    
    if __name__ == "__main__":
        # 顺序执行
        startTime = time.time()
        for item in number_list:
                # print(evaluate_item(item))
                evaluate_item(item)
        print("Sequential execution use {} s".format(time.time() - startTime))
    
        # 线程池执行
        startTime1 = time.time()
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
                futures = [executor.submit(evaluate_item, item) for item in number_list]
    
                for future in concurrent.futures.as_completed(futures):
                        # print(future.result())
                        future.result()
    
        print ("Thread pool execution use {} s".format(time.time() - startTime1))
    
        # 进程池
        startTime2 = time.time()
        with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
                futures = [executor.submit(evaluate_item, item) for item in number_list]
    
                for future in concurrent.futures.as_completed(futures):
                        # print(future.result())
                        future.result()
    
        print ("Process pool execution {} s".format(time.time() - startTime2))
    
    Sequential execution use 0.004363298416137695 s
    Thread pool execution use 0.01605534553527832 s
    Process pool execution 0.4858131408691406 s
    

    从计算结果来看,对于这种重cpu的计算,线程池方式不如进程池,甚至于不如进程池

    3. 为什么进程池ProcessPoolExecutor只有可序列化的对象可以执行并返回

    3.1 什么是可序列化的对象?

    在 Python 中,可序列化的对象意味着该对象可以被“序列化”(即转换成字节流),然后在不同进程之间传递。Python 使用 pickle 模块来实现这种序列化,因此所有传递给 ProcessPoolExecutor 的参数和返回值都必须是 pickle 支持的类型(例如基本数据类型、列表、字典、元组等)。

    3.2 为什么这会带来限制?

    Python 的大多数基础数据类型都是可序列化的,但某些复杂类型(例如打开的文件句柄、网络连接、锁对象等)是不可序列化的。如果你尝试将这些不可序列化的对象传递给 ProcessPoolExecutor 的子进程,会导致 pickle 抛出错误。

  • 示例1
    以下是一个使用 ProcessPoolExecutor 的示例:
  • 正常情况:传递可序列化对象
    假设你有一个简单的 CPU 密集型任务(如计算平方),可以将整数列表作为输入传递给子进程,因为整数是可序列化的。
  • from concurrent.futures import ProcessPoolExecutor
    
    def square(x):
        return x * x
    
    if __name__ == "__main__":
        with ProcessPoolExecutor() as executor:
            numbers = [1, 2, 3, 4, 5]
            results = list(executor.map(square, numbers))
        print(results)  # 输出: [1, 4, 9, 16, 25]
    
    

    在这个示例中:
    square 函数被传递给子进程执行,numbers 列表的每个元素 x 被传递给 square。
    由于整数是可序列化的对象,ProcessPoolExecutor 能够正常处理并返回结果。

  • 示例2
  • 不可序列化对象的情况
    如果尝试在多进程中传递一个不可序列化的对象(例如打开的文件句柄),则会导致错误:
  • from concurrent.futures import ProcessPoolExecutor
    
    def read_file(file):
        return file.read()
    
    if __name__ == "__main__":
        with open("example.txt", "r") as file:
            with ProcessPoolExecutor() as executor:
                # 尝试传递文件句柄
                results = list(executor.map(read_file, [file]))  # 将会抛出序列化错误
    

    在这个示例中:文件句柄 file 被传递给 read_file 函数,但由于文件句柄不可序列化,ProcessPoolExecutor 会抛出错误,无法传递该对象给子进程。

    3.3 怎么解决解决不可序列化对象的限制

    如果遇到不可序列化对象,通常可以通过以下方式解决:

    1. 在子进程中重新创建资源:在子进程中打开文件或建立数据库连接,而不是在主进程中传递这些资源。
    def read_file(filename):
        with open(filename, "r") as file:
            return file.read()
    
    if __name__ == "__main__":
        with ProcessPoolExecutor() as executor:
            results = list(executor.map(read_file, ["example.txt"]))
    
    1. 只传递路径或标识:传递文件路径等可序列化的标识,而不是直接传递不可序列化的对象(如文件句柄)

    4. submit与map方法

    ProcessPoolExecutor和ThreadPoolExecutor类中最重要的 2 个方法如下:

    1. submit提交任务,并返回 Future 对象代表可调用对象的执行。
    2. map和 Python 自带的 map 函数功能类似,只不过是以异步的方式把函数依次作用在列表的每个元素上。
      如果一次性提交一批任务可以使用map,如果单个任务提交用submit

    可以简单参考如下脚本:
    其中process_single_line是个单参数函数,另外注意这种多线程对进程的方式,并不仅仅面对单参函数,多参数函数也一样使用,就在submit或者map后的参数新增就行。见小标5

            # 多线程ThreadPoolExecutor模式
            with ThreadPoolExecutor(max_workers = args.num_worker) as executor:
    
                # # submit提交
                # results = [executor.submit(self.process_single_line, line) for line in lines]
                # for future in tqdm(concurrent.futures.as_completed(results), total=len(lines)):
                #     relative, absoluted = future.result()
                #     self.outputJsonListRelative.append(relative)
                #     self.outputJsonListAbsoluted.append(absoluted)   
    
                # map提交
                results = list(tqdm(executor.map(self.process_single_line, lines), total=len(lines)))
                for relative, absoluted in results:
                    self.outputJsonListRelative.append(relative)
                    self.outputJsonListAbsoluted.append(absoluted)
    
            # # 多线程ThreadPoolExecutor模式
            # with ProcessPoolExecutor(max_workers = args.num_worker) as executor:
    
            #     # submit提交
            #     results = [executor.submit(self.process_single_line, line) for line in lines]
            #     for future in tqdm(concurrent.futures.as_completed(results), total=len(lines)):
            #         relative, absoluted = future.result()
            #         self.outputJsonListRelative.append(relative)
            #         self.outputJsonListAbsoluted.append(absoluted)   
    
            #     # # map提交
            #     # results = list(tqdm(executor.map(self.process_single_line, lines), total=len(lines)))
            #     # for relative, absoluted in results:
            #         # self.outputJsonListRelative.append(relative)
            #         # self.outputJsonListAbsoluted.append(absoluted)
    

    map和submit处理多参函数

  • submit
  • from concurrent.futures import ThreadPoolExecutor
    
    # 定义 process 函数
    def process(a, b):
        return a + b
    
    A = [1, 2, 3, 4, 5]
    B = [11, 12, 13, 14, 15]
    
    # 使用 submit 提交任务
    with ThreadPoolExecutor() as executor:
        futures = [executor.submit(process, a, b) for a, b in zip(A, B)]
        
        # 获取结果
        results = [future.result() for future in futures]
    
    print(results)  # 输出: [12, 14, 16, 18, 20]
    
    
  • map
  • from concurrent.futures import ThreadPoolExecutor
    
    # 定义 process 函数
    def process(a, b):
        return a + b
    
    A = [1, 2, 3, 4, 5]
    B = [11, 12, 13, 14, 15]
    
    # 使用 map 提交任务
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(process, A, B))
    
    print(results)  # 输出: [12, 14, 16, 18, 20]
    
    

    作者:星光技术人

    物联沃分享整理
    物联沃-IOTWORD物联网 » python并行计算—concurrent.futures模块的使用方法

    发表回复