airflow源码分析-任务调度器实现分析
Airflow源码分析-任务调度器实现分析
概述
本文介绍Airflow执行器的总体实现流程。通过函数调用的方式说明了Airflow scheduler的实现原理,对整个调度过程的源码进行了分析。
通过本文,可以基本把握住Airflow的调度器的运行原理主线。
启动调度器
可以通过命令来启动调度器:
airflow scheduler
启动airflow的调度器后的总执行流程如下:
- 执行命令后会调用
scheduler_command.py#scheduler(args)函数。 - scheduler(args)函数会打开一个日志文件,然后调用
_run_scheduler_job(args)函数,该函数会:创建一个SchedulerJob对象,把DAG文件的目录传入,启动后就开始解析DAG文件目录中的DAG文件(*.py)文件。 - 然后开始执行
SchedulerJob#run()这个函数,启动job侧。该函数在其父类BaseJob中定义。 BaseJob#run()函数调用其实现类的_execute()函数,这里就是调用SchedulerJob#_execute函数来启动调度服务。
调度器的实现分析
SchedulerJob#_execute函数
-
打印启动调度器的日志:可以在调度器服务的日志文件中看到以下日志:
Starting the scheduler。 -
如果不是独立模式进程,则创建一个
DagFileProcessorAgent对象,用于读取DAG文件。并把该对象保存到:SchedulerJob#processor_agent成员变量中。 -
初始化执行器,并设置执行器的回调函数。创建callback_sink对象,它可能是PipeCallbackSink(DagFileProcessorAgent是非独立模式)或DatabaseCallbackSink(DagFileProcessorAgent是独立模式);创建完成后,对象保存到BaseExecutor#callback_sink变量中(代码中实现的是SchedulerJob#executor函数的返回值)。
-
调用: self.executor.start()函数来启动执行器executor:调用
SchedulerJob#executor.start() -
调用self.register_signals()函数来:注册信号处理程序,以便在需要时可以停止儿子进程。
-
启动DagFileProcessorAgent服务:
SchedulerJob.processor_agent.start() -
调用:
SchedulerJob#_run_scheduler_loop,进入调度服务循环,根据DAG的调度计划,执行DAG中的任务。 -
DAG处理完成后,如果是非独立模式,则停止DagFileProcessorAgent对象,并检查所有文件是否都已经处x理。如果所有文件都已处理,则停用未被调度器触及的DAG。
-
最后,执行器结束工作,关闭处理DAG文件的代理对象和回调函数,并移除会话对象。
调度服务主循环: SchedulerJob#_run_scheduler_loop
-
通过
if not self.processor_agent来检查DAG文件处理服务是否已经启动 -
创建一个定时器调度对象timers:
timers = EventScheduler(),该对象会定时执行定义的函数。用于周期性地运行一些任务。 -
向该timer对象中注册一些回调函数,来定时进行一些任务。这些任务包括检查孤立的任务、检查触发器超时、更新池指标、查找僵尸任务等。
-
通过一个无限循环来不断地进行调度,直到达到指定的运行次数或达到了DAG解析次数的上限。在每次循环中,它会执行以下步骤:
a. 进入调度循环,若使用sqlite,则运行一个单独的的DAG文件解析进程:
processor_agent.run_single_parsing_loop()。b. 调用
SchedulerJob#_do_scheduling进入实际的调度实现代码中。并返回排队的任务数量。c. 启动executor的心跳处理进程:
self.executor.heartbeat()d. 启动executor的事件处理器:
SchedulerJob#_process_executor_eventse. 启动DagFileProcessorAgent的心跳服务:
self.processor_agent.heartbeat()f. 运行定时任务;
-
如果没有工作需要执行,则等待一段时间。
最后,如果达到了指定的运行次数或达到了DAG解析次数的上限,则退出循环。如果使用DagFileProcessorAgent,则在达到解析次数上限时也会退出循环。
调度实现函数:SchedulerJob#_do_scheduling
- 调用prohibit_commit的函数来返回一个上下文管理器:
CommitProhibitorGuard。该上下文管理器可以防止在其作用域之外通过会话对象提交事务,从而严格控制事务的生命周期,以确保在核心调度器循环中的严格控制。如果在上下文管理器之外通过会话对象提交事务,将引发RuntimeError异常。 - 调用
SchedulerJob#_create_dagruns_for_dags函数来根据DagModel中的next_dagrun_create_after列创建任何必要的DagRun。默认情况下,只选择10个DAG,可以通scheduler.max_dagruns_to_create_per_loop设置进行配置。 - 调用函数:
SchedulerJob#_start_queued_dagruns:在DagRuns集合中对象中查找“下n个最老的”正在运行的DAGRun进行调度(默认n = 20,可以通过“scheduler.max_dagruns_per_loop_to_schedule”进行配置),并尝试进度状态(将TIs调度为SCHEDULED,或将DagRuns调度为SUCCESS / FAILURE等)。 - 调用
SchedulerJob#_get_next_dagruns_to_examine检查dagrun的参数。 - 调用
SchedulerJob#_schedule_all_dag_runs函数来决定所有dagrun的调度决定。该函数会遍历所有的dagrun,对每个dagrun调用SchedulerJob#_schedule_dag_run函数来决定是否调度该dagrun。 - 通过临界区(锁定Pool模型的行),将任务排队,然后将其发送到执行器中。详见_critical_section_enqueue_task_instances()文档。
- 返回在此迭代中入队的TIs的数量。
其中,步骤2和步骤3需要注意,因为它们会锁定某些行,并且只有一个调度器可以同时处理这些行,因此它们可能会影响调度器的吞吐量。步骤2中默认选择的20个DAG Run是基于它们最久没有被检查/调度的时间来选择的。步骤3中,通过临界区锁定行的目的是为了防止多个调度器同时修改同一个任务实例,这会导致竞态条件和不一致性。
SchedulerJob#_start_queued_dagruns
该方法用于启动处于排队状态的DagRuns。
-
该方法调用_get_next_dagruns_to_examine方法,获取处于QUEUED状态的DagRuns。
-
然后,该方法使用DagRun.active_runs_of_dags方法计算每个DAG当前正在运行的DagRun数量,并将结果存储在active_runs_of_dags字典中。
-
它遍历了每个处于QUEUED状态的DagRun,检查是否可以将其移动到RUNNING状态。
a. 对于每个DagRun,该方法首先使用DagBag.get_dag方法获取其对应的DAG对象,并将其赋值给dag_run.dag属性。如果DAG不存在,则记录错误并继续处理下一个DagRun。
b. 接下来,该方法使用active_runs_of_dags字典获取DAG当前正在运行的DagRun数量,并将结果存储在active_runs变量中。
c. 该方法检查DAG的max_active_runs属性是否为None,如果不是,则检查DAG当前正在运行的DagRun数量是否超过了该属性的值。如果是,则记录调试日志并不将该DagRun移动到RUNNING状态。否则,该方法会将DagRun的状态设置为RUNNING,并更新其start_date属性。此外,该方法还会将DAG当前正在运行的DagRun数量加1,并调用DagRun.notify_dagrun_state_changed方法通知状态已更改。
d. 最后,该方法调用了_update_state方法,该方法用于设置DagRun的状态为RUNNING,并计算调度延迟(如果DAG是周期性的)。需要注意的是,_update_state方法是一个内部函数,定义在_start_queued_dagruns方法内部。
SchedulerJob#_critical_section_enqueue_task_instances
该方法用于将TaskInstances添加到执行队列中。
该方法包含以下三个步骤:
-
使用优先级选择TaskInstances,并确保它们处于预期状态,并且不会超过最大活动运行数或池限制。
原子地更改上述TaskInstances的状态。 -
将TaskInstances添加到执行器的队列中。需要注意的是,该方法是一个“关键段”,意味着只有一个执行器进程可以同时执行该方法。为了实现这一点,该方法使用了SELECT…FOR UPDATE语句锁定了池表,以确保只有一个进程可以进行修改。
-
该方法还包含了两个辅助方法:_executable_task_instances_to_queued和_enqueue_task_instances_with_queued_state。其中,_executable_task_instances_to_queued方法用于选择可执行的TaskInstances,而_enqueue_task_instances_with_queued_state方法用于将TaskInstances添加到执行器的队列中。
-
最后,该方法返回状态发生了变化的TaskInstance的数量。
注意:该方法使用了一个名为max_tis_per_query的属性,它表示每次查询最多选择的TaskInstance数量。如果该属性的值为0,则选择所有可用的TaskInstances;否则,仅选择最多max_tis_per_query个TaskInstances。此外,该方法还使用了一个名为executor的属性,它表示Airflow的执行器对象,用于将TaskInstances添加到执行队列中。
SchedulerJob#_enqueue_task_instances_with_queued_state
该方法的作用是将状态为"queued"的任务实例添加到执行器(executor)的队列中(queued_tasks字典中)等待执行。
该方法接受两个参数:
- task_instances:待执行的任务实例列表。
- session:数据库会话对象。
-
该方法首先遍历任务实例列表,对于每个状态为"queued"的任务实例,将其命令添加到执行器的队列中等待执行。如果任务实例所属的DAG运行状态为finished,则将任务实例状态设置为"None",并跳过该任务实例的执行。
-
对于每个任务实例,该方法使用任务实例的command_as_list方法获取该任务实例的命令,并设置该任务实例的优先级和队列。然后,该方法调用执行器的queue_command方法将该任务实例的命令添加到执行器的队列中等待执行。
需要注意的是,该方法并不会等待任务实例执行完毕,而是将任务实例的执行交给了执行器处理。而执行器的具体处理逻辑在_process_tasks函数的_executor.execute_async方法中。
该函数的实现代码如下:
def _enqueue_task_instances_with_queued_state(self, task_instances: list[TI], session: Session) -> None:"""Takes task_instances, which should have been set to queued, and enqueues themwith the executor.:param task_instances: TaskInstances to enqueue:param session: The session object"""# actually enqueue themfor ti in task_instances:if ti.dag_run.state in State.finished:ti.set_state(State.NONE, session=session)continuecommand = ti.command_as_list(local=True,pickle_id=ti.dag_model.pickle_id,)priority = ti.priority_weightqueue = ti.queueself.log.info("Sending %s to executor with priority %s and queue %s", ti.key, priority, queue)self.executor.queue_command(ti,command,priority=priority,queue=queue,)
task通过queue_command已经放到了执行器的任务执行队列queued_tasks中,该变量其实是一个有序的字典,由OrderedDict类来定义。这样不同类型的执行器就可以消费该队列,执行任务了。
Task放入执行器队列
执行器会调用以下函数来执行task。每个执行器实现的实现逻辑不同,可以进入每个执行器中继续分析其实现。
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:for key, command, queue, executor_config in task_tuples:del self.queued_tasks[key]self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)self.running.add(key)
小结
本文分析了airflow的任务执行总体流程。分析了从dag文件处理,到task的调度和执行的整个流程。
通过本文的分析可以说基本把握住了Airflow的运行原理的主线。可以根据这条主线,继续分析每个执行器的执行原理,以及任务优先级,DAG文件处理的细节。
相关文章:
airflow源码分析-任务调度器实现分析
Airflow源码分析-任务调度器实现分析 概述 本文介绍Airflow执行器的总体实现流程。通过函数调用的方式说明了Airflow scheduler的实现原理,对整个调度过程的源码进行了分析。 通过本文,可以基本把握住Airflow的调度器的运行原理主线。 启动调度器 可…...
一文学会数组的reduce()和reduceRight()
reduce()方法和reduceRight()方法依次处理数组的每个成员,最终累计为一个值。 它们的差别是,reduce()是从左到右处理,reduceRight()则是从右到左,其他完全一样。 [1, 2, 3, 4, 5].reduce(function (a, b) {console.log(a, b);ret…...
登录校验-Filter
上一篇介绍完了基础应用和细节,现在来完成登录校验功能基本流程: 要进入后台管理系统,必须完成登录操作,此时就需要访问登录接口Login。登录成功服务端会生成一个JWT令牌,并且返回给前端,前端会将JWT令牌存…...
C C++ Java python 分别写出不同表白girlfriend的爱心动态代码实现
C `` #include <stdio.h> #include <stdlib.h> #include <windows.h> void heart_animation() {int i, j, k; for (i = 1; i <= 6; i++) {for (j = -3; j <= 3; j++) {for (k = -4; k <= 4; k++) {if (abs(j) + abs(k) < i * 2) {printf(“I”)…...
ThreeJS-投影、投影模糊(十七)
无投影: 完整的代码: <template> <div id"three_div"></div> </template> <script> import * as THREE from "three"; import { OrbitControls } from "three/examples/jsm/controls/Or…...
蓝桥杯赛前冲刺-枚举暴力和排序专题1(包含历年蓝桥杯真题和AC代码)
目录 连号区间数(第四届蓝桥杯省赛CB组,第四届蓝桥杯省赛JAVAB组) 递增三元组(第九届蓝桥杯省赛CB组,第九届蓝桥杯省赛JAVAB组) 特别数的和(第十届蓝桥杯省赛CB组,第十届蓝桥杯省赛JAVAB组) 错误票据&a…...
Github库中的Languages显示与修改
目录 前言 【.gitattributes】文件 修改GitHub语言 前言 上传一个项目到GitHub时,发现显示的语言并非是自己项目所示的语言,这样的情况是经常发生的,为了能到达自己所需快速检索,或者是外部访问者能很好的搜索我们的项目&#…...
RocketMQ消息高可靠详解
文章目录 消息同步策略殊途同归同步基于offset而不是消息本身刷盘策略RocketMQ broker服务端以组为单位提供服务的,拥有着一样的brokerName则认为是一个组。其中brokerId=0的就是master,大于0的则为slave。 消息同步策略 master和slave都可以提供读服务,但是只有master允许…...
【python设计模式】4、建造者模式
哲学思想: 建造者模式的哲学思想是将复杂对象的创建过程分解成多个简单的步骤,并将这些步骤分别封装在一个独立的建造者类中。然后,我们可以使用一个指挥者类来控制建造者的调用顺序,以便在每个步骤完成后正确地构建复杂对象。 …...
【全网独家】华为OD机试Golang解题 - 机智的外卖员
华为Od必看系列 华为OD机试 全流程解析+经验分享,题型分享,防作弊指南华为od机试,独家整理 已参加机试人员的实战技巧华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典使用说明 如果想要在华为od机试中获取高分…...
Sentinel滑动时间窗限流算法原理及源码解析(中)
文章目录 MetricBucketMetricEvent数据统计的维度WindowWrap样本窗口实例 范型T为MetricBucket windowLengthInMs 样本窗口长度 windowStart 样本窗口的起始时间戳 value 当前样本窗口的统计数据 其类型为MetricBucket MetricBucket MetricEvent数据统计的维度 1、首先计算27t位…...
【OpenLayers】VUE+OpenLayers+ElementUI加载WMS地图服务
【OpenLayers】VUEOpenLayersElementUI加载WMS地图服务准备工作安装vue创建vue项目安装OpenLayers安装ElementUI加载wms地图服务准备工作 需要安装好nodejs,nodejs下载地址,下载对应的版本向导式安装即可。 安装完成后,控制台输入node -v&a…...
linux 命名管道 mkfifo
专栏内容:linux下并发编程个人主页:我的主页座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.目录 前言 概述 原理介绍 接口说明 代码演示 结尾 前言 本专栏主要分享linux下并发编程…...
Redis(主从复制、哨兵模式、集群)概述及部署
目录 1.redis高可用 2.redis持久化 1.Redis 提供两种方式进行持久化: 2.RDB 持久化 3.AOF持久化 4.RDB和AOF的优缺点 5.Redis 性能管理 3.redis主从复制 1.Redis主从复制的概念 2.Redis主从复制的作用 3.Redis主从复制的搭建 4.redis哨兵模式 1.哨兵模式…...
windows下软件包安装工具之Scoop安装与使用
Scoop介绍 Scoop是Windows的命令行程序安装器。 Scoop从命令行安装程序,及其容易。它有如下特点: 消除权限弹出窗口隐藏 GUI 向导样式的安装程序防止安装大量程序的 PATH 污染避免安装和卸载程序的意外副作用自动查找并安装依赖项自行执行所有额外的设…...
九龙证券|人工智能+国产软件+智慧城市概念股火了,欧洲资管巨头大举抄底
近一周组织调研个股数量有130多只,迈瑞医疗成为调研组织数量最多的股票。 证券时报数据宝统计,近一周组织调研公司数量有130多家。从调研组织类型来看,证券公司调研相对最广泛,调研80多家公司。 迈瑞医疗获超500家组织调研 迈瑞…...
Nacos下载安装与配置(windows)
一、Nacos下载 官网地址:home (nacos.io) 点击前往Github,跳转至Github下载页面。 点击Tags,跳转至版本选择页面,此处选择2.2.0版本。 点击nacos-server-2.2.0.zip,进行下载。 二、Nacos安装 将下载的压缩包解压至需…...
QT学习笔记(语音识别项目 )
语音识别项目 我们知道 AI 智能音箱已经在我们生活中不少见,也许我们都玩过,智能化非常高,功能 强大,与我们平常玩的那种蓝牙音箱,Wifi 音箱有很大的区别,AI 智能在哪里呢?语音识别技 术和云端…...
Vulnhub:DC-4靶机
kali:192.168.111.111 靶机:192.168.111.251 信息收集 端口扫描 nmap -A -v -sV -T5 -p- --scripthttp-enum 192.168.111.251 访问目标网站发现需要登录 使用账号admin爆破出密码:happy 登陆后抓包执行反弹shell 提权 在/home/jim/backu…...
序列差分练习题--从模板到灵活运用
本篇包含6道序列差分练习题及题解,难度由模板到提高 语文成绩 题目背景 语文考试结束了,成绩还是一如既往地有问题。 题目描述 语文老师总是写错成绩,所以当她修改成绩的时候,总是累得不行。她总是要一遍遍地给某些同学增加分…...
装饰模式(Decorator Pattern)重构java邮件发奖系统实战
前言 现在我们有个如下的需求,设计一个邮件发奖的小系统, 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式(Decorator Pattern)允许向一个现有的对象添加新的功能,同时又不改变其…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
MFC 抛体运动模拟:常见问题解决与界面美化
在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...
并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
【从零开始学习JVM | 第四篇】类加载器和双亲委派机制(高频面试题)
前言: 双亲委派机制对于面试这块来说非常重要,在实际开发中也是经常遇见需要打破双亲委派的需求,今天我们一起来探索一下什么是双亲委派机制,在此之前我们先介绍一下类的加载器。 目录 编辑 前言: 类加载器 1. …...
Modbus RTU与Modbus TCP详解指南
目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...
消防一体化安全管控平台:构建消防“一张图”和APP统一管理
在城市的某个角落,一场突如其来的火灾打破了平静。熊熊烈火迅速蔓延,滚滚浓烟弥漫开来,周围群众的生命财产安全受到严重威胁。就在这千钧一发之际,消防救援队伍迅速行动,而豪越科技消防一体化安全管控平台构建的消防“…...
未授权访问事件频发,我们应当如何应对?
在当下,数据已成为企业和组织的核心资产,是推动业务发展、决策制定以及创新的关键驱动力。然而,未授权访问这一隐匿的安全威胁,正如同高悬的达摩克利斯之剑,时刻威胁着数据的安全,一旦触发,便可…...
C#中用于控制自定义特性(Attribute)
我们来详细解释一下 [AttributeUsage(AttributeTargets.Class, AllowMultiple false, Inherited false)] 这个 C# 属性。 在 C# 中,Attribute(特性)是一种用于向程序元素(如类、方法、属性等)添加元数据的机制。Attr…...
