Apache Hudi初探(八)(与spark的结合)--非bulk_insert模式
背景
之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:
/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet
因为是"bulk insert"操作,所以没有去重的需要,所以直接采用spark原生的方式,
以下我们讨论非spark原生的方式,
闲说杂谈
继续Apache Hudi初探(二)(与spark的结合)
剩下的代码:
val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBooleanval (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) =...case _ => { // any other operation// register classes & schemasval (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)sparkContext.getConf.registerKryoClasses(Array(classOf[org.apache.avro.generic.GenericData],classOf[org.apache.avro.Schema]))// TODO(HUDI-4472) revisit and simplify schema handlingval sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)val latestTableSchema = getLatestTableSchema(sqlContext.sparkSession, tableMetaClient).getOrElse(sourceSchema)val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBooleanvar internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)val writerSchema: Schema =if (reconcileSchema) {// In case we need to reconcile the schema and schema evolution is enabled,// we will force-apply schema evolution to the writer's schemaif (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))}if (internalSchemaOpt.isDefined) {...// Convert to RDD[HoodieRecord]val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,org.apache.hudi.common.util.Option.of(writerSchema))val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||operation.equals(WriteOperationType.UPSERT) ||parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBooleanval hoodieAllIncomingRecords = genericRecords.map(gr => {val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)val hoodieRecord = if (shouldCombine) {val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean).asInstanceOf[Comparable[_]]DataSourceUtils.createHoodieRecord(processedRecord,orderingVal,keyGenerator.getKey(gr),hoodieConfig.getString(PAYLOAD_CLASS_NAME))} else {DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))}hoodieRecord}).toJavaRDD()val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema// Create a HoodieWriteClient & issue the write.val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {asyncCompactionTriggerFn.get.apply(client)}if (isAsyncClusteringEnabled(client, parameters)) {asyncClusteringTriggerFn.get.apply(client)}val hoodieRecords =if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))} else {hoodieAllIncomingRecords}client.startCommitWithTime(instantTime, commitActionType)val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)(writeResult, client)}
-
如果开启了Schema Evolution,也就是hoodie.datasource.write.reconcile.schema是true,默认是false,就会进行schema的合并
convertStructTypeToAvroSchema 把df的schema转换成avro的schema
并且从*.hoodie/20230530073115535.deltacommit* 获取internalSchemaOpt,具体的合并就是把即将写入的schema和internalSchemaOpt进行合并
最后赋值给writerSchema,有可能还需要hoodie.schema.on.read.enable,默认是false -
HoodieSparkUtils.createRdd 创建RDD
把df转换为了RDD[GenericRecord]类型,赋值给genericRecords -
val hoodieAllIncomingRecords = genericRecords.map(gr => {
- 首先如果是hoodie.datasource.write.drop.partition.columns为true(默认是false),则会从schema中删除hoodie.datasource.write.
partitionpath.field字段 - 如果hoodie.datasource.write.insert.drop.duplicates为true(默认是false)或者hoodie.datasource.write.operation是upsert(默认
是upsert),或者hoodie.combine.before.insert为true(默认是false),
则会创建HoodieAvroRecord<>(hKey, payload)类型的实例,其中HoodieKey以recordkey和partitionpath组成,playload为OverwriteWithLatestAvroPayload实例, - hoodieAllIncomingRecords就变成了RDD[HoodieAvroRecord]
- 首先如果是hoodie.datasource.write.drop.partition.columns为true(默认是false),则会从schema中删除hoodie.datasource.write.
-
writerDataSchema= client 这些就是创建SparkRDDWriteClient 客户端
-
isAsyncCompactionEnabled
默认asyncCompactionTriggerFnDefined是没有的,所以不会开启异步的Compaction,isAsyncClusteringEnabled同理也是 -
val hoodieRecords =
如果配置了hoodie.datasource.write.insert.drop.duplicates为true(默认是false),则会进行去重处理,具体是调用DataSourceUtils.dropDuplicates方法:SparkRDDReadClient client = new SparkRDDReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig);return client.tagLocation(incomingHoodieRecords).filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());- SparkRDDReadClient client 在创建Client的时候,会进行索引的创建this.index = SparkHoodieIndexFactory.createIndex(clientConfig);
如果有hoodie.index.class设置,则实例化对象,否则根据hoodie.index.type的值来建立索引(默认是HoodieSimpleIndex,适合做测试用) - client.tagLocation(incomingHoodieRecords)…
从要插入的记录中过滤出在index中不存在的记录,最终调用的是index.tagLocation方法
如果hoodie.datasource.write.insert.drop.duplicates为false,则保留所有的记录
- SparkRDDReadClient client 在创建Client的时候,会进行索引的创建this.index = SparkHoodieIndexFactory.createIndex(clientConfig);
-
client.startCommitWithTime 开始写操作,这涉及到回滚的操作
- 会先过滤出需要回滚的的的写失败的文件,如果hoodie.cleaner.policy.failed.writes是EAGER(默认是EAGER),就会在这次提交中回滚失败的文件
- 然后创建一个后缀为deltacommit.requested的文件,此时没有真正的写
-
val writeResult = DataSourceUtils.doWriteOperation
真正的写操作
相关文章:
Apache Hudi初探(八)(与spark的结合)--非bulk_insert模式
背景 之前讨论的都是’hoodie.datasource.write.operation’:bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件: /dt1/.hoodie_partition_metadata /dt1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_202305282…...
Java之旅(九)
Java 循环语句 Java 中的循环语句包括 for、while 和 do-while,它们都可以用于实现循环结构。 for 语句用于循环执行一段代码块,直到给定的条件表达式的布尔值为 false。 for 语句的一般格式如下: for (initialization; condition; update…...
6年测试经验之谈,为什么要做自动化测试?
一、自动化测试 自动化测试是把以人为驱动的测试行为转化为机器执行的一种过程。 个人认为,只要能服务于测试工作,能够帮助我们提升工作效率的,不管是所谓的自动化工具,还是简单的SQL 脚本、批处理脚本,还是自己编写…...
二分法的边界条件 2517. 礼盒的最大甜蜜度
2517. 礼盒的最大甜蜜度 给你一个正整数数组 price ,其中 price[i] 表示第 i 类糖果的价格,另给你一个正整数 k 。 商店组合 k 类 不同 糖果打包成礼盒出售。礼盒的 甜蜜度 是礼盒中任意两种糖果 价格 绝对差的最小值。 返回礼盒的 最大 甜蜜度。 记录一…...
java设计模式(十六)命令模式
目录 定义模式结构角色职责代码实现适用场景优缺点 定义 命令模式(Command Pattern) 又叫动作模式或事务模式。指的是将一个请求封装成一个对象,使发出请求的责任和执行请求的责任分割开,然后可以使用不同的请求把客户端参数化&a…...
[运维] iptables限制指定ip访问指定端口和只允许指定ip访问指定端口
iptables限制指定ip访问指定端口 要使用iptables限制特定IP地址访问特定端口,您可以使用以下命令: iptables -A INPUT -p tcp -s <IP地址> --dport <端口号> -j DROP请将 <IP地址> 替换为要限制的IP地址,将 <端口号&g…...
JS学习笔记(3. 流程控制)
1. 分歧 1.1 if条件 if (条件) {...} // 为真则执行,单条语句可省略大括号 if (条件) {...} else {...}// 为真则执行if,否则执行else if (条件1) {...} else if (条件2) {...} else {...} // 条件1为真则,条件2为真则,否则执…...
遥感云大数据在灾害、水体与湿地领域典型案例及GPT模型教程
详情点击链接:遥感云大数据在灾害、水体与湿地领域典型案例及GPT模型教程 一:平台及基础开发平台 GEE平台及典型应用案例; GEE开发环境及常用数据资源; ChatGPT、文心一言等GPT模型 JavaScript基础; GEE遥感云重…...
什么是文件描述符以及重定向的本质和软硬链接(Linux)
目录 1 什么是文件?什么是文件操作?认识系统接口open 什么是文件描述符认识Linux底层进程如何打开的文件映射关系重定向的本质理解软硬链接扩展问题 1 什么是文件?什么是文件操作? 文件 文件内容 文件属性(文件属性…...
LVM逻辑卷元数据丢失恢复案例 —— 筑梦之路
Lvm常见的故障主要是pv出现异常,有以下几种情况 一个是pv所在的磁盘发生了lvm的元数据损坏一个是系统无法识别到pv所在的磁盘一个是系统异常,断电等导致重启后盘符发生变化,也就是系统识别的磁盘uuid发生变化,但是wwid还是可以对应…...
Java技术规范概览
Java技术规范 目录概述需求: 设计思路实现思路分析1.Java JSR的部分2.JSR-000373.JSR-0000394.JSR-000337 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a bet…...
【OpenMMLab AI实战营第二期】二十分钟入门OpenMMLab笔记
OpenMMlab 主页:openmmlab.com 开源地址:https://github.com/open-mmlab 学习视频地址:https://www.bilibili.com/video/BV1js4y1i72P/ 概述 开源成为人工智能行业发展引擎 时间轴 theano:2007 Caffe:2013 Ten…...
docker-compose单机容器集群编排
docker-compose dockerfile模板文件可以定义一个独立的应用容器,如果需要多个容器就需要服务编排。服务编排有很多技术方案 docker-compose开源的项目实现对容器集群的快速编排 docker-compose将所管理的容器分为三层,分别为工程,服务&#…...
CentOS7 安装Gitlab
1、安装依赖 sudo yum install -y curl openssh-server ca-certificates tzdata perl libsemanage-devel 2、安装邮件服务工具 sudo yum install -y postfix 3、配置GitLab 软件源镜像 curl -fsSL https://packages.gitlab.cn/repository/raw/scripts/setup.sh | /bin/bash …...
Mysql InnoDB的Buffer Pool
Buffer Pool 在MySQL服务器启动的时候就向操作系统申请了⼀⽚连续的内存,他们给这⽚内存起了个名,叫做Buffer Pool(中⽂名 是缓冲池)。 默认情况下Buffer Pool只有128M⼤⼩,最⼩值为5M,通过修改配置文件设…...
SMTP简单邮件传输协议(C/C++ 发送电子邮件)
SMTP是用于通过Internet发送电子邮件的协议。电子邮件客户端(如Microsoft Outlook或macOS Mail应用程序)使用SMTP连接到邮件服务器并发送电子邮件。邮件服务器还使用SMTP将邮件从一个邮件服务器交换到另一个。它不用于从服务器下载电子邮件;相…...
uploads靶场通关(1-11关)
Pass-01(JS校验) 看题目我们准备好我们的php脚本文件,命名为1.php 上传该php文件,发现上传失败 方法一:将浏览器的JavaScript禁用 然后就能上传了 方法二: 查看源码,发现只能上传以下形式的文…...
6.1黄金探底回升是否到顶,今日多空如何布局
近期有哪些消息面影响黄金走势?今日黄金多空该如何研判? 黄金消息面解析:周三(5月31日)黄金期货价格攀升,美国国债收益率下降推动金价升至一周最高收盘位。美市尾盘,现货黄金收报1962.42美元/盎司,上升3…...
自定义ViewGroup实现流式布局
目录 1、View的绘制流程 2、自定义ViewGroup构造函数的作用 3、onMeasure 方法 3.1、View的度量方式 3.2、onMeasure方法参数的介绍 3.3、自定义ViewGroup onMeasure 方法的实现 4、onLayout方法 5、onDraw方法 6、自定义View的生命周期 7、自定义流式布局的实现 扩展ÿ…...
Git版本控制
目录 版本控制 概念 为什么需要版本控制? 常见的版本控制工具 Git 1、安装 2、了解基本的Linux命令 3、配置git 用户名和邮箱 4、git 工作模式 5、git 项目管理 6、git 分支 托管平台 远程仓库 Gitee 关联多个远程库 Git服务器 Git GUI 版本控制 概…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
椭圆曲线密码学(ECC)
一、ECC算法概述 椭圆曲线密码学(Elliptic Curve Cryptography)是基于椭圆曲线数学理论的公钥密码系统,由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA,ECC在相同安全强度下密钥更短(256位ECC ≈ 3072位RSA…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
