当前位置: 首页 > news >正文

如何通过 Apache Airflow 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz

了解如何通过 Apache Airflow 将数据导入 Elasticsearch。

Apache Airflow

Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load) 流程、数据管道和其他复杂工作流,提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效,让你可以跟踪执行的进度和结果。以下是它的四个主要支柱:

  • 动态:管道以 Python 定义,允许动态灵活地生成工作流。
  • 可扩展:Airflow 可以与各种环境集成,可以创建自定义运算符,并可以根据需要执行特定代码。
  • 优雅:管道以干净明确的方式编写。
  • 可扩展:其模块化架构使用消息队列来编排任意数量的工作器。

在实践中,Airflow 可用于以下场景:

  • 数据导入:编排将数据每日提取到 Elasticsearch 等数据库中。
  • 日志监控:管理日志文件的收集和处理,然后在 Elasticsearch 中进行分析以识别错误或异常。
  • 多种数据源集成:将来自不同系统(API、数据库、文件)的信息合并到 Elasticsearch 中的单个层中,简化搜索和报告。

DAG:Directed Acyclic Graphs - 有向无环图

在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种定义任务执行顺序的结构。DAG 的主要特征是:

  • 由独立任务组成:每个任务代表一个工作单元,旨在独立执行。
  • 排序:任务的执行顺序在 DAG 中明确定义。
  • 可重用性:DAG 旨在重复执行,促进流程自动化。

Airflow 的主要组件

Airflow 生态系统由多个组件组成,它们共同协作以协调任务:

  • 调度程序 - scheduler:负责调度 DAG 并发送任务以供工作人员执行。
  • 执行器 - Exectutor:管理任务的执行,将其委托给工作人员。
  • Web 服务器 - Webserver:提供与 DAG 和任务交互的图形界面。
  • Dags 文件夹 - Dags folder:我们存储用 Python 编写的 DAG 的文件夹。
  • 元数据 - Metadata:作为工具存储库的数据库,由调度程序和执行器用于存储执行状态。

Apache Airflow 和 Elasticsearch

我们将演示如何使用 Apache Airflow 和 Elasticsearch 来协调任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包含电影数据库,用户可以在其中进行评分和分配评级。想象一个每天有数百个评级的场景,有必要保持评级记录更新。为此,将开发一个 DAG,它将每天执行,负责检索新的合并评级并更新索引中的记录。

在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到失败任务。否则,数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责计算分数的机制的方法检索评级,以更新索引中电影的评级字段。

使用 Apache Airflow 和 Elasticsearch 以及 Docker

要创建容器化环境,我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的说明实际设置 Airflow。

至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 配置 Elasticsearch。已经创建了一个包含电影目录的索引,其中电影数据已编入索引。这些电影的 “rating” 字段将被更新。

创建 DAG

通过 Docker 安装后,将创建一个文件夹结构,其中包括 dags 文件夹,我们必须将 DAG 文件放在该文件夹中,以便 Airflow 识别它们。

在此之前,我们需要确保安装了必要的依赖项。以下是此项目的依赖项:

pip install apache-airflow apache-airflow-providers-elasticsearch

我们将创建文件 update_ratings_movies.py 并开始编写任务代码。

现在,让我们导入必要的库:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

我们将使用 ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。

接下来,我们定义 DAG,并指定其主要参数:

  • dag_id:DAG 的名称。
  • start_date:DAG 的启动时间。
  • schedule:定义周期(在我们的例子中是每日)。
  • doc_md:将导入并显示在 Airflow 界面中的文档。

定义任务

现在,让我们定义 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator,并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。

get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task
)

接下来,我们需要验证结果是否有效。为此,我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”,python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果传递给验证函数。

validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证成功,我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务 “index_movie_ratings”,它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果传递给索引函数。

index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证表明失败,DAG 将继续执行失败通知任务。在此示例中,我们只是打印一条消息,但在实际场景中,我们可以配置警报来通知失败。

failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.')
)

最后,我们定义任务依赖关系,确保它们以正确的顺序执行:

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

以下是我们 DAG 的完整代码:

"""
DAG update Rating Movies
"""
import ast
import randomfrom airflow import DAG
from datetime import datetimefrom airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHookdef index_movie_ratings_task(movies):es_hook = ElasticsearchPythonHook(hosts=None,es_conn_args={"cloud_id": "cloud_id""api_key": "api-key"})es_client = es_hook.get_connactions = []for movie in ast.literal_eval(movies):actions.append({"update": {"_id": movie["id"],"_index": "movies"}})actions.append({"doc": {"rating": movie["rating"]},"doc_as_upsert": True})result = es_client.bulk(operations=actions)print(f"Ingestion completed.")print(result)return Truedef get_movie_ratings_task():movies = [{"id": i, "rating": round(random.uniform(1, 10), 1)}for i in range(1, 100)]return moviesdef validate_result(result):if not result:return 'failed_get_rating_operator'else:return 'index_movie_ratings'with DAG(dag_id="update_ratings_movies_2024",start_date=datetime(2024, 12, 29),schedule="@daily",doc_md=__doc__,
):get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task)validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"],provide_context=True)index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"])failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.'))get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

可视化 DAG 执行

在 Apache Airflow 界面中,我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。

下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行,我们可以访问每个任务的日志。请注意,在 index_movie_ratings 任务中,我们可以在索引中看到索引结果,并且它已成功完成。

在其他选项卡中,可以访问有关任务和 DAG 的其他信息,以协助分析和解决潜在问题。

结论

在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何配置 DAG、定义负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面中监控和可视化这些任务的执行。

这种方法可以轻松适应不同类型的数据和工作流,使 Airflow 成为在各种场景中编排数据管道的有用工具。

参考资料:

Apache AirFlow

  • https://airflow.apache.org/

使用 Docker 安装 Apache Airflow

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Elasticsearch Python Hook

  • https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html

Python 运算符

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在吗的本地机器上试用 Elastic。

原文:How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs

相关文章:

如何通过 Apache Airflow 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz 了解如何通过 Apache Airflow 将数据导入 Elasticsearch。 Apache Airflow Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load&#xff0…...

Android Studio:Linux环境下安装与配置

更多内容:XiaoJ的知识星球 Android Studio:Linux环境下安装与配置 1.安装JDK2.安装Android Studio2.1 获取安装包2.2 安装(1)配置环境变量:(2)运行安装:(3)配…...

token是用来鉴权的,那session是用来干什么的?

在Web应用和API设计中,鉴权与会话管理是两个核心概念,它们对于确保用户身份的安全性和维护用户会话状态至关重要。Token和Session是两种常用的鉴权与会话管理机制,它们各自具有独特的工作原理和适用场景。下面是对Token和Session的详细解析及…...

基于 WEB 开发的二手车辆销售管理系统设计与实现

标题:基于 WEB 开发的二手车辆销售管理系统设计与实现 内容:1.摘要 摘要:随着互联网技术的不断发展,电子商务在各个领域得到了广泛的应用。本文以二手车辆销售管理系统为例,探讨了基于 WEB 开发的销售管理系统的设计与实现。通过对系统需求的…...

wordpress的火车头商品发布接口

<?php require ../wp-load.php; ini_set(memory_limit, 1024M); set_time_limit(180);$top_cat ; # 图片链接域名替换 $image_host ;$start_time microtime(true);$counter 0; // 临时缓存 $products $skus $categories []; $var_sku_index 1;$rowData$_POST;// if…...

浙江安吉成新照明电器:Acrel-1000DP 分布式光伏监控系统应用探索

安科瑞吕梦怡 18706162527 摘 要&#xff1a;分布式光伏发电站是指将光伏发电组件安装在用户的建筑物屋顶、空地或其他适合的场地上&#xff0c;利用太阳能进行发电的一种可再生能源利用方式&#xff0c;与传统的大型集中式光伏电站相比&#xff0c;分布式光伏发电具有更灵活…...

总结3..

#include<stdio.h> int n,m; int a[1002][1002]; int b[1002][1002];//判断该空的八连通图是否被走过 int gg0; int dd0; int xz[8]{-1,-1,-1,0,0,1,1,1},yz[8]{-1,0,1,-1,1,-1,0,1};//八个方向 void dfs(int x,int y) { int dx,dy; for(int i0;i<8;i) { …...

信息奥赛一本通 1168:大整数加法

这道题是一道大整数加法&#xff0c;涉及到高精度的算法&#xff0c;比如说有两个数要进行相加&#xff0c;1111111111111111111111111111111111111112222222222222222222222222222222&#xff0c;那么如果这两个数很大的话我们常用的数据类型是不能进行计算的&#xff0c;那么…...

3.3 OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南

OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南 OpenAI 的 GPT 系列语言模型,包括 GPT-4、GPT-3.5 和 GPT-3,已经成为自然语言处理领域的标杆。无论是文本生成、对话系统,还是自动化任务,开发者都可以通过 API 调用这些强大的模型来增强他们的应用。本文将为您详细介…...

横盘出击的三种经典走势形态,买点以及需要注意的问题技术详解

龙头股在横盘整理过程中&#xff0c;也会出现几种不同的形态&#xff0c;比如矩形整理形态&#xff0c;或者在某一趋势线下方运行。 第一种形态&#xff1a;突破横盘趋势线 突破横盘趋势线时识别横盘龙头启动的关键点位。股价经过一段时间的横盘后&#xff0c;突然出现快速上…...

处理没有提示的字符串、计算相隔天数应用题

正常情况下&#xff0c;小云每天跑 1 千米。如果某天是周一或者月初&#xff08;1 日&#xff09;&#xff0c;为了激励自己&#xff0c;小云要跑 2 千米。如果同时是周一或月初&#xff0c;小云也是跑 2 千米。 小云跑步已经坚持了很长时间&#xff0c;从 1990 年 1 月 1 日周…...

【LeetCode】力扣刷题热题100道(31-35题)附源码 搜索二维矩阵 岛屿数量 腐烂的橙子 课程表 实现 Trie (前缀树)(C++)

一、搜索二维矩阵 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 可以使用 从右上角开始搜索 的方法来有效地找到目标值。 选择起始位置&#xff1a; 从矩…...

react使用react-redux状态管理

1、安装 npm install react-redux2、创建store.js import { createStore } from redux;// 定义初始状态 const initialState {counter: 888 };// 定义 reducer 函数&#xff0c;根据 action 类型更新状态 function reducer(state initialState, action) {switch (action.ty…...

04_角色创建窗口

将上文的登录窗口隐藏 创建空节点 作为创建角色窗口 命名为CreateWnd 创建输入的名字的输入框 再创建一个按钮用来随机角色名字 创建开始游戏按钮 End....

Dockerfile -> Docker image -> Docker container

1. Dockfile -> Docker image docker build -t shuai_image -f xxx/xxx/Dockerfile . (.不能少)出现&#xff1a; [] Building xxx(10/17) > [internal] load build definition from Dockerfile > > transferring dockerfile: … > > transferring context …...

LDN的蓝牙双模键盘帮助文档

文档索引 已支持的PCB列表(仅列出少部分)&#xff1a;键盘特性硬件软件键盘以及驱动蓝牙模式USB模式 驱动功能介绍主界面键盘列表页面键盘配置&#xff08;使用双模键盘的请务必细看本说明&#xff09;功能层配置(改键)触发层配置(改FN键等触发功能)功能选择&#xff08;重要&a…...

搭建一个基于Spring Boot的驾校管理系统

搭建一个基于Spring Boot的驾校管理系统可以涵盖多个功能模块&#xff0c;例如学员管理、教练管理、课程管理、考试管理、车辆管理等。以下是一个简化的步骤指南&#xff0c;帮助你快速搭建一个基础的系统。 1. 项目初始化 使用 Spring Initializr 生成一个Spring Boot项目&am…...

运动相机拍视频过程中摔了,导致录视频打不开怎么办

3-11 在使用运动相机拍摄激烈运动的时候&#xff0c;极大的震动会有一定概率使得保存在存储卡中的视频出现打不开的情况&#xff0c;原因是存储卡和相机在极端情况下&#xff0c;可能会出现接触不良的问题&#xff0c;如果遇到这种问题&#xff0c;就不得不进行视频修复了。 本…...

MongoDB vs Redis:相似与区别

前言 在当今的数据库领域&#xff0c;MongoDB 和 Redis 都是备受关注的非关系型数据库&#xff08;NoSQL&#xff09;&#xff0c;它们各自具有独特的优势和适用场景。本文将深入探讨 MongoDB 和 Redis 的特点&#xff0c;并详细对比它们之间的相似之处和区别&#xff0c;帮助…...

数字图像处理:实验二

任务一&#xff1a; 将不同像素&#xff08;32、64和256&#xff09;的原图像放大为像素大 小为1024*1024的图像&#xff08;图像自选&#xff09; 要求&#xff1a;1&#xff09;输出一幅图&#xff0c;该图包含六幅子图&#xff0c;第一排是原图&#xff0c;第 二排是对应放大…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

k8s业务程序联调工具-KtConnect

概述 原理 工具作用是建立了一个从本地到集群的单向VPN&#xff0c;根据VPN原理&#xff0c;打通两个内网必然需要借助一个公共中继节点&#xff0c;ktconnect工具巧妙的利用k8s原生的portforward能力&#xff0c;简化了建立连接的过程&#xff0c;apiserver间接起到了中继节…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南

&#x1f680; C extern 关键字深度解析&#xff1a;跨文件编程的终极指南 &#x1f4c5; 更新时间&#xff1a;2025年6月5日 &#x1f3f7;️ 标签&#xff1a;C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言&#x1f525;一、extern 是什么&#xff1f;&…...

Map相关知识

数据结构 二叉树 二叉树&#xff0c;顾名思义&#xff0c;每个节点最多有两个“叉”&#xff0c;也就是两个子节点&#xff0c;分别是左子 节点和右子节点。不过&#xff0c;二叉树并不要求每个节点都有两个子节点&#xff0c;有的节点只 有左子节点&#xff0c;有的节点只有…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...

在 Spring Boot 中使用 JSP

jsp&#xff1f; 好多年没用了。重新整一下 还费了点时间&#xff0c;记录一下。 项目结构&#xff1a; pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...

规则与人性的天平——由高考迟到事件引发的思考

当那位身着校服的考生在考场关闭1分钟后狂奔而至&#xff0c;他涨红的脸上写满绝望。铁门内秒针划过的弧度&#xff0c;成为改变人生的残酷抛物线。家长声嘶力竭的哀求与考务人员机械的"这是规定"&#xff0c;构成当代中国教育最尖锐的隐喻。 一、刚性规则的必要性 …...

一些实用的chrome扩展0x01

简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序&#xff0c;无论是测试应用程序、搜寻漏洞还是收集情报&#xff0c;它们都能提升工作流程。 FoxyProxy 代理管理工具&#xff0c;此扩展简化了使用代理&#xff08;如 Burp…...

欢乐熊大话蓝牙知识17:多连接 BLE 怎么设计服务不会乱?分层思维来救场!

多连接 BLE 怎么设计服务不会乱&#xff1f;分层思维来救场&#xff01; 作者按&#xff1a; 你是不是也遇到过 BLE 多连接时&#xff0c;调试现场像网吧“掉线风暴”&#xff1f; 温度传感器连上了&#xff0c;心率带丢了&#xff1b;一边 OTA 更新&#xff0c;一边通知卡壳。…...

【多线程初阶】单例模式 指令重排序问题

文章目录 1.单例模式1)饿汉模式2)懒汉模式①.单线程版本②.多线程版本 2.分析单例模式里的线程安全问题1)饿汉模式2)懒汉模式懒汉模式是如何出现线程安全问题的 3.解决问题进一步优化加锁导致的执行效率优化预防内存可见性问题 4.解决指令重排序问题 1.单例模式 单例模式确保某…...