Apache Airflow 快速入门教程
Apache Airflow已经成为Python生态系统中管道编排的事实上的库。与类似的解决方案相反,由于它的简单性和可扩展性,它已经获得了普及。在本文中,我将尝试概述它的主要概念,并让您清楚地了解何时以及如何使用它。
Airflow应用场景
想象一下,你想要构建一个机器学习管道,它由以下几个步骤组成:
- 从基于云的存储中读取图像数据集
- 处理图像
- 使用下载的图像训练深度学习模型
- 将训练好的模型上传到云端
- 部署模型
你将如何安排和自动化这个工作流程?Cron作业是一个简单的解决方案,但它也带来了许多问题。最重要的是,它们不允许你有效地扩展。Airflow提供了轻松调度和扩展复杂数据流程编排的能力,另一方面,它还能够在故障后自动重新运行它们,管理它们的依赖关系,并使用日志和仪表板监视它们。
在构建上述数据流之前,让我们先了解Apache Airflow 的基本概念。
Airflow 简介
Apache Airflow 是一个开源的平台,用于编排、调度和监控工作流,工作流是由一系列任务(Tasks)组成的,这些任务可以是数据处理、数据分析、机器学习模型训练、文件传输等各种操作。因此,它是ETL和MLOps用例的理想解决方案。示例用例包括:
- 从多个数据源提取数据,对其进行聚合、转换,并将其存储在数据仓库中。
- 从数据中提取见解并将其显示在分析仪表板中
- 训练、验证和部署机器学习模型
核心组件
在默认版本中安装Apache Airflow 时,你将看到四个不同的组件。
- Webserver: Webserver是Airflow的用户界面(UI),它允许您在不需要CLI或API的情况下与之交互。从那里可以执行和监视管道,创建与外部系统的连接,检查它们的数据集等等。
- 执行器:执行器是管道运行的机制。有许多不同类型的管道在本地运行,在单个机器中运行,或者以分布式方式运行。一些例子是LocalExecutor, SequentialExecutor, CeleryExecutor和KubernetesExecutor
- 调度器:调度器负责在正确的时间执行不同的任务,重新运行管道,回填数据,确保任务完成等。
- PostgreSQL:存储所有管道元数据的数据库。这通常是Postgres,但也支持其他SQL数据库。
安装Airflow最简单的方法是使用docker compose。你可以从这里下载官方的docker撰写文件:
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
基本概念
要学习Apache Airflow,必须熟悉它的主要概念,这些概念可能有点难理解,让我们试着揭开它们的神秘面纱。
DAGs
所有管道都定义为有向无环图(dag)。每次执行DAG时,都会创建一个单独的运行。每个DAG运行都是独立的,并且包含一个关于DAG执行阶段的状态。这意味着相同的dag可以并行执行多次。
要实例化DAG,可以使用DAG函数或与上下文管理器一起使用,如下所示:
from airflow import DAG
with DAG("mlops",default_args={"retries": 1,},schedule=timedelta(days=1),start_date=datetime(2023, 1, 1)
) as dag:# dag code goes here
上下文管理器接受一些关于DAG的全局变量和一些默认参数。默认参数被传递到所有任务中,并且可以在每个任务的基础上重写。完整的参数列表可以在官方文档中找到。
在本例中,我们定义DAG将从2023年1月1日开始,并且每天执行一次。retries参数确保在可能出现故障后重新运行一次。
task(任务)
DAG的每个节点表示一个Task,即一段单独的代码。每个任务可能有一些上游和下游依赖项。这些依赖关系表示任务如何相互关联以及它们应该以何种顺序执行。每当初始化一个新的DAG运行时,所有任务都初始化为Task实例。这意味着每个Task实例都是给定任务的特定运行。
operator(任务模板)
操作符可以被视为预定义任务的模板,因为它们封装了样板代码并抽象了它们的大部分逻辑。常见的操作符有BashOperator、PythonOperator、MySqlOperator、S3FileTransformOperator。我们看到,操作符可以定义遵循特定模式的任务。例如,MySqlOperator创建任务来执行SQL查询,而BashOperator执行bash脚本。
操作符在DAG上下文管理器中定义,如下所示。下面的代码创建了两个任务,一个执行bash命令,另一个执行MySQL查询。
with DAG("tutorial"
) as dag:task1 = BashOperator(task_id="print_date",bash_command="date",)task2 = MySqlOperator(task_id="load_table",sql="/scripts/load_table.sql")
任务依赖
为了形成DAG的结构,我们需要定义每个任务之间的依赖关系。一种方法是使用>>符号,如下所示:
task1 >> task2 >> task3
# 一个任务有多个依赖
task1 >> [task2, task3]
# 也可以使用set_downstream, set_upstream
t1.set_downstream([t2, t3])
xcom
xcom,或相互通信,负责任务之间的通信。xcom对象可以在任务之间推拉数据。更具体地说,它们将数据推入元数据数据库,其他任务可以从中提取数据。这就是为什么可以通过它们传递的数据量是有限的。但是,如果需要传输大数据,则可以使用合适的外部数据存储,例如对象存储或NoSQL数据库。
看看下面的代码。这两个任务使用ti参数(任务实例的缩写)通过xcom进行通信。train_model任务将model_path推入元数据数据库,元数据由deploy_model任务拉出。
dag = DAG('mlops_dag',
)def train_model(ti):model_path = train_and_save_model()ti.xcom_push(key='model_path', value=model_path)def deploy_model(ti):model_path = ti.xcom_pull(key='model_path', task_ids='train_model')deploy_trained_model(model_path)train_model_task = PythonOperator(task_id='train_model',python_callable=train_model,dag=dag
)deploy_model_task = PythonOperator(task_id='deploy_model',python_callable=deploy_model,dag=dag
)train_model_task >> deploy_model_task
Taskflow
Taskflow API是一种使用Python装饰器@task来定义任务的简单方法。如果所有任务的逻辑都可以用Python编写,那么一个简单的注释就可以定义一个新任务。Taskflow自动管理其他任务之间的依赖关系和通信。
使用Taskflow API,我们可以用@dag装饰器初始化DAG。下面是使用Tashflow示例:
@dag(start_date=datetime(2023, 1, 1),schedule_interval='@daily'
)
def mlops():@taskdef load_data():. . .return df@taskdef preprocessing(data):. . .return data@taskdef fit(data): return Nonedf = load_data()data = preprocessing(df)model = fit(data)dag = mlops()
注意,任务之间的依赖关系是通过每个函数参数隐含的。这里我们是简单的连接顺序,但实际可以变得复杂得多。Taskflow API还解决了任务之间的通信问题,因此使用xcom的需求有限。
调度
作业调度是Airflow的核心功能之一。这可以使用schedule_interval参数完成,该参数接收cron表达式,表示日期时间对象,或预定义变量,如@hour, @daily等。更灵活的方法是使用最近添加的时间表,它支持使用Python定义自定义时间表。
下面是如何使用schedule_interval参数的示例。以下DAG将每天执行。
@dag(start_date=datetime(2023,1,1),schedule_interval = '@daily',catchup =False
)
def my_dag():pass
关于调度,需要了解两个非常重要的概念:回填(backfill)和追赶(catchup)。
一旦我们定义了DAG,我们就设置了开始日期和计划间隔。如果catchup=True,则Airflow 将为从开始日期到当前日期的所有计划间隔创建DAG运行。如果catchup=False,气流将只从当前日期调度运行。
回填扩展了这个想法,使我们能够在CLI中创建过去的运行,而不管catchup参数的值:
$ airflow backfill -s <START_DATE> -e <END_DATE> <DAG_NAME>
连接
Airflow 提供了一种简单的方法来配置与外部系统或服务的连接。可以使用UI、作为环境变量或通过配置文件创建连接。它们通常需要URL、身份验证信息和唯一id。钩子(Hooks )是一种API,它抽象了与这些外部系统的通信。例如,我们可以通过如下的UI定义一个PostgreSQL连接:
然后使用PostgresHook来建立连接并执行我们的查询:
pg_hook = PostgresHook(postgres_conn_id='postgres_custom')conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('create table _mytable (ModelID int, ModelName varchar(255)')
cursor.close()
conn.close()
高级概念
为了使本教程尽可能完整,我需要提到一些更高级的概念。我不会详细介绍每一个,但我强烈建议你看看他们,如果你想深入掌握Airflow 。
- 分支:分支允许你将任务划分为许多不同的任务,如:支持条件处理不同任务的工作流。最常见的方法是BranchPythonOperator。
- 任务组:任务组可以在单个组中组织多个任务。它是简化图形视图和重复模式的好工具。
- 动态包:包和任务也可以以动态的方式构造。从Airflow 2.3开始,可以在运行时创建包和任务,这对于并行和依赖输入的任务来说是理想的。气流也支持Jinja模板,并且是对动态包非常有用的补充。
- 单元测试和日志记录:气流具有运行单元测试和记录信息的专用功能.
Airflow最佳实践
在我们看到实际操作的示例之前,让我们讨论一下大多数从业者使用的一些最佳实践。
- 幂等性:dag和任务应该是幂等的。使用相同的输入重新执行相同的DAG运行应该始终具有与执行一次相同的效果。
- 原子性:任务应该是原子性的。每个任务应该负责一个操作,并且独立于其他任务
- 增量过滤:每个DAG运行应该只处理一批支持增量提取和加载的数据。这样,可能出现的故障就不会影响整个数据集。
- 顶级代码:如果不是用于创建操作符或标记,则应避免使用顶级代码,因为它会影响性能和加载时间。所有代码都应该在任务内部,包括导入包、数据库访问和繁重的计算。
- 复杂性:dag应尽可能保持简单,因为高复杂性可能会影响性能或调度。
相关文章:

Apache Airflow 快速入门教程
Apache Airflow已经成为Python生态系统中管道编排的事实上的库。与类似的解决方案相反,由于它的简单性和可扩展性,它已经获得了普及。在本文中,我将尝试概述它的主要概念,并让您清楚地了解何时以及如何使用它。 Airflow应用场景 …...

42 基于单片机的智能浇花系统
目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机,采样DHT11温湿度传感器检测温湿度,通过LCD1602显示 4*4按键矩阵可以设置温度湿度阈值,温度大于阈值则开启水泵,湿度大于阈值则开启风扇…...
乐橙云小程序插件接入HbuilderX
乐橙插件使用: 1.配置app.json文件,uniapp中在mainfest.json中配置 https://uniapp.dcloud.net.cn/collocation/manifest.html#mp-weixin ** 2、集成插件页面.json文件 ** uniapp在 pages.json 对应页面的 style -> usingComponents 引入组件&…...
VoCo-LLaMA: Towards Vision Compression with Large Language Models
视觉语言模型在各种多模态任务上取得了显著的成功,但经常受到有限上下文窗口和处理高分辨率图像输入和视频的高计算成本的瓶颈。视觉压缩可以通过减少视觉令牌数量避免该问题。先前方法使用额外模块压缩视觉令牌并强制LLM理解压缩的令牌。然而,LLM对视觉…...

Vue+vite 组件开发的环境准备
一.nodejs安装 进入Node.js 官网(Node.js — Run JavaScript Everywhere),点击下载。 双击打开,进行安装 双击打开后,点击 next(下一步),后面也是一直点击 next 无其他设置,直到 …...

基于社区发现的GraphRAG思路
GraphRAG出自2024年4月的论文《From Local to Global: A Graph RAG Approach to Query-Focused Summarization》,其代码也在2024年年中开源 。它在用图结构来完成RAG时,使用社区这个概念并基于社区摘要来回答一些概括性的问题。 Graph RAG流程如论文图1所…...
react学习记录
一、目录结构react优秀代码之react目录结构简洁之道React 作为一个库,不会决定你如何组织项目的结构。这是件好事,因为这样 - 掘金【React】项目的目录结构全面指南_react项目结构-CSDN博客 1、创建项目:开发文档 Getting Started | Create…...

Day2——需求分析与设计
教师端签到应用软件的需求分析; 产品经理如何写好产品需求文档(附模板) 需求分析是软件开发过程中的关键步骤,它确保了开发的软件能够满足用户的需求。以下是进行需求分析的具体步骤: 1. 确定分析目标 明确教师端签到…...

VScode离线下载扩展安装
在使用VScode下在扩展插件时,返现VScode搜索不到插件,网上搜了好多方法,都不是常规操作,解决起来十分麻烦,可以利用离线下载安装的方式安装插件!亲测有效!!! 1.找到VScod…...

【机器学习】机器学习的基本分类-监督学习-决策树(Decision Tree)
决策树是一种树形结构的机器学习模型,适用于分类和回归任务。它通过一系列基于特征的条件判断来将数据分割为多个子区域,从而预测目标变量的值。 1. 决策树的结构 根节点(Root Node) 决策树的起点,包含所有样本。根据某…...

【第 1 章 初识 C 语言】1.8 使用 C 语言的 7 个步骤
目录 1.8 使用 C 语言的 7 个步骤 1.8.1 第 1 步:定义程序的目标 1.8.2 第 2 步:设计程序 1.8.3 第 3 步:编写代码 1.8.4 第 4 步:编译 1.8.5 第 5 步:运行程序 1.8.6 第 6 步:测试和调试程序 1.8.…...
Docker 使用 Dockerfile 文件打包部署前端项目
编写 Dockerfile 文件: FROM nginx:latest ADD dist /etc/nginx/html/dist COPY nginx.conf /etc/nginx/nginx.conf ENV PATH /usr/sbin:$PATH EXPOSE 80 ENTRYPOINT ["nginx"] CMD ["-g","daemon off;"]编写 nginx.conf 文件&#…...
HTML-全
. CSS css后缀名的文件被html引用 在HTML中,CSS(层叠样式表,Cascading Style Sheets)是一种用于设置网页上的文本内容、图片布局和版面设计等外观样式的样式表语言。简单来说,CSS定义了HTML元素如何显示在浏览器中。…...

高效流程图绘制:开发设计流程图利器
在选择画流程图的工具时,不同的项目和使用场景会决定最佳的工具。以下是几款常见的流程图工具,并结合具体项目使用场景提供建议: 1. Lucidchart 特点: 在线协作:支持多人实时协作,适合团队合作。模板丰富&…...
数据仓库的概念
先用大白话讲一下,数据仓库的主要目的就是存储和分析大量结构化数据的。 > 那么它的核心目的是:支持商业智能(BI)和决策支持系统,也就是说,它不仅仅是为了存储,更重要的是为了分析提供便利。…...

AI - 谈谈RAG中的查询分析(2)
AI - 谈谈RAG中的查询分析(2) 大家好,RAG中的查询分析是比较有趣的一个点,内容丰富,并不是一句话能聊的清楚的。今天接着上一篇,继续探讨RAG中的查询分析,并在功能层面和代码层面持续改进。 功…...

Java基础面试题,46道Java基础八股文(4.8万字,30+手绘图)
Java是一种广泛使用的编程语言,由Sun Microsystems(现为Oracle Corporation的一部分)在1995年首次发布。它是一种面向对象的语言,这意味着它支持通过类和对象的概念来构造程序。 Java设计有一个核心理念:“编写一次&am…...
taro小程序马甲包插件
插件名 maloulab/taro-plugins-socksuppet-ci maloulab/taro-plugins-socksuppet-ci安装 yarn add maloulab/taro-plugins-socksuppet-ci or npm i maloulab/taro-plugins-socksuppet-ci插件描述 taro官方是提供了小程序集成插件的tarojs/plugin-mini-ci ,且支持…...

【分组去重】.NET开源 ORM 框架 SqlSugar 系列
💥 .NET开源 ORM 框架 SqlSugar 系列 🎉🎉🎉 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列…...

2020年
C D A C B B A B C B A 42...

(十)学生端搭建
本次旨在将之前的已完成的部分功能进行拼装到学生端,同时完善学生端的构建。本次工作主要包括: 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...

面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...
HybridVLA——让单一LLM同时具备扩散和自回归动作预测能力:训练时既扩散也回归,但推理时则扩散
前言 如上一篇文章《dexcap升级版之DexWild》中的前言部分所说,在叠衣服的过程中,我会带着团队对比各种模型、方法、策略,毕竟针对各个场景始终寻找更优的解决方案,是我个人和我司「七月在线」的职责之一 且个人认为,…...

Xela矩阵三轴触觉传感器的工作原理解析与应用场景
Xela矩阵三轴触觉传感器通过先进技术模拟人类触觉感知,帮助设备实现精确的力测量与位移监测。其核心功能基于磁性三维力测量与空间位移测量,能够捕捉多维触觉信息。该传感器的设计不仅提升了触觉感知的精度,还为机器人、医疗设备和制造业的智…...

前端开发者常用网站
Can I use网站:一个查询网页技术兼容性的网站 一个查询网页技术兼容性的网站Can I use:Can I use... Support tables for HTML5, CSS3, etc (查询浏览器对HTML5的支持情况) 权威网站:MDN JavaScript权威网站:JavaScript | MDN...