python 多进程apply_async、map_async、pool.imap的用法

1. apply_async

pool.apply_async 是 Python 中 multiprocessing 模块的一部分,用于异步地执行一个函数。当你使用 apply_async 方法时,它会立即返回一个 AsyncResult 对象,而不是等待函数执行完成。这允许你继续执行程序的其他部分,而不必等待函数执行完成。

apply_async适合用于各个进程之间及结果互不影响,比如大批量处理数据的场景,能显著提升效率。

from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(4) as p:  # 创建一个有4个进程的进程池
        result = p.apply_async(square, (10,))  # 异步执行square函数
        print(result.get())  # 获取执行结果

在这个例子中,square 函数被异步地执行,并且我们可以通过调用 AsyncResult 对象的 get 方法来获取结果。get 方法会阻塞,直到结果可用。

apply_async 还可以接受一个 callback 参数,这是一个在任务完成时会被调用的函数

def my_callback(result):
    print("Result: ", result)

result = p.apply_async(square, (10,), callback=my_callback)

在这个例子中,当 square 函数执行完成后,my_callback 函数会被调用,并且执行结果会作为参数传递给 my_callback。

使用 apply_async 可以有效地利用多核处理器,提高程序的执行效率。

使用 apply_async 方法并行处理大量数据通常涉及以下几个步骤:

  • 1.定义工作函数:这个函数将对单个数据项进行处理。它应该能够接受一个参数,因为 apply_async 会将数据项作为单个参数传递给这个函数。

  • 2.创建进程池:使用 multiprocessing.Pool 创建一个进程池,你可以根据你的机器的CPU核心数来决定进程池的大小。

  • 3.使用 apply_async 提交任务:对于数据集中的每个数据项,使用 apply_async 将工作函数和数据项提交给进程池。这会异步地执行工作函数。

  • 4.收集结果:对于每个提交的任务,你可以使用返回的 AsyncResult 对象的 get 方法来获取结果,或者使用 map_async 方法来简化结果收集过程。

  • 5.关闭进程池:在所有任务提交后,使用 close 方法关闭进程池,这会阻止更多的任务提交。然后使用 join 方法等待所有进程完成。

  • 下面是一个处理大量数据的示例:

    from multiprocessing import Pool
    
    # 定义工作函数
    def process_data(data_item):
        # 这里是处理数据的逻辑
        result = data_item * 2  # 假设的处理逻辑
        return result
    
    if __name__ == '__main__':
        # 创建一个进程池
        with Pool(4) as pool:
            # 假设我们有大量数据需要处理
            data = [1, 2, 3, 4, 5, ...]  # 这里只是示例,实际数据可能来自文件或数据库
    
            # 使用 apply_async 提交任务
            results = [pool.apply_async(process_data, (item,)) for item in data]
    
            # 收集结果
            processed_data = [result.get() for result in results]
    
            # 打印处理后的数据
            print(processed_data)
    
        # 进程池会自动关闭
    

    在这个例子中,我们定义了一个 process_data 函数来处理单个数据项。我们创建了一个进程池,并为数据集中的每个数据项提交了一个任务。然后我们收集了所有任务的结果,并打印了处理后的数据。

    如果你的数据量非常大,你可能会考虑使用 pool.map_async 来简化代码,它会自动处理任务的提交和结果的收集:

    from multiprocessing import Pool
    
    def process_data(data_item):
        return data_item * 2
    
    if __name__ == '__main__':
        with Pool(4) as pool:
            data = [1, 2, 3, 4, 5, ...]  # 大量数据
            processed_data = pool.map_async(process_data, data).get()
    
        print(processed_data)
    

    在这个简化的例子中,map_async 接受工作函数和数据列表,返回一个 AsyncResult 对象,我们可以通过调用 get 方法来获取所有处理后的数据。这种方法更简洁,但在某些情况下可能不如单独使用 apply_async 灵活。

    2. map_async

  • 功能map_async 是 map 函数的异步版本`,它将一个函数应用于一个迭代器的每个元素。
  • 使用场景: 当你有一个迭代器(如列表或元组)并且需要对其中的每个元素应用同一个函数时,使用 map_async 是合适的。它适用于批量处理相似任务的场景。
  • 结果处理: map_async 返回一个 AsyncResult 对象,你可以通过调用 get() 方法来获取所有任务的结果,这些结果通常以列表的形式返回。
  • from multiprocessing import Pool
    
    def square(x):
        return x * x
    
    if __name__ == '__main__':
        with Pool(4) as pool:
            result = pool.map_async(square, range(5))
            print(result.get())  # 获取结果
    

    3 pool.imap

    pool.imap 是 Python multiprocessing.Pool 中的一种方法,用于在进程池中分布式地逐步处理输入数据序列,并逐项返回结果。它的行为类似于内置函数 map,但以迭代器的形式顺序返回结果,支持逐步消费。

    案例1

    from multiprocessing.pool import Pool, ThreadPool
    from threading import Thread
    
    results = ThreadPool(NUM_THREADS).imap(fn, iter_data)  # fn 是函数名, iter_data 可迭代数据
    for result in results:
        print(result)
    

    案例2
    https://github1s.com/ultralytics/yolov5/blob/master/utils/dataloaders.py

     def cache_labels(self, path=Path("./labels.cache"), prefix=""):
         """Caches dataset labels, verifies images, reads shapes, and tracks dataset integrity."""
         x = {}  # dict
         nm, nf, ne, nc, msgs = 0, 0, 0, 0, []  # number missing, found, empty, corrupt, messages
         desc = f"{prefix}Scanning {path.parent / path.stem}..."
         with Pool(NUM_THREADS) as pool:
             pbar = tqdm(
                 pool.imap(verify_image_label, zip(self.im_files, self.label_files, repeat(prefix))),
                 desc=desc,
                 total=len(self.im_files),
                 bar_format=TQDM_BAR_FORMAT,
             )
             for im_file, lb, shape, segments, nm_f, nf_f, ne_f, nc_f, msg in pbar:
                 nm += nm_f
                 nf += nf_f
                 ne += ne_f
                 nc += nc_f
                 if im_file:
                     x[im_file] = [lb, shape, segments]
                 if msg:
                     msgs.append(msg)
                 pbar.desc = f"{desc} {nf} images, {nm + ne} backgrounds, {nc} corrupt"
    
         pbar.close()
         if msgs:
             LOGGER.info("\n".join(msgs))
         if nf == 0:
             LOGGER.warning(f"{prefix}WARNING ⚠️ No labels found in {path}. {HELP_URL}")
         x["hash"] = get_hash(self.label_files + self.im_files)
         x["results"] = nf, nm, ne, nc, len(self.im_files)
         x["msgs"] = msgs  # warnings
         x["version"] = self.cache_version  # cache version
         try:
             np.save(path, x)  # save cache for next time
             path.with_suffix(".cache.npy").rename(path)  # remove .npy suffix
             LOGGER.info(f"{prefix}New cache created: {path}")
         except Exception as e:
             LOGGER.warning(f"{prefix}WARNING ⚠️ Cache directory {path.parent} is not writeable: {e}")  # not writeable
         return x
    
      pool.imap(verify_image_label, zip(self.im_files, self.label_files, repeat(prefix)))
    
  • 执行pool.imap 返回result作为一个迭代器,然后通过tqdm包装为进度条。
  • pool.imap为每个可迭代的数据执行verify_image_label
  • 优缺点

  • 优点:
  • 按需加载结果: pool.imap 不会一次性把所有任务结果加载到内存中,因此适合处理大规模数据。
  • 顺序输出: 结果按输入顺序返回,即使某些任务耗时较短,也会等待前面的任务完成后输出。
  • 缺点:
  • 等待顺序: 如果某些任务很耗时,后面的任务即使已经完成,也需要等待之前任务完成后才能返回。
  • 适用场景

  • 任务耗时分布均匀: 每个任务的执行时间差不多。
  • 结果需要按输入顺序返回: 顺序重要时使用。
  • 内存有限: 可以逐步处理大规模数据
  • 4. 对比

    4.1 apply_async 和 map_async对比

    apply_async map_async 都是Python multiprocessing模块中的函数,用于在进程池中异步地执行任务。它们的主要区别在于它们处理任务的方式和适用场景。

  • 灵活性:apply_async 更灵活因为它允许你为每个任务传递不同的参数。而 map_async 则将同一个函数应用于迭代器的每个元素。
  • 结果收集:使用 map_async 可以更方便地收集所有任务的结果,因为它们会作为一个列表返回。而使用 apply_async 时,你需要为每个任务单独处理结果。
  • 适用性:如果你的任务是独立的并且参数不同,使用 apply_async。如果你需要对一个数据集合中的每个元素执行相同的操作,使用 map_async 更合适。
  • 4.2 pool.imap 和 map_async对比

    pool.imap 和 pool.map_async 都是 Python 中 multiprocessing.Pool 提供的并行任务处理方法,但它们在执行流程、返回结果的方式和适用场景上有所不同。

    4.2.1 pool.imap

  • 以迭代器(generator)的形式逐步返回结果。

  • 按输入顺序分发任务,并且按输入顺序返回结果,每处理完一项任务就立即生成一个结果。

  • 优点:

  • 逐步返回结果,可以节省内存。
  • 按输入顺序生成结果,保证结果顺序一致性。
  • 缺点:

  • 因为需要按顺序返回结果,所以即使后面的任务先完成,也需要等待前面的任务
  • 示例代码

    from multiprocessing import Pool
    
    def square(x):
        return x ** 2
    
    if __name__ == "__main__":
        with Pool(4) as pool:
            results = pool.imap(square, range(10))  # 返回一个生成器
            for result in results:  # 按顺序逐步获取结果
                print(result)
    
    

    输出(按顺序返回结果):

    0
    1
    4
    9
    16
    ...
    81
    
    

    4.2 pool.map_async

  • 返回一个 AsyncResult 对象,可以通过 .get() 获取所有结果。

  • 所有任务会被分发到子进程池中,并行执行。任务完成后,所有结果一次性返回(与内置的 map 类似,但是异步执行)。

  • 优点:

  • 无需逐步消费结果,可选择等待所有任务完成后统一获取。

  • 可以在结果未完成前做其他事情(异步优势)。
    缺点:

  • 会占用更多内存,因为结果会在所有任务完成后一次性返回。

  • 示例代码

    from multiprocessing import Pool
    
    def square(x):
        return x ** 2
    
    if __name__ == "__main__":
        with Pool(4) as pool:
            async_result = pool.map_async(square, range(10))  # 异步提交任务
            print("Doing other tasks while waiting for results...")
    
            results = async_result.get()  # 获取所有结果(阻塞,等待完成)
            print(results)
    

    输出(一次性返回结果):

    Doing other tasks while waiting for results...
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    

    对比总结

    5. 案例

  • 案例1
  •  process_num  = 10
     data_list =[]  # 可迭代对象
     with Pool(processes=process_num) as pool:
         r =  pool.map_async(process_, args_list)
         return_list = r.get()
    
  • 案例2
  • import multiprocessing
    from multiprocessing import Pool
    from tqdm import tqdm
    
    # 多线程处理的函数
    # 注意process函数需要有返回值,如果没有返回值可以  return None
    def process_(x):
        return x ** 2
    
    
    def run_with_multiprocessing():
        process_num = 10  # 设置进程数
        data_list = [4,6,7,8]  # 一个可迭代的数据
        # 使用进程池执行
        with Pool(processes=process_num) as pool:
            # tqdm 显示任务进度
            results = list(tqdm(pool.imap(process_, data_list), total=len(data_list )))
    
    
  • 案例3
  • from concurrent.futures import ThreadPoolExecutor, as_completed
    from tqdm import tqdm
    
    def run_with_multithreading():
        process_num = 10  # 设置线程数
    	
    	data_list = [4,6,7,8]
        results = []
        with ThreadPoolExecutor(max_workers=process_num) as executor:
            # 提交任务
            future_to_args = {
                executor.submit(process_,data ) for args in data_list 
            }
    
            # tqdm 显示任务进度
            for future in tqdm(as_completed(future_to_args), total=len(future_to_args)):
                try:
                    results.append(future.result())
                except Exception as e:
                    print(f"Task failed: {e}")
    
        return results
    

    注意多线程执行的函数,需要有返回值,如果没有返回值,直接Return None

    作者:@BangBang

    物联沃分享整理
    物联沃-IOTWORD物联网 » python 多进程apply_async、map_async、pool.imap的用法

    发表回复