Apache Hudi初探(三)(与flink的结合)--flink写hudi的操作(真正的写数据)
背景
在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到写hudi真实数据以及写hudi元数据,这篇文章来说一下具体的实现
写hudi真实数据
这里的操作就是在HoodieFlinkWriteClient.upsert方法:
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),instantTime, table, records.listIterator());HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(result, instantTime, table);}
- initTable
初始化HoodieFlinkTable - preWrite
在这里几乎没什么操作 - getOrCreateWriteHandle
创建一个写文件的handle(假如这里创建的是FlinkMergeAndReplaceHandle),这里会记录已有的文件路径,后续FlinkMergeHelper.runMerge会从这里读取数
注意该构造函数中的init方法,会创建一个ExternalSpillableMap类型的map来存储即将插入的记录,这在后续upsert中会用到 - HoodieFlinkTable.upsert
这里进行真正的upsert操作,会调用FlinkUpsertDeltaCommitActionExecutor.execute,最终会调用到BaseFlinkCommitActionExecutor.execute,从而调用到FlinkMergeHelper.newInstance().runMergepublic void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,..) {final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {readSchema = baseFileReader.getSchema();gWriter = new GenericDatumWriter<>(readSchema);gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());} else {gReader = null;gWriter = null;readSchema = mergeHandle.getWriterSchemaWithMetaFields();}wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),Option.of(new UpdateHandler(mergeHandle)), record -> {if (!externalSchemaTransformation) {return record;}return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);});wrapper.execute();。。。mergeHandle.close();}- externalSchemaTransformation=
这里有hoodie.avro.schema.external.transformation配置(默认是false)用来把在之前schame下的数据转换为新的schema下的数据 - wrapper.execute()
这里会最终调用到upsertHandle.write(record),也就是UpdateHandler.consumeOneRecord方法被调用的地方
如果keyToNewRecords报班了对应的记录,也就是说会有uodate的操作的话,就插入新的数据,public void write(GenericRecord oldRecord) {...if (keyToNewRecords.containsKey(key)) {if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {copyOldRecord = true;} else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {copyOldRecord = false;}writtenRecordKeys.add(key); }}
writeUpdateRecord 这里进行数据的更新,并用writtenRecordKeys记录插入的记录 - mergeHandle.close()
这里的writeIncomingRecords会判断如果writtenRecordKeys没有包含该记录的话,就直接插入数据,而不是更新public List<WriteStatus> close() {writeIncomingRecords();...}...protected void writeIncomingRecords() throws IOException {// write out any pending records (this can happen when inserts are turned into updates)Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();while (newRecordsItr.hasNext()) {HoodieRecord<T> hoodieRecord = newRecordsItr.next();if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {writeInsertRecord(hoodieRecord);}}}
- externalSchemaTransformation=
总结一下upsert的关键点:
mergeHandle.close()才是真正的写数据(insert)的时候,在初始化handle的时候会把记录传导writtenRecordKeys中(在HoodieMergeHandle中的init方法)mergeHandle的write() 方法会在写入数据的时候,如果发现有新的数据,则会写入新的数据(update)
写hudi元数据
这里的操作是StreamWriteOperatorCoordinator.notifyCheckpointComplete方法
public void notifyCheckpointComplete(long checkpointId) {...final boolean committed = commitInstant(this.instant, checkpointId);...
}...
private boolean commitInstant(String instant, long checkpointId){...doCommit(instant, writeResults);...
}...
private void doCommit(String instant, List<WriteStatus> writeResults) {// commit or rollbacklong totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);boolean hasErrors = totalErrorRecords > 0;if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {HashMap<String, String> checkpointCommitMetadata = new HashMap<>();if (hasErrors) {LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total="+ totalErrorRecords + "/" + totalRecords);}final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults): Collections.emptyMap();boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata),tableState.commitAction, partitionToReplacedFileIds);if (success) {reset();this.ckpMetadata.commitInstant(instant);LOG.info("Commit instant [{}] success!", instant);} else {throw new HoodieException(String.format("Commit instant [%s] failed!", instant));}} else {LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);LOG.error("The first 100 error messages");writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {LOG.error("Global error for partition path {} and fileID {}: {}",ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());if (ws.getErrors().size() > 0) {ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));}});// Rolls back instantwriteClient.rollback(instant);throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));}
}
主要在commitInstant涉及动的方法doCommit(instant, writeResults)
如果说没有错误发生的话,就继续下一步:
这里的提交过程和spark中一样,具体参考Apache Hudi初探(五)(与spark的结合)
其他
在flink和spark中新写入的文件是在哪里分配对一个的fieldId:
//Flink中
BucketAssignFunction 中processRecord getNewRecordLocation 分配新的 fieldId//Spark中
BaseSparkCommitActionExecutor 中execute方法 中 handleUpsertPartition 涉及到的UpsertPartitioner getBucketInfo方法
其中UpsertPartitioner构造函数中 assignInserts 方法涉及到分配新的 fieldId
相关文章:
Apache Hudi初探(三)(与flink的结合)--flink写hudi的操作(真正的写数据)
背景 在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到写hudi真实数据以及写hudi元数据,这篇文章来说一下具体的实现 写hudi真实数据 这里的操作就是在HoodieFlinkWriteClient.upsert方法: public …...
解释 Git 的基本概念和使用方式(InsCode AI 创作助手)
Git 是一种分布式版本控制系统,它允许多个用户协同工作并对项目进行版本控制。下面是 Git 的基本概念和使用方式: 基本概念: 仓库(Repository):存储代码和版本历史记录的地方。 提交(Commit&a…...
【QT】信号和槽(15)
前面的内容说了很多不同的控件如何使用,今天来看下QT的核心,信号与槽(Signals and slots)! 简单理解一下,就是我们的信号与槽连接上了之后,发射一个信号给到槽,槽函数接收到了这个信…...
EFLK日志平台(filebeat-->kafka-->logstash-->es-->kiabana)
ELK平台是一套完整的日志集中处理解决方案,将 ElasticSearch、Logstash 和 Kiabana 三个开源工具配合使用, 完成更强大的用户对日志的查询、排序、统计需求。 安装顺序 1.安装es 7.17.12 2.安装kibana 7.17.12 3.安装x-pack 保证以上调试成功后开始下面…...
C盘扩容遇到的问题(BitLocker解密、)
120G的C盘不知不觉的就满了,忍了好久终于要动手了。 尽管电脑-管理--磁盘管理里可以进行磁盘大小调整,但由于各盘都在用,不能够连续调整,所以选用DiskGenius。 # DiskGenius调整分区大小遇到“您选择的分区不支持无损调整容量” …...
ShardingSphere——柔性事务SEATA原理
摘要 Apache ShardingSphere集成了 SEATA 作为柔性事务的使用方案,本文主要介绍其实现ShardingSphere中柔性事务SEATA原理原理。帮助你更好的理解ShardingSphere原理。同时帮助大家更好的使用柔性事务SEATA原理。 一、Seata柔性事务 Apache ShardingSphere 集成了…...
Introducing GlobalPlatform(一篇了解GP)
安全之安全(security)博客目录导读 TEE之GP(Global Platform)认证汇总 目录 一、GP简介 二、GP新的重点领域是什么? 三、认证程序和培训<...
Ubuntu 18.04上无法播放MP4格式视频解决办法
ubuntu18.04系统无法播放MP4格式视频,提示如下图所示: 解决办法: 1、首先,确保ubuntu系统已完全更新。可使用以下命令更新软件包列表:sudo apt update,然后使用以下命令升级所有已安装的软件包:…...
科技驱动产业升级:浅谈制造型企业对MES系统的应用
在科技不断进步的背景下,制造型行业也在持续发展,但随之而来的挑战也不断增加。传统的管理方式已经无法满足企业的需求,因此许多制造型企业开始寻找新的管理模式。制造执行系统(MES)作为先进的制造信息技术之一&#x…...
智能化新十年,“全栈智能”定义行业“Copilot智能助手”
“智能化转型是未来十年中国企业穿越经济周期的利器”,这是联想集团执行副总裁兼中国区总裁刘军在去年联想创新科技大会上做出的判断,而2023年正值第四次工业革命第二个十年的开端,智能化是第四次工业革命的主题。2023年初,基于谷…...
Docker资源控制cgroups
文章目录 一、docker资源控制1、资源控制工具2、Cgroups四大功能 二、CPU 资源控制1、设置CPU使用率上限2、CPU压力测试3、Cgroups限制cpu使用率4、设置CPU资源占用比(设置多个容器时才有效)5、设置容器绑定指定的CPU 三、对内存使用的限制四、对磁盘IO配…...
通过python 获取当前局域网内存在的IP和MAC
通过python 获取当前局域网内存在的ip 通过ipconfig /all 命令获取局域网所在的网段 通过arp -d *命令清空当前所有的arp映射表 循环遍历当前网段所有可能的ip与其ping一遍建立arp映射表 for /L %i IN (1,1,254) DO ping -w 1 -n 1 192.168.3.%i 通过arp -a命令读取缓存的映射表…...
解决D盘的类型不是基本,而是动态的问题
一、正确的图片 1.1图片 1.2本人遇到的问题 二、将动态磁盘 转为基本盘 2.1 基本概念,动态无法转化为基本,不是双向的,借助软件 网址:转换动态磁盘到普通磁盘_检测到计算机本地磁盘为动态分区_卫水金波的博客-CSDN博客 2.2分区…...
如何判断自己的qt版本呢?
如何判断自己的qt版本呢? 前情提要很简单,按照如下图所示,即可查看当前打开的qtCreator的版本如何打开5.15.2版本的qtCreator呢?安装教程 前情提要 我的电脑已经安装了qt5.14.1,然后我又安装了qt5.15.2,我想尝试一下同一台电脑能否适应两个版本的qt? 当我安装完成qt5.15.2后…...
【文心一言大模型插件制作初体验】制作面试错题本大模型插件
文心一言插件开发初体验 效果图 注意:目前插件仅支持在本地运行,虽然只能自用,但仍然是一个不错的选择。(什么?你说没有用?这不可能!文心一言app可以支持语音,网页端结合手机端就可…...
ROS 2官方文档(基于humble版本)学习笔记(二)
ROS 2官方文档(基于humble版本)学习笔记(二) 理解节点(node)ros2 runros2 node list重映射(remap)ros2 node info 理解话题(topic)rqt_graphros2 topic listr…...
excel中公式结合实际的数据提取出公式计算的分支
要在Excel中使用公式结合实际数据提取分支信息,您可以使用一些文本函数和条件函数来实现这个目标。以下是一个示例,假设您有一个包含银行交易描述的列A,想要从中提取分支信息: 假设交易描述的格式是"分行名称-交易类型"…...
3D模型优化实战:LowPoly、纹理烘焙及格式转换
在快节奏的游戏和虚拟/增强现实 (VR/AR) 世界中,3D 模型的优化在提供引人入胜的体验方面发挥着关键作用。 这门学科不仅仅是创造令人着迷的图形结构; 这是视觉质量和游戏流畅性之间的平衡问题,确保细致而流畅的游戏环境。 通过低多边形建模等…...
genome comparison commend 2 MCMCtree
仅本人练习使用!!后续会逐渐修改!! mcmctree估算物种分歧时间 - 简书 https://www.cnblogs.com/bio-mary/p/12818888.html 估算系统树分歧时间 —— paml.mcmctree,r8s | 生信技工 http://www.chenlianfu.com/?p2948 4. 使用PAM…...
Linux安装JenkinsCLI
项目简介安装目录 mkdir -p /opt/jenkinscli && cd /opt/jenkinscli JenkinsCLI下载 wget http://<your-jenkins-server>/jnlpJars/jenkins-cli.jar # <your-jenkins-server> 替换为你的 Jenkins 服务器地址 JenkinsCLI授权 Dashboard-->Configure Glob…...
深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
【解密LSTM、GRU如何解决传统RNN梯度消失问题】
解密LSTM与GRU:如何让RNN变得更聪明? 在深度学习的世界里,循环神经网络(RNN)以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而,传统RNN存在的一个严重问题——梯度消失&#…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...
C/C++ 中附加包含目录、附加库目录与附加依赖项详解
在 C/C 编程的编译和链接过程中,附加包含目录、附加库目录和附加依赖项是三个至关重要的设置,它们相互配合,确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中,这些概念容易让人混淆,但深入理解它们的作用和联…...
基于IDIG-GAN的小样本电机轴承故障诊断
目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) 梯度归一化(Gradient Normalization) (2) 判别器梯度间隙正则化(Discriminator Gradient Gap Regularization) (3) 自注意力机制(Self-Attention) 3. 完整损失函数 二…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
DBLP数据库是什么?
DBLP(Digital Bibliography & Library Project)Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高,数据库文献更新速度很快,很好地反映了国际计算机科学学术研…...
CppCon 2015 学习:Time Programming Fundamentals
Civil Time 公历时间 特点: 共 6 个字段: Year(年)Month(月)Day(日)Hour(小时)Minute(分钟)Second(秒) 表示…...
