Python 并发编程

本文为 https://www.bilibili.com/video/BV1bK411A7tV?p=1 课程笔记。

Python 并发模块相关官方文档:并发执行目录

概览

并发与并行

并发(concurrency):指在同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行,使得在宏观上具有多个进程同时执行的效果,但在微观上并不是同时执行的,只是把时间分成若干段,使多个进程快速交替的执行。

并行(parallel):指在同一时刻,有多条指令在多个处理器上同时执行。所以无论从微观还是从宏观来看,二者都是一起执行的。

程序提速的方法

  • 单线程串行:未利用任何并发技巧的编程方式,CPU需要等待IO,效率较低。(不加改造的naive的程序)
  • 多线程并发:在某个线程进行IO时,CPU可以调度另一个线程进行计算,提高了CPU利用率。实际上仍然是一个CPU核心进行分时调度。(threading)
  • 多CPU并行:需要多核CPU,可以并行地处理多个线程,充分利用现代CPU的多核性能。(multiprocessing)
  • 多机器并行:分布式计算,多机器并行。(hadoop/hive/spark)
  • Python 对并发编程的支持

  • 多线程:threading,使得某个线程的IO操作和另一个线程的CPU计算可以同时进行,避免CPU等待IO,提高CPU利用率。
  • 多进程:multiprocessing,利用现代CPU多核的特点,真正地并行执行任务。
  • 异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数的异步执行。
  • 使用Lock对临界区资源进行加锁,避免数据竞争。
  • 使用Queue实现不同线程/进程之间的数据通信,实现生产者/消费者模式。
  • 使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果。
  • 使用subprocess启动外部程序的进程,并进行输入输出的交互。
  • 怎样选择多线程多进程多协程

    Python并发编程有以下三种方式:多线程 Thread,多进程 Process,多协程 Coroutine。

    本小节将介绍如何根据具体任务选择最适合的编程方式。

    CPU密集型计算与IO密集型计算

    CPU密集型(CPU-bound)

    CPU密集型也叫做计算密集型,是指IO在很短的时间内就可以完成,CPU需要大量的计算和处理,CPU占用率相当高。

    例如:压缩解压缩、加密解密、正则表达式搜索等

    IO密集型(IO-bound)

    IO密集型指的是程序运行的大部分状况是CPU在等待IO(磁盘、内存、网络等)的读写完成,CPU的占用率比较低。

    例如:文件处理程序、网络爬虫程序、读写数据库程序等

    多线程、多进程、多协程的对比

    包含关系:一个进程中可以启动多个线程,一个线程中可以启动多个协程。

  • 多进程

    优点:可以利用多核CPU进行并行计算,在 CPython 中,只有多进程可以

    缺点:占用资源最多,可以启动数目最少

    使用场景:CPU密集型计算

  • 多线程

    优点:相比于进程更轻量级,占用资源更少

    缺点:

  • 相较于进程:受限于全局解释器锁GIL,在CPython中,多线程只能使用一个CPU,即只能并发执行,无法充分利用多核处理器。但是Python中的多线程技术也能用于IO密集型计算,这是利用CPU和IO可以同时执行的特点。
  • 相较于协程:启动数目有限制,占用内存资源,有线程切换的开销
  • 适用场景:IO密集型计算、同时运行的任务数目不是特别多

  • 多协程

    优点:内存开销最小,可启动数目最多(可多达几万个)

    缺点:支持的库有限制(如requests库不支持,需用aiohttp)、代码实现复杂

    适用于:IO密集型计算、需要启动超多任务、有现成的库支持

  • 怎样根据任务选择对应的技术

    CPython中的全局解释器锁GIL

    Python速度慢的两大原因

    1. Python是动态类型语言,边解释,边执行
    2. CPython中的全局解释器锁GIL,使得无法利用多核CPU并行执行

    GIL是什么、为何会有GIL

    全局解释器锁(Global Interpreter Lock,简称 GIL)是 CPython 解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即使是在多核心处理器上,使用 GIL 锁的 Python 解释器也只允许同一时时间运行一个进程。

    为何引入GIL?

    既然 GIL 会在 Python 并发编程中给我们带来的困难,那当时 CPython 解释器是为什么会引入 GIL 呢?实际上,在当时,绝大部分处理器都是单核心的,考虑不到并行的问题,而为了解决多线程之间数据完整性和状态同步的问题,引入了 GIL。

    总之,就是当时为了简单直接地解决 Python 对共享资源的管理问题,引入了 GIL,想在想去除也去不掉了。

    (至少短期内是去不掉了,FB有在做相关的工作)

    怎样规避其带来的限制

    那么,我们该怎样来规避 GIL 在 Python 并发编程中带来的问题呢?

    1. 在IO密集型计算中,多线程 threading 机制依然是有作用的,因为在 IO 期间,线程会释放 GIL,其他线程可以获取 GIL 并进行 CPU 计算,从而利用 IO 操作与 CPU 计算的同时执行来大幅提升速度。

      但是在用于 CPU 密集型计算时,多线程技术反而会降低程序的效率。

    2. 在面对 CPU 密集型计算时,为了充分利用现代 CPU 的多核能力,我们可以利用 multiprocessing 的多进程技术。

    代码示例:多线程爬虫 vs. 单线程爬虫

    Python创建多线程的方法

    Python创建多线程可分为四步:

    1. 准备一个函数

      def my_func(a, b):
      	do_craw(a, b)
      
    2. 创建一个线程

      import threading
      t = threading.Thread(target=my_func, args=(100, 200))
      
    3. 启动线程

      t.start()
      
    4. 等待线程结束(如果你在乎它什么时候结束)

      t.join()
      

    多线程爬虫代码示例及效率对比

    以下代码爬取博客园的 50 个页面,single_threadmulti_thread 两个函数分别使用单线程和多线程实现,我们运行来看一下它们的效率对比:

    import requests
    import time
    import threading
    
    base_url = 'https://www.cnblogs.com'
    urls = [base_url+f'/#p{i}' for i in range(1, 51)]
    
    def craw(url):
        response = requests.get(url)
        print(url, len(response.text))
    
    def single_thread():
        for url in urls:
            craw(url)
    
    def multi_thread():
        threads = []
        for url in urls:
            threads.append( 
                threading.Thread(target=craw, args=(url, ))
            )
        for thread in threads:
            thread.start()
        
        for thread in threads:
            thread.join()
    
    if __name__ == '__main__':
        start = time.time()
        single_thread()
        end = time.time()
        print("Single Thread Crawling Cost", end-start, "seconds.")
    
        start = time.time()
        multi_thread()
        end = time.time()
        print("Multi Thread Crawling Cost", end-start, "seconds.")
    

    输出:

    https://www.cnblogs.com/#p1 69631
    https://www.cnblogs.com/#p2 69631
    ...
    https://www.cnblogs.com/#p49 69631
    https://www.cnblogs.com/#p50 69631
    Single Thread Crawling Cost 3.4501893520355225 seconds.
    https://www.cnblogs.com/#p2 69631
    https://www.cnblogs.com/#p3 69631
    ...
    https://www.cnblogs.com/#p50 69631
    https://www.cnblogs.com/#p48 69631
    https://www.cnblogs.com/#p31 69631
    Multi Thread Crawling Cost 0.2533271312713623 seconds.
    

    我们注意到:

    1. 多线程比单线程快了约 13 倍
    2. 各个多线程任务的执行和返回是无序的

    Python实现生产者消费者爬虫——queue线程通信

    生产者消费者的爬虫模型

    多线程数据通信:queue.Queue

    import queue
    q = queue.Queue() 	# 创建Queue
    q.put(item)			# 添加元素
    item = q.get()		# 获取元素
    q.qsize() 		# 查看元素个数
    q.empty()			# 判断是否为空
    q.full()			# 判断是否已满
    

    代码示例

    以下生产者 craw 爬取页面并将返回的 html 放到 html_queue,消费者 parsehtml_queue 中取得 html ,解析,并将结果存入到文件中。

    import requests
    import time
    import threading
    import queue
    from bs4 import BeautifulSoup
    import random
    
    base_url = 'https://www.cnblogs.com'
    urls = [base_url+f'/#p{i}' for i in range(1, 51)]
    
    def craw(url):          # 消费者
        response = requests.get(url)
        return  response.text
    
    def parse(html):        #  生产者
        soup = BeautifulSoup(html, 'html.parser')
        links = soup.find_all('a', class_='post-item-title')
        return [(link['href'], link.get_text()) for link in links]
    
    def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
        while True:
            url = url_queue.get()
            html = craw(url)
            html_queue.put(html)
            print(threading.current_thread().name, f"craw {url}",
                    "url_queue.size = ", url_queue.qsize())
            time.sleep(random.randint(1, 2))
    
    def do_parse(html_queue: queue.Queue, fout):
        while True:
            html = html_queue.get()
            results = parse(html)
            for result in results:
                fout.write(str(result) + '\n')
            print(threading.current_thread().name, f"results.size {len(results)}",
                    "html_queue.size = ", html_queue.qsize())
            time.sleep(random.randint(1, 2))
      
    
    
    if __name__ == '__main__':
        url_queue = queue.Queue()
        html_queue = queue.Queue()
        for url in urls:
            url_queue.put(url)
    
        for idx in range(3):
            t = threading.Thread(
                target=do_craw, args=(url_queue, html_queue), name=f"craw {idx}"
            )
            t.start()
        fout = open('result.txt', 'w')
        for idx in range(2):
            t = threading.Thread(
                target=do_parse, args=(html_queue, fout), name=f"parse {idx}"
            )
            t.start()
    
    

    输出 :

    craw 1 craw https://www.cnblogs.com/#p2 url_queue.size =  47
    parse 0 results.size 20 html_queue.size =  0
    craw 2 craw https://www.cnblogs.com/#p3 url_queue.size =  47
    parse 1 results.size 20 html_queue.size =  0
    ...
    parse 0 results.size 20 html_queue.size =  5
    parse 1 results.size 20 html_queue.size =  4
    parse 1 results.size 20 html_queue.size =  3
    parse 0 results.size 20 html_queue.size =  2
    parse 1 results.size 20 html_queue.size =  1
    parse 0 results.size 20 html_queue.size =  0
    

    可以看到,由于生产者有3个线程,消费者有2个线程,即页面的爬取比解析的要快,故 html_queue 中的元素个数有升有降,但总体是在增加。最终生产者已经将 url_queue 中的 url 全部爬取完毕,随后消费者才将 html_queue 中的 html 全部解析。

    Python并发编程中的线程安全问题

    线程安全概念的介绍

    线程安全指的是某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程间的的共享变量,是程序功能正确完成。

    由于线程的执行随时会发生切换,就会造成不可预料的后果,出现线程的不安全。

    注意线程并发不安全的问题只有在线程切换时机恰好导致了数据竞争时才会出现,也就是说有时会出现,有时又不会出现。

    Lock用于保证线程安全

    用法1:try-finally 模式

    import threading
    lock = threading.Lock()
    lock.acquire()
    try: 
      # do something
    finally:
    	lock,release()
    

    用法2:with模式

    import threading
    lock = threading.Lock()
    with lock:
      # do something
    

    代码示例

    我们先看不加锁的版本:

    import threading
    import time
    
    class Account:
        def __init__(self, balance):
            self.balance = balance
        
    def draw(account, number):
        if account.balance >= number:
            print(threading.current_thread().name, '取钱成功')
            # time.sleep(0.1)
            account.balance -= number
            print(threading.current_thread().name, '余额: ', account.balance)
        else:
            print(threading.current_thread().name, '余额不足')
    
    if __name__ == '__main__':
        acc = Account(1000)
        ta = threading.Thread(name='ta', target=draw, args=(acc, 800))
        tb = threading.Thread(name='tb', target=draw, args=(acc, 800))
    
        ta.start()
        tb.start()
    

    这是不加锁的版本,有可能会出现数据竞争导致出错:

    ta 取钱成功
    tb 取钱成功
    tb 余额:  200
    ta 余额:  -600
    

    但实际上,笔者在实测的时候很难出错,试了很多次都是“正常的”:

    ta 取钱成功
    ta 余额:  200
    tb 余额不足
    

    当然,前面也提到了,这种 ”正常“ 其实是一种假象,只是刚好线程的调度没有导致数据竞争的问题而已。为了复现出错误,可以像笔者一样,在 if 判断之后,加上0.1s的暂停,它会导致线程的阻塞,从而导致线程的切换,而如果在这里发生了线程的切换的话,是一定会出现数据竞争的问题的。

    加锁保护临界区:

    import threading
    import time
    
    lock = Lock = threading.Lock()
    
    class Account:
        def __init__(self, balance):
            self.balance = balance
     
    def draw(account, number):
      with lock:			# 加锁保护
        if account.balance >= number:
            print(threading.current_thread().name, '取钱成功')
            # time.sleep(0.1)
            account.balance -= number
            print(threading.current_thread().name, '余额: ', account.balance)
        else:
            print(threading.current_thread().name, '余额不足')
    
    if __name__ == '__main__':
        acc = Account(1000)
        ta = threading.Thread(name='ta', target=draw, args=(acc, 800))
        tb = threading.Thread(name='tb', target=draw, args=(acc, 800))
    
        ta.start()
        tb.start()
    

    加速保护操作临界区变量的代码段之后,不会再出现问题。

    线程池ThreadPoolExecutor

    线程池的原理

    新建线程需要系统分配资源,终止线程需要系统回收资源,如果可以重用线程,则可以省去新建/终止的开销。

    使用线程池的好处

  • 提升性能:省去大量新建、终止线程的开销,重用了线程资源
  • 防御功能:能够有效避免因为创建线程过多,而导致系统符合过大而响应变慢的问题
  • 代码优势:使用线程池的语法相比于自己新建线程执行线程更加简洁
  • 使用场景:适合处理突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短

    ThreadPoolExecutor的语法

    用法1:map

    from concurrent.futures import ThreadPoolExecutor
    
    with TheadPoolExecutor() as pool:
      results = pool.map(craw, urls)
      for result in results:
        print(result)
    

    map的结果和入参的顺序是对应的

    用法2:future

    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    with ThreadPoolExecutor() as pool:
      futures = [pool.submit(craw, url) for url in urls]
     	
      for future in futures:
        print(future.result())
       for future in as_completed(futures):
        print(future.result())
    

    future模式,更强大,注意 as_completed 包裹执行的话会先完成,先返回,而非按照入参的顺序,是否要用 as_completed 需要看实际情况。

    代码示例:使用线程池改造爬虫程序

    我们接下来使用线程池的方法实现上面的爬虫的例子:

    import concurrent.futures
    import requests
    from bs4 import BeautifulSoup
    
    base_url = 'https://www.cnblogs.com'
    urls = [base_url+f'/#p{i}' for i in range(1, 51)]
    
    def craw(url):          # 消费者
        response = requests.get(url)
        return  response.text
    
    def parse(html):        #  生产者
        soup = BeautifulSoup(html, 'html.parser')
        links = soup.find_all('a', class_='post-item-title')
        return [(link['href'], link.get_text()) for link in links]
    
    # craw
    with concurrent.futures.ThreadPoolExecutor() as pool:
        htmls = pool.map(craw, urls)
        htmls = list(zip(urls, htmls))
        for url, html in htmls:
            print(url, len(html))
    print("craw over")
    
    # parse
    with concurrent.futures.ThreadPoolExecutor() as pool:
        futures = {}
        for url, html in htmls:
            future = pool.submit(parse, html)
            futures[future] = url
    
        # for futrue, url in futures.items():			# 按入参顺序返回
        #     print(url, future.result())
    
        for future in concurrent.futures.as_completed(futures):		# 无序,先结束先返回
            url = futures[future]
            print(url, future.result())
    print("parse over")
    

    在web服务中使用线程池加速

    web服务的架构及特点

    特点

    1. web服务对响应时间的要求很高,如200ms内返回
    2. web服务中有大量的依赖IO操作的调用,比如磁盘文件、数据库、远程API
    3. web服务经常需要处理几万人,几百万人的同时请求

    使用线程池ThreadPoolExecutor加速

    使用线程池的好处:

    1. 方便地将磁盘文件,数据库。远程API和IO调用并行执行
    2. 线程池的线程数目不会无限创建从而导致系统挂掉,具有防御功能

    使用Flask实现web服务并实现加速

    无加速版本:

    import flask
    import json
    import time
    
    app = flask.Flask(__name__)
    
    def read_file():
        time.sleep(0.1)
        return 'file result'
        
    def read_db():
        time.sleep(0.2)
        return 'db result'
    
    def read_api():
        time.sleep(0.3)
        return 'api result'
    
    @app.route('/')
    def index():
        result_file = read_file()
        result_db = read_db()
        result_api = read_api()
    
        return json.dumps({
                'result_file': result_file,
                'result_db': result_db,
                'result_api': result_api
            })
    if __name__ == '__main__':
        app.run()
    

    上面是无多线程加速的版本,我们手动算一下,响应时间应该是 0.1+0.2+0.3+…=0.6s +,即应该是0.6s多,我们运行起flask服务,并测试响应时间:

    time curl http://127.0.0.1:5000/
    

    输出:

    {"result_file": "file result", "result_db": "db result", "result_api": "api result"}
    real	0m0.612s
    user	0m0.004s
    sys	0m0.004s
    

    与我们的预计相符。

    由于三个动作都是IO操作,即是一个IO密集型的操作,我们可以用Python并发中的多线程技术来加速,接下来我们使用线程池来加速这一过程,只需对上面的程序作如下改动:

    from concurrent.futures import ThreadPoolExecutor		# 导包
    pool = ThreadPoolExecutor()		# 初始化一个全局可用的线程池
    
    # 改动index()
    @app.route('/')
    def index():
        result_file = pool.submit(read_file)
        result_db = pool.submit(read_db)
        result_api = pool.submit(read_api)
    
        return json.dumps({
                'result_file': result_file.result(),
                'result_db': result_db.result(),
                'result_api': result_api.result()
            })
    

    注意我们通常是初始化一个全局可用的线程池,而非在函数内部创建线程池。

    输出

    {"result_file": "file result", "result_db": "db result", "result_api": "api result"}
    real	0m0.311s
    user	0m0.008s
    sys	0m0.000s
    

    可以看到响应时间变为了 0.3s,这是合理的,因为我们使用了多线程技术,三个IO密集计算预期会并行进行,最终的响应时间取决于最常的函数。

    Python多进程multiprocessing

    Python中,有了多线程threading,为什么还需要多进程multiprocessing

    IO密集型计算

    在 IO 密集型计算中,多线程技术可以使得一个将线程在进行IO时,其他的线程获取 GIL 进行 CPU 计算,从而提高整体效率。

    CPU密集型计算

    而在CPU密集型计算中,由于IO较少,IO与CPU并行的效果不明显,反而频繁 GIL 锁的检查和线程切换会降低整体效率。

    multiprocessing模块可以通过多进程摆脱 GIL 锁的限制,使 Python 程序得在现代多核处理器上真正的并行执行。

    多进程multiprocessing知识梳理

    可以看到,多进程 multiprocessing 模块与多线程 threading 模块的 API 几乎完全一致:

    代码示例:CPU密集计算中单线程、多线程、多进程的效率对比

    我们使用判断素数这样一个典型的 CPU 密集型计算来测试这种情况下单线程、多线程、多进程的效率对比:

    import math
    
    PRIMES = [112272535095293] * 100
    
    def is_prime(num):
        if num < 2: return False
        if num == 2: return True
        if num % 2 == 0: return False
        sqrt = int(math.floor(math.sqrt(num)))
        for i in range(3, sqrt + 1, 2):
            if num % i == 0: return False
        return True
    
    def single_thread():
        for num in PRIMES:
            is_prime(num)
    
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    def multi_thread():
        with ThreadPoolExecutor() as pool:
            pool.map(is_prime, PRIMES)
    def multi_process():
        with ProcessPoolExecutor() as pool:
            pool.map(is_prime, PRIMES)
    
    if __name__ == '__main__':
        import time
        start = time.time()
        single_thread()
        print("Single Thread Costs: ", time.time()-start)
    
        start = time.time()
        multi_thread()
        print("Multi Thread Costs: ", time.time()-start)
    
        start = time.time()
        multi_process()
        print("Multi Process Costs: ", time.time()-start)
    

    输出:

    Single Thread Costs:  40.87938737869263
    Multi Thread Costs:  64.84761428833008
    Multi Process Costs:  5.259229898452759
    

    可以明显地看到:

  • 多线程在 CPU 密集型计算中由于 GIL 锁的限制,无法利用多核 CPU 进行计算,而线程切换和锁的检查反而会拖慢整体效率
  • 多进程在 CPU 密集型计算中则大显身手,充分利用 CPU 的多核能力,显著提高了程序效率
  • 与我们的预期是相符的。

    在 flask 中使用多进程来加速CPU密集型计算

    如果我们的web服务中有 CPU 密集型计算需要响应,可能会用到多进程技术来处理。与多线程不同,多进程之间是不共享地址空间的,因此就不如多线程灵活,在使用上有一些限制,有时需要自己查阅资料解决。

    比如在 flask 框架中,需要注意以下两点:

    1. 线程池的初始化必须在所有调用它的函数都定义完成之后
    2. 必须放在 if __name__ == '__main__' 中执行

    而多线程的使用就不需要受到这些限制。

    (其他写法与多线程是相同的,因为也介绍过 Python 中多线程与多进程的 API 几乎完全相同)

    Python异步IO实现并发爬虫

    IO多路复用

    原理

  • 是一种同步IO模型,实现一个线程可以监视多个文件句柄;
  • 一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;
  • 没有文件句柄就绪时会阻塞应用程序,交出cpu
  • 多路是指网络连接,复用指的是同一个线程
  • 3种实现方式

  • select

  • 数据结构:bitmap

  • 最大连接数:1024

  • fd拷贝:每次调用select拷贝

  • 工作效率:轮询O(N)

  • poll

  • 数据结构:数组

  • 最大连接数:无上限

  • fd拷贝:每次调用poll拷贝

  • 工作效率:轮询O(N)

  • epoll

  • 数据结构:红黑树

  • 最大连接数:无上限

  • fd拷贝:fd首次调用epool_ctl拷贝,每次调用epoll_wait不拷贝

  • 工作效率:回调O(1)

  • 协程:在单线程中实现并发

    单线程爬虫的执行路径

    协程

    核心原理:用一个至尊循环(其实就是 while True)循环,里面每次轮询处理所有的 task,配合 IO 多路复用原理(即IO操作时CPU可以同时进行其他计算)来实现并发

    与多线程类似,协程也是适用于IO密集型计算,并且,由于协程是在线程内的,没有线程切换的开销,因此通常比多线程更快。

    Python异步IO库:asyncio

    以爬虫程序为例,使用协程的语法:

    import asyncio
    
    # 获取事件循环
    loop = asyncio.get_event_loop()
    
    # 定义协程
    async def myfunc(url):
    	await get_url(url)
    
    # 创建task列表
    tasks = [loop.create_task(myfunc(url) for url in urls)]
    
    # 执行爬虫事件列表
    loop.run_until_complete(asyncio.wait(tasks))
    

    注意:要用在异步IO编程中,依赖的库必须支持异步IO特性,如网络库中 requests 不支持异步,需要用 aiohttp

    代码示例:

    import asyncio
    import aiohttp
    
    base_url = 'https://www.cnblogs.com'
    urls = [base_url+f'/#p{i}' for i in range(1, 51)]
    
    async def async_craw(url):
        print("craw url: ", url)
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                result = await resp.text()
                print("craw url: ", url)
    
    loop = asyncio.get_event_loop()
    tasks = [ loop.create_task(async_craw(url)) for url in urls ]
    
    import time
    start = time.time()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    print(f"async spider cost time: {end-start} seconds")
    
    

    输出:

    ...
    craw url:  https://www.cnblogs.com/#p29
    craw url:  https://www.cnblogs.com/#p38
    craw url:  https://www.cnblogs.com/#p36
    craw url:  https://www.cnblogs.com/#p33
    async spider cost time: 0.18497681617736816 seconds
    

    可以看到,由于没有线程切换的开销,在IO密集型计算中,协程比多线程技术还要快。

    在异步IO中使用信号量来控制并发度

    信号量(semaphore)是一个同步对象,用于保持在0至指定最大值之间的一个计数值

  • 当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一
  • 当线程完成一次对该semaphore对象的释放(realease)时,该计数值加一
  • 当计数值为0时,则线程等待该seamphore对象不再能成功直至该semaphore对象变为signaled状态
  • seamphore对象的计数值为0时,为signaled状态;等于0时为nonsignaled状态
  • 使用方法

    与锁类似,有两种用法:

    用法1

    sem = asyncio.Semaphore(10)
    
    # ... later
    async with sem:
    	# work with shared resource
    

    用法2

    sem = asyncio.Semaphore(10)
    
    # ...later
    await sem.acquire()
    try:
      # work with shared resource
    finally:
      sem.release()
    

    爬虫代码中使用sem控制并发度

    仅需有三处调整:

    import asyncio
    import aiohttp
    
    base_url = 'https://www.cnblogs.com'
    urls = [base_url+f'/#p{i}' for i in range(1, 51)]
    
    # sem控制并发度
    sem = asyncio.Semaphore(10)
    
    async def async_craw(url):
        async with sem:     # sem控制并发度,并调整缩进
            print("craw url: ", url)
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as resp:
                    result = await resp.text()
                    await asyncio.sleep(5)  # 更直观地体现并发度
                    print("craw url: ", url)
    
    loop = asyncio.get_event_loop()
    tasks = [ loop.create_task(async_craw(url)) for url in urls ]
    
    import time
    start = time.time()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    print(f"async spider cost time: {end-start} seconds")
    

    Ref:

    https://blog.csdn.net/xili2532/article/details/117489248

    https://www.jianshu.com/p/cbf9588b2afb

    https://www.bilibili.com/video/BV1bK411A7tV?p=1

    来源:Adenialzz

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python 并发编程

    发表评论