当前位置: 首页 > article >正文

Apache Airflow 系列教程 | 第6课:DAG 解析与处理引擎

导读(Introduction)欢迎来到 Apache Airflow 源码深度解析系列的第六课。在前一课中,我们深入剖析了 Scheduler 的核心原理——它如何在循环中创建 DagRun、推进任务状态、将任务入队给 Executor。但 Scheduler 能调度 DAG 的前提是:系统必须先"看到"这些 DAG。这就是 DAG 解析与处理引擎所承担的职责。想象一下:你在dags/目录下新建了一个 Python 文件,定义了一个 DAG。Airflow 是如何发现这个文件、导入其中的 Python 模块、提取 DAG 对象、序列化并写入数据库的?这个从"文件"到"数据库记录"的转换过程,正是本课的核心主题。Airflow 3.x 对 DAG 解析体系做了重大重构,引入了DAG Bundle概念(源自 AIP-66),将 DAG 文件的组织与加载从"单一文件夹扫描"升级为"多源可插拔的 Bundle 管理"。同时,解析过程被设计为独立的子进程,实现了与 Scheduler 主进程的安全隔离——这意味着即使用户编写的 DAG 代码存在 bug 或安全风险,也不会影响调度器的稳定性。本课将带你逐层拆解这个精密的解析引擎,从文件发现到子进程解析,从序列化到数据库持久化,完整理解 Airflow 如何将静态的 Python 文件转化为可调度的工作流实体。学习目标(Learning Objectives)完成本课学习后,你将能够:理解 DAG 文件发现与导入机制——掌握 Airflow 如何扫描目录、过滤文件、检测 DAG 内容掌握 DagBag 容器的工作原理——了解 DAG 的内存管理、验证和收集过程理解 DAG Bundle 概念——掌握 AIP-66 引入的多源 DAG 文件组织与加载体系分析 DagFileProcessorManager 的处理循环——理解并行解析的调度策略和生命周期管理掌握子进程解析的通信协议——了解 Manager 与 Parser 子进程之间的 IPC 机制理解解析结果的持久化路径——从序列化 DAG 到写入 DagModel、SerializedDagModel 的完整链路正文内容(Main Content)1. 整体架构:从文件到数据库的转换链路DAG 解析引擎的核心使命是将用户编写的 Python DAG 文件转化为数据库中的结构化记录。这个过程可以概括为以下流水线:DAG 文件 (.py) │ ▼ ┌─────────────────────────────────────────┐ │ DagFileProcessorManager (管理进程) │ │ - 文件发现 Bundle 刷新 │ │ - 文件队列调度 │ │ - 子进程生命周期管理 │ └────────────────────┬────────────────────┘ │ fork 子进程 ▼ ┌─────────────────────────────────────────┐ │ DagFileProcessorProcess (子进程) │ │ - _parse_file_entrypoint() │ │ - DagBag.process_file() 加载 DAG │ │ - DagSerialization.to_dict() 序列化 │ │ - 通过 IPC 返回 DagFileParsingResult │ └────────────────────┬────────────────────┘ │ 结果回传 ▼ ┌─────────────────────────────────────────┐ │ persist_parsing_result() │ │ - update_dag_parsing_results_in_db() │ │ - 写入 DagModel / SerializedDagModel │ │ - 更新 ParseImportError / DagWarning │ └─────────────────────────────────────────┘让我们逐层深入了解每个组件的实现细节。2. DAG Bundle:多源文件组织体系2.1 Bundle 的设计动机在传统 Airflow 中,所有 DAG 文件必须放在一个统一的dags_folder中。这在大规模团队协作时会产生问题:不同团队的 DAG 混在一起,缺乏隔离无法为不同来源的 DAG 配置不同的刷新频率Git 仓库管理困难,无法支持多仓库结构Airflow 3.x 通过DAG Bundle(AIP-66)解决了这些问题。Bundle 是 DAG 文件的逻辑分组单位,每个 Bundle 可以有不同的来源(本地目录、Git 仓库等)、版本控制和刷新策略。2.2 BaseDagBundle 抽象基类所有 Bundle 实现都继承自BaseDagBundle:# 源码位置:airflow-core/src/airflow/dag_processing/bundles/base.pyclassBaseDagBundle(ABC):""" DAG bundles are used both by the DAG processor and by a worker when running a task. When running a task, we know what version of the bundle we need. The DAG processor uses a bundle to keep the DAGs up to date, always using the latest version. """supports_versioning:bool=Falsedef__init__(self,*,name:str,refresh_interval:int=conf.getint("dag_processor","refresh_interval"),version:str|None=None,view_url_template:str|None=None,)-None:self.name=name self.version=version self.refresh_interval=refresh_interval self.is_initialized:bool=Falseself.base_dir=get_bundle_base_folder(bundle_name=self.name)self.versions_dir=get_bundle_versions_base_folder(bundle_name=self.name)@property@abstractmethoddefpath(self)-Path:"""Airflow will use this path to find/load/execute the DAGs from the bundle."""@abstractmethoddefget_current_version(self)-str|None:"""Retrieve a string that represents the version of the DAG bundle."""@abstractmethoddefrefresh(self)-None:"""Retrieve the latest version of the files in the bundle."""definitialize(self)-None:"""Called before the bundle is used. Safe to call concurrently."""self.is_initialized=True关键设计要点:属性/方法说明supports_versioning是否支持版本控制,影响 Worker 获取特定版本的能力pathDAG 文件所在目录的路径refresh_interval多久检查一次是否需要刷新initialize()延迟初始化,避免昂贵操作过早执行refresh()拉取最新版本,必须支持并发安全get_current_version()获取当前版本标识符2.3 LocalDagBundle:本地目录实现最简单的 Bundle 实现是本地目录:# 源码位置:airflow-core/src/airflow/dag_processing/bundles/local.pyclassLocalDagBundle(BaseDagBundle):"""Local DAG bundle - exposes a local directory as a DAG bundle."""supports_versioning=Falsedef__init__(self,*,path:str|None=None,**kwargs)-None:super().__init__(**kwargs)ifpathisNone:path=settings.DAGS_FOLDER self._path=Path(path)defget_current_version(self)-None:returnNonedefrefresh(self)-None:"""Nothing to refresh - it's just a local directory."""@propertydefpath(self)-Path:returnself._pathLocalDagBundle不支持版本控制(supports_versioning = False),刷新操作为空操作。它就是简单地指向一个本地文件夹。2.4 DagBundlesManager:配置解析与实例化DagBundlesManager负责读取配置并管理所有 Bundle 实例:# 源码位置:airflow-core/src/airflow/dag_processing/bundles/manager.pyclassDagBundlesManager(LoggingMixin):"""Manager for DAG bundles."""def__init__(self):self._bundle_config:dict[str,_InternalBundleConfig]={}self.parse_config()defparse_config(self)-None:"""Get all DAG bundle configurations and store in instance variable."""config_list=conf.getjson("dag_processor","dag_bundle_config_list")# ... 解析 JSON 配置列表 ...forbundle_configinbundle_config_list:class_=import_string(bundle_config.classpath)self._bundle_config[bundle_config.name]=_InternalBundleConfig(bundle_class=class_,kwargs=bundle_config.kwargs,team_name=bundle_config.team_name,)defget_bundle(self,name:str,version:str|None=None)-BaseDagBundle:"""Get a DAG bundle by name."""cfg_bundle=self._bundle_config.get(name)returncfg_bundle.bundle_class(name=name,version=version,**cfg_bundle.kwargs)defsync_bundles_to_db(self,*,session:Session=NEW_SESSION)-None:"""Sync configured DAG bundles to the metadata database."""# 同步 DagBundleModel 记录配置示例(airflow.cfg):[dag_processor] dag_bundle_config_list = [ { "name": "my_team_dags", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/opt/airflow/dags/my_team"}, "team_name": "data_engineering" }, { "name": "ml_pipelines", "classpath": "my_custom.GitDagBundle", "kwargs": {"repo_url": "https://github.com/org/ml-dags.git"} } ]2.5 Bundle 版本管理与 Worker 协作Bundle 在 DAG Processor 和 Worker 中的使用模式不同:┌─────────────────────────────────────────────────────────────┐ │ DAG Processor │ │ - 始终使用最新版本 │ │ - 定期 refresh() 拉取更新 │ │ - 同一时间只有一个版本在使用 │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ Worker │ │ - 使用任务创建时的特定版本 │ │ - 可能同时运行多个版本 │ │ - 通过 BundleVersionLock 锁定版本 │ └─────────────────────────────────────────────────────────────┘BundleUsageTrackingManager负责清理不再使用的旧版本:classBundleUsageTrackingManager:"""Utility helper for removing stale bundles."""defremove_stale_bundle_versions(self):"""Remove bundles not in use and not used for some time. Keeps last N used bundles, and bundles used within X time."""bundles=list(DagBundlesManager().get_all_dag_bundles())forbundleinbundles:ifnotbundle.supports_versioning:continueself._remove_stale_bundle_versions_for_bundle(bundle_name=bundle.name)3. DagBag:DAG 的内存容器3.1 DagBag 的职责DagBag是 Airflow 中经典的核心组件,它充当 DAG 对象的内存容器。其核心职责:扫描目录:发现可能包含 DAG 的 Python 文件导入模块:执行 Python 文件,提取 DAG 对象验证 DAG:检测循环依赖、重复 ID、执行集群策略收集错误:记录导入错误和警告# 源码位置:airflow-core/src/airflow/dag_processing/dagbag.pyclassDagBag(LoggingMixin):""" A dagbag is a collection of dags, parsed out of a folder tree. Makes it easier to run distinct environments for production and development, tests, or different teams or security profiles. """def__init__(self,dag_folder:str|Path|None=None,include_examples:bool|ArgNotSet=NOTSET,safe_mode:bool|ArgNotSet=NOTSET,load_op_links:bool=True,collect_dags:bool=True,known_pools:set[str]|None=None,bundle_path:Path|None=None,bundle_name:str|None=None,):self.dag_folder=dag_folderorsettings.DAGS_FOLDER self.dags:dict[str,DAG]={}self.file_last_changed:dict[str,datetime]={}self.import_errors:dict[str,str]={}self.captured_warnings:dict[str,tuple[str,...]]={}self.known_pools=known_pools self.bundle_path=bundle_path

相关文章:

Apache Airflow 系列教程 | 第6课:DAG 解析与处理引擎

导读(Introduction) 欢迎来到 Apache Airflow 源码深度解析系列的第六课。 在前一课中,我们深入剖析了 Scheduler 的核心原理——它如何在循环中创建 DagRun、推进任务状态、将任务入队给 Executor。但 Scheduler 能调度 DAG 的前提是:系统必须先"看到"这些 DA…...

AISMM自评估工具究竟如何判定“智能奇点临近”?——独家披露5类隐性失效模式与3类高危误判信号

更多请点击: https://intelliparadigm.com 第一章:AISMM自评估工具的核心定位与奇点判定范式演进 AISMM(Artificial Intelligence Security Maturity Model)自评估工具并非传统合规检查清单的数字化复刻,而是面向AI系…...

AISMM人才培养体系正式启用倒计时72天!未备案机构将失去官方认证资格(附首批17家白名单)

更多请点击: https://intelliparadigm.com 第一章:2026奇点智能技术大会:AISMM人才培养体系 体系定位与核心理念 AISMM(Artificial Intelligence Skills Maturity Model)是2026奇点智能技术大会正式发布的国家级AI人…...

devmem:为代码库构建本地化项目记忆的CLI工具

1. 项目概述:为你的代码库构建一个本地化的“项目记忆” 你有没有过这样的经历?接手一个新项目,或者时隔几个月再回到自己的老项目,面对一堆代码,脑子里一片空白:“我当时为什么要这么设计这个接口&#x…...

mysql表结构发生变更如何记录_SQL版本管理与Migration工具

所有表结构变更必须通过带版本号的SQL文件执行,禁止直接在生产库运行ALTER命令;每个文件仅含一个操作、严格编号、开头注明影响范围。MySQL表结构变更必须走SQL文件,不能直接在生产库上ALTER线上表结构改了但没留痕,等于没改——下…...

从尖叫到安静:一个电机小白的FOC电流环PI参数实战调参笔记(含计算法与经验法对比)

从尖叫到安静:一个电机小白的FOC电流环PI参数实战调参笔记(含计算法与经验法对比) 第一次给永磁同步电机上电时,那刺耳的啸叫声让我差点摔了开发板——这大概是所有FOC初学者共同的噩梦。作为从Arduino PWM调速一路摸爬滚打过来的…...

一站式大模型评估框架EvalScope:从原理到实战的完整指南

1. 项目概述:一站式大模型评估框架 EvalScope 在当下这个“百模大战”的时代,无论是研究机构、企业团队还是个人开发者,面对层出不穷的大语言模型、多模态模型,一个最直接且核心的问题就是: “这个模型到底行不行&am…...

开源量化期权交易框架FlowAlgo:从事件驱动到希腊字母风控

1. 项目概述:一个面向量化期权交易的算法框架如果你在量化交易领域摸爬滚打过几年,尤其是接触过期权,那你一定对“策略回测”和“实盘部署”之间的巨大鸿沟深有体会。自己写的策略在回测里表现亮眼,一旦要把它变成一个稳定、可维护…...

LuaDec51 终极指南:如何高效反编译 Lua 5.1 字节码的完整解决方案

LuaDec51 终极指南:如何高效反编译 Lua 5.1 字节码的完整解决方案 【免费下载链接】luadec51 Lua Decompiler for Lua version 5.1 项目地址: https://gitcode.com/gh_mirrors/lu/luadec51 LuaDec51 是一款专注于 Lua 5.1 版本的专业反编译工具,能…...

终极指南:3分钟掌握VideoDownloadHelper免费视频下载神器

终极指南:3分钟掌握VideoDownloadHelper免费视频下载神器 【免费下载链接】VideoDownloadHelper Chrome Extension to Help Download Video for Some Video Sites. 项目地址: https://gitcode.com/gh_mirrors/vi/VideoDownloadHelper 想要轻松下载网页视频却…...

Android虚拟相机:用开源技术重塑你的摄像头体验

Android虚拟相机:用开源技术重塑你的摄像头体验 【免费下载链接】com.example.vcam 虚拟摄像头 virtual camera 项目地址: https://gitcode.com/gh_mirrors/co/com.example.vcam 你是否曾想过,在视频会议中展示一个精心设计的虚拟背景&#xff0c…...

WarcraftHelper:如何在现代电脑上重燃魔兽争霸3的竞技激情?

WarcraftHelper:如何在现代电脑上重燃魔兽争霸3的竞技激情? 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 你是否曾经满怀期待…...

终极魔兽争霸3优化指南:WarcraftHelper让你的经典游戏焕发新生

终极魔兽争霸3优化指南:WarcraftHelper让你的经典游戏焕发新生 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 还在为魔兽争霸3在现代电脑…...

【研报418】汽车行业2026年夏季策略报告:以旧换新年中或加码,结构性倾斜高端市场

本报告提供限时下载,请查看文后提示以下仅为报告部分内容:摘要:当前国内汽车行业内需承压,正处于筑底去库存阶段,出口成为行业核心增长支撑。2026年一季度中国汽车出口达222.6万辆,同比增长56.7%&#xff0…...

ChanlunX:通达信缠论插件的完整使用指南

ChanlunX:通达信缠论插件的完整使用指南 【免费下载链接】ChanlunX 缠中说禅炒股缠论可视化插件 项目地址: https://gitcode.com/gh_mirrors/ch/ChanlunX ChanlunX是一个基于C开发的开源缠论技术分析插件,专门为通达信软件提供专业的缠论结构自动…...

视频分析终极指南:如何用AI智能解析视频内容,让机器看懂视频

视频分析终极指南:如何用AI智能解析视频内容,让机器看懂视频 【免费下载链接】video-analyzer Analyze videos using LLMs, Computer Vision and Automatic Speech Recognition 项目地址: https://gitcode.com/gh_mirrors/vi/video-analyzer 你是…...

比亚迪+奇瑞+长安组建电池供应链联盟;Sensify无液压制动系统实现量产;宝马深化合作量子计算加速新能源材料研发

比亚迪、奇瑞、长安组建电池供应链联盟降本提效牛喀网获悉,比亚迪、奇瑞、长安三大中国车企正式组建战略联盟,聚焦电池供应链的优化,以应对新能源汽车补贴退坡后的市场压力。技术与战略层面,三方将成立深圳合资公司,初…...

Cursor破解工具终极指南:3步轻松解除AI编程限制

Cursor破解工具终极指南:3步轻松解除AI编程限制 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reached your trial req…...

通过Taotoken的稳定性与路由能力保障线上服务高可用

通过Taotoken的稳定性与路由能力保障线上服务高可用 在将大模型能力集成到线上生产环境时,服务的连续性与可靠性是开发者必须面对的核心挑战。模型供应商的API端点可能因网络、负载或维护而出现波动,单一依赖往往意味着单点故障风险。本文将分享在实际生…...

Tomcat 8.5 启动报错 Invalid byte tag in constant pool?别慌,教你两招搞定(附 gson-2.8.6.jar 解决方案)

Tomcat 8.5启动报错Invalid byte tag in constant pool的深度解析与实战解决方案 当你正在紧急部署项目时,突然看到Tomcat控制台抛出"Invalid byte tag in constant pool: 19"的红色错误信息,那种心跳加速的感觉想必很多开发者都深有体会。这种…...

产销严重脱节,生产过剩与缺货问题反复出现怎么办?——2026年基于实在Agent的智慧供应链深度重构方案

站在2026年的时间节点回看,制造业的数字化转型已从简单的“信息化”跃迁至“智能体化”。 然而,即便在AI技术高度普及的今天,许多企业依然深陷于产销严重脱节的泥潭: 一边是仓库中堆积如山的过期库存,导致资金链极度紧…...

绕过地域限制:利用国内IP池,采集仅限特定地区访问的内容

做数据采集时最郁闷的状况是什么?不是代码写不出来,而是你明明看到数据就在那里,网站却理直气壮把你的请求拒之门外。更令人摸不着头脑的是,同一份商品的价格在不同的城市切换了IP就变了,招聘同一内容同岗位在不同地区…...

高端游戏主板选哪个品牌?主流产品线深度解析

在当前的游戏主板市场中,品牌方普遍采用多层次的产品系列策略来覆盖从入门到极限超频的广泛需求。清晰的系列划分不仅帮助玩家快速定位适合自身预算与使用场景的产品,也反映了各家技术路线的差异。本文从产品线定位切入,梳理华硕、七彩虹、技…...

【AISMM全球落地实战指南】:20年SITS专家亲授3大阻力破解法与5国推广避坑清单

更多请点击: https://intelliparadigm.com 第一章:SITS2026圆桌:AISMM的全球推广 在2026年新加坡国际技术峰会(SITS2026)上,AISMM(AI-Driven Software Maturity Model)正式成为全球…...

FinOps还在人工对账?AISMM已实现毫秒级资源-成本-业务价值映射(2026奇点大会实时沙箱演示实录)

更多请点击: https://intelliparadigm.com 第一章:2026奇点智能技术大会:AISMM与FinOps 2026奇点智能技术大会首次将人工智能系统成熟度模型(AISMM)与云原生财务运营(FinOps)深度耦合&#xff…...

五级地址解析是什么?为什么比四级多了行政村

你有没有遇到过这种情况?做物流分单,地址只解析到街道级别,但一个街道下面可能有十几个社区,分单不够精细;做政务数据统计,想按行政村/社区维度汇总,但地址库只有省市区街道四级,缺了…...

AISMM评估成本黑箱破解(含SITS2026官方未披露的3项强制审计附加项)

更多请点击: https://intelliparadigm.com 第一章:SITS2026分享:AISMM评估成本分析 AISMM(AI Software Maturity Model)作为面向生成式AI系统的能力成熟度评估框架,在SITS2026峰会上首次公开了其标准化评估…...

Docker 入门实战 完整步骤记录

一、安装与基础配置阶段 安装并启动 Docker Desktop 完成安装后,打开软件,确认主界面显示 Engine running(引擎运行中) 且状态为绿色。 配置国内镜像源(解决下载慢/超时问题) 点击右上角 Settings&#xf…...

抖音图片怎么无水印保存?2026 保存工具和方法实测对比指南

每当我们在抖音上看到喜欢的图片,总会想保存下来。但抖音默认保存的图片往往带着明显的水印,影响美观度。对于想要收藏素材、做内容创意参考,或者只是想干净地保存喜欢图片的人来说,无水印保存抖音图片就成了一个实际需求。2026 年…...

从新手到高手|AI在水文水环境领域的全场景应用(基础→高阶,理论+实践双突破)

基础篇(提示词应用)专题一、时间序列水文数据自动化处理及机器学习模型(ChatGPT-4O,实践)1.流量(或者降雨量)异常值自动分析2.PIII型曲线的参数估计3.降雨频率以及重现期自动分析4.随机森林、支…...