Iceberg与SparkSQL写操作整合
前言
spark操作iceberg之前先要配置spark catalogs,详情参考Iceberg与Spark整合环境配置。
有些操作需要在spark3中开启iceberg sql扩展。
Iceberg使用Apache Spark的DataSourceV2 API来实现数据源和catalog。Spark DSv2是一个不断发展的API,在Spark版本中具有不同级别的支持:

Spark 3支持SQL INSERT INTO、MERGE INTO和INSERT OVERWRITE,以及新的DataFrameWriterV2 API来进行iceberg表的写操作,接下来我们进行详细讲解。
INSERT INTO
insert into是往iceberg表中插入新数据,主要有两种语法:
INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.table SELECT ...
这两种语法和其它组件如hive等没有太多区别,比较容易掌握。
MERGE INTO
Iceberg "merge into"语法可以对表数据进行行级更新或删除,在Spark3.x版本之后支持,其原理是重写包含需要删除和更新行数据所在的data files。"merge into"可以使用一个查询结果数据来更新目标表的数据,其语法通过类似join关联方式,根据指定的匹配条件对匹配的行数据进行相应操作。
- 语法
MERGE INTO tbl t -- 目标表
USING (SELECT ...) s -- 数据源表,也就是用数据源表查出的数据来更新或删除目标表
ON t.id = s.id -- 关联条件,类似join的on条件
WHEN MATCHED AND ... THEN DELETE -- 删除直接用delete命令
WHEN MATCHED AND ... THEN UPDATE SET ... --更新用upate set
WHEN MATCHED AND ... AND ... THEN UPDATE SET ... --多条件更新
WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...) --匹配不上向目标表插入数据
- 示例
- 创建两张表a和b
create table hadoop_prod.default.a (id int,name string,age int) using iceberg;
create table hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg
- 插入数据
insert into hadoop_prod.default.a values (1,"zs",18),(2,"ls",19),(3,"ww",20)
insert into hadoop_prod.default.b values (1,"zs",30,"delete"),(2,"李四",31,"update"),(4,"王五",32,"add")
- 使用MERGE INTO 语法向目标表更新、删除、新增数据
这里我们计划将b表与a表匹配id,如果b表中tp字段是"delete"那么a表中对应的id数据删除,如果b表中tp字段是"update",那么a表中对应的id数据其他字段进行更新,如果a表与b表id匹配不上,那么将b表中的数据插入到a表中,具体操作如下:
merge into hadoop_prod.default.a t1 -- 目标表a
using (select id,name ,age,tp from hadoop_prod.default.b) t2 -- 数据源表b
on t1.id = t2.id -- 关联条件为id
when matched and t2.tp = 'delete' then delete -- 如果数据源表中tp字段为delete,则对目标表关联d对应的数据进行删除操作
when matched and t2.tp = 'update' then update set t1.name = t2.name,t1.age = t2.age -- 如果数据源表tp字段为update,则对目标表关联id对应数据用数据源表中name和age更新目标表对应字段
when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age) -- 如果id关联不上,则直接把数据源表对应id这条数据插入到目标表中
注意:我们很多数据库都没有类似merge into的操作,为了便于初学者理解,每一行操作都有详细的注释。
- 结果

id=1,可以匹配上,但数据源表tp为delete,因此会把目标表id=1对应的行删除;
id=2,可以匹配上,但数据源表tp为update,因此会把目标表id=2对应的name和age用数据源表name和age进行更新;
id=3,没有匹配上,需要把数据源表对应的这条数据插入到目标表,但是由于数据源中没有id=3的数据,因此没有插入数据,此时保留数据源表中id=3对应的数据;
id=4,没有匹配上,需要把数据源表对应的这条数据插入到目标表;
注意:更新数据时,在查询的数据中只能有一条匹配的数据更新到目标表,否则将报错。
INSERT OVERWRITE
"insert overwrite"可以覆盖Iceberg表中的数据,这种操作会将表中全部数据替换掉,建议如果有部分数据替换操作可以使用"merge into"操作。
对于Iceberg分区表使用"insert overwrite"操作时,有两种情况,第一种是“动态覆盖”,第二种是“静态覆盖”。
-
动态分区覆盖
动态覆盖会全量将原有数据覆盖,并将新插入的数据根据Iceberg表分区规则自动分区,类似Hive中的动态分区。 -
静态分区覆盖
静态覆盖需要在向Iceberg中插入数据时需要手动指定分区,如果当前Iceberg表存在这个分区,那么只有这个分区的数据会被覆盖,其他分区数据不受影响,如果Iceberg表不存在这个分区,那么相当于给Iceberg表增加了个一个分区。 -
示例
- 创建三张表并插入数据
创建test1分区表、test2普通表、test3普通表三张表,并插入数据,每张表字段相同,但是插入数据不同。
-- test1为分区表
create table hadoop_prod.default.test1 (id int,name string,loc string)
using iceberg
partitioned by (loc);-- 插入数据
insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai");
-- test2为普通无分区表
create table hadoop_prod.default.test2 (id int,name string,loc string)
using iceberg;
-- 插入数据
insert into hadoop_prod.default.test2 values (10,"x1","shandong"),(11,"x2","hunan");
-- test3为普通无分区表
create table hadoop_prod.default.test3 (id int,name string,loc string)
using iceberg;
-- 插入数据
insert into hadoop_prod.default.test3 values (3,"ww","beijing"),(4,"ml","shanghai"),(5,"tq","guangzhou");
- 使用insert overwrite 读取test3表中的数据覆盖到test2表中
-- 使用insert overwrite 读取test3 表中的数据覆盖到test2 普通表中
insert overwrite hadoop_prod.default.test2 select id,name,loc from hadoop_prod.default.test3;
-- 查询test2表数据
select * from hadoop_prod.default.test2;
此时test2表中的结果如下:

说明此时insert overwrite操作是把test2表的数据全部删除,然后把test3表的所有数据插入到test2表。
- 使用insert overwrite 读取test3表数据,动态分区方式覆盖到表test1
-- 使用insert overwrite 读取test3表数据 动态分区方式覆盖到表 test1
insert overwrite hadoop_prod.default.test1 select id,name,loc from hadoop_prod.default.test3;
-- 查询 test1 表数据
select * from hadoop_prod.default.test1;
此时test1表中的数据如下:

说明此时insert overwrite操作是把test1表的数据全部删除,然后把test3表的所有数据插入到test1表,并且分区字段loc按照动态分区的方式进行分区。
- 静态分区方式,将iceberg表test3的数据覆盖到Iceberg表test1中
这里可以将test1表删除,然后重新创建,加载数据,也可以直接读取test3中的数据静态分区方式更新到test1。另外,使用insert overwrite 语法覆盖静态分区方式时,查询的语句中就不要再次写入分区列,否则会重复。
-- 删除表test1,重新创建表test1 分区表,并插入数据
drop table hadoop_prod.default.test1;
-- 重建test1分区表
create table hadoop_prod.default.test1 (id int,name string,loc string) using iceberg partitioned by (loc);
-- 插入数据
insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai");
-- 查询test1表数据
select * from hadoop_prod.default.test1;

-- 注意:指定静态分区"jiangsu",静态分区下,就不要在查询 “loc" 列了,否则重复
insert overwrite hadoop_prod.default.test1 partition (loc = "jiangsu") select id,name from hadoop_prod.default.test3;
-- 查询 test1 表数据
select * from hadoop_prod.default.test1;
此时test1表的数据如下:

我们可以看到test1表原来没有jiangsu分区,采用静态分区指定jiangsu分区的时候,并不影响非jiangsu的数据,只是从test3中读取所有数据,并存放到loc=jiangsu这个分区目录下。
注意:使用insert overwrite 读取test3表数据 静态分区方式覆盖到表 test1,表中其他分区数据不受影响,只会覆盖指定的静态分区数据。
至此,我相信我们已经完全掌握了merge into的用法。
DELETE FROM
Spark3.x版本之后支持"Delete from"可以根据指定的where条件来删除表中数据。如果where条件匹配Iceberg表一个分区的数据,Iceberg仅会修改元数据,如果where条件匹配的表的单个行,则Iceberg会只重写受影响行所在的data files。
-- 创建表 delete_tbl ,并加载数据
create table hadoop_prod.default.delete_tbl (id int,name string,age int) using iceberg;
insert into hadoop_prod.default.delete_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23);
-- 根据条件范围删除表 delete_tbl 中的数据
delete from hadoop_prod.default.delete_tbl where id >3 and id <6;
-- 查询数据
select * from hadoop_prod.default.delete_tbl;
删除了id大于3和小于6之间的所有数据:

-- 根据条件删除表 delete_tbl 中的一条数据
delete from hadoop_prod.default.delete_tbl where id = 2;
-- 查询数据
select * from hadoop_prod.default.delete_tbl;
删除了id=2的数据:

删除操作和其它数据库完全一样,操作很简单,但是得理解底层删除数据的原理。
UPDATE
Spark3.x+版本支持了update更新数据操作,可以根据匹配的条件进行数据更新操作。
-- 创建表 update_tbl ,并加载数据
create table hadoop_prod.default.update_tbl (id int,name string,age int) using iceberg;
-- 插入数据
insert into hadoop_prod.default.update_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23);
insert into hadoop_prod.default.update_tbl values (1,“zs”,18),(2,“ls”,19),(3,“ww”,20),(4,“ml”,21),(5,“tq”,22),(6,“gb”,23),操作如下:
-- 更新 delete_tbl 表
update hadoop_prod.default.update_tbl set name = 'zhangsan' ,age = 30 where id <=3;
-- 查询数据
select * from hadoop_prod.default.update_tbl;
把id小于等于3的,name全部改成zhangshan,age全部改成30:

update操作和其它数据库一模一样,非常简单。
注意:UPDATE 更加专注于单一记录的修改,而 MERGE INTO 则是一个更全面的操作,可以同时处理多个数据状态的变化。因此一些复杂的操作直接用MERGE INTO,比如:
- 同步外部数据源:如果你有一个外部数据库系统,你可能希望定期将更改(包括插入、更新和删除)同步到你的数据湖中的表。MERGE INTO 可以用来比较两个表,并根据匹配条件执行更新,对于没有匹配记录的新数据则执行插入。
- 数据集成:当需要合并多个来源的数据到一个目标表中时,MERGE INTO 可以有效地处理这种情况。它可以检查数据是否已经存在,并决定是更新还是添加新的记录。
- 高效的数据处理:在处理大量数据时,MERGE INTO 可以减少数据处理的时间,因为它只需要一次操作就可以完成更新和插入。
参考文献
Spark Write
https://bbs.huaweicloud.com/blogs/364273
相关文章:
Iceberg与SparkSQL写操作整合
前言 spark操作iceberg之前先要配置spark catalogs,详情参考Iceberg与Spark整合环境配置。 有些操作需要在spark3中开启iceberg sql扩展。 Iceberg使用Apache Spark的DataSourceV2 API来实现数据源和catalog。Spark DSv2是一个不断发展的API,在Spark版…...
MYSQL1
一、为什么学习数据库 1、岗位技能需求 2、现在的世界,得数据者得天下 3、存储数据的方法 4、程序,网站中,大量数据如何长久保存? 5、数据库是几乎软件体系中最核心的一个存在。 二、数据库相关概念 (一)数据库DB 数据库是将大量数据保存起来,通过计算机加…...
一文解答Swin Transformer + 代码【详解】
文章目录 1、Swin Transformer的介绍1.1 Swin Transformer解决图像问题的挑战1.2 Swin Transformer解决图像问题的方法 2、Swin Transformer的具体过程2.1 Patch Partition 和 Linear Embedding2.2 W-MSA、SW-MSA2.3 Swin Transformer代码解析2.3.1 代码解释 2.4 W-MSA和SW-MSA…...
Vue3:<Teleport>传送门组件的使用和注意事项
你好,我是沐爸,欢迎点赞、收藏、评论和关注。 Vue3 引入了一个新的内置组件 <Teleport>,它允许你将子组件树渲染到 DOM 中的另一个位置,而不是在父组件的模板中直接渲染。这对于需要跳出当前组件的 DOM 层级结构进行渲染的…...
项目之家:又一家项目信息发布合作对接及一手接单平台
这几天“小三劝退师时薪700”的消息甚嚣尘上,只能说从某一侧面来看心理咨询师这个职业的前景还是可以的,有兴趣的朋友可以关注下。话说上一篇文章给大家介绍了U客直谈,今天趁热打铁再给大家分享一个地推拉新项目合作平台~项目之家:…...
02-java实习工作一个多月-经历分享
一、描述一下最近不写博客的原因 离我发java实习的工作的第一天的博客已经过去了一个多月了,本来还没入职的情况是打算每天工作都要写一份博客来记录一下的(最坏的情况也是每周至少总结一下的),其实这个第一天的博客都是在公司快…...
JVM 调优篇2 jvm的内存结构以及堆栈参数设置与查看
一 jvm的内存模型 2.1 jvm内存模型概览 二 实操案例 2.1 设置和查看栈大小 1.代码 /*** 演示栈中的异常:StackOverflowError** author shkstart* create 2020 下午 9:08** 设置栈的大小: -Xss (-XX:ThreadStackSize)** -XX:PrintFlagsFinal*/ public class S…...
微信可以设置自动回复吗?
在日常的微信聊天中,我们或许会频繁地遭遇客户提出的相同问题,尤其是对于从事销售工作的朋友们来说,客户在添加好友后的第一句话往往是“在吗”或者“你好”。当我们的好友数量众多时,手动逐个回复可能会耗费大量的时间。因此&…...
同样数据源走RTMP播放延迟低还是RTSP低?
背景 在比较同一个数据源,是RTMP播放延迟低还是RTSP延迟低之前,我们先看看RTMP和RTSP的区别,我们知道,RTMP(Real-Time Messaging Protocol)和RTSP(Real Time Streaming Protocol)是…...
@开发者极客们,网易2024低代码大赛来啦
极客们,网易云信拍了拍你 9月6日起,2024网易低代码大赛正式开启啦! 低代码大赛是由网易主办的权威赛事,鼓励开发者们用低代码开发的方式快速搭建应用,并最终以作品决出优胜。 从2022年11月起,网易低代码大赛…...
数据分析-16-时间序列分析的常用模型
1 什么是时间序列 时间序列是一组按时间顺序排列的数据点的集合,通常以固定的时间间隔进行观测。这些数据点可以是按小时、天、月甚至年进行采样的。时间序列在许多领域中都有广泛应用,例如金融、经济学、气象学和工程等。 时间序列的分析可以帮助我们理解和预测未来的趋势和…...
SpringMVC使用:类型转换数据格式化数据验证
01-类型转换器 先在pom.xml里面导入依赖,一个是mvc框架的依赖,一个是junit依赖 然后在web.xml里面导入以下配置(配置的详细说明和用法我在前面文章中有写到) 创建此测试类的方法用于测试springmvc是具备自动类型转换功能的 user属…...
多语言ASO – 本地化的10个技巧
ASO优化是一个复杂的领域,即使你只关注讲英语的用户。如果您想面向国际受众并在全球范围内发展您的应用程序业务,您必须在App Store和Google Play Store上本地化应用程序的产品页面。不过,应用程序商店本地化的过程也有很多陷阱。 应用商店本…...
C程序设计——函数0
函数定义 前面说过C语言是结构化的程序设计语言,他把所有问题抽象为数据和对数据的操作,前面讲的变量、常量,都是数据。现在开始讲对数据操作——函数。 C语言的函数,定义方式如下: 返回值类型 函数名(参数列表) {…...
第二十一章 rust与动静态库的结合使用
注意 本系列文章已升级、转移至我的自建站点中,本章原文为:rust与动静态库的结合使用 目录 注意一、前言二、库生成三、库使用四、总结一、前言 rust中多了很多类型的库,比如前面章节中我们提到基本的bin与lib这两种crate类型库。 如果你在命令行执行下列语句: rustc -…...
修改服务器DNS解析及修改自动对时时区
修改服务器DNS解析: 1、搜索一下当地的DNS服务器的地址 2、登录服务器,执行 vim /etc/resolv.conf文件,在nameserver字段后填写DNS服务的地址 3、chattr i /etc/resolv.conf 加上不可修改权限,防止重启DNS被修改 修改自动对时…...
中科院TOP“灌水神刊”合集!盘点那些“又牛又水”的国人友好SCI
【SciencePub学术】本期,小编给大家推荐几本“又牛又水”的期刊,并且都是清一色的国人友好刊,涵盖各领域,以供各位学者参考! NO.1 Nature Communications IF:14.7 分区:JCR1区中科院1区TOP 年…...
Python列表浅拷贝的陷阱与破解之道
引言 在Python编程世界中,列表的拷贝操作看似简单,却常常隐藏着一些令人意想不到的陷阱,尤其是当涉及到浅拷贝时。今天,我们将深入探讨Python列表浅拷贝现象及产生原因,并提供有效的解决方案,帮助你写出更…...
开放式系统互连(OSI)模型的实际意义
0 前言 开放式系统互连(OSI,Open Systems Interconnection)模型,由国际标准化组织(ISO)在1984年提出,目的是为了促进不同厂商生产的网络设备之间的互操作性。 定义了一种在层之间进行协议实现…...
回溯——10.全排列 II
力扣题目链接 给定一个可包含重复数字的序列 nums ,按任意顺序 返回所有不重复的全排列。 示例 1: 输入:nums [1,1,2]输出: [[1,1,2], [1,2,1], [2,1,1]] 解题思路: 排序:首先对数组进行排序…...
【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
R 语言科研绘图第 55 期 --- 网络图-聚类
在发表科研论文的过程中,科研绘图是必不可少的,一张好看的图形会是文章很大的加分项。 为了便于使用,本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中,获取方式: R 语言科研绘图模板 --- sciRplothttps://mp.…...
