当前位置: 首页 > news >正文

如何通过 Apache Airflow 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz

了解如何通过 Apache Airflow 将数据导入 Elasticsearch。

Apache Airflow

Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load) 流程、数据管道和其他复杂工作流,提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效,让你可以跟踪执行的进度和结果。以下是它的四个主要支柱:

  • 动态:管道以 Python 定义,允许动态灵活地生成工作流。
  • 可扩展:Airflow 可以与各种环境集成,可以创建自定义运算符,并可以根据需要执行特定代码。
  • 优雅:管道以干净明确的方式编写。
  • 可扩展:其模块化架构使用消息队列来编排任意数量的工作器。

在实践中,Airflow 可用于以下场景:

  • 数据导入:编排将数据每日提取到 Elasticsearch 等数据库中。
  • 日志监控:管理日志文件的收集和处理,然后在 Elasticsearch 中进行分析以识别错误或异常。
  • 多种数据源集成:将来自不同系统(API、数据库、文件)的信息合并到 Elasticsearch 中的单个层中,简化搜索和报告。

DAG:Directed Acyclic Graphs - 有向无环图

在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种定义任务执行顺序的结构。DAG 的主要特征是:

  • 由独立任务组成:每个任务代表一个工作单元,旨在独立执行。
  • 排序:任务的执行顺序在 DAG 中明确定义。
  • 可重用性:DAG 旨在重复执行,促进流程自动化。

Airflow 的主要组件

Airflow 生态系统由多个组件组成,它们共同协作以协调任务:

  • 调度程序 - scheduler:负责调度 DAG 并发送任务以供工作人员执行。
  • 执行器 - Exectutor:管理任务的执行,将其委托给工作人员。
  • Web 服务器 - Webserver:提供与 DAG 和任务交互的图形界面。
  • Dags 文件夹 - Dags folder:我们存储用 Python 编写的 DAG 的文件夹。
  • 元数据 - Metadata:作为工具存储库的数据库,由调度程序和执行器用于存储执行状态。

Apache Airflow 和 Elasticsearch

我们将演示如何使用 Apache Airflow 和 Elasticsearch 来协调任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包含电影数据库,用户可以在其中进行评分和分配评级。想象一个每天有数百个评级的场景,有必要保持评级记录更新。为此,将开发一个 DAG,它将每天执行,负责检索新的合并评级并更新索引中的记录。

在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到失败任务。否则,数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责计算分数的机制的方法检索评级,以更新索引中电影的评级字段。

使用 Apache Airflow 和 Elasticsearch 以及 Docker

要创建容器化环境,我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的说明实际设置 Airflow。

至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 配置 Elasticsearch。已经创建了一个包含电影目录的索引,其中电影数据已编入索引。这些电影的 “rating” 字段将被更新。

创建 DAG

通过 Docker 安装后,将创建一个文件夹结构,其中包括 dags 文件夹,我们必须将 DAG 文件放在该文件夹中,以便 Airflow 识别它们。

在此之前,我们需要确保安装了必要的依赖项。以下是此项目的依赖项:

pip install apache-airflow apache-airflow-providers-elasticsearch

我们将创建文件 update_ratings_movies.py 并开始编写任务代码。

现在,让我们导入必要的库:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

我们将使用 ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。

接下来,我们定义 DAG,并指定其主要参数:

  • dag_id:DAG 的名称。
  • start_date:DAG 的启动时间。
  • schedule:定义周期(在我们的例子中是每日)。
  • doc_md:将导入并显示在 Airflow 界面中的文档。

定义任务

现在,让我们定义 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator,并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。

get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task
)

接下来,我们需要验证结果是否有效。为此,我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”,python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果传递给验证函数。

validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证成功,我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务 “index_movie_ratings”,它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果传递给索引函数。

index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证表明失败,DAG 将继续执行失败通知任务。在此示例中,我们只是打印一条消息,但在实际场景中,我们可以配置警报来通知失败。

failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.')
)

最后,我们定义任务依赖关系,确保它们以正确的顺序执行:

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

以下是我们 DAG 的完整代码:

"""
DAG update Rating Movies
"""
import ast
import randomfrom airflow import DAG
from datetime import datetimefrom airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHookdef index_movie_ratings_task(movies):es_hook = ElasticsearchPythonHook(hosts=None,es_conn_args={"cloud_id": "cloud_id""api_key": "api-key"})es_client = es_hook.get_connactions = []for movie in ast.literal_eval(movies):actions.append({"update": {"_id": movie["id"],"_index": "movies"}})actions.append({"doc": {"rating": movie["rating"]},"doc_as_upsert": True})result = es_client.bulk(operations=actions)print(f"Ingestion completed.")print(result)return Truedef get_movie_ratings_task():movies = [{"id": i, "rating": round(random.uniform(1, 10), 1)}for i in range(1, 100)]return moviesdef validate_result(result):if not result:return 'failed_get_rating_operator'else:return 'index_movie_ratings'with DAG(dag_id="update_ratings_movies_2024",start_date=datetime(2024, 12, 29),schedule="@daily",doc_md=__doc__,
):get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task)validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"],provide_context=True)index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"])failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.'))get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

可视化 DAG 执行

在 Apache Airflow 界面中,我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。

下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行,我们可以访问每个任务的日志。请注意,在 index_movie_ratings 任务中,我们可以在索引中看到索引结果,并且它已成功完成。

在其他选项卡中,可以访问有关任务和 DAG 的其他信息,以协助分析和解决潜在问题。

结论

在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何配置 DAG、定义负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面中监控和可视化这些任务的执行。

这种方法可以轻松适应不同类型的数据和工作流,使 Airflow 成为在各种场景中编排数据管道的有用工具。

参考资料:

Apache AirFlow

  • https://airflow.apache.org/

使用 Docker 安装 Apache Airflow

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Elasticsearch Python Hook

  • https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html

Python 运算符

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在吗的本地机器上试用 Elastic。

原文:How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs

相关文章:

如何通过 Apache Airflow 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz 了解如何通过 Apache Airflow 将数据导入 Elasticsearch。 Apache Airflow Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load&#xff0…...

Android Studio:Linux环境下安装与配置

更多内容:XiaoJ的知识星球 Android Studio:Linux环境下安装与配置 1.安装JDK2.安装Android Studio2.1 获取安装包2.2 安装(1)配置环境变量:(2)运行安装:(3)配…...

token是用来鉴权的,那session是用来干什么的?

在Web应用和API设计中,鉴权与会话管理是两个核心概念,它们对于确保用户身份的安全性和维护用户会话状态至关重要。Token和Session是两种常用的鉴权与会话管理机制,它们各自具有独特的工作原理和适用场景。下面是对Token和Session的详细解析及…...

基于 WEB 开发的二手车辆销售管理系统设计与实现

标题:基于 WEB 开发的二手车辆销售管理系统设计与实现 内容:1.摘要 摘要:随着互联网技术的不断发展,电子商务在各个领域得到了广泛的应用。本文以二手车辆销售管理系统为例,探讨了基于 WEB 开发的销售管理系统的设计与实现。通过对系统需求的…...

wordpress的火车头商品发布接口

<?php require ../wp-load.php; ini_set(memory_limit, 1024M); set_time_limit(180);$top_cat ; # 图片链接域名替换 $image_host ;$start_time microtime(true);$counter 0; // 临时缓存 $products $skus $categories []; $var_sku_index 1;$rowData$_POST;// if…...

浙江安吉成新照明电器:Acrel-1000DP 分布式光伏监控系统应用探索

安科瑞吕梦怡 18706162527 摘 要&#xff1a;分布式光伏发电站是指将光伏发电组件安装在用户的建筑物屋顶、空地或其他适合的场地上&#xff0c;利用太阳能进行发电的一种可再生能源利用方式&#xff0c;与传统的大型集中式光伏电站相比&#xff0c;分布式光伏发电具有更灵活…...

总结3..

#include<stdio.h> int n,m; int a[1002][1002]; int b[1002][1002];//判断该空的八连通图是否被走过 int gg0; int dd0; int xz[8]{-1,-1,-1,0,0,1,1,1},yz[8]{-1,0,1,-1,1,-1,0,1};//八个方向 void dfs(int x,int y) { int dx,dy; for(int i0;i<8;i) { …...

信息奥赛一本通 1168:大整数加法

这道题是一道大整数加法&#xff0c;涉及到高精度的算法&#xff0c;比如说有两个数要进行相加&#xff0c;1111111111111111111111111111111111111112222222222222222222222222222222&#xff0c;那么如果这两个数很大的话我们常用的数据类型是不能进行计算的&#xff0c;那么…...

3.3 OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南

OpenAI GPT-4, GPT-3.5, GPT-3 模型调用:开发者指南 OpenAI 的 GPT 系列语言模型,包括 GPT-4、GPT-3.5 和 GPT-3,已经成为自然语言处理领域的标杆。无论是文本生成、对话系统,还是自动化任务,开发者都可以通过 API 调用这些强大的模型来增强他们的应用。本文将为您详细介…...

横盘出击的三种经典走势形态,买点以及需要注意的问题技术详解

龙头股在横盘整理过程中&#xff0c;也会出现几种不同的形态&#xff0c;比如矩形整理形态&#xff0c;或者在某一趋势线下方运行。 第一种形态&#xff1a;突破横盘趋势线 突破横盘趋势线时识别横盘龙头启动的关键点位。股价经过一段时间的横盘后&#xff0c;突然出现快速上…...

处理没有提示的字符串、计算相隔天数应用题

正常情况下&#xff0c;小云每天跑 1 千米。如果某天是周一或者月初&#xff08;1 日&#xff09;&#xff0c;为了激励自己&#xff0c;小云要跑 2 千米。如果同时是周一或月初&#xff0c;小云也是跑 2 千米。 小云跑步已经坚持了很长时间&#xff0c;从 1990 年 1 月 1 日周…...

【LeetCode】力扣刷题热题100道(31-35题)附源码 搜索二维矩阵 岛屿数量 腐烂的橙子 课程表 实现 Trie (前缀树)(C++)

一、搜索二维矩阵 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 可以使用 从右上角开始搜索 的方法来有效地找到目标值。 选择起始位置&#xff1a; 从矩…...

react使用react-redux状态管理

1、安装 npm install react-redux2、创建store.js import { createStore } from redux;// 定义初始状态 const initialState {counter: 888 };// 定义 reducer 函数&#xff0c;根据 action 类型更新状态 function reducer(state initialState, action) {switch (action.ty…...

04_角色创建窗口

将上文的登录窗口隐藏 创建空节点 作为创建角色窗口 命名为CreateWnd 创建输入的名字的输入框 再创建一个按钮用来随机角色名字 创建开始游戏按钮 End....

Dockerfile -> Docker image -> Docker container

1. Dockfile -> Docker image docker build -t shuai_image -f xxx/xxx/Dockerfile . (.不能少)出现&#xff1a; [] Building xxx(10/17) > [internal] load build definition from Dockerfile > > transferring dockerfile: … > > transferring context …...

LDN的蓝牙双模键盘帮助文档

文档索引 已支持的PCB列表(仅列出少部分)&#xff1a;键盘特性硬件软件键盘以及驱动蓝牙模式USB模式 驱动功能介绍主界面键盘列表页面键盘配置&#xff08;使用双模键盘的请务必细看本说明&#xff09;功能层配置(改键)触发层配置(改FN键等触发功能)功能选择&#xff08;重要&a…...

搭建一个基于Spring Boot的驾校管理系统

搭建一个基于Spring Boot的驾校管理系统可以涵盖多个功能模块&#xff0c;例如学员管理、教练管理、课程管理、考试管理、车辆管理等。以下是一个简化的步骤指南&#xff0c;帮助你快速搭建一个基础的系统。 1. 项目初始化 使用 Spring Initializr 生成一个Spring Boot项目&am…...

运动相机拍视频过程中摔了,导致录视频打不开怎么办

3-11 在使用运动相机拍摄激烈运动的时候&#xff0c;极大的震动会有一定概率使得保存在存储卡中的视频出现打不开的情况&#xff0c;原因是存储卡和相机在极端情况下&#xff0c;可能会出现接触不良的问题&#xff0c;如果遇到这种问题&#xff0c;就不得不进行视频修复了。 本…...

MongoDB vs Redis:相似与区别

前言 在当今的数据库领域&#xff0c;MongoDB 和 Redis 都是备受关注的非关系型数据库&#xff08;NoSQL&#xff09;&#xff0c;它们各自具有独特的优势和适用场景。本文将深入探讨 MongoDB 和 Redis 的特点&#xff0c;并详细对比它们之间的相似之处和区别&#xff0c;帮助…...

数字图像处理:实验二

任务一&#xff1a; 将不同像素&#xff08;32、64和256&#xff09;的原图像放大为像素大 小为1024*1024的图像&#xff08;图像自选&#xff09; 要求&#xff1a;1&#xff09;输出一幅图&#xff0c;该图包含六幅子图&#xff0c;第一排是原图&#xff0c;第 二排是对应放大…...

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...

TDengine 快速体验(Docker 镜像方式)

简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能&#xff0c;本节首先介绍如何通过 Docker 快速体验 TDengine&#xff0c;然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker&#xff0c;请使用 安装包的方式快…...

springboot 百货中心供应链管理系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;百货中心供应链管理系统被用户普遍使用&#xff0c;为方…...

Qt Widget类解析与代码注释

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码&#xff0c;写上注释 当然可以&#xff01;这段代码是 Qt …...

Neo4j 集群管理:原理、技术与最佳实践深度解析

Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

实现弹窗随键盘上移居中

实现弹窗随键盘上移的核心思路 在Android中&#xff0c;可以通过监听键盘的显示和隐藏事件&#xff0c;动态调整弹窗的位置。关键点在于获取键盘高度&#xff0c;并计算剩余屏幕空间以重新定位弹窗。 // 在Activity或Fragment中设置键盘监听 val rootView findViewById<V…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...

七、数据库的完整性

七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中&#xff0c;Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式&#xff0c;用于在多个 Goroutine 之间传递数据&#xff0c;从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...