当前位置: 首页 > news >正文

如何通过 Apache Airflow 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz

了解如何通过 Apache Airflow 将数据导入 Elasticsearch。

Apache Airflow

Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load) 流程、数据管道和其他复杂工作流,提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效,让你可以跟踪执行的进度和结果。以下是它的四个主要支柱:

  • 动态:管道以 Python 定义,允许动态灵活地生成工作流。
  • 可扩展:Airflow 可以与各种环境集成,可以创建自定义运算符,并可以根据需要执行特定代码。
  • 优雅:管道以干净明确的方式编写。
  • 可扩展:其模块化架构使用消息队列来编排任意数量的工作器。

在实践中,Airflow 可用于以下场景:

  • 数据导入:编排将数据每日提取到 Elasticsearch 等数据库中。
  • 日志监控:管理日志文件的收集和处理,然后在 Elasticsearch 中进行分析以识别错误或异常。
  • 多种数据源集成:将来自不同系统(API、数据库、文件)的信息合并到 Elasticsearch 中的单个层中,简化搜索和报告。

DAG:Directed Acyclic Graphs - 有向无环图

在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种定义任务执行顺序的结构。DAG 的主要特征是:

  • 由独立任务组成:每个任务代表一个工作单元,旨在独立执行。
  • 排序:任务的执行顺序在 DAG 中明确定义。
  • 可重用性:DAG 旨在重复执行,促进流程自动化。

Airflow 的主要组件

Airflow 生态系统由多个组件组成,它们共同协作以协调任务:

  • 调度程序 - scheduler:负责调度 DAG 并发送任务以供工作人员执行。
  • 执行器 - Exectutor:管理任务的执行,将其委托给工作人员。
  • Web 服务器 - Webserver:提供与 DAG 和任务交互的图形界面。
  • Dags 文件夹 - Dags folder:我们存储用 Python 编写的 DAG 的文件夹。
  • 元数据 - Metadata:作为工具存储库的数据库,由调度程序和执行器用于存储执行状态。

Apache Airflow 和 Elasticsearch

我们将演示如何使用 Apache Airflow 和 Elasticsearch 来协调任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包含电影数据库,用户可以在其中进行评分和分配评级。想象一个每天有数百个评级的场景,有必要保持评级记录更新。为此,将开发一个 DAG,它将每天执行,负责检索新的合并评级并更新索引中的记录。

在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到失败任务。否则,数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责计算分数的机制的方法检索评级,以更新索引中电影的评级字段。

使用 Apache Airflow 和 Elasticsearch 以及 Docker

要创建容器化环境,我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的说明实际设置 Airflow。

至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 配置 Elasticsearch。已经创建了一个包含电影目录的索引,其中电影数据已编入索引。这些电影的 “rating” 字段将被更新。

创建 DAG

通过 Docker 安装后,将创建一个文件夹结构,其中包括 dags 文件夹,我们必须将 DAG 文件放在该文件夹中,以便 Airflow 识别它们。

在此之前,我们需要确保安装了必要的依赖项。以下是此项目的依赖项:

pip install apache-airflow apache-airflow-providers-elasticsearch

我们将创建文件 update_ratings_movies.py 并开始编写任务代码。

现在,让我们导入必要的库:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

我们将使用 ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。

接下来,我们定义 DAG,并指定其主要参数:

  • dag_id:DAG 的名称。
  • start_date:DAG 的启动时间。
  • schedule:定义周期(在我们的例子中是每日)。
  • doc_md:将导入并显示在 Airflow 界面中的文档。

定义任务

现在,让我们定义 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator,并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。

get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task
)

接下来,我们需要验证结果是否有效。为此,我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”,python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果传递给验证函数。

validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证成功,我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务 “index_movie_ratings”,它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果传递给索引函数。

index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证表明失败,DAG 将继续执行失败通知任务。在此示例中,我们只是打印一条消息,但在实际场景中,我们可以配置警报来通知失败。

failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.')
)

最后,我们定义任务依赖关系,确保它们以正确的顺序执行:

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

以下是我们 DAG 的完整代码:

"""
DAG update Rating Movies
"""
import ast
import randomfrom airflow import DAG
from datetime import datetimefrom airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHookdef index_movie_ratings_task(movies):es_hook = ElasticsearchPythonHook(hosts=None,es_conn_args={"cloud_id": "cloud_id""api_key": "api-key"})es_client = es_hook.get_connactions = []for movie in ast.literal_eval(movies):actions.append({"update": {"_id": movie["id"],"_index": "movies"}})actions.append({"doc": {"rating": movie["rating"]},"doc_as_upsert": True})result = es_client.bulk(operations=actions)print(f"Ingestion completed.")print(result)return Truedef get_movie_ratings_task():movies = [{"id": i, "rating": round(random.uniform(1, 10), 1)}for i in range(1, 100)]return moviesdef validate_result(result):if not result:return 'failed_get_rating_operator'else:return 'index_movie_ratings'with DAG(dag_id="update_ratings_movies_2024",start_date=datetime(2024, 12, 29),schedule="@daily",doc_md=__doc__,
):get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task)validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"],provide_context=True)index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"])failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.'))get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

可视化 DAG 执行

在 Apache Airflow 界面中,我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。

下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行,我们可以访问每个任务的日志。请注意,在 index_movie_ratings 任务中,我们可以在索引中看到索引结果,并且它已成功完成。

在其他选项卡中,可以访问有关任务和 DAG 的其他信息,以协助分析和解决潜在问题。

结论

在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何配置 DAG、定义负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面中监控和可视化这些任务的执行。

这种方法可以轻松适应不同类型的数据和工作流,使 Airflow 成为在各种场景中编排数据管道的有用工具。

参考资料:

Apache AirFlow

  • https://airflow.apache.org/

使用 Docker 安装 Apache Airflow

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Elasticsearch Python Hook

  • https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html

Python 运算符

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在吗的本地机器上试用 Elastic。

原文:How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs

相关文章:

如何通过 Apache Airflow 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz 了解如何通过 Apache Airflow 将数据导入 Elasticsearch。 Apache Airflow Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load&#xff0…...

Android Studio:Linux环境下安装与配置

更多内容:XiaoJ的知识星球 Android Studio:Linux环境下安装与配置 1.安装JDK2.安装Android Studio2.1 获取安装包2.2 安装(1)配置环境变量:(2)运行安装:(3)配…...

token是用来鉴权的,那session是用来干什么的?

在Web应用和API设计中,鉴权与会话管理是两个核心概念,它们对于确保用户身份的安全性和维护用户会话状态至关重要。Token和Session是两种常用的鉴权与会话管理机制,它们各自具有独特的工作原理和适用场景。下面是对Token和Session的详细解析及…...

基于 WEB 开发的二手车辆销售管理系统设计与实现

标题:基于 WEB 开发的二手车辆销售管理系统设计与实现 内容:1.摘要 摘要:随着互联网技术的不断发展,电子商务在各个领域得到了广泛的应用。本文以二手车辆销售管理系统为例,探讨了基于 WEB 开发的销售管理系统的设计与实现。通过对系统需求的…...

wordpress的火车头商品发布接口

<?php require ../wp-load.php; ini_set(memory_limit, 1024M); set_time_limit(180);$top_cat ; # 图片链接域名替换 $image_host ;$start_time microtime(true);$counter 0; // 临时缓存 $products $skus $categories []; $var_sku_index 1;$rowData$_POST;// if…...

浙江安吉成新照明电器:Acrel-1000DP 分布式光伏监控系统应用探索

安科瑞吕梦怡 18706162527 摘 要&#xff1a;分布式光伏发电站是指将光伏发电组件安装在用户的建筑物屋顶、空地或其他适合的场地上&#xff0c;利用太阳能进行发电的一种可再生能源利用方式&#xff0c;与传统的大型集中式光伏电站相比&#xff0c;分布式光伏发电具有更灵活…...

总结3..

#include<stdio.h> int n,m; int a[1002][1002]; int b[1002][1002];//判断该空的八连通图是否被走过 int gg0; int dd0; int xz[8]{-1,-1,-1,0,0,1,1,1},yz[8]{-1,0,1,-1,1,-1,0,1};//八个方向 void dfs(int x,int y) { int dx,dy; for(int i0;i<8;i) { …...

信息奥赛一本通 1168:大整数加法

这道题是一道大整数加法&#xff0c;涉及到高精度的算法&#xff0c;比如说有两个数要进行相加&#xff0c;1111111111111111111111111111111111111112222222222222222222222222222222&#xff0c;那么如果这两个数很大的话我们常用的数据类型是不能进行计算的&#xff0c;那么…...

3.3 OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南

OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南 OpenAI 的 GPT 系列语言模型,包括 GPT-4、GPT-3.5 和 GPT-3,已经成为自然语言处理领域的标杆。无论是文本生成、对话系统,还是自动化任务,开发者都可以通过 API 调用这些强大的模型来增强他们的应用。本文将为您详细介…...

横盘出击的三种经典走势形态,买点以及需要注意的问题技术详解

龙头股在横盘整理过程中&#xff0c;也会出现几种不同的形态&#xff0c;比如矩形整理形态&#xff0c;或者在某一趋势线下方运行。 第一种形态&#xff1a;突破横盘趋势线 突破横盘趋势线时识别横盘龙头启动的关键点位。股价经过一段时间的横盘后&#xff0c;突然出现快速上…...

处理没有提示的字符串、计算相隔天数应用题

正常情况下&#xff0c;小云每天跑 1 千米。如果某天是周一或者月初&#xff08;1 日&#xff09;&#xff0c;为了激励自己&#xff0c;小云要跑 2 千米。如果同时是周一或月初&#xff0c;小云也是跑 2 千米。 小云跑步已经坚持了很长时间&#xff0c;从 1990 年 1 月 1 日周…...

【LeetCode】力扣刷题热题100道(31-35题)附源码 搜索二维矩阵 岛屿数量 腐烂的橙子 课程表 实现 Trie (前缀树)(C++)

一、搜索二维矩阵 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 可以使用 从右上角开始搜索 的方法来有效地找到目标值。 选择起始位置&#xff1a; 从矩…...

react使用react-redux状态管理

1、安装 npm install react-redux2、创建store.js import { createStore } from redux;// 定义初始状态 const initialState {counter: 888 };// 定义 reducer 函数&#xff0c;根据 action 类型更新状态 function reducer(state initialState, action) {switch (action.ty…...

04_角色创建窗口

将上文的登录窗口隐藏 创建空节点 作为创建角色窗口 命名为CreateWnd 创建输入的名字的输入框 再创建一个按钮用来随机角色名字 创建开始游戏按钮 End....

Dockerfile -> Docker image -> Docker container

1. Dockfile -> Docker image docker build -t shuai_image -f xxx/xxx/Dockerfile . (.不能少)出现&#xff1a; [] Building xxx(10/17) > [internal] load build definition from Dockerfile > > transferring dockerfile: … > > transferring context …...

LDN的蓝牙双模键盘帮助文档

文档索引 已支持的PCB列表(仅列出少部分)&#xff1a;键盘特性硬件软件键盘以及驱动蓝牙模式USB模式 驱动功能介绍主界面键盘列表页面键盘配置&#xff08;使用双模键盘的请务必细看本说明&#xff09;功能层配置(改键)触发层配置(改FN键等触发功能)功能选择&#xff08;重要&a…...

搭建一个基于Spring Boot的驾校管理系统

搭建一个基于Spring Boot的驾校管理系统可以涵盖多个功能模块&#xff0c;例如学员管理、教练管理、课程管理、考试管理、车辆管理等。以下是一个简化的步骤指南&#xff0c;帮助你快速搭建一个基础的系统。 1. 项目初始化 使用 Spring Initializr 生成一个Spring Boot项目&am…...

运动相机拍视频过程中摔了,导致录视频打不开怎么办

3-11 在使用运动相机拍摄激烈运动的时候&#xff0c;极大的震动会有一定概率使得保存在存储卡中的视频出现打不开的情况&#xff0c;原因是存储卡和相机在极端情况下&#xff0c;可能会出现接触不良的问题&#xff0c;如果遇到这种问题&#xff0c;就不得不进行视频修复了。 本…...

MongoDB vs Redis:相似与区别

前言 在当今的数据库领域&#xff0c;MongoDB 和 Redis 都是备受关注的非关系型数据库&#xff08;NoSQL&#xff09;&#xff0c;它们各自具有独特的优势和适用场景。本文将深入探讨 MongoDB 和 Redis 的特点&#xff0c;并详细对比它们之间的相似之处和区别&#xff0c;帮助…...

数字图像处理:实验二

任务一&#xff1a; 将不同像素&#xff08;32、64和256&#xff09;的原图像放大为像素大 小为1024*1024的图像&#xff08;图像自选&#xff09; 要求&#xff1a;1&#xff09;输出一幅图&#xff0c;该图包含六幅子图&#xff0c;第一排是原图&#xff0c;第 二排是对应放大…...

基于HTTP协议的PLC数据交互实战(涵盖欧姆龙、三菱、西门子等主流品牌)

1. 为什么需要HTTP协议与PLC交互&#xff1f; 在工业自动化领域&#xff0c;PLC&#xff08;可编程逻辑控制器&#xff09;就像工厂的"大脑"&#xff0c;负责控制各种设备的运行。但传统PLC数据交互方式存在明显痛点&#xff1a;比如欧姆龙用FINS协议、三菱用MC协议、…...

一文学习 工作流开发 BPMN、 Flowable俗

一、什么是requests&#xff1f; requests 是一个用于发送HTTP请求的 Python 库。 它可以帮助你&#xff1a; 轻松发送GET、POST、PUT、DELETE等请求 处理Cookie、会话等复杂性 自动解压缩内容 处理国际化域名和URL 二、应用场景 requests 广泛应用于以下实际场景&#xff1a; …...

GPU算力适配优化:Pixel Epic智识终端在A10/A100/V100上的部署差异

GPU算力适配优化&#xff1a;Pixel Epic智识终端在A10/A100/V100上的部署差异 1. 引言&#xff1a;当像素冒险遇上GPU算力 Pixel Epic智识终端作为一款融合游戏化体验与专业研究功能的创新工具&#xff0c;其核心的AgentCPM-Report大模型对GPU算力有着独特需求。不同型号的NV…...

【LLM工程化生死线】:A/B测试未通过=模型不可上线——某金融大模型因跳过这3步合规验证被监管叫停的完整复盘报告

第一章&#xff1a;大模型工程化中的A/B测试实践 2026奇点智能技术大会(https://ml-summit.org) 在大模型落地场景中&#xff0c;A/B测试不再仅是推荐系统或前端UI的验证手段&#xff0c;而是保障推理质量、响应延迟、成本效率与用户满意度协同演进的核心工程闭环。当多个LLM服…...

【计算机视觉入门精讲】第一站:图像处理与视觉基础

1. 图像的本质&#xff1a;从数学函数到像素矩阵 第一次接触计算机视觉时&#xff0c;最让我震撼的发现是&#xff1a;原来照片就是个数学函数。想象你面前有张黑白老照片&#xff0c;每个位置(x,y)的颜色深浅&#xff0c;其实就是一个函数值f(x,y)。这个函数把二维坐标映射到亮…...

丹青识画系统在卷积神经网络上的优化:提升图像特征提取效率

丹青识画系统在卷积神经网络上的优化&#xff1a;提升图像特征提取效率 最近在折腾一个图像识别项目&#xff0c;用到了丹青识画系统。说实话&#xff0c;刚开始部署完&#xff0c;跑起来的效果虽然不错&#xff0c;但那个推理速度实在是让人有点着急&#xff0c;处理一张高清…...

暗黑破坏神2存档编辑器的终极指南:打造你的完美角色

暗黑破坏神2存档编辑器的终极指南&#xff1a;打造你的完美角色 【免费下载链接】d2s-editor 项目地址: https://gitcode.com/gh_mirrors/d2/d2s-editor 你是否曾为暗黑破坏神2中某个角色的属性分配不当而后悔&#xff1f;是否想体验不同装备组合却不想花费数小时刷装备…...

实战分享:如何用YOLOv5+SpringBoot打造化工安全火苗检测系统(附完整代码)

工业级AI实战&#xff1a;YOLOv5与SpringBoot构建智能火情预警系统 化工行业对安全生产的严苛要求&#xff0c;使得传统人工监控方式面临巨大挑战。我们团队在多个工业场景中验证了一套基于YOLOv5与SpringBoot的智能火情检测方案&#xff0c;其核心在于将前沿目标检测技术与企业…...

类器官:十五五规划下的“人体替身“革命

3月10日&#xff0c;《中华人民共和国国民经济和社会发展第十五个五年规划纲要&#xff08;草案&#xff09;》正式对外公布并提请全国人大审议。值得关注的是&#xff0c;类器官与器官芯片技术首次被提升至国家战略高度——在"专栏8 前沿科技攻关"的"生命科学与…...

3大实用技巧彻底解放你的游戏时间:MAA明日方舟助手深度解析

3大实用技巧彻底解放你的游戏时间&#xff1a;MAA明日方舟助手深度解析 【免费下载链接】MaaAssistantArknights 《明日方舟》小助手&#xff0c;全日常一键长草&#xff01;| A one-click tool for the daily tasks of Arknights, supporting all clients. 项目地址: https:…...