当前位置: 首页 > news >正文

Flink创建Hudi的Sink动态表

工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 。

createDynamicTableSink

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),"Option [path] should not be empty.");setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();sanityCheck(conf, schema);setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);setupSortOptions(conf, context.getConfiguration());return new HoodieTableSink(conf, schema);}

createDynamicTableSource

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->new ValidationException("Option [path] should not be empty.")));setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);return new HoodieTableSource(schema,path,context.getCatalogTable().getPartitionKeys(),conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),conf);}

创建Sink表过程

1、检查是否设置了 path 选项(checkArgument),没有的话抛异常“Option [path] should not be empty.”。

2、做兼容性设置(setupTableOptions):

2.1、如果设置了 hoodie.table.recordkey.fields,但没有设置 hoodie.datasource.write.recordkey.field,则将 hoodie.datasource.write.recordkey.field 的值设置为 hoodie.table.recordkey.fields 的值;

2.2、如果设置了 hoodie.table.precombine.field,但没有设置 precombine.field,则将 precombine.field 的值设置为 hoodie.table.precombine.field 的值;

2.3、如果设置了 hoodie.datasource.write.hive_style_partitioning,但没有设置 hoodie.datasource.write.hive_style_partitioning,则将 hoodie.datasource.write.hive_style_partitioning 的值设置为 hoodie.datasource.write.hive_style_partitioning 的值。

3、必要选项检查:

3.1、检查表的类型(checkTableType),如果 table.type 的值为空,则不做处理,否则必须为 COPY_ON_WRITE 或者 MERGE_ON_READ,不然抛异常Invalid table type: TABLETYPE . Table type should be either MERGE_ON_READ or COPY_ON_WRITE.“;

3.2、如果为非 Append 模式,则检查是否设置了 hoodie.datasource.write.recordkey.field 和 precombine.field。

4、依次设置:

4.1、表名(hoodie.table.name);

4.2、主键(hoodie.datasource.write.recordkey.field);

4.3、分区(hoodie.datasource.write.partitionpath.field);

4.4、如果是 index 类型为 BUCKET,则设置桶(bucket)的键 hoodie.bucket.index.hash.field;

  4.4.1、如果还没有设置 hoodie.bucket.index.hash.field,则使用 hoodie.datasource.write.recordkey.field 的值作为 hoodie.bucket.index.hash.field 的值;4.4.2、否则进一步检查 hoodie.bucket.index.hash.field 的值是否为 hoodie.datasource.write.recordkey.field 值的子集。假设 hoodie.datasource.write.recordkey.field 值为“ds,dh”,则 hoodie.bucket.index.hash.field 值可以为“ds”、“dh”或“ds,dh”。4.5、设置压缩选项:4.5.1、设置 archive.min_commits,4.5.1、设置 archive.max_commits。4.6、设置Hive选项:4.6.1、如果没有设置 hive_sync.db,则设置 hive_sync.db;4.6.2、如果没有设置 hive_sync.table,则设置 hive_sync.table。4.7、设置read选项,如果不是增量查询则什么也不做;否则设置 hoodie.datasource.query.type 值为 incremental 。4.8、设置write选项:如果 write.operation 为默认值且为 cow 表,则实则 write.precombine 为 true 。4.9、如果没有设置 source.avro-schema.path 和 source.avro-schema,则设置 source.avro-schema 。

5、设置排序选项(flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java):

5.1、设置 Flink 的 table.exec.sort.max-num-file-handles

5.2、设置 Flink 的 table.exec.spill-compression.enabled

5.3、设置 Flink 的 table.exec.spill-compression.block-size

5.4、设置 Flink 的 table.exec.sort.async-merge-enabled

Append 模式

write.operation 值为 insert,并且为 mor 表;或则为 cow 表,但是 write.insert.cluster 值为 false。

  • write.insert.cluster

该选项用于控制是否在写入时合并小文件,仅对 cow 类型表有效,默认为 false。如果设置为 true,则每次写入前先合并小文件,这会降低写吞吐量,但可提高读性能。

相关文章:

Flink创建Hudi的Sink动态表

工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieT…...

人脸识别技术的安全性及其应用探讨

随着科技的不断发展,人脸识别技术已经成为了一个热门话题。人脸识别系统的出现,给人们的生活带来了极大的便利,同时也为一些犯罪分子提供了方便。因此,人脸识别技术的安全性和可靠性一直备受关注。 一、人脸识别技术的原理 人脸识…...

老域名查询工具- 在线域名批量查询工具

域名批量查询工具 域名批量查询工具是一种帮助用户快速查询多个域名信息的工具,通常能够自动扫描一组域名的WHOIS信息、DNS、IP地址、服务器等各种信息,并提供快速的结果反馈。 以下是域名批量查询工具主要的优点: 提高工作效率&#xff1a…...

JimuReport - 积木报表(一款免费Web报表工具)

一款免费的数据可视化报表,含报表和大屏设计,像搭建积木一样在线设计报表!功能涵盖,数据报表、打印设计、图表报表、大屏设计等! Web 版报表设计器,类似于excel操作风格,通过拖拽完成报表设计。…...

01-数据操作+数据预处理

1.n维数组,也称为张量(tensor):tensor和ndarray没有本质区别。tensor是有数学上的严格定义,ndarray是计算机描述的;张量表示一个由数值组成的数组,这个数组可能有多个维度; 无论使用…...

macOS本地python环境/vscode/导入python包/设置python解释器

查看macbook本地是否有python环境 输入python或者python3,退出python环境使用exit(),别忘了括号 没有的话去官网安装https://www.python.org/ 2. 安装vscode 官网https://code.visualstudio.com/ 3. 安装插件 点击左边的“插件”按钮,安装…...

【转存】Go语言设计模式

导语| 设计模式是针对软件设计中常见问题的工具箱,其中的工具就是各种经过实践验证的解决方案。即使你从未遇到过这些问题,了解模式仍然非常有用,因为它能指导你如何使用面向对象的设计原则来解决各种问题,提高开发效率&#xff0…...

第十一章 升级与定制

第十一章 升级与定制 一、 RPM 包安装操作 RPM(Redhat Packet Manager)。 ①安装 rpm –i rpm 文件名 (注:⑴常见用法:-ivh 参数显示安装过程和 hash 符#; ⑵覆盖安装:使用- -force 选项。…...

代码随想录算法训练营第二十二天|235. 二叉搜索树的最近公共祖先、701.二叉搜索树中的插入操作、450.删除二叉搜索树中的节点

目录 235. 二叉搜索树的最近公共祖先 1、递归实现 2、迭代法实现 701.二叉搜索树中的插入操作(递归实现) 450.删除二叉搜索树中的节点(递归实现) 235. 二叉搜索树的最近公共祖先 相对于 二叉树的最近公共祖先 本题就简单一些了…...

hbase表出现RIT删除方案

1.删除zookeeper中对应表注册信息 cd /opt/cloudera/parcels/CDH/lib/zookeeper/bin ./zkCli.sh -server node2:2181 --node2为仿真节点,生产需改 deleteall /hbase/table/表名 2.删除hdfs对应表数据 hadoop dfs -rm -r /hbase/data/default/表名 3.删除hbase:met…...

SQL学习(3)

SELECT 语句用于从表中选取数据。 SELECT 列名称 FROM 表名称 SELECT * FROM 表名称关键词 DISTINCT 用于返回唯一不同的值 SELECT DISTINCT 列名称 FROM 表名称WHERE 子句用于规定选择的标准 如需有条件地从表中选取数据,可将 WHERE 子句添加到 SELECT 语句。 S…...

连接型CRM助力医疗企业把“成本中心”变成“利润中心”

在市场竞争日益加剧的情形下,企业获客成本大幅上涨,存量客户的维护和开发开始被重视,售后服务部门的职责在企业中发挥的价值越来越大。因为企业售后服务不仅能帮助客户解决问题的部门,还是客户与企业沟通的桥梁,将客户…...

《Vue.js 设计与实现》—— 03 Vue.js 3 的设计思路

1. 声明式地描述 UI Vue.js 3 是一个声明式的 UI 框架,即用户在使用 Vue.js 3 开发页面时是声明式地描述 UI 的。 编写前端页面涉及的内容如下: DOM 元素:例如是 div 标签还是 a 标签属性:如 a 标签的 href 属性,再…...

2023年湖北省建设厅特种作业操作证报名条件是什么?

建筑施工特种作业人员是指在房屋建筑和市政工程施工活动中,从事可能对本人、他人及周围设备设施的安全造成重大危害作业的人员。建筑施工特种作业人员必须经建设主管部门考核合格,取得建筑施工特种作业人员操作资格证书(以下简称“资格证书”…...

Redis 进阶

🥲 🥸 🤌 🫀 🫁 🥷 🐻‍❄️🦤 🪶 🦭 🪲 🪳 🪰 🪱 🪴 🫐 🫒 🫑…...

伙伴匹配系统笔记---02

Java 8特性 1. stream / parallelStream 流失处理 2. Optional 可选类 一. 前端整合路由 1. 路由:vue 路由组件库地址:安装 | Vue Router (vuejs.org) 安装:yarn add vue-router@4 2. 整合路由: // 1. 定义路由组件. // 也可以从其他文件导入 const Home = { templ…...

Redis学习——单机版安装

目录 1.解压 2.安装gcc 3.执行make命令 4.复制redis的配置文件到默认安装目录下 5.修改redis.conf文件 6.启动redis服务与客户端 7.查看redis进行是否启动 8.关闭redis服务 9.redis性能测试 注意:安装redis前要安装jdk。 1.解压 [rootlxm148 install]# t…...

第三十一章 React中路由组件和一般组件

在React中,组件是应用程序的构建块。它们是可重用的,可以用于创建复杂的UI。React中有两种类型的组件:路由组件和一般组件。 一般组件 一般组件是React应用程序的基本构建块。它们是可重用的,可以用于创建复杂的UI。它们不知道U…...

怎么把pdf中的某一页分出来?

怎么把pdf中的某一页分出来?PDF格式的文档在日常生活中是非常常见的,相信大家都对其有所了解,并且经常使用。它的主要特点是不允许用户随意编辑其中的内容,当我们仅需要阅读时,PDF文档无疑是十分方便的,尤其…...

MongoDB 聚合操作Map-Reduce

这此之前已经对MongoDB中的一些聚合操作进行了详细的介绍,主要介绍了聚合方法和聚合管道;如果您想对聚合方法和聚合管道进行了解,可以参考: MongoDB 数据库操作汇总https://blog.csdn.net/m1729339749/article/details/130086022…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

什么是库存周转?如何用进销存系统提高库存周转率?

你可能听说过这样一句话: “利润不是赚出来的,是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业,很多企业看着销售不错,账上却没钱、利润也不见了,一翻库存才发现: 一堆卖不动的旧货…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

C# 类和继承(抽象类)

抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...

Java入门学习详细版(一)

大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...

2023赣州旅游投资集团

单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别

【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势&#xf…...

Linux部署私有文件管理系统MinIO

最近需要用到一个文件管理服务,但是又不想花钱,所以就想着自己搭建一个,刚好我们用的一个开源框架已经集成了MinIO,所以就选了这个 我这边对文件服务性能要求不是太高,单机版就可以 安装非常简单,几个命令就…...

【堆垛策略】设计方法

堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下&#xf…...