Apache Hudi初探(八)(与spark的结合)--非bulk_insert模式
背景
之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:
/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet
因为是"bulk insert"操作,所以没有去重的需要,所以直接采用spark原生的方式,
以下我们讨论非spark原生的方式,
闲说杂谈
继续Apache Hudi初探(二)(与spark的结合)
剩下的代码:
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBooleanval (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) =...case _ => { // any other operation// register classes & schemasval (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)sparkContext.getConf.registerKryoClasses(Array(classOf[org.apache.avro.generic.GenericData],classOf[org.apache.avro.Schema]))// TODO(HUDI-4472) revisit and simplify schema handlingval sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)val latestTableSchema = getLatestTableSchema(sqlContext.sparkSession, tableMetaClient).getOrElse(sourceSchema)val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBooleanvar internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)val writerSchema: Schema =if (reconcileSchema) {// In case we need to reconcile the schema and schema evolution is enabled,// we will force-apply schema evolution to the writer's schemaif (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))}if (internalSchemaOpt.isDefined) {...// Convert to RDD[HoodieRecord]val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,org.apache.hudi.common.util.Option.of(writerSchema))val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||operation.equals(WriteOperationType.UPSERT) ||parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBooleanval hoodieAllIncomingRecords = genericRecords.map(gr => {val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)val hoodieRecord = if (shouldCombine) {val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean).asInstanceOf[Comparable[_]]DataSourceUtils.createHoodieRecord(processedRecord,orderingVal,keyGenerator.getKey(gr),hoodieConfig.getString(PAYLOAD_CLASS_NAME))} else {DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))}hoodieRecord}).toJavaRDD()val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema// Create a HoodieWriteClient & issue the write.val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {asyncCompactionTriggerFn.get.apply(client)}if (isAsyncClusteringEnabled(client, parameters)) {asyncClusteringTriggerFn.get.apply(client)}val hoodieRecords =if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))} else {hoodieAllIncomingRecords}client.startCommitWithTime(instantTime, commitActionType)val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)(writeResult, client)}
-
如果开启了Schema Evolution,也就是hoodie.datasource.write.reconcile.schema是true,默认是false,就会进行schema的合并
convertStructTypeToAvroSchema 把df的schema转换成avro的schema
并且从*.hoodie/20230530073115535.deltacommit* 获取internalSchemaOpt,具体的合并就是把即将写入的schema和internalSchemaOpt进行合并
最后赋值给writerSchema,有可能还需要hoodie.schema.on.read.enable,默认是false -
HoodieSparkUtils.createRdd 创建RDD
把df转换为了RDD[GenericRecord]类型,赋值给genericRecords -
val hoodieAllIncomingRecords = genericRecords.map(gr => {
- 首先如果是hoodie.datasource.write.drop.partition.columns为true(默认是false),则会从schema中删除hoodie.datasource.write.
partitionpath.field字段 - 如果hoodie.datasource.write.insert.drop.duplicates为true(默认是false)或者hoodie.datasource.write.operation是upsert(默认
是upsert),或者hoodie.combine.before.insert为true(默认是false),
则会创建HoodieAvroRecord<>(hKey, payload)类型的实例,其中HoodieKey以recordkey和partitionpath组成,playload为OverwriteWithLatestAvroPayload实例, - hoodieAllIncomingRecords就变成了RDD[HoodieAvroRecord]
- 首先如果是hoodie.datasource.write.drop.partition.columns为true(默认是false),则会从schema中删除hoodie.datasource.write.
-
writerDataSchema= client 这些就是创建SparkRDDWriteClient 客户端
-
isAsyncCompactionEnabled
默认asyncCompactionTriggerFnDefined是没有的,所以不会开启异步的Compaction,isAsyncClusteringEnabled同理也是 -
val hoodieRecords =
如果配置了hoodie.datasource.write.insert.drop.duplicates为true(默认是false),则会进行去重处理,具体是调用DataSourceUtils.dropDuplicates方法:SparkRDDReadClient client = new SparkRDDReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig);return client.tagLocation(incomingHoodieRecords).filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());- SparkRDDReadClient client 在创建Client的时候,会进行索引的创建this.index = SparkHoodieIndexFactory.createIndex(clientConfig);
如果有hoodie.index.class设置,则实例化对象,否则根据hoodie.index.type的值来建立索引(默认是HoodieSimpleIndex,适合做测试用) - client.tagLocation(incomingHoodieRecords)…
从要插入的记录中过滤出在index中不存在的记录,最终调用的是index.tagLocation方法
如果hoodie.datasource.write.insert.drop.duplicates为false,则保留所有的记录
- SparkRDDReadClient client 在创建Client的时候,会进行索引的创建this.index = SparkHoodieIndexFactory.createIndex(clientConfig);
-
client.startCommitWithTime 开始写操作,这涉及到回滚的操作
- 会先过滤出需要回滚的的的写失败的文件,如果hoodie.cleaner.policy.failed.writes是EAGER(默认是EAGER),就会在这次提交中回滚失败的文件
- 然后创建一个后缀为deltacommit.requested的文件,此时没有真正的写
-
val writeResult = DataSourceUtils.doWriteOperation
真正的写操作
相关文章:
Apache Hudi初探(八)(与spark的结合)--非bulk_insert模式
背景 之前讨论的都是’hoodie.datasource.write.operation’:bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件: /dt1/.hoodie_partition_metadata /dt1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_202305282…...
Java之旅(九)
Java 循环语句 Java 中的循环语句包括 for、while 和 do-while,它们都可以用于实现循环结构。 for 语句用于循环执行一段代码块,直到给定的条件表达式的布尔值为 false。 for 语句的一般格式如下: for (initialization; condition; update…...
6年测试经验之谈,为什么要做自动化测试?
一、自动化测试 自动化测试是把以人为驱动的测试行为转化为机器执行的一种过程。 个人认为,只要能服务于测试工作,能够帮助我们提升工作效率的,不管是所谓的自动化工具,还是简单的SQL 脚本、批处理脚本,还是自己编写…...
二分法的边界条件 2517. 礼盒的最大甜蜜度
2517. 礼盒的最大甜蜜度 给你一个正整数数组 price ,其中 price[i] 表示第 i 类糖果的价格,另给你一个正整数 k 。 商店组合 k 类 不同 糖果打包成礼盒出售。礼盒的 甜蜜度 是礼盒中任意两种糖果 价格 绝对差的最小值。 返回礼盒的 最大 甜蜜度。 记录一…...
java设计模式(十六)命令模式
目录 定义模式结构角色职责代码实现适用场景优缺点 定义 命令模式(Command Pattern) 又叫动作模式或事务模式。指的是将一个请求封装成一个对象,使发出请求的责任和执行请求的责任分割开,然后可以使用不同的请求把客户端参数化&a…...
[运维] iptables限制指定ip访问指定端口和只允许指定ip访问指定端口
iptables限制指定ip访问指定端口 要使用iptables限制特定IP地址访问特定端口,您可以使用以下命令: iptables -A INPUT -p tcp -s <IP地址> --dport <端口号> -j DROP请将 <IP地址> 替换为要限制的IP地址,将 <端口号&g…...
JS学习笔记(3. 流程控制)
1. 分歧 1.1 if条件 if (条件) {...} // 为真则执行,单条语句可省略大括号 if (条件) {...} else {...}// 为真则执行if,否则执行else if (条件1) {...} else if (条件2) {...} else {...} // 条件1为真则,条件2为真则,否则执…...
遥感云大数据在灾害、水体与湿地领域典型案例及GPT模型教程
详情点击链接:遥感云大数据在灾害、水体与湿地领域典型案例及GPT模型教程 一:平台及基础开发平台 GEE平台及典型应用案例; GEE开发环境及常用数据资源; ChatGPT、文心一言等GPT模型 JavaScript基础; GEE遥感云重…...
什么是文件描述符以及重定向的本质和软硬链接(Linux)
目录 1 什么是文件?什么是文件操作?认识系统接口open 什么是文件描述符认识Linux底层进程如何打开的文件映射关系重定向的本质理解软硬链接扩展问题 1 什么是文件?什么是文件操作? 文件 文件内容 文件属性(文件属性…...
LVM逻辑卷元数据丢失恢复案例 —— 筑梦之路
Lvm常见的故障主要是pv出现异常,有以下几种情况 一个是pv所在的磁盘发生了lvm的元数据损坏一个是系统无法识别到pv所在的磁盘一个是系统异常,断电等导致重启后盘符发生变化,也就是系统识别的磁盘uuid发生变化,但是wwid还是可以对应…...
Java技术规范概览
Java技术规范 目录概述需求: 设计思路实现思路分析1.Java JSR的部分2.JSR-000373.JSR-0000394.JSR-000337 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a bet…...
【OpenMMLab AI实战营第二期】二十分钟入门OpenMMLab笔记
OpenMMlab 主页:openmmlab.com 开源地址:https://github.com/open-mmlab 学习视频地址:https://www.bilibili.com/video/BV1js4y1i72P/ 概述 开源成为人工智能行业发展引擎 时间轴 theano:2007 Caffe:2013 Ten…...
docker-compose单机容器集群编排
docker-compose dockerfile模板文件可以定义一个独立的应用容器,如果需要多个容器就需要服务编排。服务编排有很多技术方案 docker-compose开源的项目实现对容器集群的快速编排 docker-compose将所管理的容器分为三层,分别为工程,服务&#…...
CentOS7 安装Gitlab
1、安装依赖 sudo yum install -y curl openssh-server ca-certificates tzdata perl libsemanage-devel 2、安装邮件服务工具 sudo yum install -y postfix 3、配置GitLab 软件源镜像 curl -fsSL https://packages.gitlab.cn/repository/raw/scripts/setup.sh | /bin/bash …...
Mysql InnoDB的Buffer Pool
Buffer Pool 在MySQL服务器启动的时候就向操作系统申请了⼀⽚连续的内存,他们给这⽚内存起了个名,叫做Buffer Pool(中⽂名 是缓冲池)。 默认情况下Buffer Pool只有128M⼤⼩,最⼩值为5M,通过修改配置文件设…...
SMTP简单邮件传输协议(C/C++ 发送电子邮件)
SMTP是用于通过Internet发送电子邮件的协议。电子邮件客户端(如Microsoft Outlook或macOS Mail应用程序)使用SMTP连接到邮件服务器并发送电子邮件。邮件服务器还使用SMTP将邮件从一个邮件服务器交换到另一个。它不用于从服务器下载电子邮件;相…...
uploads靶场通关(1-11关)
Pass-01(JS校验) 看题目我们准备好我们的php脚本文件,命名为1.php 上传该php文件,发现上传失败 方法一:将浏览器的JavaScript禁用 然后就能上传了 方法二: 查看源码,发现只能上传以下形式的文…...
6.1黄金探底回升是否到顶,今日多空如何布局
近期有哪些消息面影响黄金走势?今日黄金多空该如何研判? 黄金消息面解析:周三(5月31日)黄金期货价格攀升,美国国债收益率下降推动金价升至一周最高收盘位。美市尾盘,现货黄金收报1962.42美元/盎司,上升3…...
自定义ViewGroup实现流式布局
目录 1、View的绘制流程 2、自定义ViewGroup构造函数的作用 3、onMeasure 方法 3.1、View的度量方式 3.2、onMeasure方法参数的介绍 3.3、自定义ViewGroup onMeasure 方法的实现 4、onLayout方法 5、onDraw方法 6、自定义View的生命周期 7、自定义流式布局的实现 扩展ÿ…...
Git版本控制
目录 版本控制 概念 为什么需要版本控制? 常见的版本控制工具 Git 1、安装 2、了解基本的Linux命令 3、配置git 用户名和邮箱 4、git 工作模式 5、git 项目管理 6、git 分支 托管平台 远程仓库 Gitee 关联多个远程库 Git服务器 Git GUI 版本控制 概…...
Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...
NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...
接口自动化测试:HttpRunner基础
相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具,支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议,涵盖接口测试、性能测试、数字体验监测等测试类型…...
沙箱虚拟化技术虚拟机容器之间的关系详解
问题 沙箱、虚拟化、容器三者分开一一介绍的话我知道他们各自都是什么东西,但是如果把三者放在一起,它们之间到底什么关系?又有什么联系呢?我不是很明白!!! 就比如说: 沙箱&#…...
Neko虚拟浏览器远程协作方案:Docker+内网穿透技术部署实践
前言:本文将向开发者介绍一款创新性协作工具——Neko虚拟浏览器。在数字化协作场景中,跨地域的团队常需面对实时共享屏幕、协同编辑文档等需求。通过本指南,你将掌握在Ubuntu系统中使用容器化技术部署该工具的具体方案,并结合内网…...
