使用Redis和Celery实现FastAPI中的异步任务和定时任务详解

简介

fastapi你访问一个接口,当这个接口执行一个非常耗时的任务的时候,其他的接口根本没办法访问,直接给你卡死,除非等这个接口的程序运行完。为什么,因为同步和异步的问题。那么我告诉你今天这个问题不存在了;对于定时任务来说,要解决起来就太简单了,也就一笔带过。

一.  redis,celery安装(略)

       redis在linux或者redis上安装都行,redis装在Windows上已然够使,但我基于linux上进行表述,如有异议,移步他处。接下来你还需要去安装一个Another Redis Desktop Manager,这个使redis的视图版,调代码嘎嘎好使。

二. 概念

       有一些概念,我必须给你讲清楚,首先代码里,生产者和消费者,生产者把任务发给任务队列然后消息中间件来把这些任务分发(如下图的delay函数)给消费者来处理这些或耗时或不耗时或阻塞或不阻塞的任务返回给你一个task_id(如上图),执行完就把一个结果以键值形式丢给backend,此外broker和backend在我这都是redis(当然RQ或者其他的也行),图里我留了个坑看有没有人能发现(实际上broker是在任务队列里面,看完如果能回答这个问题就说明这篇文章帮到你了)。  

当然task_id也会被存储在中间件redis中。

 该图取自简介说到的那个软件。

这玩意儿就是worker(消费者),嗝~记好。 

那么如何运行呢?

1.文件配置完成之后打开redis,把worker开启,运行代码delay任务拿到task_id,然后用task_id查询结果

2.文件配置完成之后打开redis,直接delay然后拿到task_id,打开worker,然后再用task_id查结果

三. Linux、celery常规命令。

1.后台启动redis后台(linux系统)

./redis-server

2.关闭后台redis,找到redis杀了就行(linux系统)

ps -ef | grep redis

3.celery启动(linux系统)

 记得在那啥的路径下运行,接着往下看,看到红字那你就知道了(路径在celery包的父目录下运行)

        举个例子  ,看下面的图,celery_task就cd到fastApiProject3执行下面的命令即可

                              

celery --app=celery_tasks worker -l INFO

顺带提一嘴,-c为每个worker的进程数,因为每一个消费者(什么是消费者就是,就是后台启动这个命令出来的东西就是消费者)提交到这里的任务就是商品,给消费者消耗的,所以没有消费者这些被提交的任务也会来排队,上面有个图反正也看不懂,这回懂了吗。

命令如下:

celery -A your_project_name worker -c 4

比如说-c 4 每个worker都只能同时一次性处理4个任务,当然也可以到celery.py(如下文)设置,本文参照的是第一个,在celery设置已经过了,这块在文章末尾有详细解释。

4.celery关闭(linux)

CTRL+C

5.定时任务启动(linux) 

在celery配置好的情况下使用即可。 

celery -A celery_tasks beat

然后在linux里输入上面这一行命令,他就自个加任务去了。既然是定时任务那么他应该优先吧?实际上并不是,每一个定时任务依然要从后面开始排队来执行。说到这里了就顺带提一嘴apply_async((1, 2), priority=1)  用priority来设置优先级(在用单独的消费者时优先级更加严格,反之则不然。)

四. 文件配置及代码

 开启celery命令在celery_tasks的父目录下执行,比如…../fastapiproject/celery_tasks就到fastapiproject文件夹下去执行之前提到的开启celery worker的命令(celery –app=celery_tasks worker -l INFO)。

pip install celery 

这个得稍微注意一下,celery里面全是配置项,__init__.py,让他置空吧。

1.celery.py

from celery import Celery

CK = Celery("celery_demo", 
              broker="redis://:123456@***.***.***.***:6379/1",             
              backend="redis://:123456@***.***.***.***:6379/2",
              include=['celery_tasks.task1', 'celery_tasks.task2'])
CK.conf.timezone = 'Asia/Shanghai'  # 时区
CK.conf.enable_utc = False  # 这个时定时任务,先不管这个
CK.conf.worker_concurrency = 4

 
# 这一块是定时任务
cel.conf.beat_schedule = {
    # 名字随便取
    "yuanfu": {
        'task': 'celery_tasks.task1.HELLO',  # 任务对象
        'schedule': timedelta(seconds=6),  # 每隔6秒执行一次
        'args': ('张三',)  # 传递参数
    },
    "..."
}

 CK.conf.worker_concurrency = 4记不记得上面在开启celery消费者说的东西,前后呼应这部给我个赞,嗯哼?此处的CK也是博主名字实际上是celery的app,conf则是配置项。

2. task1.py

import time
from celery_tasks.celery import CK

@CK.task
def HELLO(name):
    print("%s我在这只是为了证明我能传递参数" % name)
    time.sleep(10)
    print("耗时10s任务结束,你看到了吗,嗯?"%name)
    return 'Task1_Done'

关于task1.py这名字也可以随便起,里面也可以有很多@celery.task 

3. task2.py

import time
from celery_tasks.celery import CK


@CK.task
def MDFK(name):
    print(f"执行超级耗时的CPU密集型阻塞任务{name}")
    time.sleep(20)
    print(f"超级耗时CPU密集型阻塞任务{name}执行完毕")
    return f"我是执行结果XXX"

生产者的delay或者apply_async写在哪都行,想跃跃欲试了?我知道你很急但你先别急。

4.发布任务和获取任务状态(随便起名随便放哪.py)

@test_app.get("/celery_produce")
async def celery_produce(arg1: str,task1_id:str, arg2: str,task2_id:str):
    result1 = HELLO.apply_async(args=[arg1,],task_id=task1_id)
    result2 = MDFK.apply_async(args=[arg2,],task_id=task2_id)
    return {'taskid1': result1.id, 'taskid2': result2.id}
@test_app.get("/celery_get_result")
async def celery_get_result(id: str):
    from celery_operation import get_celery_statu
    result = await get_celery_statu(id)
    return result

啊,博主博主, celery_operation我这没有怎么办,行给你,拿去吧。

当然,我这只是拿fastapi举例,当然也不限于该框架。

async def get_celery_statu(id):
    result = {}
    async_result = AsyncResult(id=str(id), app=CK)
    if async_result.state=="SUCCESS":
        result["1"] = async_result.get()
        # result,forget() # 将结果删除,执行完成,结果不会自动删除
        # async.revoke(terminate=True) # 无论现在是什么时候,都要终止
        # #async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止
    else:
        result["0"]=async_result.state
    return result

 1,0是啥,自个写的逻辑,有结果了1没结果0

 五. 疑难问题

1.历史遗留问题(任务队列)

        任务队列是啥子,任务队列就是比如说你定时任务一直往队列里加,消费者也没执行完就被你给ctrl+c了,这时候这些遗留问题就面面相觑地在redis中创建一个键名为"celery"的里面排队,等你下次再打开你的worker,那些任务又开始执行,这下好了你其他任务排队得两年半。

那怎么办呢呢,redis的delete('celery')即可,如下。

@app.on_event("startup")
async def test_redis():
    pool = aioredis.ConnectionPool.from_url("redis://:123654@localhost/1", encoding="utf-8", decode_responses=True)
    # pool = aioredis.ConnectionPool.from_url("redis://:123456@10.101.0.97/0", encoding="utf-8", decode_responses=True)
    meanings = Redis(connection_pool=pool)
    print("正在检查是否存留历史遗留问题...")
    try:
        history = await meanings.lrange("celery", 0, -1)
        print("历史遗留问题任务数", len(history))
        await meanings.delete("celery")
        print("历史遗留问题清除完毕")
    except aioredis.exceptions.ConnectionError:
        print("aioredis连接失败,服务器redis关闭")
    finally:
        await pool.disconnect()

在redis里面我们可以看到是一个list,用些手段(手段:略)看一下’celery‘里面其实是一个列表式的队列先进先出。

root_id就是创建时的任务id,argsrepr即是参数。

2.worker自动创建问题

如果任务足够多而不能够应对时,它便会自个创建worker来处理任务,即使你设置了-c都没用。

解决方案:配置的celery.py文件里写autocreate_workers = False即可。

53概念图留的坑解答

任务队列是在redis中"celery"键的值,"celery"键的值即是队列本身,我的borker(消息中间件)是redis,所以队列应该要在中间件的里面。

此致。

物联沃分享整理
物联沃-IOTWORD物联网 » 使用Redis和Celery实现FastAPI中的异步任务和定时任务详解

发表评论