Django框架-使用celery(一):django使用celery的通用配置,不受版本影响
目录
一、依赖包情况
二、项目目录结构
2.1、怎么将django的应用创建到apps包
三、celery的配置
2.1、celery_task/celery.py
2.2、celery_task/async_task.py
2.3、celery_task/scheduler_task.py
2.4、utils/check_task.py
四、apps/user中配置相关处理视图
4.1、基本配置
4.2、user的models
4.3、user的视图函数
五、调用函数测试
5.1、启动项目
5.2、测试项目:Postman接口工具
六、报错
一、依赖包情况
python==3.9.0
django==3.2.0
celery==5.3.1
django-redis==5.3.0
eventlet==0.33.3 #windows系统需要使用到
注意:还需要在系统中安装好redis数据库,不然无法使用。
二、项目目录结构
2.1、怎么将django的应用创建到apps包
·1、创建user应用
cd apps
python ../manage.py startapp user
2、修改user包下的apps.py模块
from django.apps import AppConfigclass UserConfig(AppConfig):default_auto_field = 'django.db.models.BigAutoField'#原来是 name = 'user',改成下面的name = 'apps.user'
3、注册到settings.py文件中
INSTALLED_APPS = ['django.contrib.admin','django.contrib.auth','django.contrib.contenttypes','django.contrib.sessions','django.contrib.messages','django.contrib.staticfiles','apps.user.apps.UserConfig', #注册user应用,from apps.user.apps import UserConfig
]
三、celery的配置
概述:celery应用程序和任务都放到celery_task包中,其中celery.py是创建Celery的实例对象的,async_task.py用来写异步任务,scheduler_task.py用来写定时任务的。utils/check_task.py用来检测任务id是否结束并获取任务的返回值的。
2.1、celery_task/celery.py
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta# 消息中间件,密码是你redis的密码
# broker='redis://:123456@127.0.0.1:6379/2' 密码123456
broker = 'redis://127.0.0.1:6379/0' # 无密码# 任务结果存储
backend = 'redis://127.0.0.1:6379/1'#包含任务的所有模块的导入路径:
task_module = ['celery_task.async_task', #写任务模块导入路径,该模块主要写异步任务的方法'celery_task.scheduler_task', #写任务模块导入路径,该模块主要写定时任务的方法
]# 生成celery对象,'task'相当于key,用于区分celery对象
# broker是指定消息处理,backend是指定结果后端的存储位置 include参数需要指定任务模块
app = Celery('task', broker=broker, backend=backend, include=task_module)# 配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 定时任务配置
app.conf.beat_schedule = {# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},#名字随意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func',# 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22) #定时任务需要的参数},#缓存用户数据到cache中'cache-user-func':{'task':'celery_task.scheduler_task.cache_user_func',#导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule':timedelta(minutes=1),#每1分钟执行一次,将用户消息缓存到cache中}
}'''
配置:也可以使用下面这种方式:
app.conf.update(task_serializer='json',accept_content=['json'], # Ignore other contentresult_serializer='json',timezone='Asia/Shanghai',enable_utc=False,
)
'''
2.2、celery_task/async_task.py
# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()# 导入celery对象app
from celery_task.celery import app
# 导入django自带的发送邮件模块
from django.core.mail import send_mail
import threading
from study_celery import settings
'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task
'''#没有返回值,禁用掉结果后端
@app.task
def send_email_task(email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作''':param email: 接收消息的邮箱,用户的邮箱:return:'''# 启用线程发送邮件,此处最好加线程池t = threading.Thread(target=send_mail,args=("登录前获取的验证码", # 邮件标题'点击该邮件激活你的账号,否则无法登陆', # 给html_message参数传值后,该参数信息失效settings.EMAIL_HOST_USER, # 用于发送邮件的邮箱地址[email], # 接收邮件的邮件地址,可以写多个),# html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中kwargs={'html_message': f"<p></p> <p>验证码:{code}</p>"})t.start()return {'email':email,'code':code}
2.3、celery_task/scheduler_task.py
# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()from celery_task.celery import app
from apps.user.views import models as user_models
from django.core.cache import cache
import time
from django.forms import model_to_dict#有返回值,返回值可以从结果后端中获取
@app.task
def add_func(a,b):print('执行了加法函数')cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})return a+b#不需要返回值,禁用掉结果后端
@app.task(ignore_result=True)
def cache_user_func():user = user_models.UserModel.objects.all()user_dict = {}for obj in user:user_dict[obj.account] = model_to_dict(obj)cache.set('all-user-data',user_dict,timeout=35*60)
2.4、utils/check_task.py
from celery.result import AsyncResult
from celery_task.celery import app
'''验证任务的执行状态的'''def check_task_status(task_id):'''任务的执行状态:PENDING :等待执行STARTED :开始执行RETRY :重新尝试执行SUCCESS :执行成功FAILURE :执行失败:param task_id::return:'''result = AsyncResult(id=task_id, app=app)dic = {'type':result.status,'msg':'','data':'','code':400}if result.status == 'PENDING':dic['msg'] = '任务等待中'elif result.status == 'STARTED':dic['msg'] = '任务开始执行'elif result.status == 'RETRY':dic['msg']='任务重新尝试执行'elif result.status =='FAILURE':dic['msg'] = '任务执行失败了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任务执行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 将结果删除# async.revoke(terminate=True) # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。return dic
四、apps/user中配置相关处理视图
4.1、基本配置
1、study_celery/settings.py
#cache缓存
CACHES = {"default": {"BACKEND": "django_redis.cache.RedisCache","LOCATION": "redis://127.0.0.1:6379","OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient","CONNECTION_POOL_KWARGS": {"max_connections": 100}# "PASSWORD": "123",},'TIMEOUT':30*60 #缓存过期时间}
}#邮件配置
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com' # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '2414155342@qq.com' # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = 'qq邮箱的授权码' # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '<'xxxxx@qq.com>'
EMAIL_USE_SSL = True # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True
2、study_celery/urls.py
from django.contrib import admin
from django.urls import path,includeurlpatterns = [path('admin/', admin.site.urls),path('user/',include('apps.user.urls'))
]
3、apps/user/urls.py
from django.urls import path
from . import viewsurlpatterns = []
4.2、user的models
apps/user/models.py
from django.db import models
# Create your models here.class UserModel(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=32)account = models.CharField(max_length=64)password = models.CharField(max_length=256)email = models.EmailField()
执行数据库迁移命令
python manage.py makemigrations
python manage.py migrate
4.3、user的视图函数
1、apps/user/views.py
from django.contrib.auth.hashers import make_password, check_password
from django.views import View
from django.http import JsonResponse
from . import models
from django.core.cache import cache
import time
from celery_task.async_task import send_email_task
# Create your views here.class ResgiterView(View):#注册用户def post(self,request):name = request.POST.get('name')account = request.POST.get('account')password = request.POST.get('password')email = request.POST.get('email')obj = models.UserModel.objects.filter(account=account).first()if obj:return JsonResponse({'code':400,'msg':'账户已经存在了'})password = make_password(password)instance = models.UserModel.objects.create(name=name,account=account,password=password,email=email)return JsonResponse({'code':200,'msg':'注册用户成功'})class LoginView(View):#用户登录def post(self,request):account = request.POST.get('account')password = request.POST.get('password')code = request.POST.get('code') #验证码email_code = cache.get(f'email_{account}')#发给邮箱的验证码.get(f'email_{account})print(code,email_code)if code and email_code:if code != email_code:return JsonResponse({'code':400,'msg':'验证码错误'})else:return JsonResponse({'code':400,'msg':'请先点击发送邮件获取验证码'})obj = models.UserModel.objects.filter(account=account).first()if not obj:return JsonResponse({'code':400,'msg':'当前用户不存在'})pwd_true = check_password(password,obj.password)response = JsonResponse({'code':200,'msg':'登录成功'})if pwd_true:return responseelse:return JsonResponse({'code':400,'msg':'用户名或密码错误'})class LoginSendEmailView(View):#用户登录前,需要验证码,发送验证码给用户的邮箱def post(self,request):account = request.POST.get('account')email = request.POST.get('email')code = str(time.time())[-5:]cache.set(f'email_{account}',code)print(cache.get(f'email_{account}'))res = send_email_task.delay(email,code)task_id = res.idprint('验证码是',code)return JsonResponse({'code':200,'msg':f'请查看{email}邮箱中是否收到邮件','task_id':task_id})class AllUserDataView(View):#查询cache_user_func定时任务执行时存到cache中的用户数据def get(self,request):key = 'all-user-data'data = cache.get(key)if data:return JsonResponse({'code':200,'data':data})else:return JsonResponse({'code':400,'msg':'没有相关数据'})class AddFuncDataView(View):#查询add_func 定时任务执行时存到cache中的数据def get(self, request):data = cache.get('add_ret')print(data, type(data))return JsonResponse({'code': 200, 'data': data})class UserTaskIdGetDataView(View):def get(self,request):from utils.check_task import check_task_status#检查任务是否成功了,获取任务的返回值task_id = request.GET.get('task_id')if not task_id:return JsonResponse({'code':400,'msg':'没有携带任务id'})ret = check_task_status(task_id)return JsonResponse(ret)
2、apps/user/urls.py
from django.urls import path
from . import viewsurlpatterns = [path('login/',views.LoginView.as_view(),name='user-login'),#登录path('register/', views.ResgiterView.as_view(), name='user-register'),#注册path('login/code/',views.LoginSendEmailView.as_view(),name='user-login-send-email'),#登录验证码path('all/user/data/',views.AllUserDataView.as_view(),name='user-all-user-data'),#获取定时任务cache_user_func缓存到cache中的用户数据path('add/result/',views.AddFuncDataView.as_view(),name='user-add-result'),#获取定时任务add_func缓存到cache的计算结果path('task-id/result/',views.UserTaskIdGetDataView.as_view(),name='user-task-id-data'),#通过任务的task-id获取到任务返回值
]
五、调用函数测试
5.1、启动项目
1、启动django项目
python manage.py runserver
2、启动celery异步
#windows系统
celery -A celery_task worker -l info -P eventlet
#linux系统
celery -A celery_task worker -l info
3、启动celery定时(定时任务也是提交给异步的)
celery -A celery_task beat -l info
5.2、测试项目:Postman接口工具
1、注册:url= /user/register/
2、登录前点击获取验证码:返回任务id, url=/user/login/code/
把task_id=17ee8389-14ab-4da1-b88a-56446afb4493 复制下来,4中可以用到
3、登录:code是发送步骤2中发送给邮箱的验证码 ,url=/user/login/
4、通过任务id,获取到发送邮箱异步任务的返回值,url=/user/task-id/result/
复制2返回值中的task_id,获取该任务的返回值
5、获取定时任务中,add_func缓存在cache中的计算数据
6、获取定时任务中,cache_user_func缓存到cache中的用户数据
总结:
1、异步任务,一般是保存文件、发送邮件、耗时操作等
2、定时任务,定时处理某些数据。
六、报错
1、先去celery.py中查看定时任务的配置,模块的路径是不是有问题,千万不要多了一个空格啥的
2、如果通过任务id获取任务的返回值不成功,看看是不是添加@app.task(ignore_result=True),如果是就去掉这个参数配置
相关文章:

Django框架-使用celery(一):django使用celery的通用配置,不受版本影响
目录 一、依赖包情况 二、项目目录结构 2.1、怎么将django的应用创建到apps包 三、celery的配置 2.1、celery_task/celery.py 2.2、celery_task/async_task.py 2.3、celery_task/scheduler_task.py 2.4、utils/check_task.py 四、apps/user中配置相关处理视图 4.1、基本…...
nvue语法与vue的部分区别
文章目录 1、仅支持flex布局2、字体样式3、高度问题 1、仅支持flex布局 仅支持flex布局。而且默认的是 flex-direction: column; 2、字体样式 字体的样式,必须要写在 text 标签内,才能生效 错误示例: <!-- 错误示例 --> <div cl…...
Java 开发工具 IntelliJ IDEA
1. IntelliJ IDEA 简介 IntelliJ IDEA 是一款出色的 Java 集成开发环境(IDE),提供了丰富的功能和工具,支持多种语言和框架的开发,如 Java、Kotlin、Scala、 Android、Spring、Hibernate 等。IntelliJ IDEA 专注于提高…...

将vsCode 打开的多个文件分行(栏)排列,实现全部显示,便于切换文件
目录 1. 前言 2. 设置VsCode 多文件分行(栏)排列显示 1. 前言 主流编程IDE几乎都有排列切换选择所要查看的文件功能,如下为Visual Studio 2022的该功能界面: 图 1 图 2 当在Visual Studio 2022打开很多文件时,可以按照图1、图2所示找到自…...

java中的同步工具类CountDownLatch
这篇文章主要讲解java中一个比较常用的同步工具类CountDownLatch,不管是在工作还是面试中都比较常见。我们将通过案例来进行讲解分析。 一、定义 CountDownLatch的作用很简单,就是一个或者一组线程在开始执行操作之前,必须要等到其他线程执…...

路由器和交换机的区别
交换机和路由器的区别 交换机实现局域网内点对点通信,路由器实现收集发散,相当于一个猎头实现的中介的功能 路由器属于网络层,可以处理TCP/IP协议,通过IP地址寻址;交换机属于中继层,通过MAC地址寻址(列表)…...

FreeRTOS(动态内存管理)
资料来源于硬件家园:资料汇总 - FreeRTOS实时操作系统课程(多任务管理) 目录 一、动态内存管理介绍 1、heap_1 2、heap_2 3、heap_3 4、heap_4 5、heap_5 二、动态内存总结与应用 1、heap_1 2、heap_4 3、heap_5 三、内存管理编程测试 1、heap_4 2、h…...

IntelliJ IDEA(简称Idea) 基本常用设置及Maven部署---详细介绍
一,Idea是什么? 前言: 众所周知,现在有许多编译工具,如eclipse,pathon, 今天所要学的Idea编译工具 Idea是JetBrains公司开发的一款强大的集成开发环境(IDE),主要用于Java…...

【LeetCode每日一题】——128.最长连续序列
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 中等 三【题目编号】 128.最长连续序列 四【题目描述】 给定一个未…...

Redis_缓存1_缓存类型
14.redis缓存 14.1简介 穿透型缓存: 缓存与后端数据交互在一起,对服务端的调用隐藏细节。如果从缓存中可以读到数据,就直接返回,如果读不到,就到数据库中去读取,从数据库中读到数据,也是先更…...
模拟 枚举
分享牛客算法基础精选题单题目打卡!!! 目录 字符串的展开 多项式输出 机器翻译 : 铺地毯 : [NOIP2016]回文日期 字符串的展开 原题链接 : 字符串的展开 思路 : 模拟 代码 : #include<iostream> #include<cstring> #include<algorithm> using na…...

【实操】2023年npm组件库的创建发布流程
2022年的实践为基础,2023年我再建一个组件库【ZUI】。步骤回顾: 2022年的npm组件包的发布删除教程_npm i ant-design/pro-components 怎么删除_啥咕啦呛的博客-CSDN博客 1.在gitee上创建一个项目,相信你是会的 2.创建初始化项目,看吧&#…...
缓存设计的典型方案
缓存设计的典型方案 在使用缓存系统的时候,还需要考虑缓存设计的问题,重点在于缓存失效时的处理和如何更新缓存。 缓存失效是在使用缓存时不得不面对的问题。在业务开发中,缓存失效时由于找不到整个数据,一般会出于容错考虑&#…...
SQL笔记
最近的工作对SQL的应用程度较高,而且写的sql类型基本没怎么涉及过,把用到的几个关键字记录下。 使用环境:达梦数据库 达梦数据库有个特点,他有一个叫模式的说法,在图形化工具里直接点击创建查询窗口,不用像…...

UHPC的疲劳计算——兼论ModelCode2010的适用性
文章目录 0. 背景1、结论及概述2、MC10对于SN曲线的调整(囊括NC、HPC、UHPC)2.1 疲劳失效曲面的构建2.2 新模型的验证 3、MC10对于疲劳设计强度的调整及其背后的原因4. 结语 0. 背景 今年年初,有一位用UHPC做混凝土塔筒的同行告诉我…...
关于elementui的input的autocomplete的使用
项目中需要实现搜索框搜索时能自动提示可选项的功能,elementui的input组件有已经封装好的el-autocomplete可以使用,但是在使用中发现一些问题,记录一下 基础使用 // html部分 <el-autocompletev-model"name":fetch-suggestion…...
即然利用反射机制可以破坏单例模式,有什么方法避免呢?
私有构造方法中添加防止多次实例化的逻辑:在单例类的私有构造方法中,可以添加逻辑来检查是否已经存在实例,如果存在则抛出异常或返回已有的实例。这样即使通过反射创建了新的实例,也能在构造方法中进行拦截。 使用枚举实现单例&a…...

【IDEA问题】下载不了源代码
引出问题 最近不知道怎么打开 IDEA,本想查看源代码,然后点击下载源码,总是报找不到此对象的源代码。百度找了半天,GPT问了半天还是解决不了,直到遇到了这篇:idea中无法下载源码问题解决,终于得…...
代码随想录第四十八天
代码随想录第四十八天 Leetcode 198. 打家劫舍ILeetcode 213. 打家劫舍 IILeetcode 337. 打家劫舍 III Leetcode 198. 打家劫舍I 题目链接: 打家劫舍I 自己的思路:想不太出来递推公式!!!! 正确思路:这个题主要是看是否偷第下标为…...

书写自动智慧:探索Python文本分类器的开发与应用:支持二分类、多分类、多标签分类、多层级分类和Kmeans聚类
书写自动智慧:探索Python文本分类器的开发与应用:支持二分类、多分类、多标签分类、多层级分类和Kmeans聚类 文本分类器,提供多种文本分类和聚类算法,支持句子和文档级的文本分类任务,支持二分类、多分类、多标签分类…...
Cursor实现用excel数据填充word模版的方法
cursor主页:https://www.cursor.com/ 任务目标:把excel格式的数据里的单元格,按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例,…...

关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...

永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...
Docker拉取MySQL后数据库连接失败的解决方案
在使用Docker部署MySQL时,拉取并启动容器后,有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致,包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因,并提供解决方案。 一、确认MySQL容器的运行状态 …...

【若依】框架项目部署笔记
参考【SpringBoot】【Vue】项目部署_no main manifest attribute, in springboot-0.0.1-sn-CSDN博客 多一个redis安装 准备工作: 压缩包下载:http://download.redis.io/releases 1. 上传压缩包,并进入压缩包所在目录,解压到目标…...

边缘计算网关提升水产养殖尾水处理的远程运维效率
一、项目背景 随着水产养殖行业的快速发展,养殖尾水的处理成为了一个亟待解决的环保问题。传统的尾水处理方式不仅效率低下,而且难以实现精准监控和管理。为了提升尾水处理的效果和效率,同时降低人力成本,某大型水产养殖企业决定…...
Vue 实例的数据对象详解
Vue 实例的数据对象详解 在 Vue 中,数据对象是响应式系统的核心,也是组件状态的载体。理解数据对象的原理和使用方式是成为 Vue 专家的关键一步。我将从多个维度深入剖析 Vue 实例的数据对象。 一、数据对象的定义方式 1. Options API 中的定义 在 Options API 中,使用 …...