当前位置: 首页 > news >正文

每天40分玩转Django:Django Celery

Django Celery

一、知识要点概览表

模块知识点掌握程度要求
Celery基础配置、任务定义、任务执行深入理解
异步任务任务状态、结果存储、错误处理熟练应用
周期任务定时任务、Crontab、任务调度熟练应用
监控管理Flower、任务监控、性能优化理解应用

二、基础配置实现

1. 安装和配置

# requirements.txt
celery==5.3.1
django-celery-results==2.5.1
django-celery-beat==2.5.0
redis==4.6.0# settings.py
INSTALLED_APPS = [...'django_celery_results','django_celery_beat',
]# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'# celery.py
import os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()@app.task(bind=True)
def debug_task(self):print(f'Request: {self.request!r}')

三、异步任务实现

1. 基本任务定义

# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import User@shared_task
def send_welcome_email(user_id):"""发送欢迎邮件"""try:user = User.objects.get(id=user_id)send_mail('欢迎加入我们的平台',f'你好 {user.username},欢迎使用我们的服务!','noreply@example.com',[user.email],fail_silently=False,)return Trueexcept Exception as e:return str(e)@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id):"""处理支付任务"""from .models import Ordertry:order = Order.objects.get(id=order_id)result = process_payment_gateway(order)if result.success:order.status = 'paid'order.save()return 'Payment processed successfully'else:raise ValueError('Payment failed')except Exception as exc:self.retry(exc=exc, countdown=60*5)  # 5分钟后重试

2. 任务链和组

from celery import chain, group, chord
from .tasks import process_payment, send_notification, update_inventorydef process_order(order):# 任务链:按顺序执行任务task_chain = chain(process_payment.s(order.id),update_inventory.s(order.id),send_notification.s(order.user.id))return task_chain()def process_bulk_orders(orders):# 任务组:并行执行多个任务task_group = group(process_payment.s(order.id)for order in orders)return task_group()def process_orders_with_summary(orders):# 和弦:并行执行任务后执行回调def on_complete(results):successful = sum(1 for r in results if r == 'success')failed = len(results) - successfulreturn f"处理完成:{successful}成功,{failed}失败"task_chord = chord((process_payment.s(order.id) for order in orders),on_complete.s())return task_chord()

3. 自定义任务类

from celery import Task
from django.core.cache import cacheclass BaseTaskWithRetry(Task):abstract = Truemax_retries = 3default_retry_delay = 60  # 60秒后重试def on_failure(self, exc, task_id, args, kwargs, einfo):"""任务失败时的处理"""print(f'Task {task_id} failed: {str(exc)}')super().on_failure(exc, task_id, args, kwargs, einfo)def on_retry(self, exc, task_id, args, kwargs, einfo):"""任务重试时的处理"""print(f'Task {task_id} retrying: {str(exc)}')super().on_retry(exc, task_id, args, kwargs, einfo)@shared_task(base=BaseTaskWithRetry)
def process_data(data_id):try:# 处理数据的逻辑result = process_complex_data(data_id)return resultexcept Exception as exc:raise self.retry(exc=exc)

四、周期任务实现

1. 基本周期任务

# tasks.py
from celery.schedules import crontab
from celery.task import periodic_task@periodic_task(run_every=timedelta(hours=24))
def daily_cleanup():"""每日清理任务"""cleanup_expired_tokens()cleanup_old_logs()return "Daily cleanup completed"@periodic_task(run_every=crontab(hour=0, minute=0),name="generate_daily_report"
)
def generate_daily_report():"""生成每日报告"""report = Report.objects.create(date=timezone.now(),type='daily')report.generate()return f"Report generated: {report.id}"

2. 动态周期任务

# models.py
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from django.db import modelsclass ScheduledReport(models.Model):name = models.CharField(max_length=100)interval = models.IntegerField(help_text='间隔(分钟)')enabled = models.BooleanField(default=True)def save(self, *args, **kwargs):super().save(*args, **kwargs)self.update_periodic_task()def update_periodic_task(self):schedule, _ = IntervalSchedule.objects.get_or_create(every=self.interval,period=IntervalSchedule.MINUTES,)PeriodicTask.objects.update_or_create(name=f'generate_report_{self.id}',defaults={'task': 'myapp.tasks.generate_report','interval': schedule,'args': [self.id],'enabled': self.enabled})

3. 任务调度器

from django_celery_beat.models import CrontabSchedule, PeriodicTask
import jsondef schedule_report_task(name, hour, minute, day_of_week):"""创建定时报告任务"""schedule, _ = CrontabSchedule.objects.get_or_create(hour=hour,minute=minute,day_of_week=day_of_week,)task = PeriodicTask.objects.create(crontab=schedule,name=f'generate_report_{name}',task='myapp.tasks.generate_report',args=json.dumps([name]),)return task# 使用示例
schedule_report_task('weekly_summary', hour=0, minute=0, day_of_week='1')  # 每周一

五、监控和管理

1. Flower配置

# requirements.txt
flower==2.0.1# settings.py
CELERY_FLOWER_USER = 'admin'
CELERY_FLOWER_PASSWORD = 'password'# 启动Flower
# celery -A myproject flower --port=5555

2. 任务监控中间件

# middleware.py
from celery.signals import task_prerun, task_postrun
import time
import logginglogger = logging.getLogger('celery.tasks')@task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):"""任务执行前的处理"""task.start_time = time.time()@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **kw):"""任务执行后的处理"""if hasattr(task, 'start_time'):duration = time.time() - task.start_timelogger.info(f'Task {task.name}[{task_id}] 'f'completed in {duration:.2f}s with state {state}')

六、Celery工作流程图

在这里插入图片描述

七、实际应用示例

1. 图片处理任务

# tasks.py
from PIL import Image
import os
from celery import shared_task@shared_task
def process_uploaded_image(image_path, sizes=[(800, 600), (400, 300)]):"""处理上传的图片"""try:img = Image.open(image_path)filename = os.path.basename(image_path)name, ext = os.path.splitext(filename)results = []for size in sizes:resized = img.copy()resized.thumbnail(size)new_filename = f"{name}_{size[0]}x{size[1]}{ext}"new_path = os.path.join(os.path.dirname(image_path), new_filename)resized.save(new_path)results.append(new_path)return resultsexcept Exception as e:return str(e)

2. 站点监控任务

# tasks.py
import requests
from celery import shared_task
from django.core.mail import mail_admins
from .models import SiteMonitor@shared_task(bind=True, max_retries=3)
def monitor_website(self, url):"""监控网站可用性"""try:response = requests.get(url, timeout=10)status = response.status_code == 200response_time = response.elapsed.total_seconds()SiteMonitor.objects.create(url=url,status=status,response_time=response_time)if not status:mail_admins(f'网站{url}不可用',f'状态码:{response.status_code}')return {'url': url,'status': status,'response_time': response_time}except Exception as exc:self.retry(exc=exc, countdown=60)

八、最佳实践建议

  1. 任务设计原则:

    • 保持任务原子性
    • 实现幂等性
    • 合理设置超时时间
    • 添加适当的重试机制
  2. 性能优化:

    • 使用合适的序列化方式
    • 控制任务粒度
    • 合理设置并发数
    • 监控任务执行情况
  3. 错误处理:

    • 完善的异常捕获
    • 详细的日志记录
    • 合适的重试策略
    • 失败通知机制

这就是关于Django Celery的详细内容,包括异步任务队列和周期任务的实现。通过实践这些内容,你将能够在Django项目中熟练使用Celery处理异步任务。如果有任何问题,欢迎随时提出!


怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

相关文章:

每天40分玩转Django:Django Celery

Django Celery 一、知识要点概览表 模块知识点掌握程度要求Celery基础配置、任务定义、任务执行深入理解异步任务任务状态、结果存储、错误处理熟练应用周期任务定时任务、Crontab、任务调度熟练应用监控管理Flower、任务监控、性能优化理解应用 二、基础配置实现 1. 安装和…...

df.groupby(pd.Grouper(level=1)).sum()

df.groupby(pd.Grouper(level1)).sum() 在 Python 中的作用是根据 DataFrame 的某一索引级别进行分组,并计算每个分组的总和。具体来说: df.groupby(...):这是 pandas 的分组操作,按照指定的规则将 DataFrame 分组。 pd.Grouper(…...

运动控制探针功能详细介绍(CODESYS+SV63N伺服)

汇川AM400PLC和禾川X3E伺服EtherCAT通信 汇川AM400PLC和禾川X3E伺服EtherCAT通信_汇川ethercat通信-CSDN博客文章浏览阅读1.2k次。本文详细介绍了如何使用汇川AM400PLC通过EtherCAT总线与禾川X3E伺服进行通信。包括XML硬件描述文件的下载与安装,EtherCAT总线的启用,从站添加…...

C语言基础18(GDB调试)

文章目录 GDBGDB概述什么是GDB**GDB**的主要功能 GDB的启动GDB常见的启动方式 GDB的退出GDB的常用命令GDB查看源代码指令———list(1)**GDB** 查看设置**------info****GDB** 查看内存**GDB** 设置断点**---break (b)****GDB** 设置观察点**---watch****GDB** 程序调试 GDB完整…...

《向量数据库指南》——应对ElasticSearch挑战,拥抱Mlivus Cloud的新时代

在当今数据驱动的商业环境中,向量数据库的应用正变得愈加重要。随着人工智能和机器学习的快速发展,尤其是在自然语言处理、图像识别及推荐系统等领域,向量数据库以其强大的存储和检索能力,迎来了广泛的应用机会。然而,在实际应用中,企业在选择和实施向量数据库方案时,常…...

c++的stl库中stack的解析和模拟实现

目录 1.stack的介绍和使用 1.1stack的介绍 1.2stack的使用 2.stack的模拟实现 1.stack的介绍和使用 1.1stack的介绍 1. stack 是一种容器适配器,专门用在具有后进先出操作的上下文环境中,其删除只能从容器的一端进行元素的插入与提取操作。 2. stac…...

C语言——字符函数和内存函数

目录 前言 字符函数 1strlen 模拟实现 2strcpy 模拟实现 3strcat 模拟实现 4strcmp 模拟实现 5strncpy 模拟实现 6strncat 模拟实现 7strncmp 模拟实现 8strstr 模拟实现 9strtok 10strerror 11大小写字符转换函数 内存函数 1memcpy 模拟实现 2…...

查询docker overlay2文件夹下的 c7ffc13c49xxx是哪一个容器使用的

问题背景 查询docker overlay2文件夹下的 c7ffc13c49xxx是哪一个容器使用的 [root@lnops overlay2]# du -sh * | grep G 1.7G 30046eca3e838e43d16d9febc63cc8f8bb3d327b4c9839ca791b3ddfa845e12e 435G c7ffc13c49a43f08ef9e234c6ef9fc5a3692deda3c5d42149d0070e9d8124f71 1.…...

Golang的容器编排实践

Golang的容器编排实践 一、Golang中的容器编排概述 作为一种高效的编程语言,其在容器编排领域也有着广泛的运用。容器编排是指利用自动化工具对容器化的应用进行部署、管理和扩展的过程,典型的容器编排工具包括Docker Swarm、Kubernetes等。在Golang中&a…...

【51项目】51单片机自制小霸王游戏机

视频演示效果: 纳新作品——小霸王游戏机 目录: 目录 视频演示效果: 目录: 前言:...

ArkTs之NAPI学习

1.Node-api组成架构 为了应对日常开发经的网络通信、串口访问、多媒体解码、传感器数据收集等模块,这些模块大多数是使用c接口实现的,arkts侧如果想使用这些能力,就需要使用node-api这样一套接口去桥接c代码。Node-api整体的架构图如下&…...

【数据库初阶】MySQL中表的约束(上)

🎉博主首页: 有趣的中国人 🎉专栏首页: 数据库初阶 🎉其它专栏: C初阶 | C进阶 | 初阶数据结构 亲爱的小伙伴们,大家好!在这篇文章中,我们将深入浅出地为大家讲解 MySQL…...

173. 矩阵距离 acwing -多路BFS

原题链接:173. 矩阵距离 - AcWing题库 给定一个 N行 M 列的 01矩阵 A,A[i][j] 与 A[k][l]]之间的曼哈顿距离定义为: dist(i,j,k,l)|i−k||j−l|| 输出一个 N 行 M 列的整数矩阵 B,其中: B[i][j]min1≤x≤N,1≤y≤M,A…...

Linux下部署Redis集群 - 一主二从三哨兵模式

三台服务器redis一主二从三哨兵模式搭建 最近使用到了redis集群部署,使用一主二从三哨兵集群部署redis,将自己部署的过程中的使用心得分享给大家,希望大家以后部署的过程减少一些坑。 服务器准备 3台服务器 ,确定主redis和从red…...

实战设计模式之建造者模式

概述 在实际项目中,我们有时会遇到需要创建复杂对象的情况。这些对象可能包含多个组件或属性,而且每个组件都有自己的配置选项。如果直接使用构造函数或前面介绍的工厂方法来创建这样的对象,可能会导致以下两个严重问题。 1、参数过多。当一个…...

活动预告 | Microsoft Azure 在线技术公开课:使用 Azure OpenAI 服务构建生成式应用

课程介绍 通过 Microsoft Learn 免费参加 Microsoft Azure 在线技术公开课,掌握创造新机遇所需的技能,加快对 Microsoft Cloud 技术的了解。参加我们举办的“使用 Azure OpenAI 服务构建生成式应用”活动,了解如何使用包括 GPT 在内的强大的…...

ubuntu安装firefox

firefox下载地址:https://ftp.mozilla.org/pub/firefox/releases/ 卸载 sudo apt-get update dpkg --get-selections |grep firefox apt-get purge firefox 解压 tar -xjf firefox*.tar.bz2复制文件 sudo mv firefox/ /opt/firefox30sudo mv /usr/bin/firefox /…...

计算机网络原理(谢希仁第八版)第4章课后习题答案

第四章 网络层 详细计算机网络(谢希仁-第八版)第四章习题全解_计算机网络第八版谢希仁课后答案-CSDN博客 1.网络层向上提供的服务有哪两种?是比较其优缺点。网络层向运输层提供 “面向连接”虚电路(Virtual Circuit)服…...

RabbitMQ-基本使用

RabbitMQ: One broker to queue them all | RabbitMQ 官方 安装到Docker中 docker run \-e RABBITMQ_DEFAULT_USERrabbit \-e RABBITMQ_DEFAULT_PASSrabbit \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network mynet\-d \rabbitmq:3…...

从零开始学架构——互联网架构的演进

1 技术演进 1.1 技术演进的动力 对于新技术,我们应该站在行业的角度上思考,哪些技术我们要采取,哪些技术我们不能用,投入成本过大会不会导致满盘皆输?市场、技术、管理三者组成的业务发展铁三角,任何一个…...

ES6从入门到精通:前言

ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异&#xff…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容(CL)与匹配电容(CL1、CL2)的关系 2. 如何选择 CL1 和 CL…...

android RelativeLayout布局

<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...

CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!

本文介绍了一种名为AnomalyAny的创新框架&#xff0c;该方法利用Stable Diffusion的强大生成能力&#xff0c;仅需单个正常样本和文本描述&#xff0c;即可生成逼真且多样化的异常样本&#xff0c;有效解决了视觉异常检测中异常样本稀缺的难题&#xff0c;为工业质检、医疗影像…...

【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验

Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...

深度剖析 DeepSeek 开源模型部署与应用:策略、权衡与未来走向

在人工智能技术呈指数级发展的当下&#xff0c;大模型已然成为推动各行业变革的核心驱动力。DeepSeek 开源模型以其卓越的性能和灵活的开源特性&#xff0c;吸引了众多企业与开发者的目光。如何高效且合理地部署与运用 DeepSeek 模型&#xff0c;成为释放其巨大潜力的关键所在&…...