每天40分玩转Django:Django Celery
Django Celery
一、知识要点概览表
| 模块 | 知识点 | 掌握程度要求 |
|---|---|---|
| Celery基础 | 配置、任务定义、任务执行 | 深入理解 |
| 异步任务 | 任务状态、结果存储、错误处理 | 熟练应用 |
| 周期任务 | 定时任务、Crontab、任务调度 | 熟练应用 |
| 监控管理 | Flower、任务监控、性能优化 | 理解应用 |
二、基础配置实现
1. 安装和配置
# requirements.txt
celery==5.3.1
django-celery-results==2.5.1
django-celery-beat==2.5.0
redis==4.6.0# settings.py
INSTALLED_APPS = [...'django_celery_results','django_celery_beat',
]# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'# celery.py
import os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()@app.task(bind=True)
def debug_task(self):print(f'Request: {self.request!r}')
三、异步任务实现
1. 基本任务定义
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import User@shared_task
def send_welcome_email(user_id):"""发送欢迎邮件"""try:user = User.objects.get(id=user_id)send_mail('欢迎加入我们的平台',f'你好 {user.username},欢迎使用我们的服务!','noreply@example.com',[user.email],fail_silently=False,)return Trueexcept Exception as e:return str(e)@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):"""处理支付任务"""from .models import Ordertry:order = Order.objects.get(id=order_id)result = process_payment_gateway(order)if result.success:order.status = 'paid'order.save()return 'Payment processed successfully'else:raise ValueError('Payment failed')except Exception as exc:self.retry(exc=exc, countdown=60*5) # 5分钟后重试
2. 任务链和组
from celery import chain, group, chord
from .tasks import process_payment, send_notification, update_inventorydef process_order(order):# 任务链:按顺序执行任务task_chain = chain(process_payment.s(order.id),update_inventory.s(order.id),send_notification.s(order.user.id))return task_chain()def process_bulk_orders(orders):# 任务组:并行执行多个任务task_group = group(process_payment.s(order.id)for order in orders)return task_group()def process_orders_with_summary(orders):# 和弦:并行执行任务后执行回调def on_complete(results):successful = sum(1 for r in results if r == 'success')failed = len(results) - successfulreturn f"处理完成:{successful}成功,{failed}失败"task_chord = chord((process_payment.s(order.id) for order in orders),on_complete.s())return task_chord()
3. 自定义任务类
from celery import Task
from django.core.cache import cacheclass BaseTaskWithRetry(Task):abstract = Truemax_retries = 3default_retry_delay = 60 # 60秒后重试def on_failure(self, exc, task_id, args, kwargs, einfo):"""任务失败时的处理"""print(f'Task {task_id} failed: {str(exc)}')super().on_failure(exc, task_id, args, kwargs, einfo)def on_retry(self, exc, task_id, args, kwargs, einfo):"""任务重试时的处理"""print(f'Task {task_id} retrying: {str(exc)}')super().on_retry(exc, task_id, args, kwargs, einfo)@shared_task(base=BaseTaskWithRetry)
def process_data(data_id):try:# 处理数据的逻辑result = process_complex_data(data_id)return resultexcept Exception as exc:raise self.retry(exc=exc)
四、周期任务实现
1. 基本周期任务
# tasks.py
from celery.schedules import crontab
from celery.task import periodic_task@periodic_task(run_every=timedelta(hours=24))
def daily_cleanup():"""每日清理任务"""cleanup_expired_tokens()cleanup_old_logs()return "Daily cleanup completed"@periodic_task(run_every=crontab(hour=0, minute=0),name="generate_daily_report"
)
def generate_daily_report():"""生成每日报告"""report = Report.objects.create(date=timezone.now(),type='daily')report.generate()return f"Report generated: {report.id}"
2. 动态周期任务
# models.py
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from django.db import modelsclass ScheduledReport(models.Model):name = models.CharField(max_length=100)interval = models.IntegerField(help_text='间隔(分钟)')enabled = models.BooleanField(default=True)def save(self, *args, **kwargs):super().save(*args, **kwargs)self.update_periodic_task()def update_periodic_task(self):schedule, _ = IntervalSchedule.objects.get_or_create(every=self.interval,period=IntervalSchedule.MINUTES,)PeriodicTask.objects.update_or_create(name=f'generate_report_{self.id}',defaults={'task': 'myapp.tasks.generate_report','interval': schedule,'args': [self.id],'enabled': self.enabled})
3. 任务调度器
from django_celery_beat.models import CrontabSchedule, PeriodicTask
import jsondef schedule_report_task(name, hour, minute, day_of_week):"""创建定时报告任务"""schedule, _ = CrontabSchedule.objects.get_or_create(hour=hour,minute=minute,day_of_week=day_of_week,)task = PeriodicTask.objects.create(crontab=schedule,name=f'generate_report_{name}',task='myapp.tasks.generate_report',args=json.dumps([name]),)return task# 使用示例
schedule_report_task('weekly_summary', hour=0, minute=0, day_of_week='1') # 每周一
五、监控和管理
1. Flower配置
# requirements.txt
flower==2.0.1# settings.py
CELERY_FLOWER_USER = 'admin'
CELERY_FLOWER_PASSWORD = 'password'# 启动Flower
# celery -A myproject flower --port=5555
2. 任务监控中间件
# middleware.py
from celery.signals import task_prerun, task_postrun
import time
import logginglogger = logging.getLogger('celery.tasks')@task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):"""任务执行前的处理"""task.start_time = time.time()@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **kw):"""任务执行后的处理"""if hasattr(task, 'start_time'):duration = time.time() - task.start_timelogger.info(f'Task {task.name}[{task_id}] 'f'completed in {duration:.2f}s with state {state}')
六、Celery工作流程图

七、实际应用示例
1. 图片处理任务
# tasks.py
from PIL import Image
import os
from celery import shared_task@shared_task
def process_uploaded_image(image_path, sizes=[(800, 600), (400, 300)]):"""处理上传的图片"""try:img = Image.open(image_path)filename = os.path.basename(image_path)name, ext = os.path.splitext(filename)results = []for size in sizes:resized = img.copy()resized.thumbnail(size)new_filename = f"{name}_{size[0]}x{size[1]}{ext}"new_path = os.path.join(os.path.dirname(image_path), new_filename)resized.save(new_path)results.append(new_path)return resultsexcept Exception as e:return str(e)
2. 站点监控任务
# tasks.py
import requests
from celery import shared_task
from django.core.mail import mail_admins
from .models import SiteMonitor@shared_task(bind=True, max_retries=3)
def monitor_website(self, url):"""监控网站可用性"""try:response = requests.get(url, timeout=10)status = response.status_code == 200response_time = response.elapsed.total_seconds()SiteMonitor.objects.create(url=url,status=status,response_time=response_time)if not status:mail_admins(f'网站{url}不可用',f'状态码:{response.status_code}')return {'url': url,'status': status,'response_time': response_time}except Exception as exc:self.retry(exc=exc, countdown=60)
八、最佳实践建议
-
任务设计原则:
- 保持任务原子性
- 实现幂等性
- 合理设置超时时间
- 添加适当的重试机制
-
性能优化:
- 使用合适的序列化方式
- 控制任务粒度
- 合理设置并发数
- 监控任务执行情况
-
错误处理:
- 完善的异常捕获
- 详细的日志记录
- 合适的重试策略
- 失败通知机制
这就是关于Django Celery的详细内容,包括异步任务队列和周期任务的实现。通过实践这些内容,你将能够在Django项目中熟练使用Celery处理异步任务。如果有任何问题,欢迎随时提出!
怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!
相关文章:
每天40分玩转Django:Django Celery
Django Celery 一、知识要点概览表 模块知识点掌握程度要求Celery基础配置、任务定义、任务执行深入理解异步任务任务状态、结果存储、错误处理熟练应用周期任务定时任务、Crontab、任务调度熟练应用监控管理Flower、任务监控、性能优化理解应用 二、基础配置实现 1. 安装和…...
df.groupby(pd.Grouper(level=1)).sum()
df.groupby(pd.Grouper(level1)).sum() 在 Python 中的作用是根据 DataFrame 的某一索引级别进行分组,并计算每个分组的总和。具体来说: df.groupby(...):这是 pandas 的分组操作,按照指定的规则将 DataFrame 分组。 pd.Grouper(…...
运动控制探针功能详细介绍(CODESYS+SV63N伺服)
汇川AM400PLC和禾川X3E伺服EtherCAT通信 汇川AM400PLC和禾川X3E伺服EtherCAT通信_汇川ethercat通信-CSDN博客文章浏览阅读1.2k次。本文详细介绍了如何使用汇川AM400PLC通过EtherCAT总线与禾川X3E伺服进行通信。包括XML硬件描述文件的下载与安装,EtherCAT总线的启用,从站添加…...
C语言基础18(GDB调试)
文章目录 GDBGDB概述什么是GDB**GDB**的主要功能 GDB的启动GDB常见的启动方式 GDB的退出GDB的常用命令GDB查看源代码指令———list(1)**GDB** 查看设置**------info****GDB** 查看内存**GDB** 设置断点**---break (b)****GDB** 设置观察点**---watch****GDB** 程序调试 GDB完整…...
《向量数据库指南》——应对ElasticSearch挑战,拥抱Mlivus Cloud的新时代
在当今数据驱动的商业环境中,向量数据库的应用正变得愈加重要。随着人工智能和机器学习的快速发展,尤其是在自然语言处理、图像识别及推荐系统等领域,向量数据库以其强大的存储和检索能力,迎来了广泛的应用机会。然而,在实际应用中,企业在选择和实施向量数据库方案时,常…...
c++的stl库中stack的解析和模拟实现
目录 1.stack的介绍和使用 1.1stack的介绍 1.2stack的使用 2.stack的模拟实现 1.stack的介绍和使用 1.1stack的介绍 1. stack 是一种容器适配器,专门用在具有后进先出操作的上下文环境中,其删除只能从容器的一端进行元素的插入与提取操作。 2. stac…...
C语言——字符函数和内存函数
目录 前言 字符函数 1strlen 模拟实现 2strcpy 模拟实现 3strcat 模拟实现 4strcmp 模拟实现 5strncpy 模拟实现 6strncat 模拟实现 7strncmp 模拟实现 8strstr 模拟实现 9strtok 10strerror 11大小写字符转换函数 内存函数 1memcpy 模拟实现 2…...
查询docker overlay2文件夹下的 c7ffc13c49xxx是哪一个容器使用的
问题背景 查询docker overlay2文件夹下的 c7ffc13c49xxx是哪一个容器使用的 [root@lnops overlay2]# du -sh * | grep G 1.7G 30046eca3e838e43d16d9febc63cc8f8bb3d327b4c9839ca791b3ddfa845e12e 435G c7ffc13c49a43f08ef9e234c6ef9fc5a3692deda3c5d42149d0070e9d8124f71 1.…...
Golang的容器编排实践
Golang的容器编排实践 一、Golang中的容器编排概述 作为一种高效的编程语言,其在容器编排领域也有着广泛的运用。容器编排是指利用自动化工具对容器化的应用进行部署、管理和扩展的过程,典型的容器编排工具包括Docker Swarm、Kubernetes等。在Golang中&a…...
【51项目】51单片机自制小霸王游戏机
视频演示效果: 纳新作品——小霸王游戏机 目录: 目录 视频演示效果: 目录: 前言:...
ArkTs之NAPI学习
1.Node-api组成架构 为了应对日常开发经的网络通信、串口访问、多媒体解码、传感器数据收集等模块,这些模块大多数是使用c接口实现的,arkts侧如果想使用这些能力,就需要使用node-api这样一套接口去桥接c代码。Node-api整体的架构图如下&…...
【数据库初阶】MySQL中表的约束(上)
🎉博主首页: 有趣的中国人 🎉专栏首页: 数据库初阶 🎉其它专栏: C初阶 | C进阶 | 初阶数据结构 亲爱的小伙伴们,大家好!在这篇文章中,我们将深入浅出地为大家讲解 MySQL…...
173. 矩阵距离 acwing -多路BFS
原题链接:173. 矩阵距离 - AcWing题库 给定一个 N行 M 列的 01矩阵 A,A[i][j] 与 A[k][l]]之间的曼哈顿距离定义为: dist(i,j,k,l)|i−k||j−l|| 输出一个 N 行 M 列的整数矩阵 B,其中: B[i][j]min1≤x≤N,1≤y≤M,A…...
Linux下部署Redis集群 - 一主二从三哨兵模式
三台服务器redis一主二从三哨兵模式搭建 最近使用到了redis集群部署,使用一主二从三哨兵集群部署redis,将自己部署的过程中的使用心得分享给大家,希望大家以后部署的过程减少一些坑。 服务器准备 3台服务器 ,确定主redis和从red…...
实战设计模式之建造者模式
概述 在实际项目中,我们有时会遇到需要创建复杂对象的情况。这些对象可能包含多个组件或属性,而且每个组件都有自己的配置选项。如果直接使用构造函数或前面介绍的工厂方法来创建这样的对象,可能会导致以下两个严重问题。 1、参数过多。当一个…...
活动预告 | Microsoft Azure 在线技术公开课:使用 Azure OpenAI 服务构建生成式应用
课程介绍 通过 Microsoft Learn 免费参加 Microsoft Azure 在线技术公开课,掌握创造新机遇所需的技能,加快对 Microsoft Cloud 技术的了解。参加我们举办的“使用 Azure OpenAI 服务构建生成式应用”活动,了解如何使用包括 GPT 在内的强大的…...
ubuntu安装firefox
firefox下载地址:https://ftp.mozilla.org/pub/firefox/releases/ 卸载 sudo apt-get update dpkg --get-selections |grep firefox apt-get purge firefox 解压 tar -xjf firefox*.tar.bz2复制文件 sudo mv firefox/ /opt/firefox30sudo mv /usr/bin/firefox /…...
计算机网络原理(谢希仁第八版)第4章课后习题答案
第四章 网络层 详细计算机网络(谢希仁-第八版)第四章习题全解_计算机网络第八版谢希仁课后答案-CSDN博客 1.网络层向上提供的服务有哪两种?是比较其优缺点。网络层向运输层提供 “面向连接”虚电路(Virtual Circuit)服…...
RabbitMQ-基本使用
RabbitMQ: One broker to queue them all | RabbitMQ 官方 安装到Docker中 docker run \-e RABBITMQ_DEFAULT_USERrabbit \-e RABBITMQ_DEFAULT_PASSrabbit \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network mynet\-d \rabbitmq:3…...
从零开始学架构——互联网架构的演进
1 技术演进 1.1 技术演进的动力 对于新技术,我们应该站在行业的角度上思考,哪些技术我们要采取,哪些技术我们不能用,投入成本过大会不会导致满盘皆输?市场、技术、管理三者组成的业务发展铁三角,任何一个…...
OLAP] DuckDB : 开源免费的、面向嵌入式场景、列式存储的分析型数据库
0 序 DuckDB 是近期非常火的一款 AP 数据库,其独特的定位很有趣。甚至有数据库产品考虑将其纳入进来,作为分析能力的扩展。 考虑到项目中一个数据处理场景,就此调研一二。 DuckDB 的爆火,也给所有盲目追逐“大数据”的技术人敲响…...
MCP Server避坑指南:用Java写一个能连数据库、读文件的AI工具集
MCP Server避坑指南:用Java构建企业级AI工具链 在数字化转型浪潮中,企业积累的海量数据正成为AI应用的"金矿"。但如何让大语言模型安全访问这些分布在数据库、文件系统的"数据孤岛"?MCP协议为这个问题提供了优雅的解决方…...
如何高效突破AI编辑器限制:自动化Pro功能激活的技术实践
如何高效突破AI编辑器限制:自动化Pro功能激活的技术实践 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reached your t…...
手把手教你玩转双闭环MMC逆变仿真
双闭环+最近电平逼近调制MMC模块化多电平换流器仿真(逆变侧)含技术文档 MMC Matlab-Simulink 直流侧11kV 交流侧6.6kV N22 采用最近电平逼近调制NLM 环流抑制(PIR比例积分准谐振控制),测量桥臂电感THD获得抑…...
pyenv多版本Python管理实战:从安装到日常开发常用命令大全
pyenv多版本Python管理实战:从安装到日常开发常用命令大全 作为Python开发者,你是否经常遇到这样的困扰:项目A需要Python 3.6,项目B需要Python 3.9,而本地环境只能安装一个版本?或者团队协作时,…...
intv_ai_mk11开源镜像:transformers加载+健康接口+supervisor运维全栈开源
intv_ai_mk11开源镜像:transformers加载健康接口supervisor运维全栈开源 1. 项目概述 intv_ai_mk11是一个基于Llama架构的中等规模文本生成模型的开源镜像解决方案。这个项目将模型部署、服务管理和健康监控等环节进行了全栈整合,让开发者能够快速搭建…...
LPDDR4X引脚功能详解:从CK到DQS,这些信号线你都用对了吗?
LPDDR4X引脚功能深度解析:信号完整性设计与实战避坑指南 在移动设备和高性能嵌入式系统中,LPDDR4X内存已成为主流选择。但许多硬件工程师在实际设计中常陷入"信号连通即可"的误区,导致系统稳定性问题频发。本文将带您深入理解每个…...
nuScenes多传感器融合:毫米波雷达点云与图像时空对齐实战
1. 多传感器融合的核心挑战 自动驾驶系统就像一位全天候工作的司机,需要同时处理来自不同"感官"的信息。毫米波雷达擅长测距和测速,摄像头则能识别颜色和纹理,但要让它们像人类感官一样协同工作,首先要解决时空对齐的问…...
IndexTTS 2.0优化指南:如何选择参考音频,获得最佳克隆效果
IndexTTS 2.0优化指南:如何选择参考音频,获得最佳克隆效果 1. 引言:为什么参考音频如此重要? 在语音合成领域,参考音频就像是一把钥匙,决定了最终生成声音的质量和相似度。IndexTTS 2.0作为一款零样本音色…...
5个宝藏级3D模型下载站:从GLB到Blender,一站式解决你的建模素材需求
1. 为什么你需要这些3D模型资源站? 作为一个在3D建模领域摸爬滚打多年的老手,我深知找素材的痛苦。记得刚入行时,为了找一个简单的沙发模型,我花了整整三天翻遍各种论坛和资源站。现在回头看,如果当时有人给我一份靠谱…...
