当前位置: 首页 > news >正文

Gravitino源码分析-SparkConnector 实现原理

Gravitino SparkConnector 实现原理

本文参考了官网介绍,想看官方解析请参考 官网地址 本文仅仅介绍原理

文章目录

  • Gravitino SparkConnector 实现原理
    • 背景知识-Spark Plugin 介绍
        • (1) **插件加载**
        • (2) **DriverPlugin 初始化**
        • (3) **ExecutorPlugin 初始化**
        • (4) **插件执行**
        • (5) **插件销毁**
    • 背景知识-Driver Plugin 介绍
        • (1) **`init` 方法**
        • (2) **`registerMetrics` 方法**
        • (3) **`onTaskStart` 方法**
        • (4) **`onTaskSucceeded` 方法**
        • (5) **`onTaskFailed` 方法**
        • (6) **`close` 方法**
    • SparkConnector使用方式
      • 加载spark.sql.catalog.xxx 具体执行的配置

背景知识-Spark Plugin 介绍

spark在[spark-29399]pr提交更新了SparkPlugin插件

SparkPlugin插件执行生命周期

SparkPlugin 的生命周期与 Spark 应用程序的生命周期一致,具体如下:

(1) 插件加载
  • 当 Spark 应用程序启动时,Spark 会扫描类路径下的 SparkPlugin 实现类。
  • 如果插件被正确配置(例如通过 spark.plugins 配置项),Spark 会实例化该类。
(2) DriverPlugin 初始化
  • Spark 调用 driverPlugin() 方法,获取 DriverPlugin 实例。
  • DriverPlugin 的生命周期开始,其方法(如 initregisterMetrics 等)会被调用。
(3) ExecutorPlugin 初始化
  • Spark 调用 executorPlugin() 方法,获取 ExecutorPlugin 实例。
  • ExecutorPlugin 的生命周期开始,其方法(如 initshutdown 等)会被调用。
(4) 插件执行
  • DriverPlugin 在 Driver 端执行自定义逻辑,例如注册指标、拦截 SQL 解析、修改 Catalog 等。
  • ExecutorPlugin 在 Executor 端执行自定义逻辑,例如监控 Task 执行、收集指标等。
(5) 插件销毁
  • 当 Spark 应用程序结束时,DriverPluginExecutorPlugin 的生命周期结束,其 close() 方法会被调用以释放资源。

背景知识-Driver Plugin 介绍

DriverPlugin 是用于在 Driver 端执行自定义逻辑的插件,其生命周期方法包括:

(1) init 方法
  • 在 Driver 插件初始化时调用。
  • 可以在此方法中执行初始化逻辑,例如注册自定义 Catalog、拦截 SQL 解析器等。
(2) registerMetrics 方法
  • 在 Driver 插件初始化时调用。
  • 可以在此方法中注册自定义指标(Metrics)。
(3) onTaskStart 方法
  • 在 Task 启动时调用。
  • 可以在此方法中执行与 Task 相关的逻辑。
(4) onTaskSucceeded 方法
  • 在 Task 成功完成时调用。
  • 可以在此方法中执行与 Task 成功相关的逻辑。
(5) onTaskFailed 方法
  • 在 Task 失败时调用。
  • 可以在此方法中执行与 Task 失败相关的逻辑。
(6) close 方法
  • 在 Driver 插件销毁时调用。
  • 可以在此方法中释放资源,例如关闭连接、清理缓存等。

SparkConnector使用方式

./bin/spark-sql -v \
--conf spark.plugins="org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin" \
--conf spark.sql.gravitino.uri=http://127.0.0.1:8090 \
--conf spark.sql.gravitino.metalake=test \
--conf spark.sql.gravitino.enableIcebergSupport=true \
--conf spark.sql.warehouse.dir=hdfs://127.0.0.1:9000/user/hive/warehouse-hive

可以看出SparkConnector指定了加载的插件是GravitinoSparkPlugin

public class GravitinoSparkPlugin implements SparkPlugin {@Overridepublic DriverPlugin driverPlugin() {return new GravitinoDriverPlugin();}@Overridepublic ExecutorPlugin executorPlugin() {return null;}
}

可以看出实现方式很简单,仅仅使用了一个GravitinoDriverPlugin,也就是在Spark应用程序启动的时候扫描SparkPlugin扫描到了这个GravitinoSparkPlugin然后立马就去执行GravitinoDriverPlugin初始化程序。在DriverPlugin初始化过程中 插件仅仅覆写了两个函数,init()shutdown()。 说明这个插件仅仅做了一些初始化和资源销毁操作。

在Driver端进行初始化

  1. 配置检查检查gravitino_uri和gravitino_metalake是否配置

  2. 如果开启了iceberg则将gravitinoDriverExtensions放入到数组中方便配置

  3. 初始化Gravtino客户端和GravitinoCatalogManager,并且将relational类型的表加载到缓存中

  4. 将缓存中的catalog进行如果是非iceberg类型(当前仅仅只有Hive)进行注册,这里定义的注册的实际操作配置Spark的配置项(spark.sql.catalog.catalogName)这里的catalogName对应的是缓存中的catalogName,配置的值为根据Gravitino自己的Catalog使用的Provider进行适配比如可以是(org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33)具体情况由适配器进行处理。

  5. 然后注册SqlExtensions其实就是将第2步骤的数组配置到SPARK_SESSION_EXTENSIONS这个SparkConf配置里面

稍微贴一下注册Catalog代码,比较重要

  //初始化的时候调用注册逻辑,将Gravitino中的Catalog加载到缓存//然后将缓存中的数据作为第二个参数gravitinoCatalogs传递进来private void registerGravitinoCatalogs(SparkConf sparkConf, Map<String, Catalog> gravitinoCatalogs) {gravitinoCatalogs.entrySet().forEach(entry -> {String catalogName = entry.getKey();Catalog gravitinoCatalog = entry.getValue();String provider = gravitinoCatalog.provider();if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT))&& enableIcebergSupport == false) {return;}try {registerCatalog(sparkConf, catalogName, provider);} catch (Exception e) {LOG.warn("Register catalog {} failed.", catalogName, e);}});}//这里根据适配器去配置spark.sql.catalog.xxx 的具体执行CatalogClassprivate void registerCatalog(SparkConf sparkConf, String catalogName, String provider) {if (StringUtils.isBlank(provider)) {LOG.warn("Skip registering {} because catalog provider is empty.", catalogName);return;}String catalogClassName = CatalogNameAdaptor.getCatalogName(provider);if (StringUtils.isBlank(catalogClassName)) {LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider);return;}String sparkCatalogConfigName = "spark.sql.catalog." + catalogName;Preconditions.checkArgument(!sparkConf.contains(sparkCatalogConfigName),catalogName + " is already registered to SparkCatalogManager");sparkConf.set(sparkCatalogConfigName, catalogClassName);LOG.info("Register {} catalog to Spark catalog manager.", catalogName);}

到这里GravitinoConnector的代码机制已经说完了,下面聊聊Spark机制

加载spark.sql.catalog.xxx 具体执行的配置

经过上面GravitinoDriverPlugin的初始化之后,已经将具体的catalog名称和对应的处理类映射起来,这里以GravitinoHiveCatalogSpark33为例。

GravitinoHiveCatalogSpark33这个类继承关系是继承了BaseCatalogBaseCatalog是Spark中定义的CatalogPlugin的一个实现类。

Spark在解析SQL的时候会查找catalog对应的Catalog,可以看到调用了CatalogManager.catalog()方法

  private object CatalogAndMultipartIdentifier {def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {case Seq(_) =>Some((None, parts))case Seq(catalogName, tail @ _*) =>try {Some((Some(catalogManager.catalog(catalogName)), tail))} catch {case _: CatalogNotFoundException =>Some((None, parts))}}}

这个catalog方法调用了Catalogs.load()方法

  def catalog(name: String): CatalogPlugin = synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog} else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))}}

这个方法才是真正的加载方法,他真正根据conf配置将GravitinoHiveCatalogSpark33名称根据定义的反射构造函数实例化到内存中

   def load(name: String, conf: SQLConf): CatalogPlugin = {val pluginClassName = try {val _pluginClassName = conf.getConfString(s"spark.sql.catalog.$name")// SPARK-39079 do configuration check first, otherwise some path-based table like// `org.apache.spark.sql.json`.`/path/json_file` may fail on analyze phaseif (name.contains(".")) {throw QueryExecutionErrors.invalidCatalogNameError(name)}_pluginClassName} catch {case _: NoSuchElementException =>throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)}val loader = Utils.getContextOrSparkClassLoadertry {val pluginClass = loader.loadClass(pluginClassName)if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)}val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]plugin.initialize(name, catalogOptions(name, conf))plugin} catch {// 省略}}

到这里流程就分析结束了

相关文章:

Gravitino源码分析-SparkConnector 实现原理

Gravitino SparkConnector 实现原理 本文参考了官网介绍&#xff0c;想看官方解析请参考 官网地址 本文仅仅介绍原理 文章目录 Gravitino SparkConnector 实现原理背景知识-Spark Plugin 介绍(1) **插件加载**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) *…...

react基本功

useLayoutEffect useLayoutEffect 用于在浏览器重新绘制屏幕之前同步执行代码。它与 useEffect 相同,但执行时机不同。 主要特点 执行时机:useLayoutEffect 在 DOM 更新完成后同步执行,但在浏览器绘制之前。这使得它可以在浏览器渲染之前读取和修改 DOM,避免视觉上的闪烁…...

python-leetcode-解决智力问题

2140. 解决智力问题 - 力扣&#xff08;LeetCode&#xff09; 这道题是一个典型的 动态规划&#xff08;Dynamic Programming, DP&#xff09; 问题&#xff0c;可以使用 自底向上 的方式解决。 思路 定义状态&#xff1a; 设 dp[i] 表示从第 i 题开始&#xff0c;能获得的最高…...

引领变革!北京爱悦诗科技有限公司荣获“GAS消费电子科创奖-产品创新奖”!

在2025年“GAS消费电子科创奖”评选中&#xff0c;北京爱悦诗科技有限公司提交的“aigo爱国者GS06”&#xff0c;在技术创新性、设计创新性、工艺创新性、智能化创新性及原创性五大维度均获得评委的高度认可&#xff0c;荣获“产品创新奖”。 这一奖项不仅是对爱悦诗在消费电子…...

微信小程序+SpringBoot的单词学习小程序平台(程序+论文+讲解+安装+修改+售后)

感兴趣的可以先收藏起来&#xff0c;还有大家在毕设选题&#xff0c;项目以及论文编写等相关问题都可以给我留言咨询&#xff0c;我会一一回复&#xff0c;希望帮助更多的人。 系统背景 &#xff08;一&#xff09;社会需求背景 在全球化的大背景下&#xff0c;英语作为国际…...

wordpress分类名称调用的几种情况

在WordPress中&#xff0c;如果你想调用当前分类的名称&#xff0c;可以使用single_cat_title()函数。以下是一些常见的使用方法和场景&#xff1a; 1. 在分类页面调用当前分类名称 如果你正在分类存档页面(category.php)中&#xff0c;可以直接使用single_cat_title()函数来…...

HMC7043和HMC7044芯片配置使用

一,HMC7043芯片 MC7043独特的特性是对14个通道分别进行独立灵活的相位管理。所有14个通道均支持频率和相位调整。这些输出还可针对50 Ω或100 Ω内部和外部端接选项进行编程。HMC7043器件具有RF SYNC功能,支持确定性同步多个HMC7043器件,即确保所有时钟输出从同一时钟沿开始…...

html播放本地音乐

本地有多个音乐文件&#xff0c;想用 html 逐个播放&#xff0c;或循环播放&#xff0c;并设置初始音量。 audio 在 html 中播放音乐文件用 audio 标签&#xff1a; controls 启用控制按钮&#xff0c;如进度条、播放、音量、速度等。不加不显示任何 widget。autoplay 理应启…...

Windows11下玩转 Docker

一、前提准备 WSL2&#xff1a;Windows 提供的一种轻量级 Linux 运行环境&#xff0c;具备完整的 Linux 内核&#xff0c;并支持更好的文件系统性能和兼容性。它允许用户在 Windows 系统中运行 Linux 命令行工具和应用程序&#xff0c;而无需安装虚拟机或双系统。Ubuntu 1.1 安…...

vLLM + Open-WebUI 本地私有化部署 DeepSeek-R1-Distill-Qwen-32B 方案

一、vLLM 部署 DeepSeek-R1-Distill-Qwen-32B DeepSeek-R1-Distill 系列模型是 DeepSeek-R1 的蒸馏模型&#xff0c;官方提供了从 1.5B - 70B 不同尺寸大小的模型。特别适合在计算资源有限的环境中部署。 DeepSeek-R1 各个版本的蒸馏模型评估结果如下&#xff1a; 其中 DeepS…...

【基础知识】回头看Maven基础

背景 项目过程中&#xff0c;对于Maven的pom.xml文件&#xff0c;很多时候&#xff0c;我通过各种参考、仿写&#xff0c;最终做出想要的效果。 但实际心里有些迷糊&#xff0c;不清楚具体哪个基础的配置所实现的效果。 今天&#xff0c;特意回过头来&#xff0c;了解Maven的基…...

在 MyBatis 中,若数据库字段名与 SQL 保留字冲突解决办法

在 MyBatis 中&#xff0c;若数据库字段名与 SQL 保留字冲突&#xff0c;可通过以下方法解决&#xff1a; 目录 一、使用转义符号包裹字段名二、通过别名映射三、借助 MyBatis-Plus 注解四、全局配置策略&#xff08;辅助方案&#xff09;最佳实践与注意事项 一、使用转义符号…...

docker-compose Install reranker(fastgpt支持) GPU模式

前言BGE-重新排名器 与 embedding 模型不同,reranker 或 cross-encoder 使用 question 和 document 作为输入,直接输出相似性而不是 embedding。 为了平衡准确性和时间成本,cross-encoder 被广泛用于对其他简单模型检索到的前 k 个文档进行重新排序。 例如,使用 bge 嵌入模…...

200W数据需要去重,如何优化?

优化去重逻辑的时间取决于多个因素&#xff0c;包括数据量、数据结构、硬件性能&#xff08;CPU、内存&#xff09;、去重算法的实现方式等。以下是对优化去重逻辑的详细分析和预期优化效果&#xff1a; 1. 去重逻辑的性能瓶颈 时间复杂度&#xff1a;使用HashSet去重的时间复…...

使用免费IP数据库离线查询IP归属地

一、准备工作 1.下载免费IP数据库 首先&#xff0c;访问 MaxMind官网&#xff08;https://www.maxmind.com/en/home&#xff09;如果你还没有MaxMind账号&#xff0c;可以通过此链接地址&#xff08;https://www.maxmind.com/en/geolite2/signup&#xff09;进行账号注册&…...

【游戏】【客户端性能测试】

待续… 一、 常见指标 1. 越高越好 FPS 2. 越低越好 网络流量CPU内存&#xff08;PSS&#xff0c; momo&#xff09;Drawcalls三角形数耗电量包体大小 二、 游戏体验 1. 直接体感 游戏花屏闪退卡顿延迟 2. 可能原因 内存超标Drawcall数量多FPS波动严重CPU占用高居不下…...

软考中级-数据库-3.3 数据结构-树

定义:树是n(n>=0)个结点的有限集合。当n=0时称为空树。在任一非空树中,有且仅有一个称为根的结点:其余结点可分为m(m>=0)个互不相交的有限集T1,T2,T3...,Tm…,其中每个集合又都是一棵树,并且称为根结点的子树。 树的相关概念 1、双亲、孩子和兄弟: 2、结点的度:一个结…...

typora高亮方案+鼠标侧键一键改色

引言 在typora里面有一个自定义的高亮, <mark></mark>>但是单一颜色就太难看了, 我使用人工智能, 搜索全网艺术家, 汇集了几种好看的格式,并且方便大家侧键一键 调用, 是不是太方便啦 ! 示例 午夜模式 春意盎然 深海蓝调 石墨文档 秋日暖阳 蜜桃宣言 使用方法 …...

【CSS】Tailwind CSS 与传统 CSS:设计理念与使用场景对比

1. 开发方式 1.1 传统 CSS 手写 CSS&#xff1a;你需要手动编写 CSS 规则&#xff0c;定义类名、ID 或元素选择器&#xff0c;并为每个元素编写样式。 分离式开发&#xff1a;HTML 和 CSS 通常是分离的&#xff0c;HTML 中通过类名或 ID 引用 CSS 文件中的样式。 示例&#…...

Linux(Centos 7.6)命令详解:vim

1.命令作用 vi/vim 是Linux 系统内置不可或缺的文本编辑命令&#xff0c;vim 是vi 的加强版本&#xff0c;兼容vi 的所有指令&#xff0c;不仅能编辑文本&#xff0c;而且还具有shell 程序编辑的功能&#xff0c;可以不同颜色的字体来辨别语法的正确性。 2.命令语法 usage: …...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度​

一、引言&#xff1a;多云环境的技术复杂性本质​​ 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时&#xff0c;​​基础设施的技术债呈现指数级积累​​。网络连接、身份认证、成本管理这三大核心挑战相互嵌套&#xff1a;跨云网络构建数据…...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例

使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件&#xff0c;常用于在两个集合之间进行数据转移&#xff0c;如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model&#xff1a;绑定右侧列表的值&…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

MODBUS TCP转CANopen 技术赋能高效协同作业

在现代工业自动化领域&#xff0c;MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步&#xff0c;这两种通讯协议也正在被逐步融合&#xff0c;形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...

Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?

在大数据处理领域&#xff0c;Hive 作为 Hadoop 生态中重要的数据仓库工具&#xff0c;其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式&#xff0c;很多开发者常常陷入选择困境。本文将从底…...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息&#xff0c;对客户进行统一管理&#xff0c;可以把所有客户信息录入系统&#xff0c;进行维护和统计功能。可通过文件的方式保存相关录入数据&#xff0c;对…...