Spark流水线+Gravitino+Marquez数据血缘采集
1.Openlinage和Marquez简介
1.1 OpenLineage
概述
- OpenLineage 是一个开放标准和框架,用于跨工具、平台和系统捕获数据血缘信息。
- 它定义了通用的数据血缘模型和API,允许不同的数据处理工具(如ETL、调度器、数据仓库)以标准化格式生成血缘元数据。
- 由Linux基金会托管,社区驱动,支持广泛的集成。
核心功能
- 标准化元数据收集:通过统一的规范(基于JSON Schema)描述数据血缘,包括作业(Job)、数据集(Dataset)和运行(Run)等实体。
- 跨工具集成:支持与Airflow、Spark、dbt、Great Expectations等流行数据工具的集成。
- 可扩展性:允许用户自定义提取器(Extractors)或适配器来兼容其他工具。
典型应用场景
- 数据治理(如合规性审计)。
- 故障排查(追踪数据错误来源)。
- 影响分析(评估上游变更对下游的影响)。
1.2. Marquez
概述
- Marquez 是OpenLineage的参考实现,是一个开源元数据服务,专为数据血缘和元数据管理设计。
- 由WeWork团队最初开发,现由社区维护,与OpenLineage深度集成。
- 提供Web UI和API,用于存储、查询和可视化血缘信息。
核心功能
- 元数据存储:持久化存储OpenLineage格式的血缘数据(使用PostgreSQL或兼容的数据库)。
- 血缘可视化:通过Web界面展示数据集、作业和依赖关系的图谱。
- API支持:提供REST API供其他系统访问或写入元数据。
- 与OpenLineage生态集成:自动接收来自支持OpenLineage的工具(如Airflow)的血缘事件。
架构组成
- API服务:处理血缘事件的摄入和查询。
- Web UI:交互式查看血缘关系。
- 后端数据库:存储元数据。
如果需要进一步了解部署或集成细节,可以参考它们的官方文档:
- OpenLineage官网
- Marquez GitHub
2.Gravitino血缘配置
Gravitino血缘事件采集后,默认是输出到日志,如果需要处理,可以实现org.apache.gravitino.lineage.sink.LineageSink
进行扩展。
本文便实现此接口,通过http接口将血缘事件发送到Marquez,进行血缘的存储和展示。
package org.apache.gravitino.lineage.sink;import com.fasterxml.jackson.databind.ObjectMapper;
import io.openlineage.server.OpenLineage.RunEvent;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.server.web.ObjectMapperProvider;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LineageHttpSink implements LineageSink {private static final Logger LOG = LoggerFactory.getLogger(LineageHttpSink.class);private String url;private String endpoint;private String method;private Map<String, String> headers;private CloseableHttpClient httpClient;private ObjectMapper objectMapper;private int retryCount;private long retryDelayMs;@Overridepublic void initialize(Map<String, String> configs) {this.url = configs.get("http.url");this.endpoint = configs.getOrDefault("http.endpoint", "/api/v1/lineage");this.method = configs.getOrDefault("http.method", "POST");this.retryCount = Integer.parseInt(configs.getOrDefault("http.retry.count", "3"));this.retryDelayMs = Long.parseLong(configs.getOrDefault("http.retry.delay", "1000"));this.headers = parseHeaders(configs.getOrDefault("http.headers", ""));this.httpClient = HttpClients.createDefault();this.objectMapper = ObjectMapperProvider.objectMapper();LOG.info("Initialized HTTP sink with URL: {}{}", url, endpoint);}@Override@SuppressWarnings("deprecation")public void sink(RunEvent event) {String fullUrl = url + endpoint;for (int attempt = 0; attempt <= retryCount; attempt++) {try {String jsonPayload = objectMapper.writeValueAsString(event);HttpUriRequestBase request = createHttpRequest(fullUrl, jsonPayload);headers.forEach(request::setHeader);try (CloseableHttpResponse response = httpClient.execute(request)) {int statusCode = response.getCode();if (isSuccessResponse(statusCode)) {LOG.debug("Successfully sent lineage event to {}", fullUrl);return;} else {LOG.warn("HTTP request failed with status {}", statusCode);}}} catch (Exception e) {LOG.warn("Attempt {} failed to send lineage event to {}: {}",attempt + 1,fullUrl,e.getMessage());if (attempt == retryCount) {LOG.error("Failed to send lineage event after {} attempts", retryCount + 1, e);return;}}if (attempt < retryCount) {try {Thread.sleep(retryDelayMs * (attempt + 1));} catch (InterruptedException ie) {Thread.currentThread().interrupt();return;}}}}private HttpUriRequestBase createHttpRequest(String url, String jsonPayload) {HttpUriRequestBase request;switch (method.toUpperCase()) {case "POST":request = new HttpPost(url);break;case "PUT":request = new HttpPut(url);break;default:throw new IllegalArgumentException("Unsupported HTTP method: " + method);}StringEntity entity = new StringEntity(jsonPayload, ContentType.APPLICATION_JSON);request.setEntity(entity);return request;}private Map<String, String> parseHeaders(String headersString) {Map<String, String> headerMap = new HashMap<>();if (StringUtils.isNotBlank(headersString)) {String[] pairs = headersString.split(",");for (String pair : pairs) {String[] keyValue = pair.split(":", 2);if (keyValue.length == 2) {headerMap.put(keyValue[0].trim(), keyValue[1].trim());}}}return headerMap;}private boolean isSuccessResponse(int statusCode) {return statusCode >= 200 && statusCode < 300;}@Overridepublic void close() {if (httpClient != null) {try {httpClient.close();} catch (IOException e) {LOG.warn("Error closing HTTP client", e);}}LOG.info("HTTP sink closed");}
}
在gravitino.conf
中添加以下配置
gravitino.lineage.source=http
gravitino.lineage.sinks=log,openlineage
gravitino.lineage.openlineage.sinkClass=org.apache.gravitino.lineage.sink.LineageHttpSink
gravitino.lineage.openlineage.http.url=http://127.0.0.1:5000
gravitino.lineage.openlineage.http.endpoint=/api/v1/lineage
gravitino.lineage.openlineage.http.method=POST
gravitino.lineage.openlineage.http.headers=Content-Type:application/json
gravitino.lineage.openlineage.http.retry.count=3
gravitino.lineage.openlineage.http.retry.delay=1000
其中gravitino.lineage.openlineage.http.url
填写的是Marquez地址
gravitino.lineage.openlineage.http.endpoint
填写的是Marquez接收血缘事件的接口。
3. 集成演示
如需开启血缘采集功能,首先需要下载 Gravitino OpenLineage 插件 jar 并将其放置到 Spark 的类路径中。
(gravitino-openlineage-plugins/spark-plugin at main · datastrato/gravitino-openlineage-plugins)
3.1 访问系统登录页面,输入账号密码完成身份验证。
3.2 创建任务
-
入口:通过顶部菜单栏选择 任务开发,或通过快捷入口 快速创建任务。
-
任务类型:选择
SparkPipeline
。3.3 配置任务
点击任务名称,进入任务详情页。任务节点如下
-
Gravatino节点:配置Gravatino连接信息,并设置
enableLinage
为true
,开起血缘采集SQLQuery节点:执行sql查询语句。跨catalog实现联邦查询
SELECT a.id, a.user_name, b.description FROM local_data_service.dolphinscheduler.t_ds_user a left join docker_data_service.dolphinscheduler.t_ds_tenant b on a.tenant_id = b.id
-
PostgresqlWrite
节点:将查询结果写入到Postgres
。
3.4 运行任务
- 点击 运行 按钮启动任务。
3.5 查看血缘
3.6 数据查询
🔗 平台体验地址:DataStudio (http://1.94.182.15:8090)
参考链接:
[1] https://github.com/datastrato/gravitino/
[2] https://datastrato.ai/blog/gravitino-unified-metadata-lake/
.6 数据查询
[外链图片转存中…(img-pQb0YwgS-1749396844654)]
[外链图片转存中…(img-hqrYlduK-1749396844654)]
🔗 平台体验地址:DataStudio (http://1.94.182.15:8090)
参考链接:
[1] https://github.com/datastrato/gravitino/
[2] https://datastrato.ai/blog/gravitino-unified-metadata-lake/
[3] Apache Gravitino Spark connector | Apache Gravitino
相关文章:

Spark流水线+Gravitino+Marquez数据血缘采集
1.Openlinage和Marquez简介 1.1 OpenLineage 概述 OpenLineage 是一个开放标准和框架,用于跨工具、平台和系统捕获数据血缘信息。它定义了通用的数据血缘模型和API,允许不同的数据处理工具(如ETL、调度器、数据仓库)以标准化格…...
一个完整的时间序列异常检测系统,使用Flask作为后端框架,实现了AE(自编码器)、TimesNet和LSTM三种模型,并提供可视化展示
时间序列异常检测系统 下面是一个完整的时间序列异常检测系统,使用Flask作为后端框架,实现了AE(自编码器)、TimesNet和LSTM三种模型,并提供可视化展示。 系统概述 这个系统能够: 从多种来源加载时间序列数据使用三种先进算法进行异常检测可视化展示原始数据、重建数据和…...
深度学习在非线性场景中的核心应用领域及向量/张量数据处理案例,结合工业、金融等领域的实际落地场景分析
一、工业场景:非线性缺陷检测与预测 1. 半导体晶圆缺陷检测 问题:微米级划痕、颗粒污染等缺陷形态复杂,与正常纹理呈非线性关系。解决方案: 输入张量:高分辨率晶圆图像 → 三维张量 (Batch, Height,…...

基于微信小程序的车位共享平台的设计与实现源码数据库文档
摘 要 近年来,随着国民经济的飞速发展,城镇化进程的步伐加快,城市人口急剧增长,人们的生活水平持续改善,特别是大中型城市,城市的交通规模日益增大,汽车的保有量不断提高,然而城市的…...

多模态大语言模型arxiv论文略读(111)
SEA: Supervised Embedding Alignment for Token-Level Visual-Textual Integration in MLLMs ➡️ 论文标题:SEA: Supervised Embedding Alignment for Token-Level Visual-Textual Integration in MLLMs ➡️ 论文作者:Yuanyang Yin, Yaqi Zhao, Yaji…...
网页端 VUE+C#/FastAPI获取客户端IP和hostname
1 IP可以获取,但是发现获取到的是服务端的IP,如何解决呢。 如果采用nginx反向代理,那么可以在conf/nginx.conf文件中配置 location /WebApi/ { proxy_pass http://localhost:5000/; #这个/会替换location种的WebApi路径 #关键,加…...
一个自动反汇编脚本
一、环境 wsl ubuntu18.04、python3.6 二、目的 调试程序,需要分析第三方库。希望能将多个库自动转为汇编文件。 三、使用方法 将该脚本下载,进入wsl,进入到该脚本所有文件夹。 请使用 python 脚本名.py 运行。 1)、运行…...
函数与数列的交汇融合
前情概要 现行的新高考对数列的考查难度增加,那么整理与数列交汇融合的相关题目就显得非常必要了。 典例剖析 依托函数,利用导数,求数列的最值;№ 1 、 \color{blue}{№ 1、} №1、 等差数列 { a n } \{a_{n}\} {an} 的前 n n n 项和为 S n S_{n} Sn, 已知 S 10…...

怎么让自己ip显示外省?一文说清操作
在互联网时代,IP地址不仅关联网络连接,还可能影响IP属地显示。那么,手机和电脑用户怎么让自己IP显示外省?一文说清操作要点。 二、4种主流方法详解 要让自己的IP显示为外省地址,主要有以下几种方法: …...

【Docker】容器安全之非root用户运行
【Docker】容器安全之非root用户运行 1. 场景2. 原 Dockerfile 内容3. 整改结果4. 非 root 用户带来的潜在问题4.1 文件夹读写权限异常4.2 验证文件夹权限 1. 场景 最近有个项目要交付,第三方测试对项目源码扫描后发现一个问题,服务的 Dockerfile 都未指…...

汽车车载软件平台化项目规模颗粒度选择的一些探讨
汽车进入 SDV 时代后,车载软件研发呈现出开源生态构建、电子架构升级、基础软件标准化、本土供应链崛起、AI 原生架构普及、云边协同开发等趋势,这些趋势促使车载软件研发面临新挑战,如何构建适应这些变化的平台化架构成为车企与 Tier 1 的战…...

【八股消消乐】构建微服务架构体系—服务注册与发现
😊你好,我是小航,一个正在变秃、变强的文艺倾年。 🔔本专栏《八股消消乐》旨在记录个人所背的八股文,包括Java/Go开发、Vue开发、系统架构、大模型开发、具身智能、机器学习、深度学习、力扣算法等相关知识点ÿ…...
大数据+智能零售:数字化变革下的“智慧新零售”密码
大数据+智能零售:数字化变革下的“智慧新零售”密码 大家好,今天咱们聊聊一个火到不行的话题:大数据在智能零售中的应用。这个领域,不仅是技术的“硬核战场”,更是商业创新的风口浪尖。谁能玩转数据,谁就能掌控消费者心智,实现销售爆发。 咱们不搞枯燥学术,而是用最“…...
C++_核心编程_菱形继承
4.6.8 菱形继承 菱形继承概念: 两个派生类继承同一个基类 又有某个类同时继承者两个派生类 这种继承被称为菱形继承,或者钻石继承 菱形继承问题: 1. 羊继承了动物的数据, 驼同样继承了动物的数据࿰…...

掌握Git核心:版本控制、分支管理与远程操作
前言 无论热爱技术的阅读者你是希望掌握Git的企业级应用,能够深刻理解Git操作过程及操作原理,理解工作区暂存区、版本库的含义;还是想要掌握Git的版本、分支管理,自由的进行版本回退、撤销、修改等Git操作方式与背后原理和通过分…...

c#,Powershell,mmsys.cpl,使用Win32 API展示音频设备属性对话框
常识(基础) 众所周知,mmsys.cpl使管理音频设备的控制面板小工具, 其能产生一个对话框(属性表)让我们查看和修改各设备的详细属性: 在音量合成器中单击音频输出设备的小图标也能实现这个效果&a…...

STM标准库-TIM旋转编码器
文章目录 一、编码器接口1.1简介1.2正交编码器1.3编码器接口基本结构**1. 模块与 STM32 配置的映射关系****2. 设计实现步骤(核心流程)****① 硬件规划****② 时钟使能****③ GPIO 配置(对应架构图 “GPIO” 模块)****④ 时基单元…...
深入解析JVM工作原理:从字节码到机器指令的全过程
一、JVM概述 Java虚拟机(JVM)是Java平台的核心组件,它实现了Java"一次编写,到处运行"的理念。JVM是一个抽象的计算机器,它有自己的指令集和运行时内存管理机制。 JVM的主要职责: 加载:读取.class文件并验…...
MCP通信方式之Streamable HTTP
目录 一、前言二、三种传输方式对比1、Stdio和 HTTP SSE工作原理2、Streamable HTTP3、Streamable HTTP解决什么问题三、Streamable HTTP MCP设计原理四、Streamable HTTP MCP demo演示1、MCP server示例2、MCP Client示例一、前言 2025年5月9日,MCP(Model Context Protocol)…...
第七十三篇 从电影院售票到停车场计数:生活场景解析Java原子类精髓
目录 一、原子类基础:电影院售票系统1.1 传统售票的并发问题1.2 原子类解决方案 二、原子类家族:超市收银系统2.1 基础类型原子类2.2 数组类型原子类 三、CAS机制深度解析:停车场管理系统3.1 CAS工作原理3.2 车位计数器实现 四、高性能实践&a…...

【原创】基于视觉模型+FFmpeg+MoviePy实现短视频自动化二次编辑+多赛道
AI视频处理系统功能总览 🎯 系统概述 这是一个智能短视频自动化处理系统,专门用于视频搬运和二次创作。系统支持多赛道配置,可以根据不同的内容类型(如"外国人少系列"等)应用不同的处理策略。 Ἵ…...

C++----剖析list
前面学习了vector和string,接下来剖析stl中的list,在数据库中学习过,list逻辑上是连续的,但是存储中是分散的,这是与vector这种数组类型不同的地方。所以list中的元素设置为一个结构体,将list设计成双向的&…...

纳米AI搜索与百度AI搜、豆包的核心差异解析
一、技术定位与设计目标 1、纳米AI搜索:轻量化边缘计算导向 专注于实时数据处理与资源受限环境下的高效响应,通过算法优化和模型压缩技术,实现在物联网设备、智能终端等低功耗场景的本地化部署。其核心优势在于减少云端依赖,保障…...

不到 2 个月,OpenAI 火速用 Rust 重写 AI 编程工具。尤雨溪也觉得 Rust 香!
一、OpenAI 用 Rust 重写 Codex CLI OpenAI 已用 Rust 语言重写了其 AI 命令行编程工具 Codex CLI,理由是此举能提升性能和安全性,同时避免对 Node.js 的依赖。他们认为 Node.js “可能让部分用户感到沮丧或成为使用障碍”。 Codex 是一款实验性编程代理…...
人工智能:网络安全的“智能守护者”
在数字化时代,网络安全已经成为企业和个人面临的重大挑战。随着网络攻击的复杂性和频率不断增加,传统的安全防护手段已经难以应对。人工智能(AI)技术的出现为网络安全带来了新的希望和解决方案。本文将探讨人工智能在网络安全中的…...

Python60日基础学习打卡Day46
一、 什么是注意力 注意力机制的由来本质是从onehot-elmo-selfattention-encoder-bert这就是一条不断提取特征的路。各有各的特点,也可以说由弱到强。 其中注意力机制是一种让模型学会「选择性关注重要信息」的特征提取器,就像人类视觉会自动忽略背景&…...
综述论文解读:Editing Large Language Models: Problems, Methods, and Opportunities
论文为大语言模型知识编辑综述,发表于自然语言处理顶会ACL(原文链接)。由于目前存在广泛的模型编辑技术,但一个统一全面的分析评估方法,所以本文: 1、对LLM的编辑方法进行了详尽、公平的实证分析,探讨了它们各自的优势…...

WEB3全栈开发——面试专业技能点P1Node.js / Web3.js / Ethers.js
一、Node.js 事件循环 Node.js 的事件循环(Event Loop)是其异步编程的核心机制,它使得 Node.js 可以在单线程中实现非阻塞 I/O 操作。 🔁 简要原理 Node.js 是基于 libuv 实现的,它使用事件循环来处理非阻塞操作。事件…...

Vscode下Go语言环境配置
前言 本文介绍了vscode下Go语言开发环境的快速配置,为新手小白快速上手Go语言提供帮助。 1.下载官方Vscode 这步比较基础,已经安装好的同学可以直接快进到第二步 官方安装包地址:https://code.visualstudio.com/ 双击一直点击下一步即可,记…...
Java八股文——MySQL篇
文章目录 Java八股文——MySQL篇慢查询如何定位慢查询?如何分析慢SQLExplain标准答案 索引索引类型索引底层数据结构什么是聚簇索引什么是非聚簇索引?(二级索引)(回表)聚集索引选取规则回表查询 什么是覆盖…...