Python Celery框架与Django的集成使用指南

学习目标:

通过文章了解celery的运行机制以及如何结合django去使用

  • 熟悉celery的运行原理
  • 属性celery在django项目当中的配置
  • 如何启动运行celery框架

  • 学习内容:

  • 熟悉celery的运行原理,简单来说
  • Celery 是一个“任务排队机+后台处理器”。帮你把一些慢的、耗时的工作,丢到后台去异步处理,不影响用户继续正常使用网站。Celery内部又分为这些组件,任务的流转图如下,通常我们需要关注的就是定义任务,和生产任务投递到worker当中

    组件名 简单理解 主要作用 实现方式
    Task(任务) 要干的事情 定义要做的函数 比如发邮件、跑批函数 @shared_task或者 @app.task注解
    Broker(消息中间件) 传话的中介人 Django丢给Broker 比如发邮件、跑批函数 Redis对列或RabbitMQ对列
    Worker(工人) 真正干活的人 后台进程 不停从队列拿任务,执行任务 通过命令创建worker,celery -A myproject worker --loglevel=info启动一个worker,可以启用多个worker,进行并行并行处理,此时需要注意并行处理的数据安全
    Result Backend(结果存储器) 存储执行结果 worker执行的结果需要存储 通过Redis、关系型数据库都可以实现
    Scheduler(调度器) 生成任务并把任务存储起来 将任务投递给 对应的消息中间件Broker当中 常用的Celery Beat,同时可以解析cron表达式进行任务投递

    请添加图片描述

  • 为什么要在django当中引入Celery?
  • Django本身是同步阻塞的,它不适合干很慢的活。 假设我们在处理用户注册时,需要调用给用户发送邮件的功能,但是这个功能处理又很耗时,如果都放在 Django 请求里同步跑,会出现什么问题?用户界面卡住,网页转圈圈、请求超时,服务器崩溃,这样用户的体验就会很差。
  • 另外一个重要的用途,我们的系统当中一般都有跑批定时任务,我们通过Celery可以很好的通过异步的方式处理这种跑批的操作。一般我们在系统当中引入Celery基于这种情况会比较多一些

  • 如何在django项目当中配置celery

    1. 安装 Celery 和 Redis
    pip install celery
    pip install redis
    
    2. 在项目根目录(和settings.py同级)新建一个celery.py
    # myproject/celery.py
    
    import os
    from celery import Celery
    
    # 设置环境变量,指向你的Django设置
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
    
    # 创建Celery应用
    app = Celery('myproject')
    
    # 从Django settings中加载Celery配置(以CELERY_开头)
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # 自动发现Django应用中的tasks.py文件
    app.autodiscover_tasks()
    
    @app.task(bind=True)
    def debug_task(self):
        print(f'Request: {self.request!r}')
    
    

    如果我们使用了其他的名称,例如:customer_task.py,只要我们在内部方式使用了@shared_task(bind=True),一样是可以扫描到的,Celery启动的时候,会去你项目里所有安装的 Django App,自动查找能被发现的模块,只是默认是找tasks.py。

    3. 修改__init__.py让 Django 启动时自动加载Celery,在myproject/init.py加上:
    from .celery import app as celery_app
    
    __all__ = ('celery_app',)
    
    
    4. 配置settings.py
    # settings.py
    
    # 使用Redis作为broker
    CELERY_BROKER_URL = 'redis://localhost:6379/0'
    
    # 存储任务结果(可选)
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
    
    # 配置序列化格式
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    
    # 时区设置
    CELERY_TIMEZONE = 'Asia/Shanghai'  # 根据你的时区修改
    
    
    5. 在你的 Django 应用中写tasks.py

    比如你有个 app 叫myapp,在myapp/tasks.py或者custom_task.py里写任务:

    # myapp/tasks.py
    
    from celery import shared_task
    
    @shared_task
    def test_print():
        print(f"now task is running")
    
    @shared_task
    def send_email_task(email_address):
        # 假装这里发送邮件
        print(f"Sending email to {email_address}")
    
    
    6. 启动 Celery Worker,启动完成后会打印启动日志和扫描到带@shared_task或者 @app.task注解的函数
    celery -A myproject worker --loglevel=info # 启动一个worker
    
    

    [tasks] 列表是Celery Worker扫描、注册成功的所有task 函数,当你定义的task函数在这里边就说明你的任务函数被扫描到了,但并不代理这个生产了可执行的任务此时我们如果不通过代码调用生产任务或者使用Scheduler组件生成任务投递到Broker当中,worker是拉取不到可执行的任务的。

    7. 通过代码或者Scheduler组件投递任务
  • 调用方法的示例,上述我们定义了一个test_print方法,调用test_print.delay()就会生成一个任务投递到Broker当中
  • 定义一个cron表达式任务,启动Scheduler线程,生成任务
    # 不使用数据库示例
    from celery.schedules import crontab
    
    app.conf.beat_schedule = {
        'clear-logs-every-night': {
            'task': 'myapp.tasks.test_print',
            'schedule': crontab(minute='*/1'),  # 每分钟生成一个任务
        },
    }
    
    
    # 启动Scheduler线程,生产task任务投递到Broker当中
    celery -A myproject beat -l info
    

    启动完成后就可以在启动的Worker当中查看到任务的执行了,因为我起了两个Worker,所以任务并不是被一个Worker全部消费的。


  • 总结

    通过上面的一系列的学习,我们就能够很清晰的了解了Celery框架的组成和使用原理
    

    作者:晓龙的Coding之路

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python Celery框架与Django的集成使用指南

    发表回复