Python Celery框架与Django的集成使用指南
学习目标:
通过文章了解celery的运行机制以及如何结合django去使用
学习内容:
熟悉celery的运行原理,简单来说Celery 是一个“任务排队机+后台处理器”。帮你把一些慢的、耗时的工作,丢到后台去异步处理,不影响用户继续正常使用网站。Celery内部又分为这些组件,任务的流转图如下,通常我们需要关注的就是
定义任务,和生产任务投递到worker当中:
| 组件名 | 简单理解 | 主要作用 | 实现方式 |
|---|---|---|---|
| Task(任务) | 要干的事情 定义要做的函数 | 比如发邮件、跑批函数 | @shared_task或者 @app.task注解 |
| Broker(消息中间件) | 传话的中介人 Django丢给Broker | 比如发邮件、跑批函数 | Redis对列或RabbitMQ对列 |
| Worker(工人) | 真正干活的人 后台进程 | 不停从队列拿任务,执行任务 | 通过命令创建worker,celery -A myproject worker --loglevel=info启动一个worker,可以启用多个worker,进行并行并行处理,此时需要注意并行处理的数据安全 |
| Result Backend(结果存储器) | 存储执行结果 | worker执行的结果需要存储 | 通过Redis、关系型数据库都可以实现 |
| Scheduler(调度器) | 生成任务并把任务存储起来 | 将任务投递给 对应的消息中间件Broker当中 | 常用的Celery Beat,同时可以解析cron表达式进行任务投递 |

为什么要在django当中引入Celery?
一般我们在系统当中引入Celery基于这种情况会比较多一些如何在django项目当中配置celery
1. 安装 Celery 和 Redis
pip install celery
pip install redis
2. 在项目根目录(和settings.py同级)新建一个celery.py
# myproject/celery.py
import os
from celery import Celery
# 设置环境变量,指向你的Django设置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 创建Celery应用
app = Celery('myproject')
# 从Django settings中加载Celery配置(以CELERY_开头)
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现Django应用中的tasks.py文件
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
如果我们使用了其他的名称,例如:customer_task.py,只要我们在内部方式使用了@shared_task(bind=True),一样是可以扫描到的,Celery启动的时候,会去你项目里所有安装的 Django App,自动查找能被发现的模块,只是默认是找tasks.py。
3. 修改__init__.py让 Django 启动时自动加载Celery,在myproject/init.py加上:
from .celery import app as celery_app
__all__ = ('celery_app',)
4. 配置settings.py
# settings.py
# 使用Redis作为broker
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# 存储任务结果(可选)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
# 配置序列化格式
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# 时区设置
CELERY_TIMEZONE = 'Asia/Shanghai' # 根据你的时区修改
5. 在你的 Django 应用中写tasks.py
比如你有个 app 叫myapp,在myapp/tasks.py或者custom_task.py里写任务:
# myapp/tasks.py
from celery import shared_task
@shared_task
def test_print():
print(f"now task is running")
@shared_task
def send_email_task(email_address):
# 假装这里发送邮件
print(f"Sending email to {email_address}")
6. 启动 Celery Worker,启动完成后会打印启动日志和扫描到带@shared_task或者 @app.task注解的函数
celery -A myproject worker --loglevel=info # 启动一个worker

[tasks] 列表是Celery Worker扫描、注册成功的所有task 函数,当你定义的task函数在这里边就说明你的任务函数被扫描到了,但并不代理这个生产了可执行的任务此时我们如果不通过代码调用生产任务或者使用Scheduler组件生成任务投递到Broker当中,worker是拉取不到可执行的任务的。
7. 通过代码或者Scheduler组件投递任务
test_print.delay()就会生成一个任务投递到Broker当中# 不使用数据库示例
from celery.schedules import crontab
app.conf.beat_schedule = {
'clear-logs-every-night': {
'task': 'myapp.tasks.test_print',
'schedule': crontab(minute='*/1'), # 每分钟生成一个任务
},
}
# 启动Scheduler线程,生产task任务投递到Broker当中
celery -A myproject beat -l info
启动完成后就可以在启动的Worker当中查看到任务的执行了,因为我起了两个Worker,所以任务并不是被一个Worker全部消费的。

总结
通过上面的一系列的学习,我们就能够很清晰的了解了Celery框架的组成和使用原理
作者:晓龙的Coding之路