当前位置: 首页 > article >正文

Gravitino源码分析-SparkConnector 实现原理

Gravitino SparkConnector 实现原理

本文参考了官网介绍,想看官方解析请参考 官网地址 本文仅仅介绍原理

文章目录

  • Gravitino SparkConnector 实现原理
    • 背景知识-Spark Plugin 介绍
        • (1) **插件加载**
        • (2) **DriverPlugin 初始化**
        • (3) **ExecutorPlugin 初始化**
        • (4) **插件执行**
        • (5) **插件销毁**
    • 背景知识-Driver Plugin 介绍
        • (1) **`init` 方法**
        • (2) **`registerMetrics` 方法**
        • (3) **`onTaskStart` 方法**
        • (4) **`onTaskSucceeded` 方法**
        • (5) **`onTaskFailed` 方法**
        • (6) **`close` 方法**
    • SparkConnector使用方式
      • 加载spark.sql.catalog.xxx 具体执行的配置

背景知识-Spark Plugin 介绍

spark在[spark-29399]pr提交更新了SparkPlugin插件

SparkPlugin插件执行生命周期

SparkPlugin 的生命周期与 Spark 应用程序的生命周期一致,具体如下:

(1) 插件加载
  • 当 Spark 应用程序启动时,Spark 会扫描类路径下的 SparkPlugin 实现类。
  • 如果插件被正确配置(例如通过 spark.plugins 配置项),Spark 会实例化该类。
(2) DriverPlugin 初始化
  • Spark 调用 driverPlugin() 方法,获取 DriverPlugin 实例。
  • DriverPlugin 的生命周期开始,其方法(如 initregisterMetrics 等)会被调用。
(3) ExecutorPlugin 初始化
  • Spark 调用 executorPlugin() 方法,获取 ExecutorPlugin 实例。
  • ExecutorPlugin 的生命周期开始,其方法(如 initshutdown 等)会被调用。
(4) 插件执行
  • DriverPlugin 在 Driver 端执行自定义逻辑,例如注册指标、拦截 SQL 解析、修改 Catalog 等。
  • ExecutorPlugin 在 Executor 端执行自定义逻辑,例如监控 Task 执行、收集指标等。
(5) 插件销毁
  • 当 Spark 应用程序结束时,DriverPluginExecutorPlugin 的生命周期结束,其 close() 方法会被调用以释放资源。

背景知识-Driver Plugin 介绍

DriverPlugin 是用于在 Driver 端执行自定义逻辑的插件,其生命周期方法包括:

(1) init 方法
  • 在 Driver 插件初始化时调用。
  • 可以在此方法中执行初始化逻辑,例如注册自定义 Catalog、拦截 SQL 解析器等。
(2) registerMetrics 方法
  • 在 Driver 插件初始化时调用。
  • 可以在此方法中注册自定义指标(Metrics)。
(3) onTaskStart 方法
  • 在 Task 启动时调用。
  • 可以在此方法中执行与 Task 相关的逻辑。
(4) onTaskSucceeded 方法
  • 在 Task 成功完成时调用。
  • 可以在此方法中执行与 Task 成功相关的逻辑。
(5) onTaskFailed 方法
  • 在 Task 失败时调用。
  • 可以在此方法中执行与 Task 失败相关的逻辑。
(6) close 方法
  • 在 Driver 插件销毁时调用。
  • 可以在此方法中释放资源,例如关闭连接、清理缓存等。

SparkConnector使用方式

./bin/spark-sql -v \
--conf spark.plugins="org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin" \
--conf spark.sql.gravitino.uri=http://127.0.0.1:8090 \
--conf spark.sql.gravitino.metalake=test \
--conf spark.sql.gravitino.enableIcebergSupport=true \
--conf spark.sql.warehouse.dir=hdfs://127.0.0.1:9000/user/hive/warehouse-hive

可以看出SparkConnector指定了加载的插件是GravitinoSparkPlugin

public class GravitinoSparkPlugin implements SparkPlugin {@Overridepublic DriverPlugin driverPlugin() {return new GravitinoDriverPlugin();}@Overridepublic ExecutorPlugin executorPlugin() {return null;}
}

可以看出实现方式很简单,仅仅使用了一个GravitinoDriverPlugin,也就是在Spark应用程序启动的时候扫描SparkPlugin扫描到了这个GravitinoSparkPlugin然后立马就去执行GravitinoDriverPlugin初始化程序。在DriverPlugin初始化过程中 插件仅仅覆写了两个函数,init()shutdown()。 说明这个插件仅仅做了一些初始化和资源销毁操作。

在Driver端进行初始化

  1. 配置检查检查gravitino_uri和gravitino_metalake是否配置

  2. 如果开启了iceberg则将gravitinoDriverExtensions放入到数组中方便配置

  3. 初始化Gravtino客户端和GravitinoCatalogManager,并且将relational类型的表加载到缓存中

  4. 将缓存中的catalog进行如果是非iceberg类型(当前仅仅只有Hive)进行注册,这里定义的注册的实际操作配置Spark的配置项(spark.sql.catalog.catalogName)这里的catalogName对应的是缓存中的catalogName,配置的值为根据Gravitino自己的Catalog使用的Provider进行适配比如可以是(org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33)具体情况由适配器进行处理。

  5. 然后注册SqlExtensions其实就是将第2步骤的数组配置到SPARK_SESSION_EXTENSIONS这个SparkConf配置里面

稍微贴一下注册Catalog代码,比较重要

  //初始化的时候调用注册逻辑,将Gravitino中的Catalog加载到缓存//然后将缓存中的数据作为第二个参数gravitinoCatalogs传递进来private void registerGravitinoCatalogs(SparkConf sparkConf, Map<String, Catalog> gravitinoCatalogs) {gravitinoCatalogs.entrySet().forEach(entry -> {String catalogName = entry.getKey();Catalog gravitinoCatalog = entry.getValue();String provider = gravitinoCatalog.provider();if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT))&& enableIcebergSupport == false) {return;}try {registerCatalog(sparkConf, catalogName, provider);} catch (Exception e) {LOG.warn("Register catalog {} failed.", catalogName, e);}});}//这里根据适配器去配置spark.sql.catalog.xxx 的具体执行CatalogClassprivate void registerCatalog(SparkConf sparkConf, String catalogName, String provider) {if (StringUtils.isBlank(provider)) {LOG.warn("Skip registering {} because catalog provider is empty.", catalogName);return;}String catalogClassName = CatalogNameAdaptor.getCatalogName(provider);if (StringUtils.isBlank(catalogClassName)) {LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider);return;}String sparkCatalogConfigName = "spark.sql.catalog." + catalogName;Preconditions.checkArgument(!sparkConf.contains(sparkCatalogConfigName),catalogName + " is already registered to SparkCatalogManager");sparkConf.set(sparkCatalogConfigName, catalogClassName);LOG.info("Register {} catalog to Spark catalog manager.", catalogName);}

到这里GravitinoConnector的代码机制已经说完了,下面聊聊Spark机制

加载spark.sql.catalog.xxx 具体执行的配置

经过上面GravitinoDriverPlugin的初始化之后,已经将具体的catalog名称和对应的处理类映射起来,这里以GravitinoHiveCatalogSpark33为例。

GravitinoHiveCatalogSpark33这个类继承关系是继承了BaseCatalogBaseCatalog是Spark中定义的CatalogPlugin的一个实现类。

Spark在解析SQL的时候会查找catalog对应的Catalog,可以看到调用了CatalogManager.catalog()方法

  private object CatalogAndMultipartIdentifier {def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {case Seq(_) =>Some((None, parts))case Seq(catalogName, tail @ _*) =>try {Some((Some(catalogManager.catalog(catalogName)), tail))} catch {case _: CatalogNotFoundException =>Some((None, parts))}}}

这个catalog方法调用了Catalogs.load()方法

  def catalog(name: String): CatalogPlugin = synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog} else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))}}

这个方法才是真正的加载方法,他真正根据conf配置将GravitinoHiveCatalogSpark33名称根据定义的反射构造函数实例化到内存中

   def load(name: String, conf: SQLConf): CatalogPlugin = {val pluginClassName = try {val _pluginClassName = conf.getConfString(s"spark.sql.catalog.$name")// SPARK-39079 do configuration check first, otherwise some path-based table like// `org.apache.spark.sql.json`.`/path/json_file` may fail on analyze phaseif (name.contains(".")) {throw QueryExecutionErrors.invalidCatalogNameError(name)}_pluginClassName} catch {case _: NoSuchElementException =>throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)}val loader = Utils.getContextOrSparkClassLoadertry {val pluginClass = loader.loadClass(pluginClassName)if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)}val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]plugin.initialize(name, catalogOptions(name, conf))plugin} catch {// 省略}}

到这里流程就分析结束了

相关文章:

Gravitino源码分析-SparkConnector 实现原理

Gravitino SparkConnector 实现原理 本文参考了官网介绍&#xff0c;想看官方解析请参考 官网地址 本文仅仅介绍原理 文章目录 Gravitino SparkConnector 实现原理背景知识-Spark Plugin 介绍(1) **插件加载**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) *…...

windows下使用msys2编译ffmpeg

三种方法&#xff1a; 1、在msys2中使用gcc编译 2、在msys2中使用visual studio编译&#xff08;有环境变量&#xff09; 3、在msys2中使用visual studio编译&#xff08;无环境变量&#xff09; 我的环境&#xff1a; 1、msys2-x86_64-20250221 2、vs2015 3、ffmpeg-7.1…...

Linux内核自定义协议族开发指南:理解net_device_ops、proto_ops与net_proto_family

在Linux内核中开发自定义协议族需要深入理解网络协议栈的分层模型。net_device_ops、proto_ops和net_proto_family是三个关键结构体,分别作用于不同的层次。本文将详细解析它们的作用、交互关系及实现方法,并提供一个完整的开发框架。 一、核心结构体的作用与层级关系 struct…...

可视化+图解链表

链表&#xff08;Linked list&#xff09;是一种常用的数据结构&#xff0c;它由一系列节点组成&#xff0c;每个节点包含数据域和指针域。指针域存储了下一个节点的地址&#xff0c;从而建立起各节点之间的线性关系。 1、链表节点 1.1 节点构成 链表节点如下图所示&#xff…...

Docker参数,以及仓库搭建

一。Docker的构建参数 注释&#xff1a; 1.对于CMD&#xff0c;如果不想显示&#xff0c;而是使用交互界面&#xff1a;docker run -ti --rm --name test2 busybox:v5 sh 2.对于CMD&#xff0c;一个交互界面只可以使用一个&#xff0c;如果想多次使用CMD&#xff0c;则用ENTR…...

正十七边形尺规作图证明——从高斯的发现到几何实现

正十七边形尺规作图证明——从高斯的发现到几何实现 1. 引言&#xff1a;一个历史性的数学突破 在欧几里得几何中&#xff0c;尺规作图&#xff08;仅使用直尺和圆规&#xff09;是最为基础的几何构造方法。古希腊数学家已知如何构造正三角形、正方形和正五边形&#xff0c;但…...

常见Web应用源码泄露问题

文章目录 前言一、常见的源码泄露漏洞git源码泄露SVN源码泄露DS_Store文件泄漏网站备份压缩文件泄露WEB-INF/web.xml泄露CVS泄露.hg源码泄露Bazaar/bzr泄露.swp文件泄露 前言 在Web应用方面对于安全来说&#xff0c;可能大家对SQL注入、XSS跨站脚本攻击、文件上传等一些漏洞已…...

如何使用 Python+Flask+win32print 实现简易网络打印服务1

Python 实现网络打印机&#xff1a;Flask win32print 在工作场景中&#xff0c;我们可能需要一个简单的网页接口&#xff0c;供他人上传文档并自动打印到指定打印机。 本文将演示如何使用 Python Flask win32print 库来实现这一需求。 代码详见&#xff1a;https://github.…...

使用Modelsim手动仿真

FPGA设计流程 在设计输入之后,设计综合前进行 RTL 级仿真,称为综合前仿真,也称为前仿真或 功能仿真。前仿真也就是纯粹的功能仿真,主旨在于验证电路的功能是否符合设计要求,其特点是不考虑电路门延迟与线延迟。在完成一个设计的代码编写工作之后,可以直接对代码进行仿真,…...

DeepSeek、Grok与ChatGPT:AI三巨头的技术博弈与场景革命

## 引言&#xff1a;AI工具的三国杀时代 2025年的AI江湖&#xff0c;DeepSeek以黑马之姿横扫全球应用榜单&#xff0c;Grok 3凭借马斯克的狂言抢占头条&#xff0c;ChatGPT则稳坐行业王座。这场技术竞赛不仅是参数量的比拼&#xff0c;更是一场关于效率、成本与场景适配的终极…...

windows自动锁屏,并且要输入密码。如何取消?

Windows 电脑自动锁屏并需要输入密码&#xff0c;通常是因为系统的 电源和睡眠设置 或 组策略 设定了自动锁屏。你可以按照以下方法取消自动锁屏或去掉密码要求&#xff1a; 方法 1&#xff1a;修改 Windows 设置 取消锁屏时间 Win I 打开 设置&#xff0c;进入 系统 → 电源和…...

Redis 主从复制、哨兵与集群的关系及工作原理详解

一、核心概念与关系 Redis 的 主从复制、哨兵&#xff08;Sentinel&#xff09; 和 集群&#xff08;Cluster&#xff09; 是逐步演进的高可用与分布式解决方案&#xff0c;三者关系如下&#xff1a; 主从复制&#xff1a;数据冗余与读写分离的基础。 哨兵&#xff1a;在主从…...

XSD 对 XML 数据格式验证 java

xsd文件&#xff0c;文件名bean.xsd&#xff0c;放在当前java文件目录下 <?xml version"1.0" encoding"UTF-8"?> <xs:schema xmlns:xs"http://www.w3.org/2001/XMLSchema"><xs:element name"bean"><xs:comple…...

利用 ArcGIS Pro 快速统计省域各市道路长度的实操指南

在地理信息分析与处理的工作中&#xff0c;ArcGIS Pro 是一款功能强大的 GIS 软件&#xff0c;它能够帮助我们高效地完成各种复杂的空间数据分析任务。 现在&#xff0c;就让我们一起深入学习如何借助 ArcGIS Pro 来统计省下面各市的道路长度&#xff0c;这一技能在城市规划、…...

1.4 单元测试与热部署

本次实战实现Spring Boot的单元测试与热部署功能。单元测试方面&#xff0c;通过JUnit和Mockito等工具&#xff0c;结合SpringBootTest注解&#xff0c;可以模拟真实环境对应用组件进行独立测试&#xff0c;验证逻辑正确性&#xff0c;提升代码质量。具体演示了HelloWorld01和H…...

蓝桥杯备考:六级词汇积累(day5)

dense 稠密的 condense 压缩 compassion 同情&#xff0c;怜悯 compact 紧凑的&#xff0c;紧密的 resent 愤恨 sober 清醒的 sole 唯一的&#xff0c;独占的 solely only solemn 表情严肃的&#xff0c;庄重的 stun 使昏迷 Stunned by the impact, he lay on the ground won…...

掌握Kubernetes Network Policy,构建安全的容器网络

在 Kubernetes 集群中&#xff0c;默认情况下&#xff0c;所有 Pod 之间都是可以相互通信的&#xff0c;这在某些场景下可能会带来安全隐患。为了实现更精细的网络访问控制&#xff0c;Kubernetes 提供了 Network Policy 机制。Network Policy 允许我们定义一组规则&#xff0c…...

结合rpart包的决策树介绍

决策树与CART算法 决策树是一种基于树状结构的监督学习算法。它通过从根节点开始递归地对特征进行划分&#xff0c;构建出一棵树来进行决策。决策树的构建过程需要解决的重要问题有三个&#xff1a;如何选择自变量、如何选择分割点、确定停止划分的条件。解决这些问题是希望随…...

VScode代码格式化插件black失效问题

之前有如下提示&#xff1a; 没太当回事&#xff0c;发现还能用。之后突然就用不了了&#xff0c;跟着官方插件的文档来查看log&#xff1a; 查看发现提示&#xff1a; Message: TypeError: type object is not subscriptable 在github界面找到解决方案&#xff1a;安装Versio…...

【经验分享】Ubuntu20.04编译RK3568 AI模型报错问题(已解决)

【经验分享】Ubuntu20.04编译RK3568 AI模型报错问题&#xff08;已解决&#xff09; 前言问题现象问题分析解决方案总结 前言 这里使用的是Rockchip提供的rknn_model_zoo&#xff0c;https://github.com/airockchip/rknn_model_zoo/tree/main 此解决方案适用于Rockchip芯片在U…...

AI革命先锋:DeepSeek与蓝耘通义万相2.1的无缝融合引领行业智能化变革

云边有个稻草人-CSDN博客 目录 引言 一、什么是DeepSeek&#xff1f; 1.1 DeepSeek平台概述 1.2 DeepSeek的核心功能与技术 二、蓝耘通义万相2.1概述 2.1 蓝耘科技简介 2.2 蓝耘通义万相2.1的功能与优势 1. 全链条智能化解决方案 2. 强大的数据处理能力 3. 高效的模型…...

基于SpringBoot实现旅游酒店平台功能一

一、前言介绍&#xff1a; 1.1 项目摘要 随着社会的快速发展和人民生活水平的不断提高&#xff0c;旅游已经成为人们休闲娱乐的重要方式之一。人们越来越注重生活的品质和精神文化的追求&#xff0c;旅游需求呈现出爆发式增长。这种增长不仅体现在旅游人数的增加上&#xff0…...

轻松上手 —— 通过 RPM 包快速部署 NebulaGraph

前言 在当今大数据时代&#xff0c;处理复杂关系数据的需求与日俱增&#xff0c;图数据库应运而生并逐渐崭露头角。NebulaGraph 作为一款高性能、分布式且易扩展的图数据库&#xff0c;专为应对大规模图数据处理而精心打造。它不仅具备丰富的查询语言&#xff0c;还拥有强大高效…...

每日一题——接雨水

接雨水问题详解 视频学习推荐 建议先参考以下视频进行学习&#xff1a; 问题描述 给定一个非负整数数组 height&#xff0c;表示每个宽度为 1 的柱子的高度图。计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 示例 示例 1&#xff1a; 输入&#xff1a;height …...

MetaGPT发布的MGX与Devin深度对比

家人们&#xff0c;搞编程的都知道&#xff0c;工具选对了&#xff0c;效率能翻倍&#xff01;今天必须给大伙唠唠MetaGPT发布的MGX编程助手和Devin编程助手 。 先看MGX&#xff0c;简直是编程界的王炸&#xff01;它就像一个超神的虚拟开发团队&#xff0c;一堆智能助手分工明…...

网络安全技术整体架构 一个中心三重防护

网络安全技术整体架构&#xff1a;一个中心三重防护 在信息技术飞速发展的今天&#xff0c;网络安全的重要性日益凸显。为了保护信息系统不受各种安全威胁的侵害&#xff0c;网络安全技术整体架构应运而生。本文将详细介绍“一个中心三重防护”的概念&#xff0c;并结合代码示…...

03.06 QT

一、使用QSlider设计一个进度条&#xff0c;并让其通过线程自己动起来 程序代码&#xff1a; <1> Widget.h: #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QThread> #include "mythread.h"QT_BEGIN_NAMESPACE namespace Ui {…...

【YOLOv12改进trick】多尺度大核注意力机制MLKA模块引入YOLOv12,实现多尺度目标检测涨点,含创新点Python代码,方便发论文

🍋改进模块🍋:多尺度大核注意力机制(MLKA) 🍋解决问题🍋:MLKA模块结合多尺度、门控机制和空间注意力,显著增强卷积网络的模型表示能力。 🍋改进优势🍋:超分辨的MLKA模块对小目标和模糊目标涨点很明显 🍋适用场景🍋:小目标检测、模糊目标检测等 🍋思路…...

SpringUI:打造高质量Web交互设计的首选元件库

SpringUI作为一个专为Web设计与开发领域打造的高质量交互元件库&#xff0c;确实为设计师和开发者提供了极大的便利。以下是对SpringUI及其提供的各类元件的详细解读和一些建议&#xff1a; SpringUI概述 SpringUI集合了一系列预制的、高质量的交互组件&#xff0c;旨在帮助设…...

鸿蒙Android4个脚有脚线

效果 min:number122max:number150Row(){Stack(){// 底Text().border({width:2,color:$r(app.color.yellow)}).height(this.max).aspectRatio(1)// 长Text().backgroundColor($r(app.color.white)).height(this.max).width(this.min)// 宽Text().backgroundColor($r(app.color.w…...