当前位置: 首页 > news >正文

每天40分玩转Django:Django Celery

Django Celery

一、知识要点概览表

模块知识点掌握程度要求
Celery基础配置、任务定义、任务执行深入理解
异步任务任务状态、结果存储、错误处理熟练应用
周期任务定时任务、Crontab、任务调度熟练应用
监控管理Flower、任务监控、性能优化理解应用

二、基础配置实现

1. 安装和配置

# requirements.txt
celery==5.3.1
django-celery-results==2.5.1
django-celery-beat==2.5.0
redis==4.6.0# settings.py
INSTALLED_APPS = [...'django_celery_results','django_celery_beat',
]# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'# celery.py
import os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()@app.task(bind=True)
def debug_task(self):print(f'Request: {self.request!r}')

三、异步任务实现

1. 基本任务定义

# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import User@shared_task
def send_welcome_email(user_id):"""发送欢迎邮件"""try:user = User.objects.get(id=user_id)send_mail('欢迎加入我们的平台',f'你好 {user.username},欢迎使用我们的服务!','noreply@example.com',[user.email],fail_silently=False,)return Trueexcept Exception as e:return str(e)@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):"""处理支付任务"""from .models import Ordertry:order = Order.objects.get(id=order_id)result = process_payment_gateway(order)if result.success:order.status = 'paid'order.save()return 'Payment processed successfully'else:raise ValueError('Payment failed')except Exception as exc:self.retry(exc=exc, countdown=60*5)  # 5分钟后重试

2. 任务链和组

from celery import chain, group, chord
from .tasks import process_payment, send_notification, update_inventorydef process_order(order):# 任务链:按顺序执行任务task_chain = chain(process_payment.s(order.id),update_inventory.s(order.id),send_notification.s(order.user.id))return task_chain()def process_bulk_orders(orders):# 任务组:并行执行多个任务task_group = group(process_payment.s(order.id)for order in orders)return task_group()def process_orders_with_summary(orders):# 和弦:并行执行任务后执行回调def on_complete(results):successful = sum(1 for r in results if r == 'success')failed = len(results) - successfulreturn f"处理完成:{successful}成功,{failed}失败"task_chord = chord((process_payment.s(order.id) for order in orders),on_complete.s())return task_chord()

3. 自定义任务类

from celery import Task
from django.core.cache import cacheclass BaseTaskWithRetry(Task):abstract = Truemax_retries = 3default_retry_delay = 60  # 60秒后重试def on_failure(self, exc, task_id, args, kwargs, einfo):"""任务失败时的处理"""print(f'Task {task_id} failed: {str(exc)}')super().on_failure(exc, task_id, args, kwargs, einfo)def on_retry(self, exc, task_id, args, kwargs, einfo):"""任务重试时的处理"""print(f'Task {task_id} retrying: {str(exc)}')super().on_retry(exc, task_id, args, kwargs, einfo)@shared_task(base=BaseTaskWithRetry)
def process_data(data_id):try:# 处理数据的逻辑result = process_complex_data(data_id)return resultexcept Exception as exc:raise self.retry(exc=exc)

四、周期任务实现

1. 基本周期任务

# tasks.py
from celery.schedules import crontab
from celery.task import periodic_task@periodic_task(run_every=timedelta(hours=24))
def daily_cleanup():"""每日清理任务"""cleanup_expired_tokens()cleanup_old_logs()return "Daily cleanup completed"@periodic_task(run_every=crontab(hour=0, minute=0),name="generate_daily_report"
)
def generate_daily_report():"""生成每日报告"""report = Report.objects.create(date=timezone.now(),type='daily')report.generate()return f"Report generated: {report.id}"

2. 动态周期任务

# models.py
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from django.db import modelsclass ScheduledReport(models.Model):name = models.CharField(max_length=100)interval = models.IntegerField(help_text='间隔(分钟)')enabled = models.BooleanField(default=True)def save(self, *args, **kwargs):super().save(*args, **kwargs)self.update_periodic_task()def update_periodic_task(self):schedule, _ = IntervalSchedule.objects.get_or_create(every=self.interval,period=IntervalSchedule.MINUTES,)PeriodicTask.objects.update_or_create(name=f'generate_report_{self.id}',defaults={'task': 'myapp.tasks.generate_report','interval': schedule,'args': [self.id],'enabled': self.enabled})

3. 任务调度器

from django_celery_beat.models import CrontabSchedule, PeriodicTask
import jsondef schedule_report_task(name, hour, minute, day_of_week):"""创建定时报告任务"""schedule, _ = CrontabSchedule.objects.get_or_create(hour=hour,minute=minute,day_of_week=day_of_week,)task = PeriodicTask.objects.create(crontab=schedule,name=f'generate_report_{name}',task='myapp.tasks.generate_report',args=json.dumps([name]),)return task# 使用示例
schedule_report_task('weekly_summary', hour=0, minute=0, day_of_week='1')  # 每周一

五、监控和管理

1. Flower配置

# requirements.txt
flower==2.0.1# settings.py
CELERY_FLOWER_USER = 'admin'
CELERY_FLOWER_PASSWORD = 'password'# 启动Flower
# celery -A myproject flower --port=5555

2. 任务监控中间件

# middleware.py
from celery.signals import task_prerun, task_postrun
import time
import logginglogger = logging.getLogger('celery.tasks')@task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):"""任务执行前的处理"""task.start_time = time.time()@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **kw):"""任务执行后的处理"""if hasattr(task, 'start_time'):duration = time.time() - task.start_timelogger.info(f'Task {task.name}[{task_id}] 'f'completed in {duration:.2f}s with state {state}')

六、Celery工作流程图

在这里插入图片描述

七、实际应用示例

1. 图片处理任务

# tasks.py
from PIL import Image
import os
from celery import shared_task@shared_task
def process_uploaded_image(image_path, sizes=[(800, 600), (400, 300)]):"""处理上传的图片"""try:img = Image.open(image_path)filename = os.path.basename(image_path)name, ext = os.path.splitext(filename)results = []for size in sizes:resized = img.copy()resized.thumbnail(size)new_filename = f"{name}_{size[0]}x{size[1]}{ext}"new_path = os.path.join(os.path.dirname(image_path), new_filename)resized.save(new_path)results.append(new_path)return resultsexcept Exception as e:return str(e)

2. 站点监控任务

# tasks.py
import requests
from celery import shared_task
from django.core.mail import mail_admins
from .models import SiteMonitor@shared_task(bind=True, max_retries=3)
def monitor_website(self, url):"""监控网站可用性"""try:response = requests.get(url, timeout=10)status = response.status_code == 200response_time = response.elapsed.total_seconds()SiteMonitor.objects.create(url=url,status=status,response_time=response_time)if not status:mail_admins(f'网站{url}不可用',f'状态码:{response.status_code}')return {'url': url,'status': status,'response_time': response_time}except Exception as exc:self.retry(exc=exc, countdown=60)

八、最佳实践建议

  1. 任务设计原则:

    • 保持任务原子性
    • 实现幂等性
    • 合理设置超时时间
    • 添加适当的重试机制
  2. 性能优化:

    • 使用合适的序列化方式
    • 控制任务粒度
    • 合理设置并发数
    • 监控任务执行情况
  3. 错误处理:

    • 完善的异常捕获
    • 详细的日志记录
    • 合适的重试策略
    • 失败通知机制

这就是关于Django Celery的详细内容,包括异步任务队列和周期任务的实现。通过实践这些内容,你将能够在Django项目中熟练使用Celery处理异步任务。如果有任何问题,欢迎随时提出!


怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

相关文章:

每天40分玩转Django:Django Celery

Django Celery 一、知识要点概览表 模块知识点掌握程度要求Celery基础配置、任务定义、任务执行深入理解异步任务任务状态、结果存储、错误处理熟练应用周期任务定时任务、Crontab、任务调度熟练应用监控管理Flower、任务监控、性能优化理解应用 二、基础配置实现 1. 安装和…...

df.groupby(pd.Grouper(level=1)).sum()

df.groupby(pd.Grouper(level1)).sum() 在 Python 中的作用是根据 DataFrame 的某一索引级别进行分组,并计算每个分组的总和。具体来说: df.groupby(...):这是 pandas 的分组操作,按照指定的规则将 DataFrame 分组。 pd.Grouper(…...

运动控制探针功能详细介绍(CODESYS+SV63N伺服)

汇川AM400PLC和禾川X3E伺服EtherCAT通信 汇川AM400PLC和禾川X3E伺服EtherCAT通信_汇川ethercat通信-CSDN博客文章浏览阅读1.2k次。本文详细介绍了如何使用汇川AM400PLC通过EtherCAT总线与禾川X3E伺服进行通信。包括XML硬件描述文件的下载与安装,EtherCAT总线的启用,从站添加…...

C语言基础18(GDB调试)

文章目录 GDBGDB概述什么是GDB**GDB**的主要功能 GDB的启动GDB常见的启动方式 GDB的退出GDB的常用命令GDB查看源代码指令———list(1)**GDB** 查看设置**------info****GDB** 查看内存**GDB** 设置断点**---break (b)****GDB** 设置观察点**---watch****GDB** 程序调试 GDB完整…...

《向量数据库指南》——应对ElasticSearch挑战,拥抱Mlivus Cloud的新时代

在当今数据驱动的商业环境中,向量数据库的应用正变得愈加重要。随着人工智能和机器学习的快速发展,尤其是在自然语言处理、图像识别及推荐系统等领域,向量数据库以其强大的存储和检索能力,迎来了广泛的应用机会。然而,在实际应用中,企业在选择和实施向量数据库方案时,常…...

c++的stl库中stack的解析和模拟实现

目录 1.stack的介绍和使用 1.1stack的介绍 1.2stack的使用 2.stack的模拟实现 1.stack的介绍和使用 1.1stack的介绍 1. stack 是一种容器适配器,专门用在具有后进先出操作的上下文环境中,其删除只能从容器的一端进行元素的插入与提取操作。 2. stac…...

C语言——字符函数和内存函数

目录 前言 字符函数 1strlen 模拟实现 2strcpy 模拟实现 3strcat 模拟实现 4strcmp 模拟实现 5strncpy 模拟实现 6strncat 模拟实现 7strncmp 模拟实现 8strstr 模拟实现 9strtok 10strerror 11大小写字符转换函数 内存函数 1memcpy 模拟实现 2…...

查询docker overlay2文件夹下的 c7ffc13c49xxx是哪一个容器使用的

问题背景 查询docker overlay2文件夹下的 c7ffc13c49xxx是哪一个容器使用的 [root@lnops overlay2]# du -sh * | grep G 1.7G 30046eca3e838e43d16d9febc63cc8f8bb3d327b4c9839ca791b3ddfa845e12e 435G c7ffc13c49a43f08ef9e234c6ef9fc5a3692deda3c5d42149d0070e9d8124f71 1.…...

Golang的容器编排实践

Golang的容器编排实践 一、Golang中的容器编排概述 作为一种高效的编程语言,其在容器编排领域也有着广泛的运用。容器编排是指利用自动化工具对容器化的应用进行部署、管理和扩展的过程,典型的容器编排工具包括Docker Swarm、Kubernetes等。在Golang中&a…...

【51项目】51单片机自制小霸王游戏机

视频演示效果: 纳新作品——小霸王游戏机 目录: 目录 视频演示效果: 目录: 前言:...

ArkTs之NAPI学习

1.Node-api组成架构 为了应对日常开发经的网络通信、串口访问、多媒体解码、传感器数据收集等模块,这些模块大多数是使用c接口实现的,arkts侧如果想使用这些能力,就需要使用node-api这样一套接口去桥接c代码。Node-api整体的架构图如下&…...

【数据库初阶】MySQL中表的约束(上)

🎉博主首页: 有趣的中国人 🎉专栏首页: 数据库初阶 🎉其它专栏: C初阶 | C进阶 | 初阶数据结构 亲爱的小伙伴们,大家好!在这篇文章中,我们将深入浅出地为大家讲解 MySQL…...

173. 矩阵距离 acwing -多路BFS

原题链接:173. 矩阵距离 - AcWing题库 给定一个 N行 M 列的 01矩阵 A,A[i][j] 与 A[k][l]]之间的曼哈顿距离定义为: dist(i,j,k,l)|i−k||j−l|| 输出一个 N 行 M 列的整数矩阵 B,其中: B[i][j]min1≤x≤N,1≤y≤M,A…...

Linux下部署Redis集群 - 一主二从三哨兵模式

三台服务器redis一主二从三哨兵模式搭建 最近使用到了redis集群部署,使用一主二从三哨兵集群部署redis,将自己部署的过程中的使用心得分享给大家,希望大家以后部署的过程减少一些坑。 服务器准备 3台服务器 ,确定主redis和从red…...

实战设计模式之建造者模式

概述 在实际项目中,我们有时会遇到需要创建复杂对象的情况。这些对象可能包含多个组件或属性,而且每个组件都有自己的配置选项。如果直接使用构造函数或前面介绍的工厂方法来创建这样的对象,可能会导致以下两个严重问题。 1、参数过多。当一个…...

活动预告 | Microsoft Azure 在线技术公开课:使用 Azure OpenAI 服务构建生成式应用

课程介绍 通过 Microsoft Learn 免费参加 Microsoft Azure 在线技术公开课,掌握创造新机遇所需的技能,加快对 Microsoft Cloud 技术的了解。参加我们举办的“使用 Azure OpenAI 服务构建生成式应用”活动,了解如何使用包括 GPT 在内的强大的…...

ubuntu安装firefox

firefox下载地址:https://ftp.mozilla.org/pub/firefox/releases/ 卸载 sudo apt-get update dpkg --get-selections |grep firefox apt-get purge firefox 解压 tar -xjf firefox*.tar.bz2复制文件 sudo mv firefox/ /opt/firefox30sudo mv /usr/bin/firefox /…...

计算机网络原理(谢希仁第八版)第4章课后习题答案

第四章 网络层 详细计算机网络(谢希仁-第八版)第四章习题全解_计算机网络第八版谢希仁课后答案-CSDN博客 1.网络层向上提供的服务有哪两种?是比较其优缺点。网络层向运输层提供 “面向连接”虚电路(Virtual Circuit)服…...

RabbitMQ-基本使用

RabbitMQ: One broker to queue them all | RabbitMQ 官方 安装到Docker中 docker run \-e RABBITMQ_DEFAULT_USERrabbit \-e RABBITMQ_DEFAULT_PASSrabbit \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network mynet\-d \rabbitmq:3…...

从零开始学架构——互联网架构的演进

1 技术演进 1.1 技术演进的动力 对于新技术,我们应该站在行业的角度上思考,哪些技术我们要采取,哪些技术我们不能用,投入成本过大会不会导致满盘皆输?市场、技术、管理三者组成的业务发展铁三角,任何一个…...

CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型

CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...

脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)

一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...

【JVM】- 内存结构

引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

C++中string流知识详解和示例

一、概览与类体系 C 提供三种基于内存字符串的流&#xff0c;定义在 <sstream> 中&#xff1a; std::istringstream&#xff1a;输入流&#xff0c;从已有字符串中读取并解析。std::ostringstream&#xff1a;输出流&#xff0c;向内部缓冲区写入内容&#xff0c;最终取…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

【Oracle】分区表

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

蓝桥杯 冶炼金属

原题目链接 &#x1f527; 冶炼金属转换率推测题解 &#x1f4dc; 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V&#xff0c;是一个正整数&#xff0c;表示每 V V V 个普通金属 O O O 可以冶炼出 …...