Django框架-使用celery(一):django使用celery的通用配置,不受版本影响
目录
一、依赖包情况
二、项目目录结构
2.1、怎么将django的应用创建到apps包
三、celery的配置
2.1、celery_task/celery.py
2.2、celery_task/async_task.py
2.3、celery_task/scheduler_task.py
2.4、utils/check_task.py
四、apps/user中配置相关处理视图
4.1、基本配置
4.2、user的models
4.3、user的视图函数
五、调用函数测试
5.1、启动项目
5.2、测试项目:Postman接口工具
六、报错
一、依赖包情况
python==3.9.0
django==3.2.0
celery==5.3.1
django-redis==5.3.0
eventlet==0.33.3 #windows系统需要使用到
注意:还需要在系统中安装好redis数据库,不然无法使用。
二、项目目录结构
2.1、怎么将django的应用创建到apps包
·1、创建user应用
cd apps
python ../manage.py startapp user
2、修改user包下的apps.py模块
from django.apps import AppConfigclass UserConfig(AppConfig):default_auto_field = 'django.db.models.BigAutoField'#原来是 name = 'user',改成下面的name = 'apps.user'
3、注册到settings.py文件中
INSTALLED_APPS = ['django.contrib.admin','django.contrib.auth','django.contrib.contenttypes','django.contrib.sessions','django.contrib.messages','django.contrib.staticfiles','apps.user.apps.UserConfig', #注册user应用,from apps.user.apps import UserConfig
]
三、celery的配置
概述:celery应用程序和任务都放到celery_task包中,其中celery.py是创建Celery的实例对象的,async_task.py用来写异步任务,scheduler_task.py用来写定时任务的。utils/check_task.py用来检测任务id是否结束并获取任务的返回值的。
2.1、celery_task/celery.py
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta# 消息中间件,密码是你redis的密码
# broker='redis://:123456@127.0.0.1:6379/2' 密码123456
broker = 'redis://127.0.0.1:6379/0' # 无密码# 任务结果存储
backend = 'redis://127.0.0.1:6379/1'#包含任务的所有模块的导入路径:
task_module = ['celery_task.async_task', #写任务模块导入路径,该模块主要写异步任务的方法'celery_task.scheduler_task', #写任务模块导入路径,该模块主要写定时任务的方法
]# 生成celery对象,'task'相当于key,用于区分celery对象
# broker是指定消息处理,backend是指定结果后端的存储位置 include参数需要指定任务模块
app = Celery('task', broker=broker, backend=backend, include=task_module)# 配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 定时任务配置
app.conf.beat_schedule = {# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},#名字随意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func',# 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22) #定时任务需要的参数},#缓存用户数据到cache中'cache-user-func':{'task':'celery_task.scheduler_task.cache_user_func',#导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule':timedelta(minutes=1),#每1分钟执行一次,将用户消息缓存到cache中}
}'''
配置:也可以使用下面这种方式:
app.conf.update(task_serializer='json',accept_content=['json'], # Ignore other contentresult_serializer='json',timezone='Asia/Shanghai',enable_utc=False,
)
'''
2.2、celery_task/async_task.py
# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()# 导入celery对象app
from celery_task.celery import app
# 导入django自带的发送邮件模块
from django.core.mail import send_mail
import threading
from study_celery import settings
'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task
'''#没有返回值,禁用掉结果后端
@app.task
def send_email_task(email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作''':param email: 接收消息的邮箱,用户的邮箱:return:'''# 启用线程发送邮件,此处最好加线程池t = threading.Thread(target=send_mail,args=("登录前获取的验证码", # 邮件标题'点击该邮件激活你的账号,否则无法登陆', # 给html_message参数传值后,该参数信息失效settings.EMAIL_HOST_USER, # 用于发送邮件的邮箱地址[email], # 接收邮件的邮件地址,可以写多个),# html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中kwargs={'html_message': f"<p></p> <p>验证码:{code}</p>"})t.start()return {'email':email,'code':code}
2.3、celery_task/scheduler_task.py
# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()from celery_task.celery import app
from apps.user.views import models as user_models
from django.core.cache import cache
import time
from django.forms import model_to_dict#有返回值,返回值可以从结果后端中获取
@app.task
def add_func(a,b):print('执行了加法函数')cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})return a+b#不需要返回值,禁用掉结果后端
@app.task(ignore_result=True)
def cache_user_func():user = user_models.UserModel.objects.all()user_dict = {}for obj in user:user_dict[obj.account] = model_to_dict(obj)cache.set('all-user-data',user_dict,timeout=35*60)
2.4、utils/check_task.py
from celery.result import AsyncResult
from celery_task.celery import app
'''验证任务的执行状态的'''def check_task_status(task_id):'''任务的执行状态:PENDING :等待执行STARTED :开始执行RETRY :重新尝试执行SUCCESS :执行成功FAILURE :执行失败:param task_id::return:'''result = AsyncResult(id=task_id, app=app)dic = {'type':result.status,'msg':'','data':'','code':400}if result.status == 'PENDING':dic['msg'] = '任务等待中'elif result.status == 'STARTED':dic['msg'] = '任务开始执行'elif result.status == 'RETRY':dic['msg']='任务重新尝试执行'elif result.status =='FAILURE':dic['msg'] = '任务执行失败了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任务执行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 将结果删除# async.revoke(terminate=True) # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。return dic
四、apps/user中配置相关处理视图
4.1、基本配置
1、study_celery/settings.py
#cache缓存
CACHES = {"default": {"BACKEND": "django_redis.cache.RedisCache","LOCATION": "redis://127.0.0.1:6379","OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient","CONNECTION_POOL_KWARGS": {"max_connections": 100}# "PASSWORD": "123",},'TIMEOUT':30*60 #缓存过期时间}
}#邮件配置
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com' # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '2414155342@qq.com' # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = 'qq邮箱的授权码' # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '<'xxxxx@qq.com>'
EMAIL_USE_SSL = True # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True
2、study_celery/urls.py
from django.contrib import admin
from django.urls import path,includeurlpatterns = [path('admin/', admin.site.urls),path('user/',include('apps.user.urls'))
]
3、apps/user/urls.py
from django.urls import path
from . import viewsurlpatterns = []
4.2、user的models
apps/user/models.py
from django.db import models
# Create your models here.class UserModel(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=32)account = models.CharField(max_length=64)password = models.CharField(max_length=256)email = models.EmailField()
执行数据库迁移命令
python manage.py makemigrations
python manage.py migrate
4.3、user的视图函数
1、apps/user/views.py
from django.contrib.auth.hashers import make_password, check_password
from django.views import View
from django.http import JsonResponse
from . import models
from django.core.cache import cache
import time
from celery_task.async_task import send_email_task
# Create your views here.class ResgiterView(View):#注册用户def post(self,request):name = request.POST.get('name')account = request.POST.get('account')password = request.POST.get('password')email = request.POST.get('email')obj = models.UserModel.objects.filter(account=account).first()if obj:return JsonResponse({'code':400,'msg':'账户已经存在了'})password = make_password(password)instance = models.UserModel.objects.create(name=name,account=account,password=password,email=email)return JsonResponse({'code':200,'msg':'注册用户成功'})class LoginView(View):#用户登录def post(self,request):account = request.POST.get('account')password = request.POST.get('password')code = request.POST.get('code') #验证码email_code = cache.get(f'email_{account}')#发给邮箱的验证码.get(f'email_{account})print(code,email_code)if code and email_code:if code != email_code:return JsonResponse({'code':400,'msg':'验证码错误'})else:return JsonResponse({'code':400,'msg':'请先点击发送邮件获取验证码'})obj = models.UserModel.objects.filter(account=account).first()if not obj:return JsonResponse({'code':400,'msg':'当前用户不存在'})pwd_true = check_password(password,obj.password)response = JsonResponse({'code':200,'msg':'登录成功'})if pwd_true:return responseelse:return JsonResponse({'code':400,'msg':'用户名或密码错误'})class LoginSendEmailView(View):#用户登录前,需要验证码,发送验证码给用户的邮箱def post(self,request):account = request.POST.get('account')email = request.POST.get('email')code = str(time.time())[-5:]cache.set(f'email_{account}',code)print(cache.get(f'email_{account}'))res = send_email_task.delay(email,code)task_id = res.idprint('验证码是',code)return JsonResponse({'code':200,'msg':f'请查看{email}邮箱中是否收到邮件','task_id':task_id})class AllUserDataView(View):#查询cache_user_func定时任务执行时存到cache中的用户数据def get(self,request):key = 'all-user-data'data = cache.get(key)if data:return JsonResponse({'code':200,'data':data})else:return JsonResponse({'code':400,'msg':'没有相关数据'})class AddFuncDataView(View):#查询add_func 定时任务执行时存到cache中的数据def get(self, request):data = cache.get('add_ret')print(data, type(data))return JsonResponse({'code': 200, 'data': data})class UserTaskIdGetDataView(View):def get(self,request):from utils.check_task import check_task_status#检查任务是否成功了,获取任务的返回值task_id = request.GET.get('task_id')if not task_id:return JsonResponse({'code':400,'msg':'没有携带任务id'})ret = check_task_status(task_id)return JsonResponse(ret)
2、apps/user/urls.py
from django.urls import path
from . import viewsurlpatterns = [path('login/',views.LoginView.as_view(),name='user-login'),#登录path('register/', views.ResgiterView.as_view(), name='user-register'),#注册path('login/code/',views.LoginSendEmailView.as_view(),name='user-login-send-email'),#登录验证码path('all/user/data/',views.AllUserDataView.as_view(),name='user-all-user-data'),#获取定时任务cache_user_func缓存到cache中的用户数据path('add/result/',views.AddFuncDataView.as_view(),name='user-add-result'),#获取定时任务add_func缓存到cache的计算结果path('task-id/result/',views.UserTaskIdGetDataView.as_view(),name='user-task-id-data'),#通过任务的task-id获取到任务返回值
]
五、调用函数测试
5.1、启动项目
1、启动django项目
python manage.py runserver
2、启动celery异步
#windows系统
celery -A celery_task worker -l info -P eventlet
#linux系统
celery -A celery_task worker -l info
3、启动celery定时(定时任务也是提交给异步的)
celery -A celery_task beat -l info
5.2、测试项目:Postman接口工具
1、注册:url= /user/register/
2、登录前点击获取验证码:返回任务id, url=/user/login/code/
把task_id=17ee8389-14ab-4da1-b88a-56446afb4493 复制下来,4中可以用到
3、登录:code是发送步骤2中发送给邮箱的验证码 ,url=/user/login/
4、通过任务id,获取到发送邮箱异步任务的返回值,url=/user/task-id/result/
复制2返回值中的task_id,获取该任务的返回值
5、获取定时任务中,add_func缓存在cache中的计算数据
6、获取定时任务中,cache_user_func缓存到cache中的用户数据
总结:
1、异步任务,一般是保存文件、发送邮件、耗时操作等
2、定时任务,定时处理某些数据。
六、报错
1、先去celery.py中查看定时任务的配置,模块的路径是不是有问题,千万不要多了一个空格啥的
2、如果通过任务id获取任务的返回值不成功,看看是不是添加@app.task(ignore_result=True),如果是就去掉这个参数配置
相关文章:

Django框架-使用celery(一):django使用celery的通用配置,不受版本影响
目录 一、依赖包情况 二、项目目录结构 2.1、怎么将django的应用创建到apps包 三、celery的配置 2.1、celery_task/celery.py 2.2、celery_task/async_task.py 2.3、celery_task/scheduler_task.py 2.4、utils/check_task.py 四、apps/user中配置相关处理视图 4.1、基本…...
nvue语法与vue的部分区别
文章目录 1、仅支持flex布局2、字体样式3、高度问题 1、仅支持flex布局 仅支持flex布局。而且默认的是 flex-direction: column; 2、字体样式 字体的样式,必须要写在 text 标签内,才能生效 错误示例: <!-- 错误示例 --> <div cl…...
Java 开发工具 IntelliJ IDEA
1. IntelliJ IDEA 简介 IntelliJ IDEA 是一款出色的 Java 集成开发环境(IDE),提供了丰富的功能和工具,支持多种语言和框架的开发,如 Java、Kotlin、Scala、 Android、Spring、Hibernate 等。IntelliJ IDEA 专注于提高…...

将vsCode 打开的多个文件分行(栏)排列,实现全部显示,便于切换文件
目录 1. 前言 2. 设置VsCode 多文件分行(栏)排列显示 1. 前言 主流编程IDE几乎都有排列切换选择所要查看的文件功能,如下为Visual Studio 2022的该功能界面: 图 1 图 2 当在Visual Studio 2022打开很多文件时,可以按照图1、图2所示找到自…...

java中的同步工具类CountDownLatch
这篇文章主要讲解java中一个比较常用的同步工具类CountDownLatch,不管是在工作还是面试中都比较常见。我们将通过案例来进行讲解分析。 一、定义 CountDownLatch的作用很简单,就是一个或者一组线程在开始执行操作之前,必须要等到其他线程执…...

路由器和交换机的区别
交换机和路由器的区别 交换机实现局域网内点对点通信,路由器实现收集发散,相当于一个猎头实现的中介的功能 路由器属于网络层,可以处理TCP/IP协议,通过IP地址寻址;交换机属于中继层,通过MAC地址寻址(列表)…...

FreeRTOS(动态内存管理)
资料来源于硬件家园:资料汇总 - FreeRTOS实时操作系统课程(多任务管理) 目录 一、动态内存管理介绍 1、heap_1 2、heap_2 3、heap_3 4、heap_4 5、heap_5 二、动态内存总结与应用 1、heap_1 2、heap_4 3、heap_5 三、内存管理编程测试 1、heap_4 2、h…...

IntelliJ IDEA(简称Idea) 基本常用设置及Maven部署---详细介绍
一,Idea是什么? 前言: 众所周知,现在有许多编译工具,如eclipse,pathon, 今天所要学的Idea编译工具 Idea是JetBrains公司开发的一款强大的集成开发环境(IDE),主要用于Java…...

【LeetCode每日一题】——128.最长连续序列
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 中等 三【题目编号】 128.最长连续序列 四【题目描述】 给定一个未…...

Redis_缓存1_缓存类型
14.redis缓存 14.1简介 穿透型缓存: 缓存与后端数据交互在一起,对服务端的调用隐藏细节。如果从缓存中可以读到数据,就直接返回,如果读不到,就到数据库中去读取,从数据库中读到数据,也是先更…...
模拟 枚举
分享牛客算法基础精选题单题目打卡!!! 目录 字符串的展开 多项式输出 机器翻译 : 铺地毯 : [NOIP2016]回文日期 字符串的展开 原题链接 : 字符串的展开 思路 : 模拟 代码 : #include<iostream> #include<cstring> #include<algorithm> using na…...

【实操】2023年npm组件库的创建发布流程
2022年的实践为基础,2023年我再建一个组件库【ZUI】。步骤回顾: 2022年的npm组件包的发布删除教程_npm i ant-design/pro-components 怎么删除_啥咕啦呛的博客-CSDN博客 1.在gitee上创建一个项目,相信你是会的 2.创建初始化项目,看吧&#…...
缓存设计的典型方案
缓存设计的典型方案 在使用缓存系统的时候,还需要考虑缓存设计的问题,重点在于缓存失效时的处理和如何更新缓存。 缓存失效是在使用缓存时不得不面对的问题。在业务开发中,缓存失效时由于找不到整个数据,一般会出于容错考虑&#…...
SQL笔记
最近的工作对SQL的应用程度较高,而且写的sql类型基本没怎么涉及过,把用到的几个关键字记录下。 使用环境:达梦数据库 达梦数据库有个特点,他有一个叫模式的说法,在图形化工具里直接点击创建查询窗口,不用像…...

UHPC的疲劳计算——兼论ModelCode2010的适用性
文章目录 0. 背景1、结论及概述2、MC10对于SN曲线的调整(囊括NC、HPC、UHPC)2.1 疲劳失效曲面的构建2.2 新模型的验证 3、MC10对于疲劳设计强度的调整及其背后的原因4. 结语 0. 背景 今年年初,有一位用UHPC做混凝土塔筒的同行告诉我…...
关于elementui的input的autocomplete的使用
项目中需要实现搜索框搜索时能自动提示可选项的功能,elementui的input组件有已经封装好的el-autocomplete可以使用,但是在使用中发现一些问题,记录一下 基础使用 // html部分 <el-autocompletev-model"name":fetch-suggestion…...
即然利用反射机制可以破坏单例模式,有什么方法避免呢?
私有构造方法中添加防止多次实例化的逻辑:在单例类的私有构造方法中,可以添加逻辑来检查是否已经存在实例,如果存在则抛出异常或返回已有的实例。这样即使通过反射创建了新的实例,也能在构造方法中进行拦截。 使用枚举实现单例&a…...

【IDEA问题】下载不了源代码
引出问题 最近不知道怎么打开 IDEA,本想查看源代码,然后点击下载源码,总是报找不到此对象的源代码。百度找了半天,GPT问了半天还是解决不了,直到遇到了这篇:idea中无法下载源码问题解决,终于得…...
代码随想录第四十八天
代码随想录第四十八天 Leetcode 198. 打家劫舍ILeetcode 213. 打家劫舍 IILeetcode 337. 打家劫舍 III Leetcode 198. 打家劫舍I 题目链接: 打家劫舍I 自己的思路:想不太出来递推公式!!!! 正确思路:这个题主要是看是否偷第下标为…...

书写自动智慧:探索Python文本分类器的开发与应用:支持二分类、多分类、多标签分类、多层级分类和Kmeans聚类
书写自动智慧:探索Python文本分类器的开发与应用:支持二分类、多分类、多标签分类、多层级分类和Kmeans聚类 文本分类器,提供多种文本分类和聚类算法,支持句子和文档级的文本分类任务,支持二分类、多分类、多标签分类…...
应用升级/灾备测试时使用guarantee 闪回点迅速回退
1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...

基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...

3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...

Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...

高考志愿填报管理系统---开发介绍
高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发,采用现代化的Web技术,为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## 📋 系统概述 ### 🎯 系统定…...