Python数据并行的6种方法(进程池进度条)

文章目录

  • 什么是数据并行
  • 输入输出示例
  • 方法1:用Python自带的并行任务接口concurrent.futures
  • 方法2:用Python自带的多进程接口multiprocessing
  • 方法3:在方法2的基础上,共用同一个通信管道
  • 方法4:在方法3的基础上,不通过管道传结果
  • 方法5:不给数据分批的多进程,用multiprocessing.pool.imap实现
  • 方法6:在方法1的基础上,给数据分批
  • 性能比较
  • 轮子:util_executer.py
  • 参考
  • 什么是数据并行

    数据并行(data-parallelism)和任务并行(task-parallelism)是实现并行计算的两种方式,可以简单理解为多人分工的两种方式。

  • 例如:某场考试有200张试卷,试卷上有4种题型,需要4个人改卷。
  • 数据并行:对“试卷数量”分割,每人改50张试卷的4种题型。(每人的任务都一样)
  • 任务并行:对“试卷内容”分割,每人改200张试卷的1种题型。(每人的任务不一样)
  • 日常写代码,如果需要用一个函数对一组数据依次处理,觉得执行得慢,就可以用数据并行。本文最后有写好的轮子,可以直接放项目里用。

    输入输出示例

    输入:

  • 函数 f
  • def Pow(a,n):    # 例如:计算a的n次方
        return a**n
    
  • 数据列表 args_mat
  • args_mat=[     # 例如:从2的0次方到2的100000次方都需要计算
        [2,0],
        [2,1],
        [2,2],
        ...
        [2,100000]
    ]
    
    - 必须是二维列表,一行代表一组参数。
    - 一维列表arr通过`arr=[[x] for x in arr]`升成二维
    
  • 进程池大小 pool_size (默认为5)
  • 要小于CPU核数(os.cpu_count()可以返回CPU核数)
  • 进度条文字 desc (默认为空)
  • 输出:

  • 结果列表
  • [1,2,4,8,16,32,64,...] # 结果列表要和数据列表一一对应得上
    

    方法1:用Python自带的并行任务接口concurrent.futures

    import concurrent.futures
    from tqdm import tqdm
    
    def multi_process_exec_v0(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        results=[None for _ in range(len(args_mat))]
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
                futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
                for future in concurrent.futures.as_completed(futures):
                    i=futures[future]
                    ret = future.result()
                    results[i]=ret
                    pbar.update(1)
        return results
    

    这个并行任务接口是给每一条输入数据各开一个进程来执行。
    创建/销毁进程的开销很大,时间上比串行执行还慢。
    这个进程池不适合用来做“数据并行”,而是适合做“任务并行”。

    方法2:用Python自带的多进程接口multiprocessing

    from multiprocessing import Pool, Pipe
    from multiprocessing.connection import wait
    from tqdm import tqdm
    
    ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
    def batch_exec_v1(f,args_batch,w,offset=0):
        for i,args in enumerate(args_batch):
            ans = f(*args)
            w.send((i+offset,ans))
        w.send('exit')
        w.close()
    def multi_process_exec_v1(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        if type(args_mat[0]) not in [list,tuple]:
            args_mat=[[a]for a in args_mat]
        batch_size=max(1,int(len(args_mat)/4/pool_size))
        results=[None for _ in range(len(args_mat))]
        args_batches = ToBatch(args_mat,batch_size)
        readers=[]
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with Pool(processes=pool_size) as pool:
                for i,args_batch in enumerate(args_batches):
                    r,w=Pipe(duplex=False)
                    readers.append(r)
                    pool.apply_async(batch_exec_v1,(f,args_batch,w,i*batch_size))
                while readers:
                    for r in wait(readers):
                        try:
                            msg=r.recv()
                            if msg=='exit':
                                readers.remove(r)
                                continue
                            results[msg[0]]=msg[1]
                            pbar.update(1)
                        except EOFError:
                            readers.remove(r)
        return results
    

    这段代码是把输入数据分批,每个进程处理一批。
    数据分批的批数=进程池大小×4。批数设得越多,越倾向于提高并行计算资源的利用率,但同时,创建/销毁进程的开销也会越多。
    由于一个进程是处理一批数据而不是一条数据,那么进度条就是一批一批地更新。要想一条一条地更新,就需要进程间通信。这里是每处理完一条数据,就立即把这一条的结果传给主进程;主进程收集结果,更新进度条。

    方法3:在方法2的基础上,共用同一个通信管道

    from multiprocessing import Pool, Pipe
    from tqdm import tqdm
    ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
    
    def batch_exec_v2(f,args_batch,w=None,offset=0):
        for i,args in enumerate(args_batch):
            ans = f(*args)
            if w:w.send((i+offset,ans))
    
    def multi_process_exec_v2(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        batch_size=max(1,int(len(args_mat)/4/pool_size))
        results=[None for _ in range(len(args_mat))]
        args_batches = ToBatch(args_mat,batch_size)
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with Pool(processes=pool_size) as pool:
                r,w=Pipe(duplex=False)
                for i,args_batch in enumerate(args_batches):
                    pool.apply_async(batch_exec_v2,(f,args_batch,w,i*batch_size))
                cnt=0
                while cnt<len(args_mat):
                    try:
                        msg=r.recv()
                        results[msg[0]]=msg[1]
                        pbar.update(1)
                        cnt+=1
                    except EOFError:
                        break
        return results
    

    方法2是给每个进程新创建一个通信管道。方法3这里改成给所有进程共用一个通信管道,可以节约很多创建/销毁管道的开销。
    这是前三个中最快的方法,但有可能抛异常:

      File "/usr/lib/python3.8/multiprocessing/connection.py", line 251, in recv
        return _ForkingPickler.loads(buf.getbuffer())
    _pickle.UnpicklingError: invalid load key, '\x00'.
    

    原因是两个进程同时往一个管道里传数据,数据混一起了,导致解析不出来。

    方法4:在方法3的基础上,不通过管道传结果

    from multiprocessing import Pool, Pipe
    from tqdm import tqdm
    ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
    
    def batch_exec_v3(f,args_batch,w=None):
        results=[]
        for i,args in enumerate(args_batch):
            ans = f(*args)
            results.append(ans)
            if w:w.send(1)
        return results
    
    def multi_process_exec_v3(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        batch_size=max(1,int(len(args_mat)/4/pool_size))
        results=[]
        args_batches = ToBatch(args_mat,batch_size)
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with Pool(processes=pool_size) as pool:
                r,w=Pipe(duplex=False)
                pool_rets=[]
                for i,args_batch in enumerate(args_batches):
                    pool_rets.append(pool.apply_async(batch_exec_v3,(f,args_batch,w)))
                cnt=0
                while cnt<len(args_mat):
                    try:
                        msg=r.recv()
                        pbar.update(1)
                        cnt+=1
                    except EOFError:
                        break
                for ret in pool_rets:
                    for r in ret.get():
                        results.append(r)
        return results
    
    

    方法3是将每条数据的计算结果立即从管道返回,一旦计算结果太长(如计算2的2000+次方),就可能和其他进程的结果混一起,不安全。方法4改成:每算完一条数据,从管道传个信号(数字1),好更新进度条,等该进程的一批数据算完,一次性返回一批结果。

    方法5:不给数据分批的多进程,用multiprocessing.pool.imap实现

    这是我学到的第一个“多进程+进度条”的方法。

    from multiprocessing import Pool
    from tqdm import tqdm
    
    def batch_exec_v4(args):
        return args[0](*args[1])
    
    def multi_process_exec_v4(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        results=[]
        with Pool(processes=pool_size) as pool:
            imap_it = pool.imap(batch_exec_v4, [(f,args) for args in args_mat])
            for ret in tqdm(imap_it,total=len(args_mat),desc=desc):
                results.append(ret)
        return results
    

    方法6:在方法1的基础上,给数据分批

    import concurrent.futures
    from tqdm import tqdm
    
    def batch_exec_v5(f,args_batch):
        results=[]
        for args in args_batch:
            ans = f(*args)
            results.append(ans)
        return results
    def multi_process_exec_v5(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        batch_size=max(1,int(len(args_mat)/4/pool_size))
        results=[None for _ in range(len(args_mat))]
        args_batches = ToBatch(args_mat,batch_size)
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            r,w=Pipe(duplex=False)
            with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
                futures = {executor.submit(batch_exec_v5,*(f,args_batch)): i*batch_size for i,args_batch in enumerate(args_batches)}
                for future in concurrent.futures.as_completed(futures):
                    i=futures[future]
                    ret = future.result()
                    results[i:i+len(ret)]=ret
                    pbar.update(len(ret))
        return results
    

    这个方法是“一批一批”地更新进度条,而不是一个一个。

    性能比较

    测试代码:

    import concurrent.futures
    from tqdm import tqdm
    from multiprocessing import Pool, Pipe, freeze_support
    from multiprocessing.connection import wait
    
    ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
    
    def batch_exec_v1(f,args_batch,w,offset=0):
        for i,args in enumerate(args_batch):
            ans = f(*args)
            w.send((i+offset,ans))
        w.send('exit')
        w.close()
    def multi_process_exec_v1(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        if type(args_mat[0]) not in [list,tuple]:
            args_mat=[[a]for a in args_mat]
        batch_size=max(1,int(len(args_mat)/4/pool_size))
        results=[None for _ in range(len(args_mat))]
        args_batches = ToBatch(args_mat,batch_size)
        readers=[]
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with Pool(processes=pool_size) as pool:
                for i,args_batch in enumerate(args_batches):
                    r,w=Pipe(duplex=False)
                    readers.append(r)
                    pool.apply_async(batch_exec_v1,(f,args_batch,w,i*batch_size))
                while readers:
                    for r in wait(readers):
                        try:
                            msg=r.recv()
                            if msg=='exit':
                                readers.remove(r)
                                continue
                            results[msg[0]]=msg[1]
                            pbar.update(1)
                        except EOFError:
                            readers.remove(r)
        return results
    
    def batch_exec(f,args_batch,w=None,offset=0):
        for i,args in enumerate(args_batch):
            ans = f(*args)
            if w:w.send((i+offset,ans))
    
    def multi_process_exec(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        batch_size=max(1,int(len(args_mat)/4/pool_size))
        results=[None for _ in range(len(args_mat))]
        args_batches = ToBatch(args_mat,batch_size)
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with Pool(processes=pool_size) as pool:
                r,w=Pipe(duplex=False)
                for i,args_batch in enumerate(args_batches):
                    pool.apply_async(batch_exec,(f,args_batch,w,i*batch_size))
                cnt=0
                while cnt<len(args_mat):
                    try:
                        msg=r.recv()
                        results[msg[0]]=msg[1]
                        pbar.update(1)
                        cnt+=1
                    except EOFError:
                        break
        return results
    
    def batch_exec_v3(f,args_batch,w=None):
        results=[]
        for i,args in enumerate(args_batch):
            ans = f(*args)
            results.append(ans)
            if w:w.send(1)
        return results
    
    def multi_process_exec_v3(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        batch_size=max(1,int(len(args_mat)/4/pool_size))
        results=[]
        args_batches = ToBatch(args_mat,batch_size)
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with Pool(processes=pool_size) as pool:
                r,w=Pipe(duplex=False)
                pool_rets=[]
                for i,args_batch in enumerate(args_batches):
                    pool_rets.append(pool.apply_async(batch_exec_v3,(f,args_batch,w)))
                cnt=0
                while cnt<len(args_mat):
                    try:
                        msg=r.recv()
                        pbar.update(1)
                        cnt+=1
                    except EOFError:
                        break
                for ret in pool_rets:
                    for r in ret.get():
                        results.append(r)
        return results
    
    def batch_exec_v4(args):
        return args[0](*args[1])
    
    def multi_process_exec_v4(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        results=[]
        with Pool(processes=pool_size) as pool:
            imap_it = pool.imap(batch_exec_v4, [(f,args) for args in args_mat])
            for ret in tqdm(imap_it,total=len(args_mat),desc=desc):
                results.append(ret)
        return results
    
    def batch_exec_v5(f,args_batch):
        results=[]
        for args in args_batch:
            ans = f(*args)
            results.append(ans)
        return results
    def multi_process_exec_v5(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        batch_size=max(1,int(len(args_mat)/4/pool_size))
        results=[None for _ in range(len(args_mat))]
        args_batches = ToBatch(args_mat,batch_size)
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            r,w=Pipe(duplex=False)
            with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
                futures = {executor.submit(batch_exec_v5,*(f,args_batch)): i*batch_size for i,args_batch in enumerate(args_batches)}
                for future in concurrent.futures.as_completed(futures):
                    i=futures[future]
                    ret = future.result()
                    results[i:i+len(ret)]=ret
                    pbar.update(len(ret))
        return results
    
    
    def multi_process_exec_v0(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        results=[None for _ in range(len(args_mat))]
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor:
                futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
                for future in concurrent.futures.as_completed(futures):
                    i=futures[future]
                    ret = future.result()
                    results[i]=ret
                    pbar.update(1)
        return results
    
    
    def multi_thread_exec(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        results=[None for _ in range(len(args_mat))]
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with concurrent.futures.ThreadPoolExecutor(max_workers=pool_size) as executor:
                futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
                for future in concurrent.futures.as_completed(futures):
                    i=futures[future]
                    ret = future.result()
                    results[i]=ret
                    pbar.update(1)
        return results
    
    def Pow(a,n):
        return a**n
    
    if __name__=='__main__':
        import time
        args_mat=[(2,i) for i in range(100000)]
        
        t0=time.time()
        results=[Pow(*a) for a in tqdm(args_mat,desc='串行执行')]
        t1=time.time()
        print(f"串行执行,用时:{t1-t0:.2}秒")
        
        t0=time.time()
        results=multi_thread_exec(Pow,args_mat,4,desc='多线程')
        t1=time.time()
        print(f"多线程执行,用时:{t1-t0:.2}秒")
        
        t0=time.time()
        results=multi_process_exec_v0(Pow,args_mat,4,desc='多进程方法1')
        t1=time.time()
        print(f"方法1用时:{t1-t0:.2}秒")
        
        t0=time.time()
        results=multi_process_exec_v1(Pow,args_mat,4,desc='多进程方法2')
        t1=time.time()
        print(f"方法2用时:{t1-t0:.2}秒")
        
        try:
            t0=time.time()
            results=multi_process_exec(Pow,args_mat,4,desc='多进程方法3')
            t1=time.time()
            print(f"方法3用时:{t1-t0:.2}秒")
        except Exception:
            print(f"方法3 异常")
        
        t0=time.time()
        results=multi_process_exec_v3(Pow,args_mat,4,desc='多进程方法4')
        t1=time.time()
        print(f"方法4用时:{t1-t0:.2}秒")
    
        t0=time.time()
        results=multi_process_exec_v4(Pow,args_mat,4,desc='多进程方法5')
        t1=time.time()
        print(f"方法5用时:{t1-t0:.2}秒")
    
        t0=time.time()
        results=multi_process_exec_v5(Pow,args_mat,4,desc='多进程方法6')
        t1=time.time()
        print(f"方法6用时:{t1-t0:.2}秒")
    
    
    方法 用时(Windows笔记本电脑) 用时(Linux云服务器)
    串行 00:13 00:09
    多线程 00:16 00:16
    多进程(方法1) 00:44 00:25
    多进程(方法2) 00:13 00:06
    多进程(方法3) 00:06 异常
    多进程(方法4) 00:06 00:05
    多进程(方法5) 00:22 00:13
    多进程(方法6) 00:06 00:05

    可以看到,性能优于单线程的,有方法3、方法4、方法6。但方法3由于共用Pipe传结果,可能出异常;方法6的进度条一批一批更新,看起来不舒服。结论是建议用方法4。

    轮子:util_executer.py

    import concurrent.futures
    from tqdm import tqdm
    from multiprocessing import Pool, Pipe, freeze_support
    
    #=============================================================#
    # 接口                                                        #
    #-------------------------------------------------------------#
    #   multi_process_exec 多进程执行                             #
    #   multi_thread_exec  多线程执行                             #
    #-------------------------------------------------------------#
    # 参数:                                                      #
    #   f         (function): 批量执行的函数                      #
    #   args_mat  (list)    : 批量执行的参数                      #
    #   pool_size (int)     : 进程/线程池的大小                   #
    #   desc      (str)     : 进度条的描述文字                    #
    #-------------------------------------------------------------#
    # 例子:                                                      #
    # >>> def Pow(a,n):        ← 定义一个函数(可以有多个参数)   #
    # ...     return a**n                                         #
    # >>>                                                         #
    # >>> args_mat=[[2,1],     ← 批量计算 Pow(2,1)                #
    # ...           [2,2],                Pow(2,2)                #
    # ...           [2,3],                Pow(2,3)                #
    # ...           [2,4],                Pow(2,4)                #
    # ...           [2,5],                Pow(2,5)                #
    # ...           [2,6]]                Pow(2,6)                #
    # >>>                                                         #
    # >>> results=multi_thread_exec(Pow,args_mat,desc='计算中')   #
    # 计算中: 100%|█████████████| 6/6 [00:00<00:00, 20610.83it/s] #
    # >>>                                                         #
    # >>> print(results)                                          #
    # [2, 4, 8, 16, 32, 64]                                       #
    #-------------------------------------------------------------#
    
    ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)]
    
    def batch_exec(f,args_batch,w):
        results=[]
        for i,args in enumerate(args_batch):
            try:
                ans = f(*args)
                results.append(ans)
            except Exception:
                results.append(None)
            w.send(1)
        return results
    
    def multi_process_exec(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        batch_size=max(1,int(len(args_mat)/4/pool_size))
        results=[]
        args_batches = ToBatch(args_mat,batch_size)
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with Pool(processes=pool_size) as pool:
                r,w=Pipe(duplex=False)
                pool_rets=[]
                for i,args_batch in enumerate(args_batches):
                    pool_rets.append(pool.apply_async(batch_exec,(f,args_batch,w)))
                cnt=0
                while cnt<len(args_mat):
                    try:
                        msg=r.recv()
                        pbar.update(1)
                        cnt+=1
                    except EOFError:
                        break
                for ret in pool_rets:
                    for r in ret.get():
                        results.append(r)
        return results
    
    def multi_thread_exec(f,args_mat,pool_size=5,desc=None):
        if len(args_mat)==0:return []
        results=[None for _ in range(len(args_mat))]
        with tqdm(total=len(args_mat), desc=desc) as pbar:
            with concurrent.futures.ThreadPoolExecutor(max_workers=pool_size) as executor:
                futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)}
                for future in concurrent.futures.as_completed(futures):
                    i=futures[future]
                    ret = future.result()
                    results[i]=ret
                    pbar.update(1)
        return results
    
    def Pow(a,n):
        return a**n
    
    if __name__=='__main__':
        args_mat=[(2,i) for i in range(100)]
        results=multi_thread_exec(Pow,args_mat,4,desc='多线程')
        print(results)
        results=multi_process_exec(Pow,args_mat,4,desc='多进程方法1')
        print(results)
        
    

    参考

  • 任务并行VS数据并行 – 马七风
  • Python文档 – multiprocessing
  • Python文档 – concurrent.futures
  • 本文首发于我的博客:https://www.proup.club/index.php/archives/733/
    转载请注明本页面网址和原作者:pro1515151515

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python数据并行的6种方法(进程池进度条)

    发表评论