当前位置: 首页 > news >正文

uwsgi部署多进程django apscheduler与问题排查

  • 💖 作者简介:大家好,我是Zeeland,开源建设者与全栈领域优质创作者。
  • 📝 CSDN主页:Zeeland🔥
  • 📣 我的博客:Zeeland
  • 📚 Github主页: Undertone0809 (Zeeland)
  • 🎉 支持我:点赞👍+收藏⭐️+留言📝
  • 💬介绍:The mixture of software dev+Iot+ml+anything🔥

简介

之前写django的apscheduler一直采用decorator的方式构建,因为业务的定时任务是定死的,没有产生什么其他的问题。最近定时任务需要做动态增减,进行定时任务的动态设置,因此传统的decorator写法行不通了,需要用scheduler.add_job()来添加job。

最开始以为只是一个简单的函数替换,没想到替换了一下出现了很多不一样的问题,故笔者打算记录一下其中发生的问题。

本文将介绍如何使用uwsgi部署django+apscheduler的系统,为了聚焦重点,本文不细说的uwsgi.ini等文件的配置过程,只介绍如何在django中编写构建一个apscheduler系统,并且部署在uwsgi服务器上。

背景介绍

上面已经简单介绍过了,下面介绍一下笔者原有的apscheduler构建方式。在项目根目录下构建了一个jobs/core_job.py的文件,在里面编写核心的任务调度,大致代码如下所示:

from typing import List, Dict, Anyfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django_apscheduler.jobstores import DjangoJobStore, register_jobfrom app import models
from commons.log_util import get_logger, enable_log
from services.db import db_service
from services.huasi import huasi_storagelogger = get_logger()
enable_log()# 创建后台调度器
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")@register_job(scheduler,"interval",seconds=60 * 120 * 1,id="store A device data",replace_existing=True,
)
def store_pingxiang_device_job():...logger.info("[store A device job] finish")@register_job(scheduler,"interval",seconds=60 * 60 * 1,id="store B device data",replace_existing=True,
)
def store_baiyun_device_job():...logger.info("[store B device job] finish")def start_scheduler():"""获取需要存储任务调度的项目,开启任务调度"""scheduler.start()logger.info("[django scheduler] start scheduler")def resume_scheduler():logger.info("[django scheduler] resume schedule")scheduler.resume()def stop_scheduler():logger.info("[django scheduler] pause schedule")scheduler.pause()

一开始,我把它放到子模块的apps.py ready()里面,如下所示:

from django.apps import AppConfigclass AppConfig(AppConfig):default_auto_field = "django.db.models.BigAutoField"name = "app"verbose_name = "xxx管理系统"def ready(self):from jobs.core_job import start_schedulerstart_scheduler()

后面我发现,这是一个很不明智的选择,因为在uwsgi中,如果你的processes不为1,即你要开启多进程模式时,ready()会被执行多次,而如果你的job是固定的,那么你就会出现下面这种问题,django apscheduler在初始化添加task到数据库时,发生pymysql.err.IntegrityError: (1062, "Duplicate entry 'store A device job' for key 'django_apscheduler_djangojob.PRIMARY'")这种问题。

根据提供的错误信息,问题是由于数据表 django_apscheduler_djangojob 中存在重复的条目导致的完整性错误(IntegrityError)。具体错误信息是:“Duplicate entry ‘store A device job’ for key ‘django_apscheduler_djangojob.PRIMARY’”,意思是在数据库中的 django_apscheduler_djangojob 表的主键上存在重复的条目,具体插入的条目是 ‘store A device job’。

要解决这个问题,你可以检查下你的代码中是否有重复插入相同数据的操作,或者手动清理数据库中的重复数据,确保每个条目在主键上是唯一的。你可以使用 SQL 命令来删除重复的条目,或者使用 Django ORM 提供的方法进行清理操作,具体取决于你的应用程序的需求和代码实现。

在数据库中,多次执行添加job会导致重复性问题,这也是为什么放到子模块的apps.py ready()不是一个明智的选择。

而事实上,如果你在uwsgi.ini中设置了多进程模式,那么放在项目目录下的settings.py中,也会产生同样的问题,因为两个进程的变量是独立的,而同时执行两个程序则会出现数据库字段重复的问题,从而产生报错。

解决多进程多次启动apscheduler的问题

方案1

要实现在多个worker中只运行一次apscheduler程序,可以使用分布式锁的概念。在这种方案中,只有一个worker能够获得锁并执行apscheduler程序,其他的worker会检测到锁已经被获取,并根据需要等待或跳过执行。

以下是一种可能的解决方案:

  1. 使用一个共享的可靠存储系统,如数据库或分布式缓存(例如Redis)来存储锁的状态。

  2. 在你的Django应用中,使用一个装饰器或中间件来实现锁的逻辑。这个装饰器/中间件的作用是在apscheduler程序执行之前检查锁的状态,并根据情况决定是否执行程序。

下面是一个简单的实现示例,使用Redis作为存储系统:

import redis
from functools import wrapsdef apscheduler_lock(func):@wraps(func)def wrapper(*args, **kwargs):lock_key = "apscheduler_lock"lock_value = "locked"# 连接到Redisredis_client = redis.Redis(host='localhost', port=6379)# 尝试获取锁acquired_lock = redis_client.set(lock_key, lock_value, nx=True, ex=60)if acquired_lock:# 如果成功获得了锁,执行apscheduler程序result = func(*args, **kwargs)# 执行完毕后释放锁redis_client.delete(lock_key)return resultelse:# 没有获得锁,跳过执行apscheduler程序return None  # 可以根据需要返回其他信息return wrapper

在你的apscheduler任务函数上应用这个装饰器,例如:

from apscheduler.schedulers.background import BackgroundScheduler@schedudler_lock
def my_apscheduler_task():# 执行你的apscheduler任务pass# 创建一个定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(my_apscheduler_task, 'interval', minutes=10)
scheduler.start()

这样,无论你启动多少个worker(也可以使用多个uwsgi进程),只有一个worker能够获取到锁并执行apscheduler程序,其他的worker会跳过执行。

请根据你的实际情况适配这个示例,并确保你的共享存储系统的配置和连接代码是正确的。

方案2

如果你觉得分布式锁比较麻烦,也可以参考django-apscheduler作者提供的方案,使用manage.py操作命令行的方式启动的apscheduler,这种方式比较自由,在多进程的django启动时候,再执行命令启动apscheduler,可以避免上述阐述的重复添加job的问题。

Add a custom Django management command to your project that schedules the APScheduler jobs and starts the scheduler:

# runapscheduler.py
import loggingfrom django.conf import settingsfrom apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import utillogger = logging.getLogger(__name__)def my_job():# Your job processing logic here...pass# The `close_old_connections` decorator ensures that database connections, that have become
# unusable or are obsolete, are closed before and after your job has run. You should use it
# to wrap any jobs that you schedule that access the Django database in any way. 
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):"""This job deletes APScheduler job execution entries older than `max_age` from the database.It helps to prevent the database from filling up with old historical records that are nolonger useful.:param max_age: The maximum length of time to retain historical job execution records.Defaults to 7 days."""DjangoJobExecution.objects.delete_old_job_executions(max_age)class Command(BaseCommand):help = "Runs APScheduler."def handle(self, *args, **options):scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)scheduler.add_jobstore(DjangoJobStore(), "default")scheduler.add_job(my_job,trigger=CronTrigger(second="*/10"),  # Every 10 secondsid="my_job",  # The `id` assigned to each job MUST be uniquemax_instances=1,replace_existing=True,)logger.info("Added job 'my_job'.")scheduler.add_job(delete_old_job_executions,trigger=CronTrigger(day_of_week="mon", hour="00", minute="00"),  # Midnight on Monday, before start of the next work week.id="delete_old_job_executions",max_instances=1,replace_existing=True,)logger.info("Added weekly job: 'delete_old_job_executions'.")try:logger.info("Starting scheduler...")scheduler.start()except KeyboardInterrupt:logger.info("Stopping scheduler...")scheduler.shutdown()logger.info("Scheduler shut down successfully!")

The management command defined above should be invoked via ./manage.py runapscheduler whenever the webserver serving your Django application is started. The details of how and where this should be done is implementation specific, and depends on which webserver you are using and how you are deploying your application to production. For most people this should involve configuring a supervisor process of sorts.

Register any APScheduler jobs as you would normally. Note that if you haven’t set DjangoJobStore as the ‘default’ job store, then you will need to include jobstore=‘djangojobstore’ in your scheduler.add_job() calls.

总结

本文总结了使用uwsgi部署多进程的django apscheduler可能会产生的问题,并提供了两种解决方案,如果你有更好的想法,欢迎在评论区一起讨论。

相关文章:

uwsgi部署多进程django apscheduler与问题排查

💖 作者简介:大家好,我是Zeeland,开源建设者与全栈领域优质创作者。📝 CSDN主页:Zeeland🔥📣 我的博客:Zeeland📚 Github主页: Undertone0809 (Zeeland)&…...

git difftool对比差异,避免推送不相关内容

问题 在利用git进行版本管理的时候,经常会由于对其他不相关的代码,做了一些小改动,例如删除了一个空行,多了一个缩进等。 为避免将这些不相关的改动也提交到远程,对PR造成不必要的影响,可以利用git diff命…...

Java设计模式:一、六大设计原则-05:接口隔离原则

文章目录 一、定义:接口隔离原则二、模拟场景:接口隔离原则三、违背方案:接口隔离原则3.1 工程结构3.2 英雄技能调用3.2.1 英雄技能接口3.2.2 英雄:后裔3.2.3 英雄:廉颇 3.3 单元测试 四、改善代码:接口隔离…...

第63步 深度学习图像识别:多分类建模误判病例分析(Tensorflow)

基于WIN10的64位系统演示 一、写在前面 上两期我们基于TensorFlow和Pytorch环境做了图像识别的多分类任务建模。这一期我们做误判病例分析,分两节介绍,分别基于TensorFlow和Pytorch环境的建模和分析。 本期以健康组、肺结核组、COVID-19组、细菌性&am…...

OpenCv读/写视频色差 方案

OpenCv read / write video color differenceOpenCv读/写视频色差 感谢博主: OpenCv读/写视频色差答案 - 爱码网 有没有办法让 OpenCV 使用正确的转换?? 是的,使用 GStreamer 后端而不是 FFmpeg 后端,颜色看起来很完…...

【传输层】网络基础 -- UDP协议 | TCP协议

再谈端口号端口号范围划分netstatpidof UDPUDP的特点面向数据报UDP的缓冲区 基于UDP的应用层协议 TCP认识TCP协议的报头理解封装解包理解可靠性TCP工作模式16位窗口大小6位标志位URGACKPSHRSTSYNFIN 再谈端口号 端口号(Port)标识了一个主机上进行通信的不同的应用程序 在TCP/I…...

Android开发之性能测试工具Profiler

前言 性能优化问题,在我们开发时都会遇到,但是在小厂和对自己要求不严格的情况下,我都很少去做性能优化; 在性能优化上,基本大家都是通过自己的开发经验和性能分析工具来发现问题,今天给大家分享一下小编最…...

SpringBoot初级开发--多环境配置的集成(9)

在Springboot的开发中,我们经常要切换各种各样的环境配置,比如现在是开发环境,然后又切换到生产环境,这个时候用多环境配置就是一个明智的选择。接下来我们沿用上一章的工程来配置多环境配置工程。 1.准备多环境配置文件 这里我…...

(数学) 剑指 Offer 39. 数组中出现次数超过一半的数字 ——【Leetcode每日一题】

❓ 剑指 Offer 39. 数组中出现次数超过一半的数字 难度:简单 数组中有一个数字出现的次数超过数组长度的一半,请找出这个数字。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示例 1: 输入: [1, 2, 3, 2, 2, 2, 5, 4, 2] 输…...

如何用PS把roughness贴图转换成Smoothness,并放入Metallic贴图的a通道。

1:用PS打开Roughness贴图 2:选择反相,装换成Smoothness贴图 3:新建一个大小相等的psd文件,或者打开Metallic贴图 4:如果没有金属度贴图,就把新建的图画成纯黑色 5:选择图层蒙版->…...

了解XSS攻击与CSRF攻击

什么是XSS攻击 XSS(Cross-Site Scripting,跨站脚本攻击)是一种常见的网络安全漏洞,它允许攻击者在受害者的浏览器上执行恶意脚本。这种攻击通常发生在 web 应用程序中,攻击者通过注入恶意脚本来利用用户对网站的信任&…...

安全测试-django防御安全策略

django安全性 django针对安全方面有一些处理,学习如何进行处理设置,也有利于学习安全测试知识。 CSRF 跨站点请求伪造(Cross-Site Request Forgery,CSRF)是一种网络攻击方式,攻击者欺骗用户在自己访问的网…...

7.react useReducer使用与常见问题

useReducer函数 1. useState的替代方案.接收一个(state, action)>newState的reducer, 并返回当前的state以及与其配套的dispatch方法2. 在某些场景下,useReducer会比useState更加适用,例如state逻辑较为复杂, 且**包含多个子值**,或者下一个state依赖于之前的state等清楚us…...

c#泛型(generic)

概述&#xff1a; C#中的泛型&#xff08;Generics&#xff09;是一种允许在编写类、方法和委托时使用参数化类型的机制。泛型允许我们编写更通用、可重用的代码&#xff0c;可以避免类型转换和重复编写类似的代码。 泛型的基本语法如下所示&#xff1a; class ClassName<…...

【力扣每日一题】2023.8.30 到家的最少跳跃次数

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们一只跳蚤&#xff0c;我们可以操控它前跳 a 格或是后跳 b 格&#xff0c;不能跳到小于0的位置&#xff0c;有一些被禁止的点不…...

精读《算法题 - 地下城游戏》

今天我们看一道 leetcode hard 难度题目&#xff1a;地下城游戏。 恶魔们抓住了公主并将她关在了地下城 dungeon 的 右下角 。地下城是由 m x n 个房间组成的二维网格。我们英勇的骑士最初被安置在 左上角 的房间里&#xff0c;他必须穿过地下城并通过对抗恶魔来拯救公主。 骑士…...

随记-Kibana Dev Tools,ES 增删改查 索引,Document

索引 创建索引 创建索引 PUT index_test创建索引 并 修改分片信息 # 创建索引 并 修改分片信息 PUT index_test2 { # 必须换行, PUT XXX 必须独占一行&#xff0c;类似的 其他请求也需要独占一行 "settings": {"number_of_shards": 1, # 主分片"…...

什么是架构,架构的本质是什么

不论是开发人员还是架构师&#xff0c;我们都一直在跟软件系统打交道&#xff0c;架构是在工作中出现最频繁的术语之一。那么&#xff0c;到底什么是架构&#xff1f;你可能有自己的答案&#xff0c;也有可能没有答案。对“架构”的理解需要我们不断在实践中思考、归纳、演绎&a…...

Python爬虫(十七)_糗事百科案例

糗事百科实例 爬取糗事百科段子&#xff0c;假设页面的URL是: http://www.qiushibaike.com/8hr/page/1 要求&#xff1a; 使用requests获取页面信息&#xff0c;用XPath/re做数据提取获取每个帖子里的用户头像连接、用户姓名、段子内容、点赞次数和评论次数保存到json文件内…...

Ae 效果:CC Threads

生成/CC Threads Generate/CC Threads CC Threads&#xff08;CC 编织条&#xff09;效果基于当前图层像素生成编织条图案和纹理。可以用在各种设计中&#xff0c;如背景设计、图形设计、文字设计等。 ◆ ◆ ◆ 效果属性说明 Width 宽度 设置编织的宽度。 默认值为 50。值越大…...

Kotlin 协程 - 多路复用 select()

一、概念 又叫选择表达式&#xff0c;是一个挂起函数&#xff0c;可以同时等待多个挂起结果&#xff0c;只取用最快恢复的那个值&#xff08;即多种方式获取数据&#xff0c;哪个更快返回结果就用哪个&#xff09;。 同时到达 select() 会优先选择先写子表达式&#xff0c;想随…...

学习笔记-ThreadLocal

ThreadLocal 什么是ThreadLocal&#xff1f; ThreadLocal 是线程本地变量类&#xff0c;在多线程并行执行过程中&#xff0c;将变量存储在ThreadLocal中&#xff0c;每个线程中都有独立的变量&#xff0c;因此不会出现线程安全问题。 应用举例 解决线程安全问题&#xff1a;例…...

python利用pandas统计分析—groupby()函数的使用

文章目录 一、groupby使用场景二、groupby基本原理三、groupby分组运算基础聚合操作&#xff1a;只能选择一种聚合操作agg 聚合操作&#xff1a;可以针对同列选择不同聚合方法transformapply 四、groupby分组后去重统计nunique()五、groupby分组后重命名列名rename()直接重新命…...

OPENCV实现ORB特征检测

# -*- coding:utf-8 -*- """ 作者:794919561 日期:2023/8/31 """ import cv2 import numpy as np# 读图像 img = cv2.imread(F:\\learnOpenCV\\openCVLearning\\pictures\\chess.jpg)...

W5100S-EVB-PICO主动PING主机IP检测连通性(十)

前言 上一章节我们用我们开发板在UDP组播模式下进行数据回环测试&#xff0c;本章我们用开发板去主动ping主机IP地址来检测与该主机之间网络的连通性。 什么是PING&#xff1f; PING是一种命令&#xff0c; 是用来探测主机到主机之间是否可通信&#xff0c;如果不能ping到某台…...

使用 Nginx 搭建文件下载服务器

文章目录 一、基础环境二、适用场景三、方法和步骤四、其他说明 版权声明&#xff1a;本文为CSDN博主「杨群」的原创文章&#xff0c;遵循 CC 4.0 BY-SA版权协议&#xff0c;于2023年8月27日首发于CSDN&#xff0c;转载请附上原文出处链接及本声明。 原文链接&#xff1a;http…...

链式栈StackT

C关键词&#xff1a;内部类/模板类/头插 C自学精简教程 目录(必读) C数据结构与算法实现&#xff08;目录&#xff09; 栈的内存结构 空栈&#xff1a; 有一个元素的栈&#xff1a; 多个元素的栈&#xff1a; 成员函数说明 0 clear 清空栈 clear 函数负责将栈的对内存释放…...

Fiddler中 AutoResponder 使用

Fiddler的 AutoResponder &#xff0c;即URL重定向功能非常强大。不管我们做URL重定向&#xff0c;还是做mock测试等&#xff0c;都可以通过该功能进行实践。 下面&#xff0c;小酋就来具体讲下该功能的用法。 Enable rules 启用规则Unmatched requests passthrough 没有匹配…...

77GHz线性调频连续波雷达

文章目录 前言 一、背景 二、优缺点 三、工作原理 四、电路模块设计 4.1.LFMCW信号源 4.2.发射电路 4.3.接收电路 4.4.信号处理器 五、应用 5.1.汽车测距 5.2.军事方面 5.3.气象方面 总结 前言 这篇文章是博主本科期间整理的关于77GHz线性调频连续波雷达的相关资料&#xff0c;…...

YOLOV8改进:更换为MPDIOU,实现有效涨点

1.该文章属于YOLOV5/YOLOV7/YOLOV8改进专栏,包含大量的改进方式,主要以2023年的最新文章和2022年的文章提出改进方式。 2.提供更加详细的改进方法,如将注意力机制添加到网络的不同位置,便于做实验,也可以当做论文的创新点。 2.涨点效果:更换为MPDIOU,实现有效涨点! 目录…...