uwsgi部署多进程django apscheduler与问题排查
- 💖 作者简介:大家好,我是Zeeland,开源建设者与全栈领域优质创作者。
- 📝 CSDN主页:Zeeland🔥
- 📣 我的博客:Zeeland
- 📚 Github主页: Undertone0809 (Zeeland)
- 🎉 支持我:点赞👍+收藏⭐️+留言📝
- 💬介绍:The mixture of software dev+Iot+ml+anything🔥
简介
之前写django的apscheduler一直采用decorator的方式构建,因为业务的定时任务是定死的,没有产生什么其他的问题。最近定时任务需要做动态增减,进行定时任务的动态设置,因此传统的decorator写法行不通了,需要用scheduler.add_job()来添加job。
最开始以为只是一个简单的函数替换,没想到替换了一下出现了很多不一样的问题,故笔者打算记录一下其中发生的问题。
本文将介绍如何使用uwsgi部署django+apscheduler的系统,为了聚焦重点,本文不细说的uwsgi.ini等文件的配置过程,只介绍如何在django中编写构建一个apscheduler系统,并且部署在uwsgi服务器上。
背景介绍
上面已经简单介绍过了,下面介绍一下笔者原有的apscheduler构建方式。在项目根目录下构建了一个jobs/core_job.py的文件,在里面编写核心的任务调度,大致代码如下所示:
from typing import List, Dict, Anyfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django_apscheduler.jobstores import DjangoJobStore, register_jobfrom app import models
from commons.log_util import get_logger, enable_log
from services.db import db_service
from services.huasi import huasi_storagelogger = get_logger()
enable_log()# 创建后台调度器
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")@register_job(scheduler,"interval",seconds=60 * 120 * 1,id="store A device data",replace_existing=True,
)
def store_pingxiang_device_job():...logger.info("[store A device job] finish")@register_job(scheduler,"interval",seconds=60 * 60 * 1,id="store B device data",replace_existing=True,
)
def store_baiyun_device_job():...logger.info("[store B device job] finish")def start_scheduler():"""获取需要存储任务调度的项目,开启任务调度"""scheduler.start()logger.info("[django scheduler] start scheduler")def resume_scheduler():logger.info("[django scheduler] resume schedule")scheduler.resume()def stop_scheduler():logger.info("[django scheduler] pause schedule")scheduler.pause()
一开始,我把它放到子模块的apps.py ready()里面,如下所示:
from django.apps import AppConfigclass AppConfig(AppConfig):default_auto_field = "django.db.models.BigAutoField"name = "app"verbose_name = "xxx管理系统"def ready(self):from jobs.core_job import start_schedulerstart_scheduler()
后面我发现,这是一个很不明智的选择,因为在uwsgi中,如果你的processes不为1,即你要开启多进程模式时,ready()会被执行多次,而如果你的job是固定的,那么你就会出现下面这种问题,django apscheduler在初始化添加task到数据库时,发生pymysql.err.IntegrityError: (1062, "Duplicate entry 'store A device job' for key 'django_apscheduler_djangojob.PRIMARY'")这种问题。
根据提供的错误信息,问题是由于数据表 django_apscheduler_djangojob 中存在重复的条目导致的完整性错误(IntegrityError)。具体错误信息是:“Duplicate entry ‘store A device job’ for key ‘django_apscheduler_djangojob.PRIMARY’”,意思是在数据库中的 django_apscheduler_djangojob 表的主键上存在重复的条目,具体插入的条目是 ‘store A device job’。
要解决这个问题,你可以检查下你的代码中是否有重复插入相同数据的操作,或者手动清理数据库中的重复数据,确保每个条目在主键上是唯一的。你可以使用 SQL 命令来删除重复的条目,或者使用 Django ORM 提供的方法进行清理操作,具体取决于你的应用程序的需求和代码实现。
在数据库中,多次执行添加job会导致重复性问题,这也是为什么放到子模块的apps.py ready()不是一个明智的选择。
而事实上,如果你在uwsgi.ini中设置了多进程模式,那么放在项目目录下的settings.py中,也会产生同样的问题,因为两个进程的变量是独立的,而同时执行两个程序则会出现数据库字段重复的问题,从而产生报错。
解决多进程多次启动apscheduler的问题
方案1
要实现在多个worker中只运行一次apscheduler程序,可以使用分布式锁的概念。在这种方案中,只有一个worker能够获得锁并执行apscheduler程序,其他的worker会检测到锁已经被获取,并根据需要等待或跳过执行。
以下是一种可能的解决方案:
-
使用一个共享的可靠存储系统,如数据库或分布式缓存(例如Redis)来存储锁的状态。
-
在你的Django应用中,使用一个装饰器或中间件来实现锁的逻辑。这个装饰器/中间件的作用是在apscheduler程序执行之前检查锁的状态,并根据情况决定是否执行程序。
下面是一个简单的实现示例,使用Redis作为存储系统:
import redis
from functools import wrapsdef apscheduler_lock(func):@wraps(func)def wrapper(*args, **kwargs):lock_key = "apscheduler_lock"lock_value = "locked"# 连接到Redisredis_client = redis.Redis(host='localhost', port=6379)# 尝试获取锁acquired_lock = redis_client.set(lock_key, lock_value, nx=True, ex=60)if acquired_lock:# 如果成功获得了锁,执行apscheduler程序result = func(*args, **kwargs)# 执行完毕后释放锁redis_client.delete(lock_key)return resultelse:# 没有获得锁,跳过执行apscheduler程序return None # 可以根据需要返回其他信息return wrapper
在你的apscheduler任务函数上应用这个装饰器,例如:
from apscheduler.schedulers.background import BackgroundScheduler@schedudler_lock
def my_apscheduler_task():# 执行你的apscheduler任务pass# 创建一个定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(my_apscheduler_task, 'interval', minutes=10)
scheduler.start()
这样,无论你启动多少个worker(也可以使用多个uwsgi进程),只有一个worker能够获取到锁并执行apscheduler程序,其他的worker会跳过执行。
请根据你的实际情况适配这个示例,并确保你的共享存储系统的配置和连接代码是正确的。
方案2
如果你觉得分布式锁比较麻烦,也可以参考django-apscheduler作者提供的方案,使用manage.py操作命令行的方式启动的apscheduler,这种方式比较自由,在多进程的django启动时候,再执行命令启动apscheduler,可以避免上述阐述的重复添加job的问题。
Add a custom Django management command to your project that schedules the APScheduler jobs and starts the scheduler:
# runapscheduler.py
import loggingfrom django.conf import settingsfrom apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import utillogger = logging.getLogger(__name__)def my_job():# Your job processing logic here...pass# The `close_old_connections` decorator ensures that database connections, that have become
# unusable or are obsolete, are closed before and after your job has run. You should use it
# to wrap any jobs that you schedule that access the Django database in any way.
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):"""This job deletes APScheduler job execution entries older than `max_age` from the database.It helps to prevent the database from filling up with old historical records that are nolonger useful.:param max_age: The maximum length of time to retain historical job execution records.Defaults to 7 days."""DjangoJobExecution.objects.delete_old_job_executions(max_age)class Command(BaseCommand):help = "Runs APScheduler."def handle(self, *args, **options):scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)scheduler.add_jobstore(DjangoJobStore(), "default")scheduler.add_job(my_job,trigger=CronTrigger(second="*/10"), # Every 10 secondsid="my_job", # The `id` assigned to each job MUST be uniquemax_instances=1,replace_existing=True,)logger.info("Added job 'my_job'.")scheduler.add_job(delete_old_job_executions,trigger=CronTrigger(day_of_week="mon", hour="00", minute="00"), # Midnight on Monday, before start of the next work week.id="delete_old_job_executions",max_instances=1,replace_existing=True,)logger.info("Added weekly job: 'delete_old_job_executions'.")try:logger.info("Starting scheduler...")scheduler.start()except KeyboardInterrupt:logger.info("Stopping scheduler...")scheduler.shutdown()logger.info("Scheduler shut down successfully!")
The management command defined above should be invoked via ./manage.py runapscheduler whenever the webserver serving your Django application is started. The details of how and where this should be done is implementation specific, and depends on which webserver you are using and how you are deploying your application to production. For most people this should involve configuring a supervisor process of sorts.
Register any APScheduler jobs as you would normally. Note that if you haven’t set DjangoJobStore as the ‘default’ job store, then you will need to include jobstore=‘djangojobstore’ in your scheduler.add_job() calls.
总结
本文总结了使用uwsgi部署多进程的django apscheduler可能会产生的问题,并提供了两种解决方案,如果你有更好的想法,欢迎在评论区一起讨论。
相关文章:
uwsgi部署多进程django apscheduler与问题排查
💖 作者简介:大家好,我是Zeeland,开源建设者与全栈领域优质创作者。📝 CSDN主页:Zeeland🔥📣 我的博客:Zeeland📚 Github主页: Undertone0809 (Zeeland)&…...
git difftool对比差异,避免推送不相关内容
问题 在利用git进行版本管理的时候,经常会由于对其他不相关的代码,做了一些小改动,例如删除了一个空行,多了一个缩进等。 为避免将这些不相关的改动也提交到远程,对PR造成不必要的影响,可以利用git diff命…...
Java设计模式:一、六大设计原则-05:接口隔离原则
文章目录 一、定义:接口隔离原则二、模拟场景:接口隔离原则三、违背方案:接口隔离原则3.1 工程结构3.2 英雄技能调用3.2.1 英雄技能接口3.2.2 英雄:后裔3.2.3 英雄:廉颇 3.3 单元测试 四、改善代码:接口隔离…...
第63步 深度学习图像识别:多分类建模误判病例分析(Tensorflow)
基于WIN10的64位系统演示 一、写在前面 上两期我们基于TensorFlow和Pytorch环境做了图像识别的多分类任务建模。这一期我们做误判病例分析,分两节介绍,分别基于TensorFlow和Pytorch环境的建模和分析。 本期以健康组、肺结核组、COVID-19组、细菌性&am…...
OpenCv读/写视频色差 方案
OpenCv read / write video color differenceOpenCv读/写视频色差 感谢博主: OpenCv读/写视频色差答案 - 爱码网 有没有办法让 OpenCV 使用正确的转换?? 是的,使用 GStreamer 后端而不是 FFmpeg 后端,颜色看起来很完…...
【传输层】网络基础 -- UDP协议 | TCP协议
再谈端口号端口号范围划分netstatpidof UDPUDP的特点面向数据报UDP的缓冲区 基于UDP的应用层协议 TCP认识TCP协议的报头理解封装解包理解可靠性TCP工作模式16位窗口大小6位标志位URGACKPSHRSTSYNFIN 再谈端口号 端口号(Port)标识了一个主机上进行通信的不同的应用程序 在TCP/I…...
Android开发之性能测试工具Profiler
前言 性能优化问题,在我们开发时都会遇到,但是在小厂和对自己要求不严格的情况下,我都很少去做性能优化; 在性能优化上,基本大家都是通过自己的开发经验和性能分析工具来发现问题,今天给大家分享一下小编最…...
SpringBoot初级开发--多环境配置的集成(9)
在Springboot的开发中,我们经常要切换各种各样的环境配置,比如现在是开发环境,然后又切换到生产环境,这个时候用多环境配置就是一个明智的选择。接下来我们沿用上一章的工程来配置多环境配置工程。 1.准备多环境配置文件 这里我…...
(数学) 剑指 Offer 39. 数组中出现次数超过一半的数字 ——【Leetcode每日一题】
❓ 剑指 Offer 39. 数组中出现次数超过一半的数字 难度:简单 数组中有一个数字出现的次数超过数组长度的一半,请找出这个数字。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示例 1: 输入: [1, 2, 3, 2, 2, 2, 5, 4, 2] 输…...
如何用PS把roughness贴图转换成Smoothness,并放入Metallic贴图的a通道。
1:用PS打开Roughness贴图 2:选择反相,装换成Smoothness贴图 3:新建一个大小相等的psd文件,或者打开Metallic贴图 4:如果没有金属度贴图,就把新建的图画成纯黑色 5:选择图层蒙版->…...
了解XSS攻击与CSRF攻击
什么是XSS攻击 XSS(Cross-Site Scripting,跨站脚本攻击)是一种常见的网络安全漏洞,它允许攻击者在受害者的浏览器上执行恶意脚本。这种攻击通常发生在 web 应用程序中,攻击者通过注入恶意脚本来利用用户对网站的信任&…...
安全测试-django防御安全策略
django安全性 django针对安全方面有一些处理,学习如何进行处理设置,也有利于学习安全测试知识。 CSRF 跨站点请求伪造(Cross-Site Request Forgery,CSRF)是一种网络攻击方式,攻击者欺骗用户在自己访问的网…...
7.react useReducer使用与常见问题
useReducer函数 1. useState的替代方案.接收一个(state, action)>newState的reducer, 并返回当前的state以及与其配套的dispatch方法2. 在某些场景下,useReducer会比useState更加适用,例如state逻辑较为复杂, 且**包含多个子值**,或者下一个state依赖于之前的state等清楚us…...
c#泛型(generic)
概述: C#中的泛型(Generics)是一种允许在编写类、方法和委托时使用参数化类型的机制。泛型允许我们编写更通用、可重用的代码,可以避免类型转换和重复编写类似的代码。 泛型的基本语法如下所示: class ClassName<…...
【力扣每日一题】2023.8.30 到家的最少跳跃次数
目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 题目给我们一只跳蚤,我们可以操控它前跳 a 格或是后跳 b 格,不能跳到小于0的位置,有一些被禁止的点不…...
精读《算法题 - 地下城游戏》
今天我们看一道 leetcode hard 难度题目:地下城游戏。 恶魔们抓住了公主并将她关在了地下城 dungeon 的 右下角 。地下城是由 m x n 个房间组成的二维网格。我们英勇的骑士最初被安置在 左上角 的房间里,他必须穿过地下城并通过对抗恶魔来拯救公主。 骑士…...
随记-Kibana Dev Tools,ES 增删改查 索引,Document
索引 创建索引 创建索引 PUT index_test创建索引 并 修改分片信息 # 创建索引 并 修改分片信息 PUT index_test2 { # 必须换行, PUT XXX 必须独占一行,类似的 其他请求也需要独占一行 "settings": {"number_of_shards": 1, # 主分片"…...
什么是架构,架构的本质是什么
不论是开发人员还是架构师,我们都一直在跟软件系统打交道,架构是在工作中出现最频繁的术语之一。那么,到底什么是架构?你可能有自己的答案,也有可能没有答案。对“架构”的理解需要我们不断在实践中思考、归纳、演绎&a…...
Python爬虫(十七)_糗事百科案例
糗事百科实例 爬取糗事百科段子,假设页面的URL是: http://www.qiushibaike.com/8hr/page/1 要求: 使用requests获取页面信息,用XPath/re做数据提取获取每个帖子里的用户头像连接、用户姓名、段子内容、点赞次数和评论次数保存到json文件内…...
Ae 效果:CC Threads
生成/CC Threads Generate/CC Threads CC Threads(CC 编织条)效果基于当前图层像素生成编织条图案和纹理。可以用在各种设计中,如背景设计、图形设计、文字设计等。 ◆ ◆ ◆ 效果属性说明 Width 宽度 设置编织的宽度。 默认值为 50。值越大…...
李慕婉-仙逆-造相Z-Turbo AI核心原理科普:如何用Transformer理解并生成人类语言
李慕婉-仙逆-造相Z-Turbo AI核心原理科普:如何用Transformer理解并生成人类语言 你有没有想过,当你和“李慕婉-仙逆-造相Z-Turbo”这样的AI模型对话时,它到底是怎么“听懂”你的话,又“想”出那些回答的?它不像我们人…...
PySide6新手必看:从零开始用Python玩转Qt界面开发(附官方教程对比)
PySide6新手必看:从零开始用Python玩转Qt界面开发 在Python生态中,GUI开发一直是个让人又爱又恨的话题。当Tkinter显得过于简陋,而PyQt又面临商业授权困扰时,PySide6作为Qt官方推出的Python绑定,正成为越来越多开发者的…...
【QT】-- QT操作数据库
前言: Qt是C一个开发框架,具有跨平台特性。这篇是作者大二学习的时候做的笔记,有可能有错误,请各位批评指正。这篇记录QT操作数据库。欢迎大家收藏 关注,作者将会持续更新。 文章目录Qt 操作数据库QSqlDatabase数据库…...
深度解析:基于摄像头的远程生理监测工具箱rPPG-Toolbox实战指南
深度解析:基于摄像头的远程生理监测工具箱rPPG-Toolbox实战指南 【免费下载链接】rPPG-Toolbox rPPG-Toolbox: Deep Remote PPG Toolbox (NeurIPS 2023) 项目地址: https://gitcode.com/gh_mirrors/rp/rPPG-Toolbox 远程生理监测技术正在医疗健康领域引发革命…...
gotop扩展功能详解:NVIDIA GPU监控与远程数据采集终极指南
gotop扩展功能详解:NVIDIA GPU监控与远程数据采集终极指南 【免费下载链接】gotop A terminal based graphical activity monitor inspired by gtop and vtop 项目地址: https://gitcode.com/gh_mirrors/got/gotop gotop是一款功能强大的终端图形化系统监控工…...
四管升降压电路实战解析:从拓扑原理到模式切换(附波形对比)
1. 四管升降压电路为何成为工程师的"瑞士军刀" 第一次接触四管升降压电路时,我正被一个光伏储能项目折磨得焦头烂额。太阳能板的输出电压在8V-18V剧烈波动,而系统需要稳定的12V供电。传统方案要用两个独立电路串联,直到老工程师扔给…...
认知内耗:在亚马逊,为何品牌名内部的“关键词”正在相互厮杀
在亚马逊的品牌丛林中,最隐蔽的悲剧莫过于:你精心构思的品牌名,其内部的各个组成部分(如“欧文斯”、“康宁”、“玻璃纤维”),并未协同指向你,反而各自激活了消费者心智中其他更强大品牌的“认…...
智能转换驱动科研效率:DeTikZify重构学术图表自动化新范式
智能转换驱动科研效率:DeTikZify重构学术图表自动化新范式 【免费下载链接】DeTikZify Synthesizing Graphics Programs for Scientific Figures and Sketches with TikZ 项目地址: https://gitcode.com/gh_mirrors/de/DeTikZify 在科研成果可视化的关键环节…...
从Solid模块到轨迹规划:一个完整机械臂SimMechanics仿真项目的保姆级拆解
从Solid模块到轨迹规划:一个完整机械臂SimMechanics仿真项目的保姆级拆解 机械臂仿真一直是工业自动化和机器人研究中的核心课题。不同于传统Adams等专业仿真软件,SimMechanics凭借其与Matlab/Simulink的无缝集成,为工程师提供了从建模到控制…...
有线/无线(空口)抓包过程及其分析
一、如何判断该抓有线包,还是无线包层级问题类型抓包位置L1/L2(无线)连不上、掉线、弱信号无线抓包L2(有线)VLAN错误有线抓包L3(IP)DHCP失败有线抓包L4(传输)丢包、重传有…...
