uwsgi部署多进程django apscheduler与问题排查
- 💖 作者简介:大家好,我是Zeeland,开源建设者与全栈领域优质创作者。
- 📝 CSDN主页:Zeeland🔥
- 📣 我的博客:Zeeland
- 📚 Github主页: Undertone0809 (Zeeland)
- 🎉 支持我:点赞👍+收藏⭐️+留言📝
- 💬介绍:The mixture of software dev+Iot+ml+anything🔥
简介
之前写django的apscheduler一直采用decorator的方式构建,因为业务的定时任务是定死的,没有产生什么其他的问题。最近定时任务需要做动态增减,进行定时任务的动态设置,因此传统的decorator写法行不通了,需要用scheduler.add_job()来添加job。
最开始以为只是一个简单的函数替换,没想到替换了一下出现了很多不一样的问题,故笔者打算记录一下其中发生的问题。
本文将介绍如何使用uwsgi部署django+apscheduler的系统,为了聚焦重点,本文不细说的uwsgi.ini等文件的配置过程,只介绍如何在django中编写构建一个apscheduler系统,并且部署在uwsgi服务器上。
背景介绍
上面已经简单介绍过了,下面介绍一下笔者原有的apscheduler构建方式。在项目根目录下构建了一个jobs/core_job.py
的文件,在里面编写核心的任务调度,大致代码如下所示:
from typing import List, Dict, Anyfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django_apscheduler.jobstores import DjangoJobStore, register_jobfrom app import models
from commons.log_util import get_logger, enable_log
from services.db import db_service
from services.huasi import huasi_storagelogger = get_logger()
enable_log()# 创建后台调度器
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")@register_job(scheduler,"interval",seconds=60 * 120 * 1,id="store A device data",replace_existing=True,
)
def store_pingxiang_device_job():...logger.info("[store A device job] finish")@register_job(scheduler,"interval",seconds=60 * 60 * 1,id="store B device data",replace_existing=True,
)
def store_baiyun_device_job():...logger.info("[store B device job] finish")def start_scheduler():"""获取需要存储任务调度的项目,开启任务调度"""scheduler.start()logger.info("[django scheduler] start scheduler")def resume_scheduler():logger.info("[django scheduler] resume schedule")scheduler.resume()def stop_scheduler():logger.info("[django scheduler] pause schedule")scheduler.pause()
一开始,我把它放到子模块的apps.py ready()里面,如下所示:
from django.apps import AppConfigclass AppConfig(AppConfig):default_auto_field = "django.db.models.BigAutoField"name = "app"verbose_name = "xxx管理系统"def ready(self):from jobs.core_job import start_schedulerstart_scheduler()
后面我发现,这是一个很不明智的选择,因为在uwsgi中,如果你的processes不为1,即你要开启多进程模式时,ready()会被执行多次,而如果你的job是固定的,那么你就会出现下面这种问题,django apscheduler在初始化添加task到数据库时,发生pymysql.err.IntegrityError: (1062, "Duplicate entry 'store A device job' for key 'django_apscheduler_djangojob.PRIMARY'")
这种问题。
根据提供的错误信息,问题是由于数据表 django_apscheduler_djangojob
中存在重复的条目导致的完整性错误(IntegrityError)。具体错误信息是:“Duplicate entry ‘store A device job’ for key ‘django_apscheduler_djangojob.PRIMARY’”,意思是在数据库中的 django_apscheduler_djangojob
表的主键上存在重复的条目,具体插入的条目是 ‘store A device job’。
要解决这个问题,你可以检查下你的代码中是否有重复插入相同数据的操作,或者手动清理数据库中的重复数据,确保每个条目在主键上是唯一的。你可以使用 SQL 命令来删除重复的条目,或者使用 Django ORM 提供的方法进行清理操作,具体取决于你的应用程序的需求和代码实现。
在数据库中,多次执行添加job会导致重复性问题,这也是为什么放到子模块的apps.py ready()不是一个明智的选择。
而事实上,如果你在uwsgi.ini中设置了多进程模式,那么放在项目目录下的settings.py中,也会产生同样的问题,因为两个进程的变量是独立的,而同时执行两个程序则会出现数据库字段重复的问题,从而产生报错。
解决多进程多次启动apscheduler的问题
方案1
要实现在多个worker中只运行一次apscheduler程序,可以使用分布式锁的概念。在这种方案中,只有一个worker能够获得锁并执行apscheduler程序,其他的worker会检测到锁已经被获取,并根据需要等待或跳过执行。
以下是一种可能的解决方案:
-
使用一个共享的可靠存储系统,如数据库或分布式缓存(例如Redis)来存储锁的状态。
-
在你的Django应用中,使用一个装饰器或中间件来实现锁的逻辑。这个装饰器/中间件的作用是在apscheduler程序执行之前检查锁的状态,并根据情况决定是否执行程序。
下面是一个简单的实现示例,使用Redis作为存储系统:
import redis
from functools import wrapsdef apscheduler_lock(func):@wraps(func)def wrapper(*args, **kwargs):lock_key = "apscheduler_lock"lock_value = "locked"# 连接到Redisredis_client = redis.Redis(host='localhost', port=6379)# 尝试获取锁acquired_lock = redis_client.set(lock_key, lock_value, nx=True, ex=60)if acquired_lock:# 如果成功获得了锁,执行apscheduler程序result = func(*args, **kwargs)# 执行完毕后释放锁redis_client.delete(lock_key)return resultelse:# 没有获得锁,跳过执行apscheduler程序return None # 可以根据需要返回其他信息return wrapper
在你的apscheduler任务函数上应用这个装饰器,例如:
from apscheduler.schedulers.background import BackgroundScheduler@schedudler_lock
def my_apscheduler_task():# 执行你的apscheduler任务pass# 创建一个定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(my_apscheduler_task, 'interval', minutes=10)
scheduler.start()
这样,无论你启动多少个worker(也可以使用多个uwsgi进程),只有一个worker能够获取到锁并执行apscheduler程序,其他的worker会跳过执行。
请根据你的实际情况适配这个示例,并确保你的共享存储系统的配置和连接代码是正确的。
方案2
如果你觉得分布式锁比较麻烦,也可以参考django-apscheduler作者提供的方案,使用manage.py操作命令行的方式启动的apscheduler,这种方式比较自由,在多进程的django启动时候,再执行命令启动apscheduler,可以避免上述阐述的重复添加job的问题。
Add a custom Django management command to your project that schedules the APScheduler jobs and starts the scheduler:
# runapscheduler.py
import loggingfrom django.conf import settingsfrom apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import utillogger = logging.getLogger(__name__)def my_job():# Your job processing logic here...pass# The `close_old_connections` decorator ensures that database connections, that have become
# unusable or are obsolete, are closed before and after your job has run. You should use it
# to wrap any jobs that you schedule that access the Django database in any way.
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):"""This job deletes APScheduler job execution entries older than `max_age` from the database.It helps to prevent the database from filling up with old historical records that are nolonger useful.:param max_age: The maximum length of time to retain historical job execution records.Defaults to 7 days."""DjangoJobExecution.objects.delete_old_job_executions(max_age)class Command(BaseCommand):help = "Runs APScheduler."def handle(self, *args, **options):scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)scheduler.add_jobstore(DjangoJobStore(), "default")scheduler.add_job(my_job,trigger=CronTrigger(second="*/10"), # Every 10 secondsid="my_job", # The `id` assigned to each job MUST be uniquemax_instances=1,replace_existing=True,)logger.info("Added job 'my_job'.")scheduler.add_job(delete_old_job_executions,trigger=CronTrigger(day_of_week="mon", hour="00", minute="00"), # Midnight on Monday, before start of the next work week.id="delete_old_job_executions",max_instances=1,replace_existing=True,)logger.info("Added weekly job: 'delete_old_job_executions'.")try:logger.info("Starting scheduler...")scheduler.start()except KeyboardInterrupt:logger.info("Stopping scheduler...")scheduler.shutdown()logger.info("Scheduler shut down successfully!")
The management command defined above should be invoked via ./manage.py runapscheduler whenever the webserver serving your Django application is started. The details of how and where this should be done is implementation specific, and depends on which webserver you are using and how you are deploying your application to production. For most people this should involve configuring a supervisor process of sorts.
Register any APScheduler jobs as you would normally. Note that if you haven’t set DjangoJobStore as the ‘default’ job store, then you will need to include jobstore=‘djangojobstore’ in your scheduler.add_job() calls.
总结
本文总结了使用uwsgi部署多进程的django apscheduler可能会产生的问题,并提供了两种解决方案,如果你有更好的想法,欢迎在评论区一起讨论。
相关文章:
uwsgi部署多进程django apscheduler与问题排查
💖 作者简介:大家好,我是Zeeland,开源建设者与全栈领域优质创作者。📝 CSDN主页:Zeeland🔥📣 我的博客:Zeeland📚 Github主页: Undertone0809 (Zeeland)&…...

git difftool对比差异,避免推送不相关内容
问题 在利用git进行版本管理的时候,经常会由于对其他不相关的代码,做了一些小改动,例如删除了一个空行,多了一个缩进等。 为避免将这些不相关的改动也提交到远程,对PR造成不必要的影响,可以利用git diff命…...
Java设计模式:一、六大设计原则-05:接口隔离原则
文章目录 一、定义:接口隔离原则二、模拟场景:接口隔离原则三、违背方案:接口隔离原则3.1 工程结构3.2 英雄技能调用3.2.1 英雄技能接口3.2.2 英雄:后裔3.2.3 英雄:廉颇 3.3 单元测试 四、改善代码:接口隔离…...

第63步 深度学习图像识别:多分类建模误判病例分析(Tensorflow)
基于WIN10的64位系统演示 一、写在前面 上两期我们基于TensorFlow和Pytorch环境做了图像识别的多分类任务建模。这一期我们做误判病例分析,分两节介绍,分别基于TensorFlow和Pytorch环境的建模和分析。 本期以健康组、肺结核组、COVID-19组、细菌性&am…...
OpenCv读/写视频色差 方案
OpenCv read / write video color differenceOpenCv读/写视频色差 感谢博主: OpenCv读/写视频色差答案 - 爱码网 有没有办法让 OpenCV 使用正确的转换?? 是的,使用 GStreamer 后端而不是 FFmpeg 后端,颜色看起来很完…...

【传输层】网络基础 -- UDP协议 | TCP协议
再谈端口号端口号范围划分netstatpidof UDPUDP的特点面向数据报UDP的缓冲区 基于UDP的应用层协议 TCP认识TCP协议的报头理解封装解包理解可靠性TCP工作模式16位窗口大小6位标志位URGACKPSHRSTSYNFIN 再谈端口号 端口号(Port)标识了一个主机上进行通信的不同的应用程序 在TCP/I…...

Android开发之性能测试工具Profiler
前言 性能优化问题,在我们开发时都会遇到,但是在小厂和对自己要求不严格的情况下,我都很少去做性能优化; 在性能优化上,基本大家都是通过自己的开发经验和性能分析工具来发现问题,今天给大家分享一下小编最…...

SpringBoot初级开发--多环境配置的集成(9)
在Springboot的开发中,我们经常要切换各种各样的环境配置,比如现在是开发环境,然后又切换到生产环境,这个时候用多环境配置就是一个明智的选择。接下来我们沿用上一章的工程来配置多环境配置工程。 1.准备多环境配置文件 这里我…...

(数学) 剑指 Offer 39. 数组中出现次数超过一半的数字 ——【Leetcode每日一题】
❓ 剑指 Offer 39. 数组中出现次数超过一半的数字 难度:简单 数组中有一个数字出现的次数超过数组长度的一半,请找出这个数字。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示例 1: 输入: [1, 2, 3, 2, 2, 2, 5, 4, 2] 输…...

如何用PS把roughness贴图转换成Smoothness,并放入Metallic贴图的a通道。
1:用PS打开Roughness贴图 2:选择反相,装换成Smoothness贴图 3:新建一个大小相等的psd文件,或者打开Metallic贴图 4:如果没有金属度贴图,就把新建的图画成纯黑色 5:选择图层蒙版->…...
了解XSS攻击与CSRF攻击
什么是XSS攻击 XSS(Cross-Site Scripting,跨站脚本攻击)是一种常见的网络安全漏洞,它允许攻击者在受害者的浏览器上执行恶意脚本。这种攻击通常发生在 web 应用程序中,攻击者通过注入恶意脚本来利用用户对网站的信任&…...

安全测试-django防御安全策略
django安全性 django针对安全方面有一些处理,学习如何进行处理设置,也有利于学习安全测试知识。 CSRF 跨站点请求伪造(Cross-Site Request Forgery,CSRF)是一种网络攻击方式,攻击者欺骗用户在自己访问的网…...

7.react useReducer使用与常见问题
useReducer函数 1. useState的替代方案.接收一个(state, action)>newState的reducer, 并返回当前的state以及与其配套的dispatch方法2. 在某些场景下,useReducer会比useState更加适用,例如state逻辑较为复杂, 且**包含多个子值**,或者下一个state依赖于之前的state等清楚us…...
c#泛型(generic)
概述: C#中的泛型(Generics)是一种允许在编写类、方法和委托时使用参数化类型的机制。泛型允许我们编写更通用、可重用的代码,可以避免类型转换和重复编写类似的代码。 泛型的基本语法如下所示: class ClassName<…...

【力扣每日一题】2023.8.30 到家的最少跳跃次数
目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 题目给我们一只跳蚤,我们可以操控它前跳 a 格或是后跳 b 格,不能跳到小于0的位置,有一些被禁止的点不…...

精读《算法题 - 地下城游戏》
今天我们看一道 leetcode hard 难度题目:地下城游戏。 恶魔们抓住了公主并将她关在了地下城 dungeon 的 右下角 。地下城是由 m x n 个房间组成的二维网格。我们英勇的骑士最初被安置在 左上角 的房间里,他必须穿过地下城并通过对抗恶魔来拯救公主。 骑士…...
随记-Kibana Dev Tools,ES 增删改查 索引,Document
索引 创建索引 创建索引 PUT index_test创建索引 并 修改分片信息 # 创建索引 并 修改分片信息 PUT index_test2 { # 必须换行, PUT XXX 必须独占一行,类似的 其他请求也需要独占一行 "settings": {"number_of_shards": 1, # 主分片"…...

什么是架构,架构的本质是什么
不论是开发人员还是架构师,我们都一直在跟软件系统打交道,架构是在工作中出现最频繁的术语之一。那么,到底什么是架构?你可能有自己的答案,也有可能没有答案。对“架构”的理解需要我们不断在实践中思考、归纳、演绎&a…...

Python爬虫(十七)_糗事百科案例
糗事百科实例 爬取糗事百科段子,假设页面的URL是: http://www.qiushibaike.com/8hr/page/1 要求: 使用requests获取页面信息,用XPath/re做数据提取获取每个帖子里的用户头像连接、用户姓名、段子内容、点赞次数和评论次数保存到json文件内…...

Ae 效果:CC Threads
生成/CC Threads Generate/CC Threads CC Threads(CC 编织条)效果基于当前图层像素生成编织条图案和纹理。可以用在各种设计中,如背景设计、图形设计、文字设计等。 ◆ ◆ ◆ 效果属性说明 Width 宽度 设置编织的宽度。 默认值为 50。值越大…...
C++ 基础特性深度解析
目录 引言 一、命名空间(namespace) C 中的命名空间 与 C 语言的对比 二、缺省参数 C 中的缺省参数 与 C 语言的对比 三、引用(reference) C 中的引用 与 C 语言的对比 四、inline(内联函数…...

EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...

SpringTask-03.入门案例
一.入门案例 启动类: package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
智能AI电话机器人系统的识别能力现状与发展水平
一、引言 随着人工智能技术的飞速发展,AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术,在客户服务、营销推广、信息查询等领域发挥着越来越重要…...

【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...

免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...

群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...
WEB3全栈开发——面试专业技能点P4数据库
一、mysql2 原生驱动及其连接机制 概念介绍 mysql2 是 Node.js 环境中广泛使用的 MySQL 客户端库,基于 mysql 库改进而来,具有更好的性能、Promise 支持、流式查询、二进制数据处理能力等。 主要特点: 支持 Promise / async-await…...

GraphRAG优化新思路-开源的ROGRAG框架
目前的如微软开源的GraphRAG的工作流程都较为复杂,难以孤立地评估各个组件的贡献,传统的检索方法在处理复杂推理任务时可能不够有效,特别是在需要理解实体间关系或多跳知识的情况下。先说结论,看完后感觉这个框架性能上不会比Grap…...