当前位置: 首页 > news >正文

Flink创建Hudi的Sink动态表

工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 。

createDynamicTableSink

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),"Option [path] should not be empty.");setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();sanityCheck(conf, schema);setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);setupSortOptions(conf, context.getConfiguration());return new HoodieTableSink(conf, schema);}

createDynamicTableSource

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->new ValidationException("Option [path] should not be empty.")));setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);return new HoodieTableSource(schema,path,context.getCatalogTable().getPartitionKeys(),conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),conf);}

创建Sink表过程

1、检查是否设置了 path 选项(checkArgument),没有的话抛异常“Option [path] should not be empty.”。

2、做兼容性设置(setupTableOptions):

2.1、如果设置了 hoodie.table.recordkey.fields,但没有设置 hoodie.datasource.write.recordkey.field,则将 hoodie.datasource.write.recordkey.field 的值设置为 hoodie.table.recordkey.fields 的值;

2.2、如果设置了 hoodie.table.precombine.field,但没有设置 precombine.field,则将 precombine.field 的值设置为 hoodie.table.precombine.field 的值;

2.3、如果设置了 hoodie.datasource.write.hive_style_partitioning,但没有设置 hoodie.datasource.write.hive_style_partitioning,则将 hoodie.datasource.write.hive_style_partitioning 的值设置为 hoodie.datasource.write.hive_style_partitioning 的值。

3、必要选项检查:

3.1、检查表的类型(checkTableType),如果 table.type 的值为空,则不做处理,否则必须为 COPY_ON_WRITE 或者 MERGE_ON_READ,不然抛异常Invalid table type: TABLETYPE . Table type should be either MERGE_ON_READ or COPY_ON_WRITE.“;

3.2、如果为非 Append 模式,则检查是否设置了 hoodie.datasource.write.recordkey.field 和 precombine.field。

4、依次设置:

4.1、表名(hoodie.table.name);

4.2、主键(hoodie.datasource.write.recordkey.field);

4.3、分区(hoodie.datasource.write.partitionpath.field);

4.4、如果是 index 类型为 BUCKET,则设置桶(bucket)的键 hoodie.bucket.index.hash.field;

  4.4.1、如果还没有设置 hoodie.bucket.index.hash.field,则使用 hoodie.datasource.write.recordkey.field 的值作为 hoodie.bucket.index.hash.field 的值;4.4.2、否则进一步检查 hoodie.bucket.index.hash.field 的值是否为 hoodie.datasource.write.recordkey.field 值的子集。假设 hoodie.datasource.write.recordkey.field 值为“ds,dh”,则 hoodie.bucket.index.hash.field 值可以为“ds”、“dh”或“ds,dh”。4.5、设置压缩选项:4.5.1、设置 archive.min_commits,4.5.1、设置 archive.max_commits。4.6、设置Hive选项:4.6.1、如果没有设置 hive_sync.db,则设置 hive_sync.db;4.6.2、如果没有设置 hive_sync.table,则设置 hive_sync.table。4.7、设置read选项,如果不是增量查询则什么也不做;否则设置 hoodie.datasource.query.type 值为 incremental 。4.8、设置write选项:如果 write.operation 为默认值且为 cow 表,则实则 write.precombine 为 true 。4.9、如果没有设置 source.avro-schema.path 和 source.avro-schema,则设置 source.avro-schema 。

5、设置排序选项(flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java):

5.1、设置 Flink 的 table.exec.sort.max-num-file-handles

5.2、设置 Flink 的 table.exec.spill-compression.enabled

5.3、设置 Flink 的 table.exec.spill-compression.block-size

5.4、设置 Flink 的 table.exec.sort.async-merge-enabled

Append 模式

write.operation 值为 insert,并且为 mor 表;或则为 cow 表,但是 write.insert.cluster 值为 false。

  • write.insert.cluster

该选项用于控制是否在写入时合并小文件,仅对 cow 类型表有效,默认为 false。如果设置为 true,则每次写入前先合并小文件,这会降低写吞吐量,但可提高读性能。

相关文章:

Flink创建Hudi的Sink动态表

工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieT…...

人脸识别技术的安全性及其应用探讨

随着科技的不断发展,人脸识别技术已经成为了一个热门话题。人脸识别系统的出现,给人们的生活带来了极大的便利,同时也为一些犯罪分子提供了方便。因此,人脸识别技术的安全性和可靠性一直备受关注。 一、人脸识别技术的原理 人脸识…...

老域名查询工具- 在线域名批量查询工具

域名批量查询工具 域名批量查询工具是一种帮助用户快速查询多个域名信息的工具,通常能够自动扫描一组域名的WHOIS信息、DNS、IP地址、服务器等各种信息,并提供快速的结果反馈。 以下是域名批量查询工具主要的优点: 提高工作效率&#xff1a…...

JimuReport - 积木报表(一款免费Web报表工具)

一款免费的数据可视化报表,含报表和大屏设计,像搭建积木一样在线设计报表!功能涵盖,数据报表、打印设计、图表报表、大屏设计等! Web 版报表设计器,类似于excel操作风格,通过拖拽完成报表设计。…...

01-数据操作+数据预处理

1.n维数组,也称为张量(tensor):tensor和ndarray没有本质区别。tensor是有数学上的严格定义,ndarray是计算机描述的;张量表示一个由数值组成的数组,这个数组可能有多个维度; 无论使用…...

macOS本地python环境/vscode/导入python包/设置python解释器

查看macbook本地是否有python环境 输入python或者python3,退出python环境使用exit(),别忘了括号 没有的话去官网安装https://www.python.org/ 2. 安装vscode 官网https://code.visualstudio.com/ 3. 安装插件 点击左边的“插件”按钮,安装…...

【转存】Go语言设计模式

导语| 设计模式是针对软件设计中常见问题的工具箱,其中的工具就是各种经过实践验证的解决方案。即使你从未遇到过这些问题,了解模式仍然非常有用,因为它能指导你如何使用面向对象的设计原则来解决各种问题,提高开发效率&#xff0…...

第十一章 升级与定制

第十一章 升级与定制 一、 RPM 包安装操作 RPM(Redhat Packet Manager)。 ①安装 rpm –i rpm 文件名 (注:⑴常见用法:-ivh 参数显示安装过程和 hash 符#; ⑵覆盖安装:使用- -force 选项。…...

代码随想录算法训练营第二十二天|235. 二叉搜索树的最近公共祖先、701.二叉搜索树中的插入操作、450.删除二叉搜索树中的节点

目录 235. 二叉搜索树的最近公共祖先 1、递归实现 2、迭代法实现 701.二叉搜索树中的插入操作(递归实现) 450.删除二叉搜索树中的节点(递归实现) 235. 二叉搜索树的最近公共祖先 相对于 二叉树的最近公共祖先 本题就简单一些了…...

hbase表出现RIT删除方案

1.删除zookeeper中对应表注册信息 cd /opt/cloudera/parcels/CDH/lib/zookeeper/bin ./zkCli.sh -server node2:2181 --node2为仿真节点,生产需改 deleteall /hbase/table/表名 2.删除hdfs对应表数据 hadoop dfs -rm -r /hbase/data/default/表名 3.删除hbase:met…...

SQL学习(3)

SELECT 语句用于从表中选取数据。 SELECT 列名称 FROM 表名称 SELECT * FROM 表名称关键词 DISTINCT 用于返回唯一不同的值 SELECT DISTINCT 列名称 FROM 表名称WHERE 子句用于规定选择的标准 如需有条件地从表中选取数据,可将 WHERE 子句添加到 SELECT 语句。 S…...

连接型CRM助力医疗企业把“成本中心”变成“利润中心”

在市场竞争日益加剧的情形下,企业获客成本大幅上涨,存量客户的维护和开发开始被重视,售后服务部门的职责在企业中发挥的价值越来越大。因为企业售后服务不仅能帮助客户解决问题的部门,还是客户与企业沟通的桥梁,将客户…...

《Vue.js 设计与实现》—— 03 Vue.js 3 的设计思路

1. 声明式地描述 UI Vue.js 3 是一个声明式的 UI 框架,即用户在使用 Vue.js 3 开发页面时是声明式地描述 UI 的。 编写前端页面涉及的内容如下: DOM 元素:例如是 div 标签还是 a 标签属性:如 a 标签的 href 属性,再…...

2023年湖北省建设厅特种作业操作证报名条件是什么?

建筑施工特种作业人员是指在房屋建筑和市政工程施工活动中,从事可能对本人、他人及周围设备设施的安全造成重大危害作业的人员。建筑施工特种作业人员必须经建设主管部门考核合格,取得建筑施工特种作业人员操作资格证书(以下简称“资格证书”…...

Redis 进阶

🥲 🥸 🤌 🫀 🫁 🥷 🐻‍❄️🦤 🪶 🦭 🪲 🪳 🪰 🪱 🪴 🫐 🫒 🫑…...

伙伴匹配系统笔记---02

Java 8特性 1. stream / parallelStream 流失处理 2. Optional 可选类 一. 前端整合路由 1. 路由:vue 路由组件库地址:安装 | Vue Router (vuejs.org) 安装:yarn add vue-router@4 2. 整合路由: // 1. 定义路由组件. // 也可以从其他文件导入 const Home = { templ…...

Redis学习——单机版安装

目录 1.解压 2.安装gcc 3.执行make命令 4.复制redis的配置文件到默认安装目录下 5.修改redis.conf文件 6.启动redis服务与客户端 7.查看redis进行是否启动 8.关闭redis服务 9.redis性能测试 注意:安装redis前要安装jdk。 1.解压 [rootlxm148 install]# t…...

第三十一章 React中路由组件和一般组件

在React中,组件是应用程序的构建块。它们是可重用的,可以用于创建复杂的UI。React中有两种类型的组件:路由组件和一般组件。 一般组件 一般组件是React应用程序的基本构建块。它们是可重用的,可以用于创建复杂的UI。它们不知道U…...

怎么把pdf中的某一页分出来?

怎么把pdf中的某一页分出来?PDF格式的文档在日常生活中是非常常见的,相信大家都对其有所了解,并且经常使用。它的主要特点是不允许用户随意编辑其中的内容,当我们仅需要阅读时,PDF文档无疑是十分方便的,尤其…...

MongoDB 聚合操作Map-Reduce

这此之前已经对MongoDB中的一些聚合操作进行了详细的介绍,主要介绍了聚合方法和聚合管道;如果您想对聚合方法和聚合管道进行了解,可以参考: MongoDB 数据库操作汇总https://blog.csdn.net/m1729339749/article/details/130086022…...

如何免越狱定制iPhone界面:Cowabunga Lite完整使用指南

如何免越狱定制iPhone界面:Cowabunga Lite完整使用指南 【免费下载链接】CowabungaLite iOS 15 Customization Toolbox 项目地址: https://gitcode.com/gh_mirrors/co/CowabungaLite Cowabunga Lite是一款专为iOS 15设备设计的系统定制工具,让普通…...

QMCFLAC2MP3深度解析:从格式解密到跨设备音频转换的全流程实践

QMCFLAC2MP3深度解析:从格式解密到跨设备音频转换的全流程实践 【免费下载链接】qmcflac2mp3 直接将qmcflac文件转换成mp3文件,突破QQ音乐的格式限制 项目地址: https://gitcode.com/gh_mirrors/qm/qmcflac2mp3 问题引入:破解音乐格式…...

【Mojo+Python企业级混合编程实战指南】:20年架构师亲授3大高频场景落地方法论

第一章:Mojo与Python混合编程的企业级价值全景图Mojo 是一种专为 AI 原生系统设计的现代系统编程语言,兼具 Python 的表达力与 C/Rust 级别的性能。在企业级 AI 工程实践中,Mojo 并非旨在替代 Python,而是以“无缝互操作”为核心理…...

Android购物商城APP实战:从零到一构建核心功能模块

1. 项目功能模块拆解与实现路径 一个完整的购物商城APP通常包含四大核心模块:用户系统、商品展示、购物车管理和订单处理。这就像搭建一个实体商店,需要先规划好门面(登录注册)、货架(商品展示)、购物篮&am…...

高效漫画收藏解决方案:打造你的离线数字漫画库

高效漫画收藏解决方案:打造你的离线数字漫画库 【免费下载链接】picacomic-downloader 哔咔漫画 picacomic pica漫画 bika漫画 PicACG 多线程下载器,带图形界面 带收藏夹,已打包exe 下载速度飞快 项目地址: https://gitcode.com/gh_mirrors…...

5个实战技巧让Continue插件成为你的JetBrains AI编程搭档

5个实战技巧让Continue插件成为你的JetBrains AI编程搭档 【免费下载链接】continue ⏩ Source-controlled AI checks, enforceable in CI. Powered by the open-source Continue CLI 项目地址: https://gitcode.com/GitHub_Trending/co/continue 在当今AI驱动的开发时代…...

多进程和多线程的特点和区别

小编觉得,多进程和多线程的差异主要体现在以下三个方面: 1. 资源隔离 多线程属于同一进程,共享进程的堆内存和全局变量,因此线程间可以直接访问彼此共享的数据。但需要注意的是,每个线程也拥有自己私有的栈空间&…...

Driver Store Explorer:Windows驱动管理的终极免费解决方案,轻松释放10GB+磁盘空间

Driver Store Explorer:Windows驱动管理的终极免费解决方案,轻松释放10GB磁盘空间 【免费下载链接】DriverStoreExplorer Driver Store Explorer 项目地址: https://gitcode.com/gh_mirrors/dr/DriverStoreExplorer Driver Store Explorer&#x…...

AI Agent不是你以为的那样

系列:《AI Agent 从原理到实战 —— 解密 Claude Code 背后的工程智慧》 第1篇引言 你大概有过这样的体验:打开 ChatGPT,说一句"帮我写封邮件,拒绝周五的会议邀请,语气委婉一点",几秒钟后一封措辞…...

Docker测试学习思路

Docker 核心概念学习与实战指南本文系统梳理 Docker 学习的核心思路与方法,用通俗类比帮助理解 Docker 的本质,涵盖镜像构建、容器运行、网络通信、数据持久化、资源限制五大核心能力,适合初学者建立清晰的 Docker 知识框架。一、Docker 到底…...