Gravitino源码分析-SparkConnector 实现原理
Gravitino SparkConnector 实现原理
本文参考了官网介绍,想看官方解析请参考 官网地址 本文仅仅介绍原理
文章目录
- Gravitino SparkConnector 实现原理
- 背景知识-Spark Plugin 介绍
- (1) **插件加载**
- (2) **DriverPlugin 初始化**
- (3) **ExecutorPlugin 初始化**
- (4) **插件执行**
- (5) **插件销毁**
- 背景知识-Driver Plugin 介绍
- (1) **`init` 方法**
- (2) **`registerMetrics` 方法**
- (3) **`onTaskStart` 方法**
- (4) **`onTaskSucceeded` 方法**
- (5) **`onTaskFailed` 方法**
- (6) **`close` 方法**
- SparkConnector使用方式
- 加载spark.sql.catalog.xxx 具体执行的配置
背景知识-Spark Plugin 介绍
spark在[spark-29399]pr提交更新了SparkPlugin插件
SparkPlugin插件执行生命周期
SparkPlugin 的生命周期与 Spark 应用程序的生命周期一致,具体如下:
(1) 插件加载
- 当 Spark 应用程序启动时,Spark 会扫描类路径下的
SparkPlugin实现类。 - 如果插件被正确配置(例如通过
spark.plugins配置项),Spark 会实例化该类。
(2) DriverPlugin 初始化
- Spark 调用
driverPlugin()方法,获取DriverPlugin实例。 DriverPlugin的生命周期开始,其方法(如init、registerMetrics等)会被调用。
(3) ExecutorPlugin 初始化
- Spark 调用
executorPlugin()方法,获取ExecutorPlugin实例。 ExecutorPlugin的生命周期开始,其方法(如init、shutdown等)会被调用。
(4) 插件执行
DriverPlugin在 Driver 端执行自定义逻辑,例如注册指标、拦截 SQL 解析、修改 Catalog 等。ExecutorPlugin在 Executor 端执行自定义逻辑,例如监控 Task 执行、收集指标等。
(5) 插件销毁
- 当 Spark 应用程序结束时,
DriverPlugin和ExecutorPlugin的生命周期结束,其close()方法会被调用以释放资源。
背景知识-Driver Plugin 介绍
DriverPlugin 是用于在 Driver 端执行自定义逻辑的插件,其生命周期方法包括:
(1) init 方法
- 在 Driver 插件初始化时调用。
- 可以在此方法中执行初始化逻辑,例如注册自定义 Catalog、拦截 SQL 解析器等。
(2) registerMetrics 方法
- 在 Driver 插件初始化时调用。
- 可以在此方法中注册自定义指标(Metrics)。
(3) onTaskStart 方法
- 在 Task 启动时调用。
- 可以在此方法中执行与 Task 相关的逻辑。
(4) onTaskSucceeded 方法
- 在 Task 成功完成时调用。
- 可以在此方法中执行与 Task 成功相关的逻辑。
(5) onTaskFailed 方法
- 在 Task 失败时调用。
- 可以在此方法中执行与 Task 失败相关的逻辑。
(6) close 方法
- 在 Driver 插件销毁时调用。
- 可以在此方法中释放资源,例如关闭连接、清理缓存等。
SparkConnector使用方式
./bin/spark-sql -v \
--conf spark.plugins="org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin" \
--conf spark.sql.gravitino.uri=http://127.0.0.1:8090 \
--conf spark.sql.gravitino.metalake=test \
--conf spark.sql.gravitino.enableIcebergSupport=true \
--conf spark.sql.warehouse.dir=hdfs://127.0.0.1:9000/user/hive/warehouse-hive
可以看出SparkConnector指定了加载的插件是GravitinoSparkPlugin
public class GravitinoSparkPlugin implements SparkPlugin {@Overridepublic DriverPlugin driverPlugin() {return new GravitinoDriverPlugin();}@Overridepublic ExecutorPlugin executorPlugin() {return null;}
}
可以看出实现方式很简单,仅仅使用了一个GravitinoDriverPlugin,也就是在Spark应用程序启动的时候扫描SparkPlugin扫描到了这个GravitinoSparkPlugin然后立马就去执行GravitinoDriverPlugin初始化程序。在DriverPlugin初始化过程中 插件仅仅覆写了两个函数,init() 和shutdown()。 说明这个插件仅仅做了一些初始化和资源销毁操作。
在Driver端进行初始化
-
配置检查检查gravitino_uri和gravitino_metalake是否配置
-
如果开启了iceberg则将gravitinoDriverExtensions放入到数组中方便配置
-
初始化Gravtino客户端和
GravitinoCatalogManager,并且将relational类型的表加载到缓存中 -
将缓存中的catalog进行如果是非iceberg类型(当前仅仅只有Hive)进行注册,这里定义的注册的实际操作配置Spark的配置项(spark.sql.catalog.catalogName)这里的catalogName对应的是缓存中的catalogName,配置的值为根据Gravitino自己的Catalog使用的Provider进行适配比如可以是(
org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33)具体情况由适配器进行处理。 -
然后注册SqlExtensions其实就是将第2步骤的数组配置到
SPARK_SESSION_EXTENSIONS这个SparkConf配置里面
稍微贴一下注册Catalog代码,比较重要
//初始化的时候调用注册逻辑,将Gravitino中的Catalog加载到缓存//然后将缓存中的数据作为第二个参数gravitinoCatalogs传递进来private void registerGravitinoCatalogs(SparkConf sparkConf, Map<String, Catalog> gravitinoCatalogs) {gravitinoCatalogs.entrySet().forEach(entry -> {String catalogName = entry.getKey();Catalog gravitinoCatalog = entry.getValue();String provider = gravitinoCatalog.provider();if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT))&& enableIcebergSupport == false) {return;}try {registerCatalog(sparkConf, catalogName, provider);} catch (Exception e) {LOG.warn("Register catalog {} failed.", catalogName, e);}});}//这里根据适配器去配置spark.sql.catalog.xxx 的具体执行CatalogClassprivate void registerCatalog(SparkConf sparkConf, String catalogName, String provider) {if (StringUtils.isBlank(provider)) {LOG.warn("Skip registering {} because catalog provider is empty.", catalogName);return;}String catalogClassName = CatalogNameAdaptor.getCatalogName(provider);if (StringUtils.isBlank(catalogClassName)) {LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider);return;}String sparkCatalogConfigName = "spark.sql.catalog." + catalogName;Preconditions.checkArgument(!sparkConf.contains(sparkCatalogConfigName),catalogName + " is already registered to SparkCatalogManager");sparkConf.set(sparkCatalogConfigName, catalogClassName);LOG.info("Register {} catalog to Spark catalog manager.", catalogName);}
到这里GravitinoConnector的代码机制已经说完了,下面聊聊Spark机制
加载spark.sql.catalog.xxx 具体执行的配置
经过上面GravitinoDriverPlugin的初始化之后,已经将具体的catalog名称和对应的处理类映射起来,这里以GravitinoHiveCatalogSpark33为例。
GravitinoHiveCatalogSpark33这个类继承关系是继承了BaseCatalog 而BaseCatalog是Spark中定义的CatalogPlugin的一个实现类。
Spark在解析SQL的时候会查找catalog对应的Catalog,可以看到调用了CatalogManager.catalog()方法
private object CatalogAndMultipartIdentifier {def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {case Seq(_) =>Some((None, parts))case Seq(catalogName, tail @ _*) =>try {Some((Some(catalogManager.catalog(catalogName)), tail))} catch {case _: CatalogNotFoundException =>Some((None, parts))}}}
这个catalog方法调用了Catalogs.load()方法
def catalog(name: String): CatalogPlugin = synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog} else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))}}
这个方法才是真正的加载方法,他真正根据conf配置将GravitinoHiveCatalogSpark33名称根据定义的反射构造函数实例化到内存中
def load(name: String, conf: SQLConf): CatalogPlugin = {val pluginClassName = try {val _pluginClassName = conf.getConfString(s"spark.sql.catalog.$name")// SPARK-39079 do configuration check first, otherwise some path-based table like// `org.apache.spark.sql.json`.`/path/json_file` may fail on analyze phaseif (name.contains(".")) {throw QueryExecutionErrors.invalidCatalogNameError(name)}_pluginClassName} catch {case _: NoSuchElementException =>throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)}val loader = Utils.getContextOrSparkClassLoadertry {val pluginClass = loader.loadClass(pluginClassName)if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)}val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]plugin.initialize(name, catalogOptions(name, conf))plugin} catch {// 省略}}
到这里流程就分析结束了
相关文章:
Gravitino源码分析-SparkConnector 实现原理
Gravitino SparkConnector 实现原理 本文参考了官网介绍,想看官方解析请参考 官网地址 本文仅仅介绍原理 文章目录 Gravitino SparkConnector 实现原理背景知识-Spark Plugin 介绍(1) **插件加载**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) *…...
windows下使用msys2编译ffmpeg
三种方法: 1、在msys2中使用gcc编译 2、在msys2中使用visual studio编译(有环境变量) 3、在msys2中使用visual studio编译(无环境变量) 我的环境: 1、msys2-x86_64-20250221 2、vs2015 3、ffmpeg-7.1…...
Linux内核自定义协议族开发指南:理解net_device_ops、proto_ops与net_proto_family
在Linux内核中开发自定义协议族需要深入理解网络协议栈的分层模型。net_device_ops、proto_ops和net_proto_family是三个关键结构体,分别作用于不同的层次。本文将详细解析它们的作用、交互关系及实现方法,并提供一个完整的开发框架。 一、核心结构体的作用与层级关系 struct…...
可视化+图解链表
链表(Linked list)是一种常用的数据结构,它由一系列节点组成,每个节点包含数据域和指针域。指针域存储了下一个节点的地址,从而建立起各节点之间的线性关系。 1、链表节点 1.1 节点构成 链表节点如下图所示ÿ…...
Docker参数,以及仓库搭建
一。Docker的构建参数 注释: 1.对于CMD,如果不想显示,而是使用交互界面:docker run -ti --rm --name test2 busybox:v5 sh 2.对于CMD,一个交互界面只可以使用一个,如果想多次使用CMD,则用ENTR…...
正十七边形尺规作图证明——从高斯的发现到几何实现
正十七边形尺规作图证明——从高斯的发现到几何实现 1. 引言:一个历史性的数学突破 在欧几里得几何中,尺规作图(仅使用直尺和圆规)是最为基础的几何构造方法。古希腊数学家已知如何构造正三角形、正方形和正五边形,但…...
常见Web应用源码泄露问题
文章目录 前言一、常见的源码泄露漏洞git源码泄露SVN源码泄露DS_Store文件泄漏网站备份压缩文件泄露WEB-INF/web.xml泄露CVS泄露.hg源码泄露Bazaar/bzr泄露.swp文件泄露 前言 在Web应用方面对于安全来说,可能大家对SQL注入、XSS跨站脚本攻击、文件上传等一些漏洞已…...
如何使用 Python+Flask+win32print 实现简易网络打印服务1
Python 实现网络打印机:Flask win32print 在工作场景中,我们可能需要一个简单的网页接口,供他人上传文档并自动打印到指定打印机。 本文将演示如何使用 Python Flask win32print 库来实现这一需求。 代码详见:https://github.…...
使用Modelsim手动仿真
FPGA设计流程 在设计输入之后,设计综合前进行 RTL 级仿真,称为综合前仿真,也称为前仿真或 功能仿真。前仿真也就是纯粹的功能仿真,主旨在于验证电路的功能是否符合设计要求,其特点是不考虑电路门延迟与线延迟。在完成一个设计的代码编写工作之后,可以直接对代码进行仿真,…...
DeepSeek、Grok与ChatGPT:AI三巨头的技术博弈与场景革命
## 引言:AI工具的三国杀时代 2025年的AI江湖,DeepSeek以黑马之姿横扫全球应用榜单,Grok 3凭借马斯克的狂言抢占头条,ChatGPT则稳坐行业王座。这场技术竞赛不仅是参数量的比拼,更是一场关于效率、成本与场景适配的终极…...
windows自动锁屏,并且要输入密码。如何取消?
Windows 电脑自动锁屏并需要输入密码,通常是因为系统的 电源和睡眠设置 或 组策略 设定了自动锁屏。你可以按照以下方法取消自动锁屏或去掉密码要求: 方法 1:修改 Windows 设置 取消锁屏时间 Win I 打开 设置,进入 系统 → 电源和…...
Redis 主从复制、哨兵与集群的关系及工作原理详解
一、核心概念与关系 Redis 的 主从复制、哨兵(Sentinel) 和 集群(Cluster) 是逐步演进的高可用与分布式解决方案,三者关系如下: 主从复制:数据冗余与读写分离的基础。 哨兵:在主从…...
XSD 对 XML 数据格式验证 java
xsd文件,文件名bean.xsd,放在当前java文件目录下 <?xml version"1.0" encoding"UTF-8"?> <xs:schema xmlns:xs"http://www.w3.org/2001/XMLSchema"><xs:element name"bean"><xs:comple…...
利用 ArcGIS Pro 快速统计省域各市道路长度的实操指南
在地理信息分析与处理的工作中,ArcGIS Pro 是一款功能强大的 GIS 软件,它能够帮助我们高效地完成各种复杂的空间数据分析任务。 现在,就让我们一起深入学习如何借助 ArcGIS Pro 来统计省下面各市的道路长度,这一技能在城市规划、…...
1.4 单元测试与热部署
本次实战实现Spring Boot的单元测试与热部署功能。单元测试方面,通过JUnit和Mockito等工具,结合SpringBootTest注解,可以模拟真实环境对应用组件进行独立测试,验证逻辑正确性,提升代码质量。具体演示了HelloWorld01和H…...
蓝桥杯备考:六级词汇积累(day5)
dense 稠密的 condense 压缩 compassion 同情,怜悯 compact 紧凑的,紧密的 resent 愤恨 sober 清醒的 sole 唯一的,独占的 solely only solemn 表情严肃的,庄重的 stun 使昏迷 Stunned by the impact, he lay on the ground won…...
掌握Kubernetes Network Policy,构建安全的容器网络
在 Kubernetes 集群中,默认情况下,所有 Pod 之间都是可以相互通信的,这在某些场景下可能会带来安全隐患。为了实现更精细的网络访问控制,Kubernetes 提供了 Network Policy 机制。Network Policy 允许我们定义一组规则,…...
结合rpart包的决策树介绍
决策树与CART算法 决策树是一种基于树状结构的监督学习算法。它通过从根节点开始递归地对特征进行划分,构建出一棵树来进行决策。决策树的构建过程需要解决的重要问题有三个:如何选择自变量、如何选择分割点、确定停止划分的条件。解决这些问题是希望随…...
VScode代码格式化插件black失效问题
之前有如下提示: 没太当回事,发现还能用。之后突然就用不了了,跟着官方插件的文档来查看log: 查看发现提示: Message: TypeError: type object is not subscriptable 在github界面找到解决方案:安装Versio…...
【经验分享】Ubuntu20.04编译RK3568 AI模型报错问题(已解决)
【经验分享】Ubuntu20.04编译RK3568 AI模型报错问题(已解决) 前言问题现象问题分析解决方案总结 前言 这里使用的是Rockchip提供的rknn_model_zoo,https://github.com/airockchip/rknn_model_zoo/tree/main 此解决方案适用于Rockchip芯片在U…...
AI革命先锋:DeepSeek与蓝耘通义万相2.1的无缝融合引领行业智能化变革
云边有个稻草人-CSDN博客 目录 引言 一、什么是DeepSeek? 1.1 DeepSeek平台概述 1.2 DeepSeek的核心功能与技术 二、蓝耘通义万相2.1概述 2.1 蓝耘科技简介 2.2 蓝耘通义万相2.1的功能与优势 1. 全链条智能化解决方案 2. 强大的数据处理能力 3. 高效的模型…...
基于SpringBoot实现旅游酒店平台功能一
一、前言介绍: 1.1 项目摘要 随着社会的快速发展和人民生活水平的不断提高,旅游已经成为人们休闲娱乐的重要方式之一。人们越来越注重生活的品质和精神文化的追求,旅游需求呈现出爆发式增长。这种增长不仅体现在旅游人数的增加上࿰…...
轻松上手 —— 通过 RPM 包快速部署 NebulaGraph
前言 在当今大数据时代,处理复杂关系数据的需求与日俱增,图数据库应运而生并逐渐崭露头角。NebulaGraph 作为一款高性能、分布式且易扩展的图数据库,专为应对大规模图数据处理而精心打造。它不仅具备丰富的查询语言,还拥有强大高效…...
每日一题——接雨水
接雨水问题详解 视频学习推荐 建议先参考以下视频进行学习: 问题描述 给定一个非负整数数组 height,表示每个宽度为 1 的柱子的高度图。计算按此排列的柱子,下雨之后能接多少雨水。 示例 示例 1: 输入:height …...
MetaGPT发布的MGX与Devin深度对比
家人们,搞编程的都知道,工具选对了,效率能翻倍!今天必须给大伙唠唠MetaGPT发布的MGX编程助手和Devin编程助手 。 先看MGX,简直是编程界的王炸!它就像一个超神的虚拟开发团队,一堆智能助手分工明…...
网络安全技术整体架构 一个中心三重防护
网络安全技术整体架构:一个中心三重防护 在信息技术飞速发展的今天,网络安全的重要性日益凸显。为了保护信息系统不受各种安全威胁的侵害,网络安全技术整体架构应运而生。本文将详细介绍“一个中心三重防护”的概念,并结合代码示…...
03.06 QT
一、使用QSlider设计一个进度条,并让其通过线程自己动起来 程序代码: <1> Widget.h: #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QThread> #include "mythread.h"QT_BEGIN_NAMESPACE namespace Ui {…...
【YOLOv12改进trick】多尺度大核注意力机制MLKA模块引入YOLOv12,实现多尺度目标检测涨点,含创新点Python代码,方便发论文
🍋改进模块🍋:多尺度大核注意力机制(MLKA) 🍋解决问题🍋:MLKA模块结合多尺度、门控机制和空间注意力,显著增强卷积网络的模型表示能力。 🍋改进优势🍋:超分辨的MLKA模块对小目标和模糊目标涨点很明显 🍋适用场景🍋:小目标检测、模糊目标检测等 🍋思路…...
SpringUI:打造高质量Web交互设计的首选元件库
SpringUI作为一个专为Web设计与开发领域打造的高质量交互元件库,确实为设计师和开发者提供了极大的便利。以下是对SpringUI及其提供的各类元件的详细解读和一些建议: SpringUI概述 SpringUI集合了一系列预制的、高质量的交互组件,旨在帮助设…...
鸿蒙Android4个脚有脚线
效果 min:number122max:number150Row(){Stack(){// 底Text().border({width:2,color:$r(app.color.yellow)}).height(this.max).aspectRatio(1)// 长Text().backgroundColor($r(app.color.white)).height(this.max).width(this.min)// 宽Text().backgroundColor($r(app.color.w…...
