当前位置: 首页 > article >正文

Apache Airflow 系列教程 | 番外篇:通过 REST API 动态创建 DAG

导读(Introduction)在 Apache Airflow 的标准使用模式中,DAG 的定义通常以 Python 文件的形式存放在 DAGs 文件夹中,由 DagFileProcessorManager 周期性解析并持久化到数据库。然而在实际的平台化场景中,用户往往希望通过 Web 界面或 API 接口以更友好的方式(如填写表单、提交 JSON 配置)来动态创建工作流,而不是手写 Python 代码。本文将设计并实现一个通过 REST API 创建 DAG的扩展功能。我们将深入分析 Airflow 3.x 现有的 FastAPI 路由体系、DAG 序列化机制和数据库模型结构,设计一个完整的技术方案,使得通过 HTTP POST 请求提交的 JSON DAG 定义能够被转换为 Airflow DAG 对象、序列化并持久化到数据库,最终被 Scheduler 正常调度执行。注意:本文所有代码产出仅作为技术方案参考,不修改 Airflow 源代码。实际部署时可作为独立的 Airflow Plugin 或扩展模块使用。学习目标(Learning Objectives)完成本番外篇后,你将能够:理解 Airflow 3.x 中 DAG 从 Python 对象到数据库持久化的完整链路掌握LazyDeserializedDAG、SerializedDagModel、DagModel之间的关系设计符合 Airflow FastAPI 风格的 REST API 接口实现 JSON DAG 定义到 Airflow DAG 对象的转换逻辑理解如何绕过文件系统解析,直接将 DAG 写入数据库确保 API 创建的 DAG 能被现有 Scheduler 正常调度正文内容(Main Content)1. 现有架构分析1.1 DAG 持久化的标准路径在 Airflow 3.x 中,DAG 从文件到数据库的标准持久化路径如下:Python 文件 → DagFileProcessorManager → DagBag.process_file() → DAG 对象 → DagSerialization.to_dict(dag) → LazyDeserializedDAG → SerializedDagModel.write_dag() → 数据库 (serialized_dag 表) → DagModel (dag 表) → DagVersion + DagCode关键组件的职责:组件职责DagSerialization.to_dict(dag)将 DAG 对象序列化为字典格式LazyDeserializedDAG轻量级 DAG 代理,延迟反序列化SerializedDagModel.write_dag()将序列化数据写入serialized_dag表DagModel存储 DAG 元数据(调度状态、标签等)DagVersionDAG 版本管理DagCode存储 DAG 源代码1.2 数据库模型结构DagModel(dag表) 的关键字段:# 源码位置: airflow-core/src/airflow/models/dag.py:340classDagModel(Base):__tablename__="dag"dag_id:Mapped[str]# 主键is_paused:Mapped[bool]# 是否暂停is_stale:Mapped[bool]# 是否过期last_parsed_time:Mapped[datetime]# 最后解析时间fileloc:Mapped[str|None]# 文件路径bundle_name:Mapped[str]# Bundle 名称(外键)bundle_version:Mapped[str|None]# Bundle 版本owners:Mapped[str|None]# DAG 拥有者description:Mapped[str|None]# 描述timetable_type:Mapped[str]# 时间表类型max_active_tasks:Mapped[int]# 最大活跃任务数max_active_runs:Mapped[int|None]# 最大活跃运行数next_dagrun:Mapped[datetime|None]# 下次执行时间next_dagrun_create_after:Mapped[datetime|None]# 下次执行创建时间SerializedDagModel(serialized_dag表) 的关键字段:# 源码位置: airflow-core/src/airflow/models/serialized_dag.py:281classSerializedDagModel(Base):__tablename__="serialized_dag"id:Mapped[UUID]# 主键 (UUID7)dag_id:Mapped[str]# DAG ID_data:Mapped[dict|None]# JSON 序列化数据_data_compressed:Mapped[bytes|None]# 压缩的序列化数据created_at:Mapped[datetime]# 创建时间last_updated:Mapped[datetime]# 最后更新时间dag_hash:Mapped[str]# 数据哈希值 (MD5)dag_version_id:Mapped[UUID]# 关联的 DagVersion1.3 现有 API 路由模式Airflow 3.x 使用 FastAPI 构建 REST API,路由定义遵循统一模式:# 源码位置: airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.pyconnections_router=AirflowRouter(tags=["Connection"],prefix="/connections")@connections_router.post("",status_code=status.HTTP_201_CREATED,responses=create_openapi_http_exception_doc([status.HTTP_409_CONFLICT]),dependencies=[Depends(requires_access_connection(method="POST")),Depends(action_logging())],)defpost_connection(post_body:ConnectionBody,session:SessionDep,)-ConnectionResponse:"""Create connection entry."""connection=Connection(**post_body.model_dump(by_alias=True))session.add(connection)returnconnection2. 技术方案设计2.1 整体架构我们的方案绕过文件解析流程,直接通过 API 接收 JSON DAG 定义,在服务端构建 DAG 对象并持久化到数据库:HTTP POST (JSON) → FastAPI 路由 → Pydantic 校验 → 构建 DAG 对象 → DagSerialization.to_dict() → LazyDeserializedDAG → SerializedDagModel.write_dag() → 创建/更新 DagModel → Scheduler 自动发现并调度2.2 关键设计决策决策点选择理由Bundle 归属使用专用的api_createdbundle区分 API 创建的 DAG 与文件系统 DAG序列化方式复用DagSerialization.to_dict()保证与现有系统完全兼容Operator 支持初始支持 BashOperator、PythonOperator、EmptyOperator可扩展设计调度集成写入标准数据库表Scheduler 无需修改即可调度幂等性基于 dag_id 进行 upsert重复提交不会创建重复 DAG2.3 JSON DAG 定义格式{"dag_id":"api_created_etl_pipeline","description":"通过 API 创建的 ETL 工作流","schedule":"0 2 * * *","start_date":"2024-01-01T00:00:00+00:00","end_date":null,"catchup":false,"tags":["etl","api-created"],"default_args":{"owner":"data-team","retries":2,"retry_delay_seconds":300},"max_active_tasks":16,"max_active_runs":1,"tasks":[{"task_id":"extract","operator":"BashOperator","params":{"bash_command":"echo 'extracting data'"},"downstream":["transform"]},{"task_id":"transform","operator":"PythonOperator","params":{"python_callable_name":"my_module.transform_func","python_callable_source":"def transform_func():\n print('transforming')"},"downstream":["load"]},{"task_id":"load","operator":"BashOperator","params":{"bash_command":"echo 'loading data'"},"downstream":[]}]}3. 完整实现代码3.1 Pydantic 数据模型定义# file: airflow_api_dag_creator/datamodels.py""" Pydantic 数据模型:定义 API 请求/响应的 JSON 结构。 """from__future__importannotationsfromdatetimeimportdatetimefromtypingimportAnyfrompydanticimportBaseModel,Field,field_validatorclassTaskDefinition(BaseModel):"""单个任务的定义。"""task_id:str=Field(...,description="任务唯一标识",pattern=r"^[a-zA-Z_][a-zA-Z0-9_\-\.]*$")operator:str=Field(...,description="Operator 类型名称")params:dict[str,Any]=Field(default_factory=dict,description="Operator 参数")downstream:list[str]=Field(default_factory=list,description="下游任务 ID 列表")retries:int|None=Field(None,description="重试次数")retry_delay_seconds:int|None=Field(None,description="重试间隔(秒)")trigger_rule:str=Field("all_success",description="触发规则")pool:str=Field("default_pool",description="资源池名称")@field_validator("operator")@classmethoddefvalidate_operator(cls,v:str)-str:allowed_operators={"BashOperator","PythonOperator","EmptyOperator","EmailOperator",}ifvnotinallowed_operators:raiseValueError(f"不支持的 Operator:{v}。当前支持:{', '.join(sorted(allowed_operators))}")returnvclassDefaultArgs(BaseModel):"""DAG 默认参数。"""owner:str=Field("airflow",description="DAG 拥有者")retries:int=Field(0,ge=0,le=10,description="默认重试次数")retry_delay_seconds:int=Field(300,ge=0,description="默认重试间隔(秒)")email:list[str]|None=Field(None,description="告警邮件列表")email_on_failure:bool=Field(False,description="失败时是否发送邮件")email_on_retry:bool=Field(False,description="重试时是否发送邮件")classDagCreateRequest(BaseModel):"""创建 DAG 的请求体。"""dag_id:str=Field(...,description="DAG 唯一标识符",pattern=r"^[a-zA-Z_][a-zA-Z0-9_\-\.]*$",max_length=250,)description:str|None=Field(None,description="DAG 描述")schedule:str|No

相关文章:

Apache Airflow 系列教程 | 番外篇:通过 REST API 动态创建 DAG

导读(Introduction) 在 Apache Airflow 的标准使用模式中,DAG 的定义通常以 Python 文件的形式存放在 DAGs 文件夹中,由 DagFileProcessorManager 周期性解析并持久化到数据库。然而在实际的平台化场景中,用户往往希望通过 Web 界面或 API 接口以更友好的方式(如填写表单…...

Apache Airflow 系列教程 | 第7课:执行器(Executor)体系架构

导读(Introduction) 欢迎来到 Apache Airflow 源码深度解析系列的第七课。 在前两课中,我们分别剖析了 Scheduler 的调度决策逻辑和 DAG 解析引擎。Scheduler 决定了"哪些任务应该运行",解析引擎确保了"系统能看到哪些 DAG"。但还有一个关键问题:任…...

Apache Airflow 系列教程 | 第6课:DAG 解析与处理引擎

导读(Introduction) 欢迎来到 Apache Airflow 源码深度解析系列的第六课。 在前一课中,我们深入剖析了 Scheduler 的核心原理——它如何在循环中创建 DagRun、推进任务状态、将任务入队给 Executor。但 Scheduler 能调度 DAG 的前提是:系统必须先"看到"这些 DA…...

AISMM自评估工具究竟如何判定“智能奇点临近”?——独家披露5类隐性失效模式与3类高危误判信号

更多请点击: https://intelliparadigm.com 第一章:AISMM自评估工具的核心定位与奇点判定范式演进 AISMM(Artificial Intelligence Security Maturity Model)自评估工具并非传统合规检查清单的数字化复刻,而是面向AI系…...

AISMM人才培养体系正式启用倒计时72天!未备案机构将失去官方认证资格(附首批17家白名单)

更多请点击: https://intelliparadigm.com 第一章:2026奇点智能技术大会:AISMM人才培养体系 体系定位与核心理念 AISMM(Artificial Intelligence Skills Maturity Model)是2026奇点智能技术大会正式发布的国家级AI人…...

devmem:为代码库构建本地化项目记忆的CLI工具

1. 项目概述:为你的代码库构建一个本地化的“项目记忆” 你有没有过这样的经历?接手一个新项目,或者时隔几个月再回到自己的老项目,面对一堆代码,脑子里一片空白:“我当时为什么要这么设计这个接口&#x…...

mysql表结构发生变更如何记录_SQL版本管理与Migration工具

所有表结构变更必须通过带版本号的SQL文件执行,禁止直接在生产库运行ALTER命令;每个文件仅含一个操作、严格编号、开头注明影响范围。MySQL表结构变更必须走SQL文件,不能直接在生产库上ALTER线上表结构改了但没留痕,等于没改——下…...

从尖叫到安静:一个电机小白的FOC电流环PI参数实战调参笔记(含计算法与经验法对比)

从尖叫到安静:一个电机小白的FOC电流环PI参数实战调参笔记(含计算法与经验法对比) 第一次给永磁同步电机上电时,那刺耳的啸叫声让我差点摔了开发板——这大概是所有FOC初学者共同的噩梦。作为从Arduino PWM调速一路摸爬滚打过来的…...

一站式大模型评估框架EvalScope:从原理到实战的完整指南

1. 项目概述:一站式大模型评估框架 EvalScope 在当下这个“百模大战”的时代,无论是研究机构、企业团队还是个人开发者,面对层出不穷的大语言模型、多模态模型,一个最直接且核心的问题就是: “这个模型到底行不行&am…...

开源量化期权交易框架FlowAlgo:从事件驱动到希腊字母风控

1. 项目概述:一个面向量化期权交易的算法框架如果你在量化交易领域摸爬滚打过几年,尤其是接触过期权,那你一定对“策略回测”和“实盘部署”之间的巨大鸿沟深有体会。自己写的策略在回测里表现亮眼,一旦要把它变成一个稳定、可维护…...

LuaDec51 终极指南:如何高效反编译 Lua 5.1 字节码的完整解决方案

LuaDec51 终极指南:如何高效反编译 Lua 5.1 字节码的完整解决方案 【免费下载链接】luadec51 Lua Decompiler for Lua version 5.1 项目地址: https://gitcode.com/gh_mirrors/lu/luadec51 LuaDec51 是一款专注于 Lua 5.1 版本的专业反编译工具,能…...

终极指南:3分钟掌握VideoDownloadHelper免费视频下载神器

终极指南:3分钟掌握VideoDownloadHelper免费视频下载神器 【免费下载链接】VideoDownloadHelper Chrome Extension to Help Download Video for Some Video Sites. 项目地址: https://gitcode.com/gh_mirrors/vi/VideoDownloadHelper 想要轻松下载网页视频却…...

Android虚拟相机:用开源技术重塑你的摄像头体验

Android虚拟相机:用开源技术重塑你的摄像头体验 【免费下载链接】com.example.vcam 虚拟摄像头 virtual camera 项目地址: https://gitcode.com/gh_mirrors/co/com.example.vcam 你是否曾想过,在视频会议中展示一个精心设计的虚拟背景&#xff0c…...

WarcraftHelper:如何在现代电脑上重燃魔兽争霸3的竞技激情?

WarcraftHelper:如何在现代电脑上重燃魔兽争霸3的竞技激情? 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 你是否曾经满怀期待…...

终极魔兽争霸3优化指南:WarcraftHelper让你的经典游戏焕发新生

终极魔兽争霸3优化指南:WarcraftHelper让你的经典游戏焕发新生 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 还在为魔兽争霸3在现代电脑…...

【研报418】汽车行业2026年夏季策略报告:以旧换新年中或加码,结构性倾斜高端市场

本报告提供限时下载,请查看文后提示以下仅为报告部分内容:摘要:当前国内汽车行业内需承压,正处于筑底去库存阶段,出口成为行业核心增长支撑。2026年一季度中国汽车出口达222.6万辆,同比增长56.7%&#xff0…...

ChanlunX:通达信缠论插件的完整使用指南

ChanlunX:通达信缠论插件的完整使用指南 【免费下载链接】ChanlunX 缠中说禅炒股缠论可视化插件 项目地址: https://gitcode.com/gh_mirrors/ch/ChanlunX ChanlunX是一个基于C开发的开源缠论技术分析插件,专门为通达信软件提供专业的缠论结构自动…...

视频分析终极指南:如何用AI智能解析视频内容,让机器看懂视频

视频分析终极指南:如何用AI智能解析视频内容,让机器看懂视频 【免费下载链接】video-analyzer Analyze videos using LLMs, Computer Vision and Automatic Speech Recognition 项目地址: https://gitcode.com/gh_mirrors/vi/video-analyzer 你是…...

比亚迪+奇瑞+长安组建电池供应链联盟;Sensify无液压制动系统实现量产;宝马深化合作量子计算加速新能源材料研发

比亚迪、奇瑞、长安组建电池供应链联盟降本提效牛喀网获悉,比亚迪、奇瑞、长安三大中国车企正式组建战略联盟,聚焦电池供应链的优化,以应对新能源汽车补贴退坡后的市场压力。技术与战略层面,三方将成立深圳合资公司,初…...

Cursor破解工具终极指南:3步轻松解除AI编程限制

Cursor破解工具终极指南:3步轻松解除AI编程限制 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reached your trial req…...

通过Taotoken的稳定性与路由能力保障线上服务高可用

通过Taotoken的稳定性与路由能力保障线上服务高可用 在将大模型能力集成到线上生产环境时,服务的连续性与可靠性是开发者必须面对的核心挑战。模型供应商的API端点可能因网络、负载或维护而出现波动,单一依赖往往意味着单点故障风险。本文将分享在实际生…...

Tomcat 8.5 启动报错 Invalid byte tag in constant pool?别慌,教你两招搞定(附 gson-2.8.6.jar 解决方案)

Tomcat 8.5启动报错Invalid byte tag in constant pool的深度解析与实战解决方案 当你正在紧急部署项目时,突然看到Tomcat控制台抛出"Invalid byte tag in constant pool: 19"的红色错误信息,那种心跳加速的感觉想必很多开发者都深有体会。这种…...

产销严重脱节,生产过剩与缺货问题反复出现怎么办?——2026年基于实在Agent的智慧供应链深度重构方案

站在2026年的时间节点回看,制造业的数字化转型已从简单的“信息化”跃迁至“智能体化”。 然而,即便在AI技术高度普及的今天,许多企业依然深陷于产销严重脱节的泥潭: 一边是仓库中堆积如山的过期库存,导致资金链极度紧…...

绕过地域限制:利用国内IP池,采集仅限特定地区访问的内容

做数据采集时最郁闷的状况是什么?不是代码写不出来,而是你明明看到数据就在那里,网站却理直气壮把你的请求拒之门外。更令人摸不着头脑的是,同一份商品的价格在不同的城市切换了IP就变了,招聘同一内容同岗位在不同地区…...

高端游戏主板选哪个品牌?主流产品线深度解析

在当前的游戏主板市场中,品牌方普遍采用多层次的产品系列策略来覆盖从入门到极限超频的广泛需求。清晰的系列划分不仅帮助玩家快速定位适合自身预算与使用场景的产品,也反映了各家技术路线的差异。本文从产品线定位切入,梳理华硕、七彩虹、技…...

【AISMM全球落地实战指南】:20年SITS专家亲授3大阻力破解法与5国推广避坑清单

更多请点击: https://intelliparadigm.com 第一章:SITS2026圆桌:AISMM的全球推广 在2026年新加坡国际技术峰会(SITS2026)上,AISMM(AI-Driven Software Maturity Model)正式成为全球…...

FinOps还在人工对账?AISMM已实现毫秒级资源-成本-业务价值映射(2026奇点大会实时沙箱演示实录)

更多请点击: https://intelliparadigm.com 第一章:2026奇点智能技术大会:AISMM与FinOps 2026奇点智能技术大会首次将人工智能系统成熟度模型(AISMM)与云原生财务运营(FinOps)深度耦合&#xff…...

五级地址解析是什么?为什么比四级多了行政村

你有没有遇到过这种情况?做物流分单,地址只解析到街道级别,但一个街道下面可能有十几个社区,分单不够精细;做政务数据统计,想按行政村/社区维度汇总,但地址库只有省市区街道四级,缺了…...

AISMM评估成本黑箱破解(含SITS2026官方未披露的3项强制审计附加项)

更多请点击: https://intelliparadigm.com 第一章:SITS2026分享:AISMM评估成本分析 AISMM(AI Software Maturity Model)作为面向生成式AI系统的能力成熟度评估框架,在SITS2026峰会上首次公开了其标准化评估…...

Docker 入门实战 完整步骤记录

一、安装与基础配置阶段 安装并启动 Docker Desktop 完成安装后,打开软件,确认主界面显示 Engine running(引擎运行中) 且状态为绿色。 配置国内镜像源(解决下载慢/超时问题) 点击右上角 Settings&#xf…...