当前位置: 首页 > news >正文

django-celery-beat自动调度异步任务

        Celery是一个简单、灵活且可靠的分布式系统,专门用于处理大量消息的实时任务调度。它支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。Celery不仅支持异步任务(如发送邮件、文件上传、图像处理等耗时操作),还支持定时任务,即需要在特定时间执行的任务。Celery本身不提供消息服务,需要借助RabbitMQ、Redis等消息中间件,本案例使用的是Redis。

        Celery Beat则是Celery的一个组件,专门用于处理定时任务调度。它包含一个调度器,负责根据配置的时间表计划任务的执行。这些任务通常是Celery任务,即异步执行的函数或方法。Celery Beat将计划的任务发送到Celery任务队列,由Celery Worker处理并执行队列中的任务。此外,Celery Beat还支持任务的持久性,即使在系统重启后也能够保持已计划的周期性任务。

开发环境:Python3 + MySQL + Redis  + PyCharm专业版

一、创建Django项目

参考 Python框架Django入门教程-CSDN博客 前三步

二、安装celery、mysql、redis等依赖包

eventlet 是一个python协程模板,celery 4版本以上在windows环境进行测试需要安装此依赖

django_celery_results 是任务执行结果的依赖

mysqlclient
redis
celery
eventlet
django-celery-beat
django_celery_results

三、初始化Celery数据库

打开PyCharm的终端,执行以下命令

python manage.py makemigrations
python manage.py migrate

打开数据库查看,执行命令后自动创建了一些表

django_celery_beat_clockedschedule  # 以指定时间执行任务,例如:2024-05-22 09:22:10
django_celery_beat_crontabschedule  # 以crontab格式时间执行任务,某月某天星期几某时某分
django_celery_beat_intervalschedule  # 以间隔时间执行任务,例如:每5秒、每2小时
django_celery_beat_periodictask  # 存储要执行的任务。
django_celery_beat_periodictasks  # 索引和跟踪任务更改状态
django_celery_beat_solarschedule  # 以天文时间执行任务,例如:日出、日落
django_celery_results_chordcounter  # 存储Celery的chord任务的状态
django_celery_results_groupresult  # 存储Celery的group任务的结果
django_celery_results_taskresult  # 存储Celery任务的执行结果

四、配置Celery

修改(注意是修改,不是添加!!!)settings.py,找到 INSTALLED_APPS变量(约31行),将celery注册到django的应用管理中

在settings.py末尾添加celery配置:

CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # 结果存储也使用Redis
# 配置 celery 定时任务使用的调度器,使用django_celery_beat插件用来动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 配置celery自动存储任务执行结果
CELERY_RESULT_BACKEND = 'django_celery_results.backends:DatabaseBackend'
CELERY_TIMEZONE = 'Asia/Shanghai'  # 设置时区
# 是否启用UTC
CELERY_ENABLE_UTC = False
# 是否开启时间感知
DJANGO_CELERY_BEAT_TZ_AWARE = False

在settings.py同级目录下创建celery.py,然后添加以下内容:

import osfrom celery import Celery
from django.conf import settings# djangoDemo是项目名,大家根据自己的情况进行替换!!!
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoDemo.settings')
# djangoDemo是项目名,大家根据自己的情况进行替换!!!
app = Celery('djangoDemo')
# 从django的设置中读取配置信息
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现app下的任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)@app.task(bind=True)
def debug_task(self):print(f"Request: {self.request!r}")

五、创建应用模块,进行测试

打开PyCharm终端执行命令,创建应用模块,这里命名为celeryapp,将新的应用模块注册到django的应用管理中

python manage.py startapp celeryapp

 

在新的应用模块中创建tasks.py,添加异步函数,文件名必须是tasks.py,否则后面启动Celery的时候监听不到

@shared_task
def task_one():print("------------------------- 000 <<<")# 业务逻辑...print("------------------------- 111 <<<")return "222"    

在新的应用模块中修改views.py,添加一个测试接口。这里说一个坑:在settings.py中配置了TIME_ZONE = 'Asia/Shanghai' 和  USE_TZ = True 之后,通过datetime.now()获取的是亚洲上海时间,但是把这个时间存到数据库,就会自动减少8小时,变成了UTC时间,就很无语,于是我把USE_TZ的值改为False,发现存到数据库中的时间变成正常的亚洲上海时间。然而Celery定时执行任务的时区是UTC,经过多次测试,配置了CELERY_TIMEZONE等时区相关的配置,发现好像并没什么用,Celery定时执行任务的时区依然是UTC,无奈只能把任务执行的时间减少8小时

import json
from datetime import datetime, timedeltafrom django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django_celery_beat.models import ClockedSchedule, PeriodicTaskdef celery_test(request):# 任务名task_name = 'celery_test'# 任务执行的时间,设置为下一分钟的10秒,celery执行时间的时区是UTC,要保证数据库中的时间是UTC时间,这里获取的是亚洲上海的时间,所以减了8小时time = (datetime.now() - timedelta(hours=8) + timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M:10")# 创建任务的执行时间clock = ClockedSchedule.objects.get_or_create(clocked_time=time)# 创建指定时间执行的celery任务PeriodicTask.objects.update_or_create(name=task_name,  # 任务名,尽量保证唯一性,若该任务已存在,则更新该任务task='celeryapp.tasks.task_one',  # 要执行的异步函数的全路径defaults={'clocked': clock[0],  # 使用clocked在指定时间执行该任务'one_off': True,  # 在任务执行完一次后关闭该任务'enabled': True,  # 开启任务'args': json.dumps([]),  # 参数列表,必须是json格式的数组})return JsonResponse({'message': '200'})

修改urls.py,在urlpatterns中添加路由

from celeryapp import viewsurlpatterns = [# ......path('celery/test/', views.celery_test),
]

六、启动项目进行测试

先启动Django项目,然后在分别两个终端中执行命令启动Celery和Celery Beat,命令中的djangoDemo是项目名,大家根据自己的情况进行替换

celery -A djangoDemo worker -P eventlet -l info  # 启动worker监听异步任务
celery -A djangoDemo beat -l info  # 启动beat任务调度器

这里的woker已经监听到我们创建的celeryapp.tasks.task_one异步函数了

这里说明beat启动成功了:

浏览器输入请求地址:http://127.0.0.1:8000/celery/test/ ,创建Celery任务

接口请求成功后查看数据库,django_celery_beat_periodictask表新增了一条异步任务,其中的clocked_id字段指向django_celery_beat_clockedschedule表,该表新增了一条任务的执行时间。celery.backend_cleanup是Celery自动创建的,不用管

在到达任务执行时间后观察woker和beat的终端日志

查看beat终端日志,红框第一行是我们发送请求成功创建异步任务之后,CeleryBeat已经检测到数据库中有任务发生变化(CeleryBeat每5秒检测一次,使用debug级日志可查看到);第二行CeleryBeat将一个名为celery_test的任务发送给worker,让woker执行celeryapp.tasks.task_one异步函数,消费该任务

在woker终端的日志中可以看到任务执行的结果:

七、 Celery Beat常用的三种时间控制器

clockedSchedule:指定某个时间执行任务,例如:2024-05-22 09:22:10,对应的表是django_celery_beat_clockedschedule,该表仅有id和clocked_time两个字段

crontabSchedule:指定crontab格式的时间执行任务,某月某天星期几某时某分,与linux的定时任务规则一致,可参考Linux定时任务-CSDN博客,对应的表是django_celery_beat_crontabschedule,该表有7个字段

  `id` `minute` 分钟`hour` 小时`day_of_week`  星期几`day_of_month`  每月的哪些天`month_of_year`  每年的哪些月份`timezone`  时区

intervalSchedule:间隔指定时间执行任务,对应的表是django_celery_beat_intervalschedule,该表有3个字段id、every(间隔时长)、period(时间单位,可选时、分、秒、微秒、天)

代码示例,clockedSchedule上面已经演示过了,这里只演示另外两种:

# crontabSchedule
def celery_test2(request):task_name = 'celery_test2'crontab = CrontabSchedule.objects.get_or_create(minute='*/1',  # 每1分钟hour='*',  # 每小时day_of_week='*',  # 一周中的哪几天,*表示每天day_of_month='*',  # 月份中的哪一天,*表示每一天month_of_year='*',  # 年中的哪一月,*表示每个月timezone='Asia/Shanghai')PeriodicTask.objects.update_or_create(name=task_name,task='celeryapp.tasks.task_one',  # Celery任务的全路径defaults={'crontab': crontab[0],  # 使用crontab格式时间执行该任务'one_off': False,  # 在任务执行完一次后关闭该任务'enabled': True,  # 开启任务'args': json.dumps([]),})return JsonResponse({'message': '200'})# intervalSchedule
def celery_test3(request):task_name = 'celery_test3'interval = IntervalSchedule.objects.get_or_create(every=1,  # 间隔时间period=IntervalSchedule.MINUTES,  # 周期单位,这里是分钟)PeriodicTask.objects.update_or_create(name=task_name,task='celeryapp.tasks.task_one',  # Celery任务的全路径defaults={'interval': interval[0],  # 使用interval间隔指定时间执行该任务'one_off': False,  # 在任务执行完一次后关闭该任务'enabled': True,  # 开启任务'args': json.dumps([]),})return JsonResponse({'message': '200'})

其实只要创建不同的时间控制器,然后在创建任务的时候作为参数放进去即可,注意一个任务只能使用一种时间控制器

参考文献:

https://blog.csdn.net/wuwei_201/article/details/129650089

https://blog.51cto.com/u_15703497/6252757

相关文章:

django-celery-beat自动调度异步任务

Celery是一个简单、灵活且可靠的分布式系统&#xff0c;专门用于处理大量消息的实时任务调度。它支持使用任务队列的方式在分布的机器、进程、线程上执行任务调度。Celery不仅支持异步任务&#xff08;如发送邮件、文件上传、图像处理等耗时操作&#xff09;&#xff0c;还支持…...

【CSharp】将ushort数组保存为1通道位深16bit的Tiff图片

【CSharp】将ushort数组保存为1通道位深16bit的Tiff图片 1.背景2.接口 1.背景 System.Drawing.Common 是一个用于图像处理和图形操作的库&#xff0c;它是 System.Drawing 命名空间的一部分。由于 .NET Core 和 .NET 5 的跨平台特性&#xff0c;许多以前内置于 .NET Framework…...

Bug:Linux用户拥有r权限但无法打开文件【Linux权限体系】

Bug&#xff1a;Linux用户拥有r权限但无法打开文件【Linux权限体系】 0 问题描述&解决 问题描述&#xff1a; 通过go编写了一个程序&#xff0c;产生的/var/log/xx日志文件发现普通用户无权限打开 - 查看文件权限发现该文件所有者、所有者组、其他用户均有r权限 - 查看该日…...

【Redis】Widows 和 Linux 下使用 Redis

Redis 简述 1.缓存 缓存就是将数据存放在距离计算最近的位置以加快处理速度。缓存是改善软件性能的第一手段,现代 CPU 越来越快的一个重要因素就是使用了更多的缓存,在复杂的软件设计中,缓存几乎无处不在。大型网站架构设计在很多方面都使用了缓存设计。 2.Redis Redis …...

统计计算四|蒙特卡罗方法(Monte Carlo Method)

系列文章目录 统计计算一|非线性方程的求解 统计计算二|EM算法&#xff08;Expectation-Maximization Algorithm&#xff0c;期望最大化算法&#xff09; 统计计算三|Cases for EM 文章目录 系列文章目录一、基本概念&#xff08;一&#xff09;估算 π \pi π&#xff08;二&…...

大模型时代的具身智能系列专题(三)

清华高阳团队 高阳为清华叉院助理教授&#xff0c;本科毕业于清华大学计算机系&#xff0c;博士毕业于UC Berkeley。博士导师是Vision领域的大牛Trevor Darrell&#xff0c;读博期间和Sergey Levine合作开始强化学习方面的探索&#xff0c;博后跟随Pieter Abbeel做强化学习&am…...

使用 FileZilla 在 Windows 和 Ubuntu 之间传文件

网线一端插在板子的WAN口上&#xff0c;另一段插在电脑上&#xff0c;然后要配一下板子的IP。 板侧&#xff1a; 使用串口链接板子与PC端&#xff1b; 输入指令 ifconfig eth0&#xff08;具体看wan口对应哪一个&#xff09; 192.168.1.99 PC端配置&#xff1a; 打开网络设…...

【C++初阶】—— 类和对象 (上)

&#x1f4dd;个人主页&#x1f339;&#xff1a;EterNity_TiMe_ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 类和对象 1. 初步认识C2. 类的引入3. 类的定义声明和定义全部放在类体中声明和定义分开存放 4.…...

基础—SQL—图形化界面工具的DataGrip使用(2)

一、回顾与引言 &#xff08;1&#xff09; 上次内容&#xff0c;博客讲到了DDL语句的数据库操作、表操作、表字段的操作的相关语法&#xff0c;然而之前都是在MySQL的命令行当中去操作演示的。这种方式可以用&#xff0c;但是使用的话&#xff0c;第一&#xff0c;在我们日常…...

4-主窗口

4-主窗口 1、简介2 菜单栏、工具栏、状态栏2.1 菜单栏2.2 QAction2.3 工具栏2.4 状态栏 3 混合方式UI设计 1、简介 QMainWindow是一个为用户提供主窗口程序的类&#xff0c;包含一个菜单栏、多个工具栏、多个停靠控件、一个状态栏以及一个中心控件&#xff0c;是许多应用程序&…...

四川景源畅信:抖音小店新手如何做?

随着短视频平台的兴起&#xff0c;抖音小店成为了许多创业者的新选择。但是&#xff0c;对于新手来说&#xff0c;如何在抖音上开设并经营好自己的小店呢?本文将围绕这一问题展开讨论。 一、明确目标和定位作为抖音小店的新手&#xff0c;首先要明确自己的经营目标和定位。是想…...

EventSource

什么是EventSource EventSource 是一个用于服务器推送事件&#xff08;Server-Sent Events, SSE&#xff09;的接口&#xff0c;它允许服务器推送实时更新到浏览器。与 WebSocket 不同&#xff0c;SSE 是单向的&#xff08;服务器到客户端&#xff09;&#xff0c;适用于更新频…...

Vue2 Element-UI 分页组件el-pagination 修改 自带的total、跳转等默认文字

场景需求&#xff1a; Vue2 Element-UI 分页组件el-pagination 修改 自带的total、跳转等默认文字。如下图&#xff1a;默认提示字变成了英文&#xff0c;如何将其 变成 汉字提示呢&#xff1f; 解决方案&#xff1a; 1.方案1&#xff1a;修改DOM内容 不提倡此方案&#xf…...

【简单介绍下线性回归模型】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…...

有限元法之有限元空间的构造

目录 一、区域Ω的剖分 二、三角形一次元 三、一次元的基函数与面积坐标 四、三角形二次元及其基函数 前两节我们介绍了有限元基本概念和变分理论的推导&#xff0c;本节我们继续探讨有限元空间的构造。 一、区域Ω的剖分 对矩形区域进行三角剖分&#xff0c;其中x方向剖…...

高通车规芯片分析

高通三款芯片 SA8155P 7nm SA8295P 5nm SA8255P 5nm 分析AECQ等级 AECQ100里面定义了5个工作环境温度等级&#xff1a;Grade0&#xff1a;-40-150 Grade1&#xff1a;-40-125 Grade2&#xff1a;-40-105 Grade3&#xff1a;-40-85 Grade4&#xff1a;0-70AEC-Q100整体认证测试…...

Flutter 中的 TextButton 小部件:全面指南

Flutter 中的 TextButton 小部件&#xff1a;全面指南 在Flutter的世界里&#xff0c;TextButton是一个基础的小部件&#xff0c;用于创建只包含文本的按钮。它通常用于对话框、表单以及需要强调主要操作的界面。本文将为您提供一个全面的指南&#xff0c;帮助您了解如何使用T…...

通过键值对访问字典

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 在Python中&#xff0c;如果想将字典的内容输出也比较简单&#xff0c;可以直接使用print()函数。例如&#xff0c;要想打印dictionary字典&#xff…...

海外仓扫码管理系统怎么选?精准,高效管理需求才是核心需求

海外仓对那些想拓展国际市场的商家来说还是非常重要的&#xff0c;大部分的货物都需要先运到海外仓&#xff0c;才能继续进行下一步的物流快递发货。 那对于海外仓本身来说&#xff0c;当面临大量订单的时候&#xff0c;怎么快速的管理订单&#xff0c;拣选货物就变得十分重要…...

基于51单片机的智能灯光控制系统

一.硬件方案 智能灯光控制系统由单片机最小系统、人体感应模块、关照强度模块、灯光控制模块、电源模块和灯泡组成。本文以STC89C52单片机为核心&#xff0c;通过利用光照度和红外人体感应相结合主动与被动的探测方法&#xff0c;现了室内无人或者关照充足时灯光自动光灯&…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频

使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

ffmpeg(四):滤镜命令

FFmpeg 的滤镜命令是用于音视频处理中的强大工具&#xff0c;可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下&#xff1a; ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜&#xff1a; ffmpeg…...

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

vue3+vite项目中使用.env文件环境变量方法

vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量&#xff0c;这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...