当前位置: 首页 > news >正文

Iceberg与SparkSQL写操作整合

前言

spark操作iceberg之前先要配置spark catalogs,详情参考Iceberg与Spark整合环境配置。

有些操作需要在spark3中开启iceberg sql扩展。

Iceberg使用Apache Spark的DataSourceV2 API来实现数据源和catalog。Spark DSv2是一个不断发展的API,在Spark版本中具有不同级别的支持:
在这里插入图片描述
Spark 3支持SQL INSERT INTO、MERGE INTO和INSERT OVERWRITE,以及新的DataFrameWriterV2 API来进行iceberg表的写操作,接下来我们进行详细讲解。

INSERT INTO

insert into是往iceberg表中插入新数据,主要有两种语法:

INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.table SELECT ...

这两种语法和其它组件如hive等没有太多区别,比较容易掌握。

MERGE INTO

Iceberg "merge into"语法可以对表数据进行行级更新或删除,在Spark3.x版本之后支持,其原理是重写包含需要删除和更新行数据所在的data files。"merge into"可以使用一个查询结果数据来更新目标表的数据,其语法通过类似join关联方式,根据指定的匹配条件对匹配的行数据进行相应操作。

  1. 语法
MERGE INTO tbl t -- 目标表
USING (SELECT ...) s -- 数据源表,也就是用数据源表查出的数据来更新或删除目标表
ON t.id = s.id  -- 关联条件,类似join的on条件
WHEN MATCHED AND ... THEN DELETE -- 删除直接用delete命令
WHEN MATCHED AND ... THEN UPDATE SET ... --更新用upate set
WHEN MATCHED AND ... AND ... THEN UPDATE SET ... --多条件更新
WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...) --匹配不上向目标表插入数据
  1. 示例
  • 创建两张表a和b
create table  hadoop_prod.default.a (id int,name string,age int) using iceberg;
create table  hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg
  • 插入数据
insert into hadoop_prod.default.a values (1,"zs",18),(2,"ls",19),(3,"ww",20)
insert into hadoop_prod.default.b values (1,"zs",30,"delete"),(2,"李四",31,"update"),(4,"王五",32,"add")
  • 使用MERGE INTO 语法向目标表更新、删除、新增数据
    这里我们计划将b表与a表匹配id,如果b表中tp字段是"delete"那么a表中对应的id数据删除,如果b表中tp字段是"update",那么a表中对应的id数据其他字段进行更新,如果a表与b表id匹配不上,那么将b表中的数据插入到a表中,具体操作如下:
merge into hadoop_prod.default.a  t1  -- 目标表a
using (select id,name ,age,tp from hadoop_prod.default.b) t2 -- 数据源表b
on t1.id = t2.id -- 关联条件为id
when matched and t2.tp = 'delete' then delete -- 如果数据源表中tp字段为delete,则对目标表关联d对应的数据进行删除操作
when matched and t2.tp = 'update' then update set t1.name = t2.name,t1.age = t2.age -- 如果数据源表tp字段为update,则对目标表关联id对应数据用数据源表中name和age更新目标表对应字段
when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age) -- 如果id关联不上,则直接把数据源表对应id这条数据插入到目标表中

注意:我们很多数据库都没有类似merge into的操作,为了便于初学者理解,每一行操作都有详细的注释。

  • 结果
    在这里插入图片描述
    id=1,可以匹配上,但数据源表tp为delete,因此会把目标表id=1对应的行删除;
    id=2,可以匹配上,但数据源表tp为update,因此会把目标表id=2对应的name和age用数据源表name和age进行更新;
    id=3,没有匹配上,需要把数据源表对应的这条数据插入到目标表,但是由于数据源中没有id=3的数据,因此没有插入数据,此时保留数据源表中id=3对应的数据;
    id=4,没有匹配上,需要把数据源表对应的这条数据插入到目标表;

注意更新数据时,在查询的数据中只能有一条匹配的数据更新到目标表,否则将报错。

INSERT OVERWRITE

"insert overwrite"可以覆盖Iceberg表中的数据,这种操作会将表中全部数据替换掉,建议如果有部分数据替换操作可以使用"merge into"操作。

对于Iceberg分区表使用"insert overwrite"操作时,有两种情况,第一种是“动态覆盖”,第二种是“静态覆盖”。

  1. 动态分区覆盖
    动态覆盖会全量将原有数据覆盖,并将新插入的数据根据Iceberg表分区规则自动分区,类似Hive中的动态分区。

  2. 静态分区覆盖
    静态覆盖需要在向Iceberg中插入数据时需要手动指定分区,如果当前Iceberg表存在这个分区,那么只有这个分区的数据会被覆盖,其他分区数据不受影响,如果Iceberg表不存在这个分区,那么相当于给Iceberg表增加了个一个分区。

  3. 示例

  • 创建三张表并插入数据
    创建test1分区表、test2普通表、test3普通表三张表,并插入数据,每张表字段相同,但是插入数据不同。
-- test1为分区表
create table  hadoop_prod.default.test1 (id int,name string,loc string)
using iceberg
partitioned by (loc);-- 插入数据
insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai");
-- test2为普通无分区表
create table  hadoop_prod.default.test2 (id int,name string,loc string)
using iceberg;
-- 插入数据
insert into hadoop_prod.default.test2 values (10,"x1","shandong"),(11,"x2","hunan");
-- test3为普通无分区表
create table  hadoop_prod.default.test3 (id int,name string,loc string)
using iceberg;
-- 插入数据
insert into hadoop_prod.default.test3 values (3,"ww","beijing"),(4,"ml","shanghai"),(5,"tq","guangzhou");
  • 使用insert overwrite 读取test3表中的数据覆盖到test2表中
-- 使用insert overwrite 读取test3 表中的数据覆盖到test2 普通表中
insert overwrite hadoop_prod.default.test2 select id,name,loc from  hadoop_prod.default.test3;
-- 查询test2表数据
select * from hadoop_prod.default.test2;

此时test2表中的结果如下:
在这里插入图片描述
说明此时insert overwrite操作是把test2表的数据全部删除,然后把test3表的所有数据插入到test2表。

  • 使用insert overwrite 读取test3表数据,动态分区方式覆盖到表test1
-- 使用insert overwrite 读取test3表数据 动态分区方式覆盖到表 test1
insert overwrite hadoop_prod.default.test1 select id,name,loc from  hadoop_prod.default.test3;
-- 查询 test1 表数据
select * from hadoop_prod.default.test1;

此时test1表中的数据如下:
在这里插入图片描述
说明此时insert overwrite操作是把test1表的数据全部删除,然后把test3表的所有数据插入到test1表,并且分区字段loc按照动态分区的方式进行分区。

  • 静态分区方式,将iceberg表test3的数据覆盖到Iceberg表test1中
    这里可以将test1表删除,然后重新创建,加载数据,也可以直接读取test3中的数据静态分区方式更新到test1。另外,使用insert overwrite 语法覆盖静态分区方式时,查询的语句中就不要再次写入分区列,否则会重复。
-- 删除表test1,重新创建表test1 分区表,并插入数据
drop table hadoop_prod.default.test1;
-- 重建test1分区表
create table  hadoop_prod.default.test1 (id int,name string,loc string) using iceberg partitioned by (loc);
-- 插入数据
insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai");
-- 查询test1表数据
select * from hadoop_prod.default.test1;

在这里插入图片描述

-- 注意:指定静态分区"jiangsu",静态分区下,就不要在查询 “loc" 列了,否则重复
insert overwrite hadoop_prod.default.test1 partition (loc = "jiangsu") select id,name from  hadoop_prod.default.test3;
-- 查询 test1 表数据
select * from hadoop_prod.default.test1;

此时test1表的数据如下:
在这里插入图片描述
我们可以看到test1表原来没有jiangsu分区,采用静态分区指定jiangsu分区的时候,并不影响非jiangsu的数据,只是从test3中读取所有数据,并存放到loc=jiangsu这个分区目录下。

注意:使用insert overwrite 读取test3表数据 静态分区方式覆盖到表 test1,表中其他分区数据不受影响,只会覆盖指定的静态分区数据。

至此,我相信我们已经完全掌握了merge into的用法。

DELETE FROM

Spark3.x版本之后支持"Delete from"可以根据指定的where条件来删除表中数据。如果where条件匹配Iceberg表一个分区的数据,Iceberg仅会修改元数据,如果where条件匹配的表的单个行,则Iceberg会只重写受影响行所在的data files。

-- 创建表 delete_tbl ,并加载数据
create table hadoop_prod.default.delete_tbl (id int,name string,age int) using iceberg;
insert into hadoop_prod.default.delete_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23);
-- 根据条件范围删除表 delete_tbl 中的数据
delete from hadoop_prod.default.delete_tbl where id >3 and id <6;
-- 查询数据
select * from hadoop_prod.default.delete_tbl;

删除了id大于3和小于6之间的所有数据:
在这里插入图片描述

-- 根据条件删除表 delete_tbl 中的一条数据
delete from hadoop_prod.default.delete_tbl where id = 2;
-- 查询数据
select * from hadoop_prod.default.delete_tbl;

删除了id=2的数据:
在这里插入图片描述

删除操作和其它数据库完全一样,操作很简单,但是得理解底层删除数据的原理。

UPDATE

Spark3.x+版本支持了update更新数据操作,可以根据匹配的条件进行数据更新操作。

-- 创建表 update_tbl ,并加载数据
create table hadoop_prod.default.update_tbl (id int,name string,age int) using iceberg;
-- 插入数据
insert into hadoop_prod.default.update_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23);

insert into hadoop_prod.default.update_tbl values (1,“zs”,18),(2,“ls”,19),(3,“ww”,20),(4,“ml”,21),(5,“tq”,22),(6,“gb”,23),操作如下:

-- 更新 delete_tbl 表
update hadoop_prod.default.update_tbl set name = 'zhangsan' ,age = 30 where id <=3;
-- 查询数据
select * from hadoop_prod.default.update_tbl;

把id小于等于3的,name全部改成zhangshan,age全部改成30:
在这里插入图片描述
update操作和其它数据库一模一样,非常简单。

注意:UPDATE 更加专注于单一记录的修改,而 MERGE INTO 则是一个更全面的操作,可以同时处理多个数据状态的变化。因此一些复杂的操作直接用MERGE INTO,比如:

  • 同步外部数据源:如果你有一个外部数据库系统,你可能希望定期将更改(包括插入、更新和删除)同步到你的数据湖中的表。MERGE INTO 可以用来比较两个表,并根据匹配条件执行更新,对于没有匹配记录的新数据则执行插入。
  • 数据集成:当需要合并多个来源的数据到一个目标表中时,MERGE INTO 可以有效地处理这种情况。它可以检查数据是否已经存在,并决定是更新还是添加新的记录。
  • 高效的数据处理:在处理大量数据时,MERGE INTO 可以减少数据处理的时间,因为它只需要一次操作就可以完成更新和插入。

参考文献

Spark Write
https://bbs.huaweicloud.com/blogs/364273

相关文章:

Iceberg与SparkSQL写操作整合

前言 spark操作iceberg之前先要配置spark catalogs&#xff0c;详情参考Iceberg与Spark整合环境配置。 有些操作需要在spark3中开启iceberg sql扩展。 Iceberg使用Apache Spark的DataSourceV2 API来实现数据源和catalog。Spark DSv2是一个不断发展的API&#xff0c;在Spark版…...

MYSQL1

一、为什么学习数据库 1、岗位技能需求 2、现在的世界,得数据者得天下 3、存储数据的方法 4、程序,网站中,大量数据如何长久保存? 5、数据库是几乎软件体系中最核心的一个存在。 二、数据库相关概念 (一)数据库DB 数据库是将大量数据保存起来&#xff0c;通过计算机加…...

一文解答Swin Transformer + 代码【详解】

文章目录 1、Swin Transformer的介绍1.1 Swin Transformer解决图像问题的挑战1.2 Swin Transformer解决图像问题的方法 2、Swin Transformer的具体过程2.1 Patch Partition 和 Linear Embedding2.2 W-MSA、SW-MSA2.3 Swin Transformer代码解析2.3.1 代码解释 2.4 W-MSA和SW-MSA…...

Vue3:<Teleport>传送门组件的使用和注意事项

你好&#xff0c;我是沐爸&#xff0c;欢迎点赞、收藏、评论和关注。 Vue3 引入了一个新的内置组件 <Teleport>&#xff0c;它允许你将子组件树渲染到 DOM 中的另一个位置&#xff0c;而不是在父组件的模板中直接渲染。这对于需要跳出当前组件的 DOM 层级结构进行渲染的…...

项目之家:又一家项目信息发布合作对接及一手接单平台

这几天“小三劝退师时薪700”的消息甚嚣尘上&#xff0c;只能说从某一侧面来看心理咨询师这个职业的前景还是可以的&#xff0c;有兴趣的朋友可以关注下。话说上一篇文章给大家介绍了U客直谈&#xff0c;今天趁热打铁再给大家分享一个地推拉新项目合作平台~项目之家&#xff1a…...

02-java实习工作一个多月-经历分享

一、描述一下最近不写博客的原因 离我发java实习的工作的第一天的博客已经过去了一个多月了&#xff0c;本来还没入职的情况是打算每天工作都要写一份博客来记录一下的&#xff08;最坏的情况也是每周至少总结一下的&#xff09;&#xff0c;其实这个第一天的博客都是在公司快…...

JVM 调优篇2 jvm的内存结构以及堆栈参数设置与查看

一 jvm的内存模型 2.1 jvm内存模型概览 二 实操案例 2.1 设置和查看栈大小 1.代码 /*** 演示栈中的异常:StackOverflowError** author shkstart* create 2020 下午 9:08** 设置栈的大小&#xff1a; -Xss (-XX:ThreadStackSize)** -XX:PrintFlagsFinal*/ public class S…...

微信可以设置自动回复吗?

在日常的微信聊天中&#xff0c;我们或许会频繁地遭遇客户提出的相同问题&#xff0c;尤其是对于从事销售工作的朋友们来说&#xff0c;客户在添加好友后的第一句话往往是“在吗”或者“你好”。当我们的好友数量众多时&#xff0c;手动逐个回复可能会耗费大量的时间。因此&…...

同样数据源走RTMP播放延迟低还是RTSP低?

背景 在比较同一个数据源&#xff0c;是RTMP播放延迟低还是RTSP延迟低之前&#xff0c;我们先看看RTMP和RTSP的区别&#xff0c;我们知道&#xff0c;RTMP&#xff08;Real-Time Messaging Protocol&#xff09;和RTSP&#xff08;Real Time Streaming Protocol&#xff09;是…...

@开发者极客们,网易2024低代码大赛来啦

极客们&#xff0c;网易云信拍了拍你 9月6日起&#xff0c;2024网易低代码大赛正式开启啦&#xff01; 低代码大赛是由网易主办的权威赛事&#xff0c;鼓励开发者们用低代码开发的方式快速搭建应用&#xff0c;并最终以作品决出优胜。 从2022年11月起&#xff0c;网易低代码大赛…...

数据分析-16-时间序列分析的常用模型

1 什么是时间序列 时间序列是一组按时间顺序排列的数据点的集合,通常以固定的时间间隔进行观测。这些数据点可以是按小时、天、月甚至年进行采样的。时间序列在许多领域中都有广泛应用,例如金融、经济学、气象学和工程等。 时间序列的分析可以帮助我们理解和预测未来的趋势和…...

SpringMVC使用:类型转换数据格式化数据验证

01-类型转换器 先在pom.xml里面导入依赖&#xff0c;一个是mvc框架的依赖&#xff0c;一个是junit依赖 然后在web.xml里面导入以下配置&#xff08;配置的详细说明和用法我在前面文章中有写到&#xff09; 创建此测试类的方法用于测试springmvc是具备自动类型转换功能的 user属…...

多语言ASO – 本地化的10个技巧

ASO优化是一个复杂的领域&#xff0c;即使你只关注讲英语的用户。如果您想面向国际受众并在全球范围内发展您的应用程序业务&#xff0c;您必须在App Store和Google Play Store上本地化应用程序的产品页面。不过&#xff0c;应用程序商店本地化的过程也有很多陷阱。 应用商店本…...

C程序设计——函数0

函数定义 前面说过C语言是结构化的程序设计语言&#xff0c;他把所有问题抽象为数据和对数据的操作&#xff0c;前面讲的变量、常量&#xff0c;都是数据。现在开始讲对数据操作——函数。 C语言的函数&#xff0c;定义方式如下&#xff1a; 返回值类型 函数名(参数列表) {…...

第二十一章 rust与动静态库的结合使用

注意 本系列文章已升级、转移至我的自建站点中,本章原文为:rust与动静态库的结合使用 目录 注意一、前言二、库生成三、库使用四、总结一、前言 rust中多了很多类型的库,比如前面章节中我们提到基本的bin与lib这两种crate类型库。 如果你在命令行执行下列语句: rustc -…...

修改服务器DNS解析及修改自动对时时区

修改服务器DNS解析&#xff1a; 1、搜索一下当地的DNS服务器的地址 2、登录服务器&#xff0c;执行 vim /etc/resolv.conf文件&#xff0c;在nameserver字段后填写DNS服务的地址 3、chattr i /etc/resolv.conf 加上不可修改权限&#xff0c;防止重启DNS被修改 修改自动对时…...

中科院TOP“灌水神刊”合集!盘点那些“又牛又水”的国人友好SCI

【SciencePub学术】本期&#xff0c;小编给大家推荐几本“又牛又水”的期刊&#xff0c;并且都是清一色的国人友好刊&#xff0c;涵盖各领域&#xff0c;以供各位学者参考&#xff01; NO.1 Nature Communications IF&#xff1a;14.7 分区&#xff1a;JCR1区中科院1区TOP 年…...

Python列表浅拷贝的陷阱与破解之道

引言 在Python编程世界中&#xff0c;列表的拷贝操作看似简单&#xff0c;却常常隐藏着一些令人意想不到的陷阱&#xff0c;尤其是当涉及到浅拷贝时。今天&#xff0c;我们将深入探讨Python列表浅拷贝现象及产生原因&#xff0c;并提供有效的解决方案&#xff0c;帮助你写出更…...

开放式系统互连(OSI)模型的实际意义

0 前言 开放式系统互连&#xff08;OSI&#xff0c;Open Systems Interconnection&#xff09;模型&#xff0c;由国际标准化组织&#xff08;ISO&#xff09;在1984年提出&#xff0c;目的是为了促进不同厂商生产的网络设备之间的互操作性。 定义了一种在层之间进行协议实现…...

回溯——10.全排列 II

力扣题目链接 给定一个可包含重复数字的序列 nums &#xff0c;按任意顺序 返回所有不重复的全排列。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,2]输出&#xff1a; [[1,1,2], [1,2,1], [2,1,1]] 解题思路&#xff1a; 排序&#xff1a;首先对数组进行排序&#xf…...

国防科技大学计算机基础课程笔记02信息编码

1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制&#xff0c;因此这个了16进制的数据既可以翻译成为这个机器码&#xff0c;也可以翻译成为这个国标码&#xff0c;所以这个时候很容易会出现这个歧义的情况&#xff1b; 因此&#xff0c;我们的这个国…...

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留&#xff0c;CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制&#xff08;CCA-Attention&#xff09;&#xff0c;…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

JDK 17 新特性

#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持&#xff0c;不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的&#xff…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!

本文介绍了一种名为AnomalyAny的创新框架&#xff0c;该方法利用Stable Diffusion的强大生成能力&#xff0c;仅需单个正常样本和文本描述&#xff0c;即可生成逼真且多样化的异常样本&#xff0c;有效解决了视觉异常检测中异常样本稀缺的难题&#xff0c;为工业质检、医疗影像…...

【FTP】ftp文件传输会丢包吗?批量几百个文件传输,有一些文件没有传输完整,如何解决?

FTP&#xff08;File Transfer Protocol&#xff09;本身是一个基于 TCP 的协议&#xff0c;理论上不会丢包。但 FTP 文件传输过程中仍可能出现文件不完整、丢失或损坏的情况&#xff0c;主要原因包括&#xff1a; ✅ 一、FTP传输可能“丢包”或文件不完整的原因 原因描述网络…...

如何在Windows本机安装Python并确保与Python.NET兼容

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…...

Canal环境搭建并实现和ES数据同步

作者&#xff1a;田超凡 日期&#xff1a;2025年6月7日 Canal安装&#xff0c;启动端口11111、8082&#xff1a; 安装canal-deployer服务端&#xff1a; https://github.com/alibaba/canal/releases/1.1.7/canal.deployer-1.1.7.tar.gz cd /opt/homebrew/etc mkdir canal…...