如何通过 Apache Airflow 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luiz

了解如何通过 Apache Airflow 将数据导入 Elasticsearch。
Apache Airflow
Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load) 流程、数据管道和其他复杂工作流,提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效,让你可以跟踪执行的进度和结果。以下是它的四个主要支柱:
- 动态:管道以 Python 定义,允许动态灵活地生成工作流。
- 可扩展:Airflow 可以与各种环境集成,可以创建自定义运算符,并可以根据需要执行特定代码。
- 优雅:管道以干净明确的方式编写。
- 可扩展:其模块化架构使用消息队列来编排任意数量的工作器。
在实践中,Airflow 可用于以下场景:
- 数据导入:编排将数据每日提取到 Elasticsearch 等数据库中。
- 日志监控:管理日志文件的收集和处理,然后在 Elasticsearch 中进行分析以识别错误或异常。
- 多种数据源集成:将来自不同系统(API、数据库、文件)的信息合并到 Elasticsearch 中的单个层中,简化搜索和报告。
DAG:Directed Acyclic Graphs - 有向无环图
在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种定义任务执行顺序的结构。DAG 的主要特征是:
- 由独立任务组成:每个任务代表一个工作单元,旨在独立执行。
- 排序:任务的执行顺序在 DAG 中明确定义。
- 可重用性:DAG 旨在重复执行,促进流程自动化。
Airflow 的主要组件
Airflow 生态系统由多个组件组成,它们共同协作以协调任务:

- 调度程序 - scheduler:负责调度 DAG 并发送任务以供工作人员执行。
- 执行器 - Exectutor:管理任务的执行,将其委托给工作人员。
- Web 服务器 - Webserver:提供与 DAG 和任务交互的图形界面。
- Dags 文件夹 - Dags folder:我们存储用 Python 编写的 DAG 的文件夹。
- 元数据 - Metadata:作为工具存储库的数据库,由调度程序和执行器用于存储执行状态。
Apache Airflow 和 Elasticsearch
我们将演示如何使用 Apache Airflow 和 Elasticsearch 来协调任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包含电影数据库,用户可以在其中进行评分和分配评级。想象一个每天有数百个评级的场景,有必要保持评级记录更新。为此,将开发一个 DAG,它将每天执行,负责检索新的合并评级并更新索引中的记录。
在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到失败任务。否则,数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责计算分数的机制的方法检索评级,以更新索引中电影的评级字段。
使用 Apache Airflow 和 Elasticsearch 以及 Docker
要创建容器化环境,我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的说明实际设置 Airflow。
至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 配置 Elasticsearch。已经创建了一个包含电影目录的索引,其中电影数据已编入索引。这些电影的 “rating” 字段将被更新。
创建 DAG
通过 Docker 安装后,将创建一个文件夹结构,其中包括 dags 文件夹,我们必须将 DAG 文件放在该文件夹中,以便 Airflow 识别它们。
在此之前,我们需要确保安装了必要的依赖项。以下是此项目的依赖项:
pip install apache-airflow apache-airflow-providers-elasticsearch
我们将创建文件 update_ratings_movies.py 并开始编写任务代码。
现在,让我们导入必要的库:
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
我们将使用 ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。
接下来,我们定义 DAG,并指定其主要参数:
- dag_id:DAG 的名称。
- start_date:DAG 的启动时间。
- schedule:定义周期(在我们的例子中是每日)。
- doc_md:将导入并显示在 Airflow 界面中的文档。
定义任务
现在,让我们定义 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator,并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。
get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task
)
接下来,我们需要验证结果是否有效。为此,我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”,python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果传递给验证函数。
validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)
如果验证成功,我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务 “index_movie_ratings”,它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果传递给索引函数。
index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)
如果验证表明失败,DAG 将继续执行失败通知任务。在此示例中,我们只是打印一条消息,但在实际场景中,我们可以配置警报来通知失败。
failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.')
)
最后,我们定义任务依赖关系,确保它们以正确的顺序执行:
get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]
以下是我们 DAG 的完整代码:
"""
DAG update Rating Movies
"""
import ast
import randomfrom airflow import DAG
from datetime import datetimefrom airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHookdef index_movie_ratings_task(movies):es_hook = ElasticsearchPythonHook(hosts=None,es_conn_args={"cloud_id": "cloud_id""api_key": "api-key"})es_client = es_hook.get_connactions = []for movie in ast.literal_eval(movies):actions.append({"update": {"_id": movie["id"],"_index": "movies"}})actions.append({"doc": {"rating": movie["rating"]},"doc_as_upsert": True})result = es_client.bulk(operations=actions)print(f"Ingestion completed.")print(result)return Truedef get_movie_ratings_task():movies = [{"id": i, "rating": round(random.uniform(1, 10), 1)}for i in range(1, 100)]return moviesdef validate_result(result):if not result:return 'failed_get_rating_operator'else:return 'index_movie_ratings'with DAG(dag_id="update_ratings_movies_2024",start_date=datetime(2024, 12, 29),schedule="@daily",doc_md=__doc__,
):get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task)validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"],provide_context=True)index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"])failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.'))get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]
可视化 DAG 执行
在 Apache Airflow 界面中,我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。

下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行,我们可以访问每个任务的日志。请注意,在 index_movie_ratings 任务中,我们可以在索引中看到索引结果,并且它已成功完成。

在其他选项卡中,可以访问有关任务和 DAG 的其他信息,以协助分析和解决潜在问题。
结论
在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何配置 DAG、定义负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面中监控和可视化这些任务的执行。
这种方法可以轻松适应不同类型的数据和工作流,使 Airflow 成为在各种场景中编排数据管道的有用工具。
参考资料:
Apache AirFlow
- https://airflow.apache.org/
使用 Docker 安装 Apache Airflow
- https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
Elasticsearch Python Hook
- https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html
Python 运算符
- https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html
想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时开始!
Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在吗的本地机器上试用 Elastic。
原文:How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs
相关文章:
如何通过 Apache Airflow 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luiz 了解如何通过 Apache Airflow 将数据导入 Elasticsearch。 Apache Airflow Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load࿰…...
Android Studio:Linux环境下安装与配置
更多内容:XiaoJ的知识星球 Android Studio:Linux环境下安装与配置 1.安装JDK2.安装Android Studio2.1 获取安装包2.2 安装(1)配置环境变量:(2)运行安装:(3)配…...
token是用来鉴权的,那session是用来干什么的?
在Web应用和API设计中,鉴权与会话管理是两个核心概念,它们对于确保用户身份的安全性和维护用户会话状态至关重要。Token和Session是两种常用的鉴权与会话管理机制,它们各自具有独特的工作原理和适用场景。下面是对Token和Session的详细解析及…...
基于 WEB 开发的二手车辆销售管理系统设计与实现
标题:基于 WEB 开发的二手车辆销售管理系统设计与实现 内容:1.摘要 摘要:随着互联网技术的不断发展,电子商务在各个领域得到了广泛的应用。本文以二手车辆销售管理系统为例,探讨了基于 WEB 开发的销售管理系统的设计与实现。通过对系统需求的…...
wordpress的火车头商品发布接口
<?php require ../wp-load.php; ini_set(memory_limit, 1024M); set_time_limit(180);$top_cat ; # 图片链接域名替换 $image_host ;$start_time microtime(true);$counter 0; // 临时缓存 $products $skus $categories []; $var_sku_index 1;$rowData$_POST;// if…...
浙江安吉成新照明电器:Acrel-1000DP 分布式光伏监控系统应用探索
安科瑞吕梦怡 18706162527 摘 要:分布式光伏发电站是指将光伏发电组件安装在用户的建筑物屋顶、空地或其他适合的场地上,利用太阳能进行发电的一种可再生能源利用方式,与传统的大型集中式光伏电站相比,分布式光伏发电具有更灵活…...
总结3..
#include<stdio.h> int n,m; int a[1002][1002]; int b[1002][1002];//判断该空的八连通图是否被走过 int gg0; int dd0; int xz[8]{-1,-1,-1,0,0,1,1,1},yz[8]{-1,0,1,-1,1,-1,0,1};//八个方向 void dfs(int x,int y) { int dx,dy; for(int i0;i<8;i) { …...
信息奥赛一本通 1168:大整数加法
这道题是一道大整数加法,涉及到高精度的算法,比如说有两个数要进行相加,1111111111111111111111111111111111111112222222222222222222222222222222,那么如果这两个数很大的话我们常用的数据类型是不能进行计算的,那么…...
3.3 OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南
OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南 OpenAI 的 GPT 系列语言模型,包括 GPT-4、GPT-3.5 和 GPT-3,已经成为自然语言处理领域的标杆。无论是文本生成、对话系统,还是自动化任务,开发者都可以通过 API 调用这些强大的模型来增强他们的应用。本文将为您详细介…...
横盘出击的三种经典走势形态,买点以及需要注意的问题技术详解
龙头股在横盘整理过程中,也会出现几种不同的形态,比如矩形整理形态,或者在某一趋势线下方运行。 第一种形态:突破横盘趋势线 突破横盘趋势线时识别横盘龙头启动的关键点位。股价经过一段时间的横盘后,突然出现快速上…...
处理没有提示的字符串、计算相隔天数应用题
正常情况下,小云每天跑 1 千米。如果某天是周一或者月初(1 日),为了激励自己,小云要跑 2 千米。如果同时是周一或月初,小云也是跑 2 千米。 小云跑步已经坚持了很长时间,从 1990 年 1 月 1 日周…...
【LeetCode】力扣刷题热题100道(31-35题)附源码 搜索二维矩阵 岛屿数量 腐烂的橙子 课程表 实现 Trie (前缀树)(C++)
一、搜索二维矩阵 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性: 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 可以使用 从右上角开始搜索 的方法来有效地找到目标值。 选择起始位置: 从矩…...
react使用react-redux状态管理
1、安装 npm install react-redux2、创建store.js import { createStore } from redux;// 定义初始状态 const initialState {counter: 888 };// 定义 reducer 函数,根据 action 类型更新状态 function reducer(state initialState, action) {switch (action.ty…...
04_角色创建窗口
将上文的登录窗口隐藏 创建空节点 作为创建角色窗口 命名为CreateWnd 创建输入的名字的输入框 再创建一个按钮用来随机角色名字 创建开始游戏按钮 End....
Dockerfile -> Docker image -> Docker container
1. Dockfile -> Docker image docker build -t shuai_image -f xxx/xxx/Dockerfile . (.不能少)出现: [] Building xxx(10/17) > [internal] load build definition from Dockerfile > > transferring dockerfile: … > > transferring context …...
LDN的蓝牙双模键盘帮助文档
文档索引 已支持的PCB列表(仅列出少部分):键盘特性硬件软件键盘以及驱动蓝牙模式USB模式 驱动功能介绍主界面键盘列表页面键盘配置(使用双模键盘的请务必细看本说明)功能层配置(改键)触发层配置(改FN键等触发功能)功能选择(重要&a…...
搭建一个基于Spring Boot的驾校管理系统
搭建一个基于Spring Boot的驾校管理系统可以涵盖多个功能模块,例如学员管理、教练管理、课程管理、考试管理、车辆管理等。以下是一个简化的步骤指南,帮助你快速搭建一个基础的系统。 1. 项目初始化 使用 Spring Initializr 生成一个Spring Boot项目&am…...
运动相机拍视频过程中摔了,导致录视频打不开怎么办
3-11 在使用运动相机拍摄激烈运动的时候,极大的震动会有一定概率使得保存在存储卡中的视频出现打不开的情况,原因是存储卡和相机在极端情况下,可能会出现接触不良的问题,如果遇到这种问题,就不得不进行视频修复了。 本…...
MongoDB vs Redis:相似与区别
前言 在当今的数据库领域,MongoDB 和 Redis 都是备受关注的非关系型数据库(NoSQL),它们各自具有独特的优势和适用场景。本文将深入探讨 MongoDB 和 Redis 的特点,并详细对比它们之间的相似之处和区别,帮助…...
数字图像处理:实验二
任务一: 将不同像素(32、64和256)的原图像放大为像素大 小为1024*1024的图像(图像自选) 要求:1)输出一幅图,该图包含六幅子图,第一排是原图,第 二排是对应放大…...
Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...
如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
