折腾了一下Django-celery

折腾了一下Django-celery

任务队列弄好了

明天就要上班了。旅游归来这两天折腾了一下Django和celery的搭配,这样以后在编写重型计算任务的时候,就可以把计算过程放到后台去,还能通过任务id来判断用户当前是不是存在计算任务。

记录一下折腾的过程,然后在回公司上班之后,一边完成新功能,一边逐步把这个功能放到正式服务器上。

安装包

要安装的包有这么几个(Django之类的就忽略了):

celery==5.6.2
django_celery_results==2.6.0
eventlet==0.40.4(windows下需要,linux下不需要)
redis==7.2.0

安装Redis

Redis作为消息代理,必须装了,而且这东西也确实好用,不得不装。下载的8.6Windows版,下载回来运行 install_redis_service.bat 就可以启动服务,默认运行在本地6379端口,只能本机访问,不需要密码。

配置Celery

Celery的官方文档中有如何与Django搭配。
其本质就是将自己加入到Django环境的配置中,然后启动Celery的时候指定模块,就从中读入Django项目相关的信息。我创建了一个名为testcelery的Django项目,配置如下:

01 创建celery.py

在testcelery/testcelery/目录下创建一个叫celery.py的文件,内容如下:

import os  
from celery import Celery  
  
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'testcelery.settings')  
app = Celery('testcelery')  
  
app.config_from_object('django.conf:settings', namespace='CELERY')  
  
app.autodiscover_tasks()

app就是要启动的celery的实例,这里要将项目名称传进去,实际上就是项目根目录的名称。
接下来的config_from就是指celery从Django的settings.py中读取前缀为CELERY的配置。
第三行就是指定自动发现所有注册在settings.py中的应用下边的以特定装饰器装饰的任务。

02 修改__init__.py

在当前目录的__init__.py文件中编写:

from .celery import app as celery_app  
__all__ = ('celery_app')

这就是注明自己的包里可以导出的内容。实际上就是给celery初始化用的。

03 settings.py中配置CELERY

有如下:

# celery  
CELERY_TIMEZONE = "Asia/Shanghai"  
CELERY_TASK_TRACK_STARTED = True  
CELERY_TASK_TIME_LIMIT = 3600  
CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"  
CELERY_RESULT_BACKEND = 'django-db'

这里很关键的一个是时区,一个是代理,要配置成redis。
还一个是使用Django数据库存放结果,这个是来自于django_celery_results这个库的支持。有了这个库,就无所谓设置CELERY是否保存结果,结果都会写入数据库。

04 settings.py中配置django_celery_results

要把django_celery_results加入app列表中:

INSTALLED_APPS = [  
    'django.contrib.admin',  
    'django.contrib.auth',  
    'django.contrib.contenttypes',  
    'django.contrib.sessions',  
    'django.contrib.messages',  
    'django.contrib.staticfiles',  
    'django_celery_results',  
]

然后执行python manage.py migrate,会把django_celery_results带的三个模型写入到数据库表中。

05 给异步任务加上注释

Celery的任务最好都放在各个app或者testcelery/testcelery/下边的tasks.py中,自动扫描会扫描到。
我创建了一个应用然后写了个很简单的异步任务:

import time  
from celery import shared_task  
  
@shared_task  
def sandcastle(x, y):  
    for i in range(15):  
        print(i)  
        time.sleep(1)  
    return x + y

06 启动Django项目和Celery

先启动Django项目,然后以命令行的方式启动Celery

Celery -A testcelery worker -l info -P eventlet

注意,启动celery的位置正常情况下IDE自动定位到项目根目录下(manage.py所在的目录)。如果手工启动,目录一定要正确,否则会无法找到-A参数后的testcelery包。
-P eventlet只在windows下有用,在linux下不要加该参数。

07 提交任务

随便写了个控制器,关键是如何来确定任务的状态:

import redis
from django.shortcuts import render
from astest.tasks import sandcastle
from django_celery_results.models import TaskResult
from celery import states

def index(request):
    r_link = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
    if request.method == 'GET':
        c_id = r_link.get('test')
        if c_id:
            if TaskResult.objects.filter(task_id=c_id).exists():
                if TaskResult.objects.get(task_id=c_id).status == states.SUCCESS:
                    r_link.delete('test')
                    return render(request, 'index.html', {
                        'repeated': f'{c_id} 成功'
                    })
                if TaskResult.objects.get(task_id=c_id).status == states.FAILURE:
                    r_link.delete('test')
                    return render(request, 'index.html', {
                        'repeated': f'{c_id} 失败'
                    })
                return render(request, 'index.html', {
                    'repeated': f'{c_id}的状态是{TaskResult.objects.get(task_id=c_id).status}'
                })
        return render(request, 'index.html', {
            'repeated': '未找到该ID'
        })


    else:
        c_id = r_link.get('test')
        if c_id:
            if TaskResult.objects.filter(task_id=c_id).exists():
                if not TaskResult.objects.get(task_id=c_id).status == states.SUCCESS or TaskResult.objects.get(task_id=c_id).status == states.FAILURE:
                    return render(request, 'index.html', {
                        'repeated': '有尚未运行完的任务'
                    })
        result = sandcastle.delay(9, 11)
        r_link.set('test', result.id)
        return render(request, 'index.html', {
            'sleep': result,
        })

这里的核心是追踪任务状态。逻辑是

  1. 启动一个任务的时候,将任务id和当前用户名放入redis。
  2. 如果用户再次提交任务,就通过TaskResult模型到数据库中查询任务状态。
  3. 查询到任务成功或者失败,允许再次提交任务,否则显示该用户已经存在后台任务。
  4. GET请求会显示当前用户的后台任务的状态。如果任务完成或失败,就会从Redis中删除,再刷新就会显示当前用户无任务。

这样就可以在重型任务页面引入该机制,避免用户反复提交重型计算任务。

08 Linux下实验

把代码放到Linux下也成功了。Celery只要改成后台服务就可以了,注意运行根目录一定要设置成Django的项目根目录。

已经编写的加工费降本的功能可以暂时不用移植,反正一个月的计算也就20秒左右,至于上传发票这种因为要交互,而且返回的结果太多,也暂时不移植了。将来编写新的项目的时候,就准备直接用celery来一起编写了。不过还得给IT申请一下,在服务器上安装celery,redis等等才行了。

LICENSED UNDER CC BY-NC-SA 4.0
Comment