python中实现定时任务的几种方案
目录
- while True: + sleep()
- Timeloop库
- threading.Timer
- sched模块
- schedule模块
- APScheduler框架
- Celery框架
- 数据流工具Apache Airflow
- 概述
- Airflow 核心概念
- Airflow 的架构
 
总结以下几种方案实现定时任务,可根据不同需求去使用不同方案。
while True: + sleep()
利用while True的死循环,加上 sleep()函数让其暂停一段时间,达到每隔一段时间执行特定任务的目的。
比较简单,例子如下:
import datetime
import timedef time_printer():now = datetime.datetime.now()ts = now.strftime('%Y-%m-%d %H:%M:%S')print('do func time :', ts)def loop_monitor():while True:time_printer()time.sleep(5)if __name__ == "__main__":loop_monitor()
主要缺点:
- 只能设定间隔,不能指定具体的时间
- sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。
Timeloop库
Timeloop是一个库,可用于运行多周期任务。
Timeloop内部维护了一个任务列表jobs,用来管理所有任务。
可以使用装饰器标记任务,这样就会把任务加到任务列表jobs中,使用start方法启动任务列表重的所有任务。
示例如下:
import time
from timeloop import Timeloop
from datetime import timedeltatl = Timeloop()@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():print("2s job current time : {}".format(time.ctime()))if __name__ == "__main__":tl.start(block=True)
运行后打印如下:
[2023-10-02 09:48:41,926] [timeloop] [INFO] Starting Timeloop..
[2023-10-02 09:48:41,926] [timeloop] [INFO] Registered job <function sample_job_every_2s at 0x7fc3d022d0d0>
[2023-10-02 09:48:41,926] [timeloop] [INFO] Timeloop now started. Jobs will run based on the interval set
2s job current time : Mon Oct  2 09:48:43 2023
2s job current time : Mon Oct  2 09:48:45 2023
2s job current time : Mon Oct  2 09:48:47 2023
同时Timeloop还有个stop方法,可以用来暂停所有任务。
threading.Timer
threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。
主要有如下方法:
| 方法 | 说明 | 
|---|---|
| Timer(interval, function, args=None, kwargs=None) | 创建定时器 | 
| cancel() | 取消定时器 | 
| start() | 使用线程方式执行 | 
| join(self, timeout=None) | 主线程等待线程执行结束 | 
示例:
import datetimefrom threading import Timerdef time_printer():now = datetime.datetime.now()ts = now.strftime('%Y-%m-%d %H:%M:%S')print('do func time :', ts)# 注意 Timer 只能执行一次,这里需要循环调用,否则只能执行一次loop_monitor()def loop_monitor():t = Timer(5, time_printer)t.start()if __name__ == "__main__":loop_monitor()
sched模块
sched模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。
class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time),delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。
import datetime
import time
import scheddef time_printer():now = datetime.datetime.now()ts = now.strftime('%Y-%m-%d %H:%M:%S')print('do func time :', ts)loop_monitor()def loop_monitor():s = sched.scheduler(time.time, time.sleep)  # 生成调度器s.enter(5, 1, time_printer, ())s.run()if __name__ == "__main__":loop_monitor()
scheduler对象主要方法:
enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。
 cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError。
 run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。
 比threading.Timer更好,不需要循环调用。
schedule模块
schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。
示例:
import schedule
import timedef job():print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)while True:schedule.run_pending()time.sleep(1)
也可以通过 @repeat() 装饰静态方法:
import time
from schedule import every, repeat, run_pending@repeat(every().second)
def job():print('working...')while True:run_pending()time.sleep(1)
传递参数:
import scheduledef greet(name):print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')while True:schedule.run_pending()
装饰器同样能传递参数:
from schedule import every, repeat, run_pending@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):print('Hello', planet)while True:run_pending()
取消任务:
import schedulei = 0
def some_task():global ii += 1print(i)if i == 10:schedule.cancel_job(job)print('cancel job')exit(0)
job = schedule.every().second.do(some_task)while True:schedule.run_pending()
运行一次任务:
import time
import scheduledef job_that_executes_once():print('Hello')return schedule.CancelJobschedule.every().minute.at(':34').do(job_that_executes_once)
while True:schedule.run_pending()time.sleep(1)
根据标签检索任务:
# 检索所有任务:schedule.get_jobs()
import scheduledef greet(name):print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')friends = schedule.get_jobs('friend')
print(friends)
根据标签取消任务:
# 取消所有任务:schedule.clear()
import scheduledef greet(name):print('Hello {}'.format(name))if name == 'Cancel':schedule.clear('second-tasks')print('cancel second-tasks')schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')while True:schedule.run_pending()
运行任务到某时间:
import schedule
from datetime import datetime, timedelta, timedef job():print('working...')schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止while True:schedule.run_pending()
马上运行所有任务(主要用于测试):
import scheduledef job():print('working...')def job1():print('Hello...')schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任务间延迟3秒
并行运行:使用 Python 内置队列实现:
import threading
import time
import scheduledef job1():print("I'm running on thread %s" % threading.current_thread())def job2():print("I'm running on thread %s" % threading.current_thread())def job3():print("I'm running on thread %s" % threading.current_thread())def run_threaded(job_func):job_thread = threading.Thread(target=job_func)job_thread.start()schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)while True:schedule.run_pending()time.sleep(1)
APScheduler框架
APScheduler(advanceded python scheduler)基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个Python定时任务系统。
具体使用参考文章:APScheduler框架使用
Celery框架
Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度。Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。
Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。通常使用它来实现异步任务(async task)和定时任务(crontab)。异步任务比如是发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作 ,定时任务是需要在特定时间执行的任务。
具体使用参考:
Celery使用:优秀的python异步任务框架
 
 Django(21):使用Celery任务框架
 
数据流工具Apache Airflow
概述
Apache Airflow 是Airbnb开源的一款数据流程工具,目前是Apache孵化项目。以非常灵活的方式来支持数据的ETL过程,同时还支持非常多的插件来完成诸如HDFS监控、邮件通知等功能。Airflow支持单机和分布式两种模式,支持Master-Slave模式,支持Mesos等资源调度,有非常好的扩展性。被大量公司采用。
Airflow使用Python开发,它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。比如,如下的工作流中,任务T1执行完成,T2和T3才能开始执行,T2和T3都执行完成,T4才能开始执行。

 Airflow提供了各种Operator实现,可以完成各种任务实现:
- BashOperator – 执行 bash 命令或脚本。
- SSHOperator – 执行远程 bash 命令或脚本(原理同 paramiko 模块)。
- PythonOperator – 执行 Python 函数。
- EmailOperator – 发送 Email。
- HTTPOperator – 发送一个 HTTP 请求。
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行 SQL 任务。
- DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…
除了以上这些 Operators 还可以方便的自定义 Operators 满足个性化的任务需求。
一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支。如:

这种需求可以使用BranchPythonOperator来实现。
Airflow 核心概念
- DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
- Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
- Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
- Task Instance:task的一次运行。Web 界面中可以看到task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
- Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task2了。通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow)。
Airflow 的架构
在一个可扩展的生产环境中,Airflow 含有以下组件:
- 元数据库:这个数据库存储有关任务状态的信息。
- 调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。
- 执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。
- Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。

 Worker的具体实现由配置文件中的executor来指定,airflow支持多种Executor:
- SequentialExecutor: 单进程顺序执行,一般只用来测试
- LocalExecutor: 本地多进程执行
- CeleryExecutor: 使用Celery进行分布式任务调度
- DaskExecutor:使用Dask进行分布式任务调度
- KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务
生产环境一般使用CeleryExecutor和KubernetesExecutor。
使用CeleryExecutor的架构如图:

 使用KubernetesExecutor的架构如图:

参考:
https://mp.weixin.qq.com/s/dzA9xGoho50WK_-80hzelg
https://zhuanlan.zhihu.com/p/448847300
相关文章:
 
python中实现定时任务的几种方案
目录 while True: sleep()Timeloop库threading.Timersched模块schedule模块APScheduler框架Celery框架数据流工具Apache Airflow概述Airflow 核心概念Airflow 的架构 总结以下几种方案实现定时任务,可根据不同需求去使用不同方案。 while True: sleep() 利用whil…...
 
AcWing算法提高课-5.6.1同余方程
宣传一下 算法提高课整理 CSDN个人主页:更好的阅读体验 原题链接 题目描述 求关于 x x x 的同余方程 a x ≡ 1 ( m o d b ) ax ≡ 1 \pmod b ax≡1(modb) 的最小正整数解。 输入格式 输入只有一行,包含两个正整数 a , b a,b a,b,用一…...
 
Docker Tutorial
什么是Docker 为每个应用提供完全隔离的运行环境 Dockerfile, Image,Container Image: 相当于虚拟机的快照(snapshot)里面包含了我们需要部署的应用程序以及替它所关联的所有库。通过image,我们可以创建很…...
 
平面图—简单应用
平面图:若一个图𝐺能画在平面𝑆上,且使𝐺的边仅在端点处相交,则称图𝐺为可嵌入平面𝑆,𝐺称为可平面图,简称为平面图。 欧拉公式:设有…...
 
安装JDK(Java SE Development Kit)超详细教程
文章时间 : 2023-10-04 1. 下载地址 直接去下载地址:https://www.oracle.com/java/technologies/downloads/ (需要翻墙,不想翻墙或者不想注册oracel账号的,直接去我的阿里云盘) 阿里云盘:http…...
 
KUKA机器人通过3点法设置工作台基坐标系的具体方法
KUKA机器人通过3点法设置工作台基坐标系的具体方法 具体方法和步骤可参考以下内容: 进入主菜单界面,依次选择“投入运行”—“测量”—基坐标,选择“3点法”, 在系统弹出的基坐标编辑界面,给基座标编号为3,命名为table1,然后单击“继续”按钮,进行下一步操作, 在弹出的…...
 
以太网的MAC层
以太网的MAC层 一、硬件地址  局域网中,硬件地址又称物理地址或MAC地址(因为用在MAC帧),它是局域网上每一台计算机中固化在适配器的ROM中的地址。  关于地址问题,有这样的定义:“名字指出我们所要寻…...
 
Hadoop启动后jps发现没有DateNode解决办法
多次使用 Hadoop namenode -format 格式化节点后DateNode丢失 找到hadoop配置文件core-site.xml查找tmp路径 进入该路径,使用rm -rf data删除data文件 再次使用Hadoop namenode -format 格式化后jps后出现DateNode节点...
 
VUE3照本宣科——应用实例API与setup
VUE3照本宣科——应用实例API与setup 前言一、应用实例API1.createApp()2.app.use()3.app.mount() 二、setup 前言 👨💻👨🌾📝记录学习成果,以便温故而知新 “VUE3照本宣科”是指照着中文官网和菜鸟教…...
json/js对象的key有什么区别?
1.对于JS对象来说 一个js对象如果是这样的 obj {"0": "小明","0name": "小明明", "": 18,"¥": "哈哈"," ": "爱好广泛" }对于js对象来说,有时候key是不…...
 
极大似然估计概念的理解——统计学习方法
目录 1.最大似然估计的概念的理解1 2.最大似然估计的概念的理解2 3.最大似然估计的概念的理解3 4.例子 1.最大似然估计的概念的理解1 最大似然估计是一种概率论在统计学上的概念,是参数估计的一种方法。给定观测数据来评估模型参数。也就是模型已知,参…...
 
python模拟表格任意输入位置
在表格里输入数值,要任意位置,我找到了好方法: input输入 1. 行 2. 列输入:1 excel每行输入文字input输入位置 3.2 表示输入位置在:3行个列是要实现一个类似于 Excel 表格的输入功能,并且希望能够指定输入…...
 
如何限制文件只能通过USB打印机打印,限制打印次数和时限并且无法在打印前查看或编辑内容
在今天这个高度信息化的时代,文档打印已经成为日常工作中不可或缺的一部分。然而,这也带来了诸多安全风险,如文档被篡改、知识产权被侵犯以及信息泄露等。为了解决这些问题,只印应运而生。作为一款独特的软件工具,只印…...
 
车牌文本检测与识别:License Plate Recognition Based On Multi-Angle View Model
论文作者:Dat Tran-Anh,Khanh Linh Tran,Hoai-Nam Vu 作者单位:Thuyloi University;Posts and Telecommunications Institute of Technology 论文链接:http://arxiv.org/abs/2309.12972v1 内容简介: 1)方向&#x…...
Blender中的4种视图着色模式
Blender中有四种主要的视图着色模式:线框、实体、Look Dev和渲染。它们的主要区别如下: - 线框模式只显示物体的边缘(线框),可以让您看到场景中的所有物体,也可以调整线框的颜色和背景的颜色。 - 实…...
 
Flutter项目安装到Android手机一直显示在assembledebug
问题 Flutter项目安装到Android手机一直显示在assembledebug 原因 网络不好,gradle依赖下载不下来 解决方案 修改如下的文件 gradle-wrapper.properties 使用腾讯提供的gradle镜像下载 distributionUrlhttps://mirrors.cloud.tencent.com/gradle/gradle-7.5…...
数据挖掘实验(二)数据预处理【等深分箱与等宽分箱】
一、分箱平滑的原理 (1)分箱方法 在分箱前,一定要先排序数据,再将它们分到等深(等宽)的箱中。 常见的有两种分箱方法:等深分箱和等宽分箱。 等深分箱:按记录数进行分箱࿰…...
Vue2 第一次学习
本章为超级浓缩版,文章过于短,方便复习使用哦~ 文章目录 1. 简单引入 vue.js2. 指令2.1 事件绑定指令 v-on (简写 )2.2 内容渲染指令2.3 双向绑定指令 v-model2.4 属性绑定指令 v-bind (简写 : )2.5 条件渲染指令2.6 循环指令 v-for 3. vue 其他知识3.1 侦听器 watch3.2 计算属…...
 
tiny模式基本原理整合
【Tiny模式】的基本构成 M【首头在首位】 U【/】 V【HTTP/】 Host H【真实ip】 XH \r回车 \n换行 \t制表 \ 空格 一个基本的模式构成 [method] [uri] [version]\r\nHost: [host]\r\n[method] [uri] [version]\r\nHost: [host]\r\n 检测顺序 http M H XH 有些地区 XH H M 我这边…...
使用聚氨酯密封件的好处?
聚氨酯密封件因其优异的耐用性、灵活性和广泛的应用范围而在各个行业中广受欢迎。在本文中,我们将探讨使用聚氨酯密封件的优点,阐明其在许多不同领域广泛使用背后的原因。 1、高性能: 聚氨酯密封件具有出色的性能特征,使其成为各…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
 
19c补丁后oracle属主变化,导致不能识别磁盘组
补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
 
《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...
 
linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
 
页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?
现有的 Redis 分布式锁库(如 Redisson)相比于开发者自己基于 Redis 命令(如 SETNX, EXPIRE, DEL)手动实现分布式锁,提供了巨大的便利性和健壮性。主要体现在以下几个方面: 原子性保证 (Atomicity)ÿ…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...
 
云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
