每天40分玩转Django:Django Celery
Django Celery
一、知识要点概览表
模块 | 知识点 | 掌握程度要求 |
---|---|---|
Celery基础 | 配置、任务定义、任务执行 | 深入理解 |
异步任务 | 任务状态、结果存储、错误处理 | 熟练应用 |
周期任务 | 定时任务、Crontab、任务调度 | 熟练应用 |
监控管理 | Flower、任务监控、性能优化 | 理解应用 |
二、基础配置实现
1. 安装和配置
# requirements.txt
celery==5.3.1
django-celery-results==2.5.1
django-celery-beat==2.5.0
redis==4.6.0# settings.py
INSTALLED_APPS = [...'django_celery_results','django_celery_beat',
]# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'# celery.py
import os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()@app.task(bind=True)
def debug_task(self):print(f'Request: {self.request!r}')
三、异步任务实现
1. 基本任务定义
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import User@shared_task
def send_welcome_email(user_id):"""发送欢迎邮件"""try:user = User.objects.get(id=user_id)send_mail('欢迎加入我们的平台',f'你好 {user.username},欢迎使用我们的服务!','noreply@example.com',[user.email],fail_silently=False,)return Trueexcept Exception as e:return str(e)@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):"""处理支付任务"""from .models import Ordertry:order = Order.objects.get(id=order_id)result = process_payment_gateway(order)if result.success:order.status = 'paid'order.save()return 'Payment processed successfully'else:raise ValueError('Payment failed')except Exception as exc:self.retry(exc=exc, countdown=60*5) # 5分钟后重试
2. 任务链和组
from celery import chain, group, chord
from .tasks import process_payment, send_notification, update_inventorydef process_order(order):# 任务链:按顺序执行任务task_chain = chain(process_payment.s(order.id),update_inventory.s(order.id),send_notification.s(order.user.id))return task_chain()def process_bulk_orders(orders):# 任务组:并行执行多个任务task_group = group(process_payment.s(order.id)for order in orders)return task_group()def process_orders_with_summary(orders):# 和弦:并行执行任务后执行回调def on_complete(results):successful = sum(1 for r in results if r == 'success')failed = len(results) - successfulreturn f"处理完成:{successful}成功,{failed}失败"task_chord = chord((process_payment.s(order.id) for order in orders),on_complete.s())return task_chord()
3. 自定义任务类
from celery import Task
from django.core.cache import cacheclass BaseTaskWithRetry(Task):abstract = Truemax_retries = 3default_retry_delay = 60 # 60秒后重试def on_failure(self, exc, task_id, args, kwargs, einfo):"""任务失败时的处理"""print(f'Task {task_id} failed: {str(exc)}')super().on_failure(exc, task_id, args, kwargs, einfo)def on_retry(self, exc, task_id, args, kwargs, einfo):"""任务重试时的处理"""print(f'Task {task_id} retrying: {str(exc)}')super().on_retry(exc, task_id, args, kwargs, einfo)@shared_task(base=BaseTaskWithRetry)
def process_data(data_id):try:# 处理数据的逻辑result = process_complex_data(data_id)return resultexcept Exception as exc:raise self.retry(exc=exc)
四、周期任务实现
1. 基本周期任务
# tasks.py
from celery.schedules import crontab
from celery.task import periodic_task@periodic_task(run_every=timedelta(hours=24))
def daily_cleanup():"""每日清理任务"""cleanup_expired_tokens()cleanup_old_logs()return "Daily cleanup completed"@periodic_task(run_every=crontab(hour=0, minute=0),name="generate_daily_report"
)
def generate_daily_report():"""生成每日报告"""report = Report.objects.create(date=timezone.now(),type='daily')report.generate()return f"Report generated: {report.id}"
2. 动态周期任务
# models.py
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from django.db import modelsclass ScheduledReport(models.Model):name = models.CharField(max_length=100)interval = models.IntegerField(help_text='间隔(分钟)')enabled = models.BooleanField(default=True)def save(self, *args, **kwargs):super().save(*args, **kwargs)self.update_periodic_task()def update_periodic_task(self):schedule, _ = IntervalSchedule.objects.get_or_create(every=self.interval,period=IntervalSchedule.MINUTES,)PeriodicTask.objects.update_or_create(name=f'generate_report_{self.id}',defaults={'task': 'myapp.tasks.generate_report','interval': schedule,'args': [self.id],'enabled': self.enabled})
3. 任务调度器
from django_celery_beat.models import CrontabSchedule, PeriodicTask
import jsondef schedule_report_task(name, hour, minute, day_of_week):"""创建定时报告任务"""schedule, _ = CrontabSchedule.objects.get_or_create(hour=hour,minute=minute,day_of_week=day_of_week,)task = PeriodicTask.objects.create(crontab=schedule,name=f'generate_report_{name}',task='myapp.tasks.generate_report',args=json.dumps([name]),)return task# 使用示例
schedule_report_task('weekly_summary', hour=0, minute=0, day_of_week='1') # 每周一
五、监控和管理
1. Flower配置
# requirements.txt
flower==2.0.1# settings.py
CELERY_FLOWER_USER = 'admin'
CELERY_FLOWER_PASSWORD = 'password'# 启动Flower
# celery -A myproject flower --port=5555
2. 任务监控中间件
# middleware.py
from celery.signals import task_prerun, task_postrun
import time
import logginglogger = logging.getLogger('celery.tasks')@task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):"""任务执行前的处理"""task.start_time = time.time()@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **kw):"""任务执行后的处理"""if hasattr(task, 'start_time'):duration = time.time() - task.start_timelogger.info(f'Task {task.name}[{task_id}] 'f'completed in {duration:.2f}s with state {state}')
六、Celery工作流程图
七、实际应用示例
1. 图片处理任务
# tasks.py
from PIL import Image
import os
from celery import shared_task@shared_task
def process_uploaded_image(image_path, sizes=[(800, 600), (400, 300)]):"""处理上传的图片"""try:img = Image.open(image_path)filename = os.path.basename(image_path)name, ext = os.path.splitext(filename)results = []for size in sizes:resized = img.copy()resized.thumbnail(size)new_filename = f"{name}_{size[0]}x{size[1]}{ext}"new_path = os.path.join(os.path.dirname(image_path), new_filename)resized.save(new_path)results.append(new_path)return resultsexcept Exception as e:return str(e)
2. 站点监控任务
# tasks.py
import requests
from celery import shared_task
from django.core.mail import mail_admins
from .models import SiteMonitor@shared_task(bind=True, max_retries=3)
def monitor_website(self, url):"""监控网站可用性"""try:response = requests.get(url, timeout=10)status = response.status_code == 200response_time = response.elapsed.total_seconds()SiteMonitor.objects.create(url=url,status=status,response_time=response_time)if not status:mail_admins(f'网站{url}不可用',f'状态码:{response.status_code}')return {'url': url,'status': status,'response_time': response_time}except Exception as exc:self.retry(exc=exc, countdown=60)
八、最佳实践建议
-
任务设计原则:
- 保持任务原子性
- 实现幂等性
- 合理设置超时时间
- 添加适当的重试机制
-
性能优化:
- 使用合适的序列化方式
- 控制任务粒度
- 合理设置并发数
- 监控任务执行情况
-
错误处理:
- 完善的异常捕获
- 详细的日志记录
- 合适的重试策略
- 失败通知机制
这就是关于Django Celery的详细内容,包括异步任务队列和周期任务的实现。通过实践这些内容,你将能够在Django项目中熟练使用Celery处理异步任务。如果有任何问题,欢迎随时提出!
怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!
相关文章:

每天40分玩转Django:Django Celery
Django Celery 一、知识要点概览表 模块知识点掌握程度要求Celery基础配置、任务定义、任务执行深入理解异步任务任务状态、结果存储、错误处理熟练应用周期任务定时任务、Crontab、任务调度熟练应用监控管理Flower、任务监控、性能优化理解应用 二、基础配置实现 1. 安装和…...

df.groupby(pd.Grouper(level=1)).sum()
df.groupby(pd.Grouper(level1)).sum() 在 Python 中的作用是根据 DataFrame 的某一索引级别进行分组,并计算每个分组的总和。具体来说: df.groupby(...):这是 pandas 的分组操作,按照指定的规则将 DataFrame 分组。 pd.Grouper(…...
运动控制探针功能详细介绍(CODESYS+SV63N伺服)
汇川AM400PLC和禾川X3E伺服EtherCAT通信 汇川AM400PLC和禾川X3E伺服EtherCAT通信_汇川ethercat通信-CSDN博客文章浏览阅读1.2k次。本文详细介绍了如何使用汇川AM400PLC通过EtherCAT总线与禾川X3E伺服进行通信。包括XML硬件描述文件的下载与安装,EtherCAT总线的启用,从站添加…...

C语言基础18(GDB调试)
文章目录 GDBGDB概述什么是GDB**GDB**的主要功能 GDB的启动GDB常见的启动方式 GDB的退出GDB的常用命令GDB查看源代码指令———list(1)**GDB** 查看设置**------info****GDB** 查看内存**GDB** 设置断点**---break (b)****GDB** 设置观察点**---watch****GDB** 程序调试 GDB完整…...

《向量数据库指南》——应对ElasticSearch挑战,拥抱Mlivus Cloud的新时代
在当今数据驱动的商业环境中,向量数据库的应用正变得愈加重要。随着人工智能和机器学习的快速发展,尤其是在自然语言处理、图像识别及推荐系统等领域,向量数据库以其强大的存储和检索能力,迎来了广泛的应用机会。然而,在实际应用中,企业在选择和实施向量数据库方案时,常…...

c++的stl库中stack的解析和模拟实现
目录 1.stack的介绍和使用 1.1stack的介绍 1.2stack的使用 2.stack的模拟实现 1.stack的介绍和使用 1.1stack的介绍 1. stack 是一种容器适配器,专门用在具有后进先出操作的上下文环境中,其删除只能从容器的一端进行元素的插入与提取操作。 2. stac…...

C语言——字符函数和内存函数
目录 前言 字符函数 1strlen 模拟实现 2strcpy 模拟实现 3strcat 模拟实现 4strcmp 模拟实现 5strncpy 模拟实现 6strncat 模拟实现 7strncmp 模拟实现 8strstr 模拟实现 9strtok 10strerror 11大小写字符转换函数 内存函数 1memcpy 模拟实现 2…...

查询docker overlay2文件夹下的 c7ffc13c49xxx是哪一个容器使用的
问题背景 查询docker overlay2文件夹下的 c7ffc13c49xxx是哪一个容器使用的 [root@lnops overlay2]# du -sh * | grep G 1.7G 30046eca3e838e43d16d9febc63cc8f8bb3d327b4c9839ca791b3ddfa845e12e 435G c7ffc13c49a43f08ef9e234c6ef9fc5a3692deda3c5d42149d0070e9d8124f71 1.…...

Golang的容器编排实践
Golang的容器编排实践 一、Golang中的容器编排概述 作为一种高效的编程语言,其在容器编排领域也有着广泛的运用。容器编排是指利用自动化工具对容器化的应用进行部署、管理和扩展的过程,典型的容器编排工具包括Docker Swarm、Kubernetes等。在Golang中&a…...

【51项目】51单片机自制小霸王游戏机
视频演示效果: 纳新作品——小霸王游戏机 目录: 目录 视频演示效果: 目录: 前言:...

ArkTs之NAPI学习
1.Node-api组成架构 为了应对日常开发经的网络通信、串口访问、多媒体解码、传感器数据收集等模块,这些模块大多数是使用c接口实现的,arkts侧如果想使用这些能力,就需要使用node-api这样一套接口去桥接c代码。Node-api整体的架构图如下&…...

【数据库初阶】MySQL中表的约束(上)
🎉博主首页: 有趣的中国人 🎉专栏首页: 数据库初阶 🎉其它专栏: C初阶 | C进阶 | 初阶数据结构 亲爱的小伙伴们,大家好!在这篇文章中,我们将深入浅出地为大家讲解 MySQL…...

173. 矩阵距离 acwing -多路BFS
原题链接:173. 矩阵距离 - AcWing题库 给定一个 N行 M 列的 01矩阵 A,A[i][j] 与 A[k][l]]之间的曼哈顿距离定义为: dist(i,j,k,l)|i−k||j−l|| 输出一个 N 行 M 列的整数矩阵 B,其中: B[i][j]min1≤x≤N,1≤y≤M,A…...

Linux下部署Redis集群 - 一主二从三哨兵模式
三台服务器redis一主二从三哨兵模式搭建 最近使用到了redis集群部署,使用一主二从三哨兵集群部署redis,将自己部署的过程中的使用心得分享给大家,希望大家以后部署的过程减少一些坑。 服务器准备 3台服务器 ,确定主redis和从red…...

实战设计模式之建造者模式
概述 在实际项目中,我们有时会遇到需要创建复杂对象的情况。这些对象可能包含多个组件或属性,而且每个组件都有自己的配置选项。如果直接使用构造函数或前面介绍的工厂方法来创建这样的对象,可能会导致以下两个严重问题。 1、参数过多。当一个…...

活动预告 | Microsoft Azure 在线技术公开课:使用 Azure OpenAI 服务构建生成式应用
课程介绍 通过 Microsoft Learn 免费参加 Microsoft Azure 在线技术公开课,掌握创造新机遇所需的技能,加快对 Microsoft Cloud 技术的了解。参加我们举办的“使用 Azure OpenAI 服务构建生成式应用”活动,了解如何使用包括 GPT 在内的强大的…...

ubuntu安装firefox
firefox下载地址:https://ftp.mozilla.org/pub/firefox/releases/ 卸载 sudo apt-get update dpkg --get-selections |grep firefox apt-get purge firefox 解压 tar -xjf firefox*.tar.bz2复制文件 sudo mv firefox/ /opt/firefox30sudo mv /usr/bin/firefox /…...

计算机网络原理(谢希仁第八版)第4章课后习题答案
第四章 网络层 详细计算机网络(谢希仁-第八版)第四章习题全解_计算机网络第八版谢希仁课后答案-CSDN博客 1.网络层向上提供的服务有哪两种?是比较其优缺点。网络层向运输层提供 “面向连接”虚电路(Virtual Circuit)服…...

RabbitMQ-基本使用
RabbitMQ: One broker to queue them all | RabbitMQ 官方 安装到Docker中 docker run \-e RABBITMQ_DEFAULT_USERrabbit \-e RABBITMQ_DEFAULT_PASSrabbit \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network mynet\-d \rabbitmq:3…...

从零开始学架构——互联网架构的演进
1 技术演进 1.1 技术演进的动力 对于新技术,我们应该站在行业的角度上思考,哪些技术我们要采取,哪些技术我们不能用,投入成本过大会不会导致满盘皆输?市场、技术、管理三者组成的业务发展铁三角,任何一个…...

python +tkinter绘制彩虹和云朵
python tkinter绘制彩虹和云朵 彩虹,简称虹,是气象中的一种光学现象,当太阳光照射到半空中的水滴,光线被折射及反射,在天空上形成拱形的七彩光谱,由外圈至内圈呈红、橙、黄、绿、蓝、靛、紫七种颜色。事实…...

重新整理机器学习和神经网络框架
本篇重新梳理了人工智能(AI)、机器学习(ML)、神经网络(NN)和深度学习(DL)之间存在一定的包含关系,以下是它们的关系及各自内容,以及人工智能领域中深度学习分支对比整理。…...

TypyScript从入门到精通
TypyScript从入门到精通 TypyScript 是什么?增加了什么环境搭建二、为何需要 TypeScript三、编译 TypeScript四、类型声明五、类型推断基本类型六、类型总览JavaScript 中的数据类型TypeScript 中的数据类型1. 上述所有 JavaScript 类型2. 六个新类型:3.…...

【MATLAB】绘制投资组合的有效前沿
文章目录 一、数据准备二、有效前沿三、代码3.1 数据批量读取、预处理3.2 绘制可行集3.3 绘制有效前沿3.4 其它-最大夏普率 一、数据准备 准备多个股票的的历史数据,目的就是找到最优的投资组合。 下载几个标普500里面的公式的股票数据吧,下载方法也可…...

matlab时频分析库
time frequency gallery...

GBase 8s 数据库备份还原
每一天都是一个新的篇章,等待着你去书写属于自己的故事!!! 一:备份 1.1.下载脚本文件,并上传到数据库服务器上相应目录。 解压后目录为: 说明: dbcomm.sh:导出注释脚本…...

C++模板相关概念汇总
文章目录 一、模板的概念与作用二、函数模板模板的非类型参数调用顺序 三、类模板四、模板的编译模型 一、模板的概念与作用 C模板是一种强大的代码复用机制,它允许程序员编写通用的代码,能够处理不同类型的数据,而无需为每种类型都重复编写…...

MYSQL------sql基础
SQL基础与简介 定义:SQL即结构化查询语言(Structured Query Language),是一种特殊目的的编程语言,用于存取数据以及查询、更新和管理关系数据库系统。作用:可以用于数据库的创建、数据的插入、查询、更新和…...

React Router 用法概览
React Router 用法 React 使得开发者能够轻松地创建交互式的单页应用(SPA),单页应用的一个常见挑战是如何处理页面导航和路由吗,React Router 就是解决这个问题的工具 路由(Router)是 React Router 的核心…...

网络安全之高防IP的实时监控精准防护
高防IP是一种网络安全设备,用于保护网络服务不受到各类攻击的影响,确保业务的持续稳定运行。它通过监控、识别和封锁恶意攻击流量,提供高级别的防护,降低业务被攻击的风险,并提升网络的稳定性和可靠性。 一、实时监控的…...