当前位置: 首页 > article >正文

【最后203篇系列】007 使用APS搭建本地定时任务

说明

最大的好处是方便。

其实所有任务的源头,应该都是通过定时的方式,在每个时隙发起轮询。当然在任务的后续传递中,可以通过CallBack或者WebHook的方式,以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。

总结一下:源头是定时轮询,中间过程是事件传递。

本次使用APS搭建本地定时任务的目的是为了简化实验性质的定时任务,通过在git项目下进行编辑任务脚本和执行任务清单,而运行容器本身会周期性的自动拉取代码,然后按照任务清单执行。

执行过程采用多线程方式,任务的负载通常都不高。整体设计上,复杂和繁重的任务会包在微服务中,定时任务主要是向这些微服务发起触发动作。通常,微服务收到触发元信息后进行自动的任务/数据拉取处理,处理完毕后通过webhook将结果持久化,或进一步发起其他的触发动作。

另外,具有共性的任务将会被提取出来,之后会交给celery以分布&协程方式执行,这些任务包括:

  • 1 数据库IO。例如从队列里取数,存到数据库中。
  • 2 网络数据获取IO。爬取网页、或者通过接口,获取数据。
  • 3 接口化标准操作。按url, json input这样的标准web请求,这种灵活性很强。表面上是一个IO动作,但背后可能触发密集计算,但是又不需要celery集群承担。(可能是ray集群、dask集群、基于显卡计算的集群)

内容

1 读取任务列表

主要为了简单的读入任务(脚本),同时可以方便的进行注释

# 用于将代表任务列表的数据读入
# 去掉换行和空格
# 如果以# 号开头表示注释
def read_all_lines_clean(fpath):with open(fpath, 'r') as f:lines = f.readlines()lines1 = [x.replace('\n','').strip() for x in lines]lines2 = [x for x in lines1 if len(x) and not x.startswith('#')]return lines2

任务文件如下task_list.txt

task_01_probably_git_pull.py
task_02_del_event_null_recs.py
# task_03_sync_xs_backup.py
#task_04_rotate_data.py
# task_05_sync_milvus.py
#task_06_rotate_mysql_time.py

读入后

In [4]: a = read_all_lines_clean('task_list.txt')In [5]: a
Out[5]: ['task_01_probably_git_pull.py', 'task_02_del_event_null_recs.py']

这些就是之后要定时调度的任务

2 并行执行

为了使得每一次定时任务都可以执行,且保证效率,需要用一些简单的调度(容错问题均在脚本内解决)。调度器可以保证每30秒起来一次。

线程的并行执行:

def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):tasks = read_all_lines_clean(project_folder + task_list_file)dedup_tasks = remove_duplicates_preserve_order(tasks)pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)

每一次执行os_system_python

import subprocessdef os_system_python(some_path=None, timeout=30):try:result = subprocess.run(['python3', some_path], timeout=timeout)return resultexcept subprocess.TimeoutExpired:print(f"Task {some_path} timed out after {timeout} seconds.")return None'''
代码说明
subprocess.run:这是 subprocess 模块的高级 API,用于运行命令并等待其完成。它支持 timeout 参数,如果命令在指定时间内未完成,会抛出 TimeoutExpired 异常。timeout 参数:你设置了默认超时时间为 30 秒,这是一个合理的默认值。如果任务在 30 秒内未完成,subprocess.run 会抛出 TimeoutExpired 异常。异常处理:捕获 TimeoutExpired 异常后,打印超时信息并返回 None。这样可以避免程序因超时而崩溃,同时提供清晰的日志信息。
'''

3 自动更新

更新git项目,作为一个任务脚本被周期执行。由于代码更新并不是高频事件,所以一般概率上保证5分钟会更新一次代码。

(base) root@76a14afa199b:/workspace/local_aps_v2/base# python3 task_01_probably_git_pull.py
2000-01-01 08:00:00
2000-01-01 08:00:00
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
task_01_probably_git_pull running
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
Git pull executed successfully for branch 'master':
Already up to date.2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
(base) root@76a14afa199b:/workspace/local_aps_v2/base#

4 定时调度

调度器在每分钟的0/30秒执行,我把30秒定为一拍(pace),一分钟定位一时隙(slot)。绝大部分任务都应该在30秒内完成。

# 执行本地脚本
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerfrom base_config import base_config
from Basefuncs import * 
def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):tasks = read_all_lines_clean(project_folder + task_list_file)dedup_tasks = remove_duplicates_preserve_order(tasks)pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py >/dev/null 2>&1 &if __name__ == '__main__':# 创建调度器sche1 = BlockingScheduler()# 添加任务,使用 cron 表达式每分钟的第 0 秒和第 30 秒执行sche1.add_job(exe_tasks_threads,'cron',second='0,30',  # 每分钟的第 0 秒和第 30 秒kwargs={},coalesce=True,max_instances=1)print('[S] Starting scheduler with cron (0s and 30s of every minute)...')try:sche1.start()  # 启动调度器except (KeyboardInterrupt, SystemExit):print('[S] Scheduler stopped.')

5 Docker运行

为了保证执行的稳定性,使用docker执行

docker run -d --name=local_aps_v2 \--restart=always \-v /etc/localtime:/etc/localtime -v /etc/timezone:/etc/timezone -v /etc/hostname:/etc/hostname -e "LANG=C.UTF-8" \-w /workspace/local_aps_v2/base \YOURIMAGE  \sh -c "git pull && python3 aps.py"

只有环境改变时才需要修改镜像重发布,大部分时候只要调试和修改代码,然后推送就可以了。

相关文章:

【最后203篇系列】007 使用APS搭建本地定时任务

说明 最大的好处是方便。 其实所有任务的源头,应该都是通过定时的方式,在每个时隙发起轮询。当然在任务的后续传递中,可以通过CallBack或者WebHook的方式,以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。 总结…...

Mono里运行C#脚本36—加载C#类定义的成员变量和方法的数量

前面分析了加载类和基类的基本过程, 接着来分析一下加载成员变量和方法的数量。 因为我们知道C#语言定义一个类,主要就是定义成员变量,以及那些对此成员变量进行操作的方法, 所以需要使用一种方法来描述C#语言定义类的能力。 一般情况下,主要有两种类型: 普通的类,比如前…...

JavaScript函数中this的指向

总结:谁调用我,我就指向谁(es6箭头函数不算) 一、ES6之前 每一个函数内部都有一个关键字是 this ,可以直接使用 重点: 函数内部的 this 只和函数的调用方式有关系,和函数的定义方式没有关系 …...

51单片机入门_01_单片机(MCU)概述(使用STC89C52芯片;使用到的硬件及课程安排)

文章目录 1. 什么是单片机1.1 微型计算机的组成1.2 微型计算机的应用形态1.3 单板微型计算机1.4 单片机(MCU)1.4.1 单片机内部结构1.4.2 单片机应用系统的组成 1.5 80C51单片机系列1.5.1 STC公司的51单片机1.5.1 STC公司单片机的命名规则 2. 单片机的特点及应用领域2.1 单片机的…...

k均值聚类将数据分成多个簇

K-Means 聚类并将数据分成多个簇,可以使用以下方法: 实现思路 随机初始化 K 个聚类中心计算每个点到聚类中心的距离将点分配到最近的簇更新聚类中心重复上述过程直到收敛 完整代码: import torch import matplotlib.pyplot as pltdef kme…...

【高内聚】设计模式是如何让软件更好做到高内聚的?

高内聚(High Cohesion)是指模块内部的元素紧密协作,共同完成一个明确且相对独立的功能。就像高效的小团队,成员们目标一致,相互配合默契。 低耦合(Loose Coupling)是指模块之间的依赖较少&#…...

51单片机入门_02_C语言基础0102

C语言基础部分可以参考我之前写的专栏C语言基础入门48篇 以及《从入门到就业C全栈班》中的C语言部分,本篇将会结合51单片机讲差异部分。 课程主要按照以下目录进行介绍。 文章目录 1. 进制转换2. C语言简介3. C语言中基本数据类型4. 标识符与关键字5. 变量与常量6.…...

时间轮:XXL-JOB 高效、精准定时任务调度实现思路分析

大家好,我是此林。 定时任务是我们项目中经常会遇到的一个场景。那么如果让我们手动来实现一个定时任务框架,我们会怎么做呢? 1. 基础实现:简单的线程池时间轮询 最直接的方式是创建一个定时任务线程池,用户每提交一…...

人工智能如何驱动SEO关键词优化策略的转型与效果提升

内容概要 随着数字化时代的到来,人工智能(AI)技术对各行各业的影响日益显著,在搜索引擎优化(SEO)领域尤为如此。AI的应用不仅改变了关键词研究的方法,而且提升了内容生成和搜索优化的效率&…...

CTF从入门到精通

文章目录 背景知识CTF赛制 背景知识 CTF赛制 1.web安全:通过浏览器访问题目服务器上的网站,寻找网站漏洞(sql注入,xss(钓鱼链接),文件上传,包含漏洞,xxe,ssrf,命令执行&#xff0c…...

【NLP251】NLP RNN 系列网络

NLP251 系列主要记录从NLP基础网络结构到知识图谱的学习 1.原理及网络结构 1.1RNN 在Yoshua Bengio论文中( http://proceedings.mlr.press/v28/pascanu13.pdf )证明了梯度求导的一部分环节是一个指数模型…...

【越学学糊涂的Linux系统】Linux指令篇(二)

一、pwd指令: 00x0:打印该用户当前目录下所属的文件路径 看指令框可以看出我用的是一个叫sw的用户,我们的路径就是在一个home目录下的sw目录下的class113文件路径。 也可以说是指出当前所处的工作目录 补充:🎆​​​​​​​Wi…...

【AI论文】Omni-RGPT:通过标记令牌统一图像和视频的区域级理解

摘要:我们提出了Omni-RGPT,这是一个多模态大型语言模型,旨在促进图像和视频的区域级理解。为了在时空维度上实现一致的区域表示,我们引入了Token Mark,这是一组在视觉特征空间中突出目标区域的标记。这些标记通过使用区…...

Java面试题2025-并发编程基础(多线程、锁、阻塞队列)

并发编程 一、线程的基础概念 一、基础概念 1.1 进程与线程A 什么是进程? 进程是指运行中的程序。 比如我们使用钉钉,浏览器,需要启动这个程序,操作系统会给这个程序分配一定的资源(占用内存资源)。 …...

Three城市引擎地图插件Geo-3d

一、简介 基于Three开发,为Three 3D场景提供GIS能力和城市底座渲染能力。支持Web墨卡托、WGS84、GCJ02等坐标系,支持坐标转换,支持影像、地形、geojson建筑、道路,植被等渲染。支持自定义主题。 二、效果 三、代码 //插件初始化…...

【linux】Linux 常见目录特性、权限和功能

目录特性默认权限主要功能/用途/根目录,所有目录的起点755文件系统的顶层目录,包含所有其他子目录和文件/bin基础二进制命令目录(系统启动和修复必需的命令)755存放所有用户可用的基本命令(如 ls, cp, bash 等&#xf…...

MySQL的复制

一、概述 1.复制解决的问题是让一台服务器的数据与其他服务器保持同步,即主库的数据可以同步到多台备库上,备库也可以配置成另外一台服务器的主库。这种操作一般不会增加主库的开销,主要是启用二进制日志带来的开销。 2.两种复制方式&#xf…...

【后端开发】字节跳动青训营Cloudwego脚手架

Cloudwego脚手架使用 cwgo脚手架 cwgo脚手架 安装的命令: GOPROXYhttps://goproxy.cn/,direct go install github.com/cloudwego/cwgolatest依赖thriftgo的安装: go install github.com/cloudwego/thriftgolatest编辑echo.thrift文件用于生成项目&…...

ArcGIS10.2 许可License点击始终启动无响应的解决办法及正常启动的前提

1、问题描述 在ArcGIS License Administrator中,手动点击“启动”无响应;且在计算机管理-服务中,无ArcGIS License 或者License的启动、停止、禁止等均为灰色,无法操作。 2、解决方法 ①通过cmd对service.txt进行手动服务的启动…...

Cyber Security 101-Build Your Cyber Security Career-Security Principles(安全原则)

了解安全三元组以及常见的安全模型和原则。 任务1:介绍 安全已成为一个流行词;每家公司都想声称其产品或服务是安全的。但事实真的如此吗? 在我们开始讨论不同的安全原则之前,了解我们正在保护资产的对手至关重要。您是否试图阻止蹒跚学步…...

NLP模型大对比:Transformer > RNN > n-gram

结论 Transformer 大于 RNN 大于 传统的n-gram n-gram VS Transformer 我们可以用一个 图书馆查询 的类比来解释它们的差异: 一、核心差异对比 维度n-gram 模型Transformer工作方式固定窗口的"近视观察员"全局关联的"侦探"依赖距离只能看前…...

【Rust自学】14.5. cargo工作空间(Workspace)

喜欢的话别忘了点赞、收藏加关注哦,对接下来的教程有兴趣的可以关注专栏。谢谢喵!(・ω・) 14.4.1. 为什么需要cargo workspace 假如说我们构建了一个二进制crate,里面既有library又有库。随着项目规模不断增长&#…...

[权限提升] Windows 提权 — 系统内核溢出漏洞提权

关注这个框架的其他相关笔记:[内网安全] 内网渗透 - 学习手册-CSDN博客 0x01:系统内核溢出漏洞提权介绍 注意:提权很容易让电脑蓝屏,所以如果是测试的话,提权前最好做好系统备份。 溢出漏洞就像是往杯子里装水 —— 如…...

手机端语音转文字的实用选择

今天推荐两款配合使用的软件:MultiTTS 和 T2S,它们可以在安卓设备上实现文字转语音功能。 第一款:MultiTTS(安卓) MultiTTS 是一款离线文本转语音工具,完全免费,提供多种语音风格,…...

四.3 Redis 五大数据类型/结构的详细说明/详细使用( hash 哈希表数据类型详解和使用)

四.3 Redis 五大数据类型/结构的详细说明/详细使用( hash 哈希表数据类型详解和使用) 文章目录 四.3 Redis 五大数据类型/结构的详细说明/详细使用( hash 哈希表数据类型详解和使用)2.hash 哈希表常用指令(详细讲解说明)2.1 hset …...

无心剑七绝《经纬岁华》

七绝经纬岁华 经天伟业梦初耕 纬地深沉志纵横 岁去年来添锦绣 华章曼妙筑新城 2025年1月29日 平水韵八庚平韵 无心剑七绝《经纬岁华》以“经纬岁华”为藏头,歌颂了泸州职业技术学院(川南经纬学堂)百余年的光辉历程。诗中“经天伟业梦初耕&…...

大数据治理实战:架构、方法与最佳实践

📝个人主页🌹:一ge科研小菜鸡-CSDN博客 🌹🌹期待您的关注 🌹🌹 1. 引言 大数据治理是确保数据质量、合规性和安全性的重要手段,尤其在数据驱动决策和人工智能应用日益普及的背景下&…...

基于AnolisOS 8.6安装GmSSL 3.1.1及easy_gmssl库测试国密算法

测试环境 Virtual Box,AnolisOS-8.6-x86_64-minimal.iso,4 vCPU, 8G RAM, 60 vDisk。最小化安装。需联网。 系统环境 关闭防火墙 systemctl stop firewalld systemctl disable firewalld systemctl status firewalld selinux关闭 cat /etc/selinux/co…...

区块链在能源行业的创新

技术创新 1. 智能合约与自动化交易 智能合约是区块链技术的核心组件之一,它允许在没有中介的情况下自动执行合同条款。在能源行业,这可以用于自动化电力交易、支付流程以及管理复杂的供应链。例如,当太阳能板产生的电量达到预设值时&#x…...

C基础寒假练习(1)

一、求二维数组只中元并输出行标和列标(以二行三列为例)元素的最大值&#xff0c; #include <stdio.h>int main() {// 初始化二维数组int array[2][3] {{1, 2, 3},{4, 5, 6}};// 定义变量来存储最大值及其位置int max_value array[0][0];int max_row 0;int max_col 0…...