当前位置: 首页 > article >正文

Apache Airflow 系列教程 | 第7课:执行器(Executor)体系架构

导读(Introduction)欢迎来到 Apache Airflow 源码深度解析系列的第七课。在前两课中,我们分别剖析了 Scheduler 的调度决策逻辑和 DAG 解析引擎。Scheduler 决定了"哪些任务应该运行",解析引擎确保了"系统能看到哪些 DAG"。但还有一个关键问题:任务被 Scheduler 标记为 QUEUED 之后,实际在哪里、以何种方式执行?这就是 Executor(执行器)的职责。Executor 是 Scheduler 与 Worker 之间的桥梁——它接收调度器"入队"的任务,将其分发到实际的计算资源上执行,并将执行结果反馈给调度器。Airflow 3.x 的 Executor 体系进行了重要升级:引入了统一的Workload 抽象,将任务执行、回调执行、触发器运行统一为同一种"工作负载"概念支持Multi-Team 配置,允许不同团队使用不同的执行器执行器通过Execution API与 Task SDK 通信,实现了更清晰的职责边界从单机的LocalExecutor到分布式的CeleryExecutor和KubernetesExecutor,不同的 Executor 实现适应不同的部署规模和资源管理策略。本课将带你深入理解这个执行层的设计哲学和实现细节。学习目标(Learning Objectives)完成本课学习后,你将能够:理解 Executor 在系统架构中的角色定位——Scheduler 与实际任务执行之间的协调层掌握 BaseExecutor 的核心接口设计——heartbeat、trigger_tasks、sync、change_state 的协作流程理解 Workload 抽象体系——ExecuteTask、ExecuteCallback、RunTrigger 的统一设计分析 LocalExecutor 的实现——基于 multiprocessing 的本地并行执行了解 CeleryExecutor 和 KubernetesExecutor——分布式执行的两种主流方案掌握 ExecutorLoader 的动态加载机制——多执行器、多团队的配置与实例化具备实现自定义 Executor 的能力——理解最小实现所需的接口契约正文内容(Main Content)1. Executor 在架构中的位置1.1 整体数据流┌──────────────────────────────────────────────────────────────────┐ │ Scheduler │ │ _critical_section_enqueue_task_instances() │ │ → 将 TI 状态设为 QUEUED │ │ → 调用 executor.queue_workload(workload) │ └─────────────────────────────┬────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────┐ │ Executor │ │ heartbeat() → trigger_tasks() → _process_workloads() │ │ → 将 workload 分发到实际计算资源 │ │ sync() │ │ → 收集执行结果,更新 event_buffer │ └─────────────────────────────┬────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────┐ │ Worker / Execution Environment │ │ BaseExecutor.run_workload(workload) │ │ → supervise_task() / supervise_callback() │ │ → 通过 Execution API 与 Airflow Server 通信 │ └──────────────────────────────────────────────────────────────────┘1.2 Executor 解决的核心问题问题Executor 的解决方案任务在哪里执行?本地进程、远程 Worker、K8s Pod如何控制并行度?parallelism参数限制同时运行的 workload 数如何获取执行结果?sync()方法轮询结果,写入event_buffer如何处理失败?状态变更通知 Scheduler,由 Scheduler 决定重试如何支持不同团队?Multi-Team 配置,每个团队可有独立 Executor2. BaseExecutor:执行器的核心抽象2.1 类定义与关键属性# 源码位置:airflow-core/src/airflow/executors/base_executor.pyclassBaseExecutor(LoggingMixin):""" Base class to inherit for concrete executors such as Celery, Kubernetes, Local, etc. """# 类级别的能力声明supports_ad_hoc_ti_run:bool=False# 是否支持即席运行supports_callbacks:bool=False# 是否支持回调执行supports_multi_team:bool=False# 是否支持多团队is_local:bool=False# 是否为本地执行器is_production:bool=True# 是否适用于生产环境serve_logs:bool=False# 是否提供日志服务pre_assigns_external_executor_id:ClassVar[bool]=False# 是否预分配外部IDdef__init__(self,parallelism:int=PARALLELISM,team_name:str|None=None):self.parallelism:int=parallelism self.team_name:str|None=team_name# 核心数据结构self.queued_tasks:dict[TaskInstanceKey,workloads.ExecuteTask]={}self.queued_callbacks:dict[str,workloads.ExecuteCallback]={}self.running:set[WorkloadKey]=set()self.event_buffer:dict[WorkloadKey,EventBufferValueType]={}self.conf=ExecutorConf(team_name)关键属性解读:属性类型说明queued_tasksdictScheduler 已入队但尚未发送到 Worker 的任务queued_callbacksdict已入队的回调工作负载runningset已发送到 Worker 正在执行的 workload keyevent_bufferdict执行完成的结果缓冲区,等待 Scheduler 读取parallelismint最大并行执行数2.2 核心方法生命周期Executor 的核心方法形成了一个清晰的生命周期:# 1. 启动defstart(self):"""Executors may need to get things started."""# 2. Scheduler 入队工作负载defqueue_workload(self,workload:ExecutorWorkload,session:Session)-None:ifisinstance(workload,workloads.ExecuteTask):self.queued_tasks[ti.key]=workloadelifisinstance(workload,workloads.ExecuteCallback):self.queued_callbacks[workload.callback.id]=workload# 3. 心跳驱动执行(被 Scheduler 循环调用)defheartbeat(self)-None:open_slots=self.parallelism-len(self.running)self._emit_metrics(open_slots,...)self.trigger_tasks(open_slots)self.sync()# 4. 触发任务执行deftrigger_tasks(self,open_slots:int)-None:workloads_to_schedule=self._get_workloads_to_schedule(open_slots)ifworkload_list:self._process_workloads(workload_list)# 子类必须实现# 5. 同步执行状态(子类覆盖)defsync(self)-None:"""Executors should override this to gather statuses."""# 6. Scheduler 读取执行结果defget_event_buffer(self,dag_ids=None)-dict[WorkloadKey,EventBufferValueType]:cleared_events=self.event_buffer self.event_buffer={}returncleared_events# 7. 关闭defend(self)-None:"""Wait synchronously for previously submitted jobs to complete."""defterminate(self):"""Called when the daemon receives a SIGTERM."""2.3 调度优先级策略def_get_workloads_to_schedule(self,open_slots:int)-list[tuple[WorkloadKey,ExecutorWorkload]]:""" Priority Policy: Callbacks are scheduled before tasks. Callbacks complete existing work; tasks start new work. """workloads_to_schedule=[]# 优先级1:回调先于任务(完成现有工作优先于开始新工作)ifself.queued_callbacks:forkey,workloadinself.queued_callbacks.items():iflen(workloads_to_schedule)=open_slots:breakworkloads_to_schedule.append((key,workload))# 优先级2:任务按 priority_weight 排序ifopen_slotslen(workloads_to_schedule)andself.queued_tasks:fortask_key,task_workloadinself.order_queued_tasks_by_priority():iflen(workloads_to_schedule)=open_slots:breakworkloads_to_schedule.append((task_key,task_workload))returnworkloads_to_scheduledeforder_queued_tasks_by_priority(self):"""Sort tasks by priority_weight (lower number = higher priority)."""returnsorted(self.queued_tasks.items(),key=lambdax:x[1].ti.priority_weight,reverse=False,)2.4 状态变更与事件缓冲defchange_state(self,key:WorkloadKey,state:WorkloadState,info=None,remove_running=True):"""Change state of the task and update event buffer."""ifremove_running:try:self.running.remove(key)exceptKeyError:passself.event_buffer[key]=state,infodeffail(self,key:WorkloadKey,info=

相关文章:

Apache Airflow 系列教程 | 第7课:执行器(Executor)体系架构

导读(Introduction) 欢迎来到 Apache Airflow 源码深度解析系列的第七课。 在前两课中,我们分别剖析了 Scheduler 的调度决策逻辑和 DAG 解析引擎。Scheduler 决定了"哪些任务应该运行",解析引擎确保了"系统能看到哪些 DAG"。但还有一个关键问题:任…...

Apache Airflow 系列教程 | 第6课:DAG 解析与处理引擎

导读(Introduction) 欢迎来到 Apache Airflow 源码深度解析系列的第六课。 在前一课中,我们深入剖析了 Scheduler 的核心原理——它如何在循环中创建 DagRun、推进任务状态、将任务入队给 Executor。但 Scheduler 能调度 DAG 的前提是:系统必须先"看到"这些 DA…...

AISMM自评估工具究竟如何判定“智能奇点临近”?——独家披露5类隐性失效模式与3类高危误判信号

更多请点击: https://intelliparadigm.com 第一章:AISMM自评估工具的核心定位与奇点判定范式演进 AISMM(Artificial Intelligence Security Maturity Model)自评估工具并非传统合规检查清单的数字化复刻,而是面向AI系…...

AISMM人才培养体系正式启用倒计时72天!未备案机构将失去官方认证资格(附首批17家白名单)

更多请点击: https://intelliparadigm.com 第一章:2026奇点智能技术大会:AISMM人才培养体系 体系定位与核心理念 AISMM(Artificial Intelligence Skills Maturity Model)是2026奇点智能技术大会正式发布的国家级AI人…...

devmem:为代码库构建本地化项目记忆的CLI工具

1. 项目概述:为你的代码库构建一个本地化的“项目记忆” 你有没有过这样的经历?接手一个新项目,或者时隔几个月再回到自己的老项目,面对一堆代码,脑子里一片空白:“我当时为什么要这么设计这个接口&#x…...

mysql表结构发生变更如何记录_SQL版本管理与Migration工具

所有表结构变更必须通过带版本号的SQL文件执行,禁止直接在生产库运行ALTER命令;每个文件仅含一个操作、严格编号、开头注明影响范围。MySQL表结构变更必须走SQL文件,不能直接在生产库上ALTER线上表结构改了但没留痕,等于没改——下…...

从尖叫到安静:一个电机小白的FOC电流环PI参数实战调参笔记(含计算法与经验法对比)

从尖叫到安静:一个电机小白的FOC电流环PI参数实战调参笔记(含计算法与经验法对比) 第一次给永磁同步电机上电时,那刺耳的啸叫声让我差点摔了开发板——这大概是所有FOC初学者共同的噩梦。作为从Arduino PWM调速一路摸爬滚打过来的…...

一站式大模型评估框架EvalScope:从原理到实战的完整指南

1. 项目概述:一站式大模型评估框架 EvalScope 在当下这个“百模大战”的时代,无论是研究机构、企业团队还是个人开发者,面对层出不穷的大语言模型、多模态模型,一个最直接且核心的问题就是: “这个模型到底行不行&am…...

开源量化期权交易框架FlowAlgo:从事件驱动到希腊字母风控

1. 项目概述:一个面向量化期权交易的算法框架如果你在量化交易领域摸爬滚打过几年,尤其是接触过期权,那你一定对“策略回测”和“实盘部署”之间的巨大鸿沟深有体会。自己写的策略在回测里表现亮眼,一旦要把它变成一个稳定、可维护…...

LuaDec51 终极指南:如何高效反编译 Lua 5.1 字节码的完整解决方案

LuaDec51 终极指南:如何高效反编译 Lua 5.1 字节码的完整解决方案 【免费下载链接】luadec51 Lua Decompiler for Lua version 5.1 项目地址: https://gitcode.com/gh_mirrors/lu/luadec51 LuaDec51 是一款专注于 Lua 5.1 版本的专业反编译工具,能…...

终极指南:3分钟掌握VideoDownloadHelper免费视频下载神器

终极指南:3分钟掌握VideoDownloadHelper免费视频下载神器 【免费下载链接】VideoDownloadHelper Chrome Extension to Help Download Video for Some Video Sites. 项目地址: https://gitcode.com/gh_mirrors/vi/VideoDownloadHelper 想要轻松下载网页视频却…...

Android虚拟相机:用开源技术重塑你的摄像头体验

Android虚拟相机:用开源技术重塑你的摄像头体验 【免费下载链接】com.example.vcam 虚拟摄像头 virtual camera 项目地址: https://gitcode.com/gh_mirrors/co/com.example.vcam 你是否曾想过,在视频会议中展示一个精心设计的虚拟背景&#xff0c…...

WarcraftHelper:如何在现代电脑上重燃魔兽争霸3的竞技激情?

WarcraftHelper:如何在现代电脑上重燃魔兽争霸3的竞技激情? 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 你是否曾经满怀期待…...

终极魔兽争霸3优化指南:WarcraftHelper让你的经典游戏焕发新生

终极魔兽争霸3优化指南:WarcraftHelper让你的经典游戏焕发新生 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 还在为魔兽争霸3在现代电脑…...

【研报418】汽车行业2026年夏季策略报告:以旧换新年中或加码,结构性倾斜高端市场

本报告提供限时下载,请查看文后提示以下仅为报告部分内容:摘要:当前国内汽车行业内需承压,正处于筑底去库存阶段,出口成为行业核心增长支撑。2026年一季度中国汽车出口达222.6万辆,同比增长56.7%&#xff0…...

ChanlunX:通达信缠论插件的完整使用指南

ChanlunX:通达信缠论插件的完整使用指南 【免费下载链接】ChanlunX 缠中说禅炒股缠论可视化插件 项目地址: https://gitcode.com/gh_mirrors/ch/ChanlunX ChanlunX是一个基于C开发的开源缠论技术分析插件,专门为通达信软件提供专业的缠论结构自动…...

视频分析终极指南:如何用AI智能解析视频内容,让机器看懂视频

视频分析终极指南:如何用AI智能解析视频内容,让机器看懂视频 【免费下载链接】video-analyzer Analyze videos using LLMs, Computer Vision and Automatic Speech Recognition 项目地址: https://gitcode.com/gh_mirrors/vi/video-analyzer 你是…...

比亚迪+奇瑞+长安组建电池供应链联盟;Sensify无液压制动系统实现量产;宝马深化合作量子计算加速新能源材料研发

比亚迪、奇瑞、长安组建电池供应链联盟降本提效牛喀网获悉,比亚迪、奇瑞、长安三大中国车企正式组建战略联盟,聚焦电池供应链的优化,以应对新能源汽车补贴退坡后的市场压力。技术与战略层面,三方将成立深圳合资公司,初…...

Cursor破解工具终极指南:3步轻松解除AI编程限制

Cursor破解工具终极指南:3步轻松解除AI编程限制 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reached your trial req…...

通过Taotoken的稳定性与路由能力保障线上服务高可用

通过Taotoken的稳定性与路由能力保障线上服务高可用 在将大模型能力集成到线上生产环境时,服务的连续性与可靠性是开发者必须面对的核心挑战。模型供应商的API端点可能因网络、负载或维护而出现波动,单一依赖往往意味着单点故障风险。本文将分享在实际生…...

Tomcat 8.5 启动报错 Invalid byte tag in constant pool?别慌,教你两招搞定(附 gson-2.8.6.jar 解决方案)

Tomcat 8.5启动报错Invalid byte tag in constant pool的深度解析与实战解决方案 当你正在紧急部署项目时,突然看到Tomcat控制台抛出"Invalid byte tag in constant pool: 19"的红色错误信息,那种心跳加速的感觉想必很多开发者都深有体会。这种…...

产销严重脱节,生产过剩与缺货问题反复出现怎么办?——2026年基于实在Agent的智慧供应链深度重构方案

站在2026年的时间节点回看,制造业的数字化转型已从简单的“信息化”跃迁至“智能体化”。 然而,即便在AI技术高度普及的今天,许多企业依然深陷于产销严重脱节的泥潭: 一边是仓库中堆积如山的过期库存,导致资金链极度紧…...

绕过地域限制:利用国内IP池,采集仅限特定地区访问的内容

做数据采集时最郁闷的状况是什么?不是代码写不出来,而是你明明看到数据就在那里,网站却理直气壮把你的请求拒之门外。更令人摸不着头脑的是,同一份商品的价格在不同的城市切换了IP就变了,招聘同一内容同岗位在不同地区…...

高端游戏主板选哪个品牌?主流产品线深度解析

在当前的游戏主板市场中,品牌方普遍采用多层次的产品系列策略来覆盖从入门到极限超频的广泛需求。清晰的系列划分不仅帮助玩家快速定位适合自身预算与使用场景的产品,也反映了各家技术路线的差异。本文从产品线定位切入,梳理华硕、七彩虹、技…...

【AISMM全球落地实战指南】:20年SITS专家亲授3大阻力破解法与5国推广避坑清单

更多请点击: https://intelliparadigm.com 第一章:SITS2026圆桌:AISMM的全球推广 在2026年新加坡国际技术峰会(SITS2026)上,AISMM(AI-Driven Software Maturity Model)正式成为全球…...

FinOps还在人工对账?AISMM已实现毫秒级资源-成本-业务价值映射(2026奇点大会实时沙箱演示实录)

更多请点击: https://intelliparadigm.com 第一章:2026奇点智能技术大会:AISMM与FinOps 2026奇点智能技术大会首次将人工智能系统成熟度模型(AISMM)与云原生财务运营(FinOps)深度耦合&#xff…...

五级地址解析是什么?为什么比四级多了行政村

你有没有遇到过这种情况?做物流分单,地址只解析到街道级别,但一个街道下面可能有十几个社区,分单不够精细;做政务数据统计,想按行政村/社区维度汇总,但地址库只有省市区街道四级,缺了…...

AISMM评估成本黑箱破解(含SITS2026官方未披露的3项强制审计附加项)

更多请点击: https://intelliparadigm.com 第一章:SITS2026分享:AISMM评估成本分析 AISMM(AI Software Maturity Model)作为面向生成式AI系统的能力成熟度评估框架,在SITS2026峰会上首次公开了其标准化评估…...

Docker 入门实战 完整步骤记录

一、安装与基础配置阶段 安装并启动 Docker Desktop 完成安装后,打开软件,确认主界面显示 Engine running(引擎运行中) 且状态为绿色。 配置国内镜像源(解决下载慢/超时问题) 点击右上角 Settings&#xf…...

抖音图片怎么无水印保存?2026 保存工具和方法实测对比指南

每当我们在抖音上看到喜欢的图片,总会想保存下来。但抖音默认保存的图片往往带着明显的水印,影响美观度。对于想要收藏素材、做内容创意参考,或者只是想干净地保存喜欢图片的人来说,无水印保存抖音图片就成了一个实际需求。2026 年…...