当前位置: 首页 > news >正文

Flink创建Hudi的Sink动态表

工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 。

createDynamicTableSink

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),"Option [path] should not be empty.");setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();sanityCheck(conf, schema);setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);setupSortOptions(conf, context.getConfiguration());return new HoodieTableSink(conf, schema);}

createDynamicTableSource

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->new ValidationException("Option [path] should not be empty.")));setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);return new HoodieTableSource(schema,path,context.getCatalogTable().getPartitionKeys(),conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),conf);}

创建Sink表过程

1、检查是否设置了 path 选项(checkArgument),没有的话抛异常“Option [path] should not be empty.”。

2、做兼容性设置(setupTableOptions):

2.1、如果设置了 hoodie.table.recordkey.fields,但没有设置 hoodie.datasource.write.recordkey.field,则将 hoodie.datasource.write.recordkey.field 的值设置为 hoodie.table.recordkey.fields 的值;

2.2、如果设置了 hoodie.table.precombine.field,但没有设置 precombine.field,则将 precombine.field 的值设置为 hoodie.table.precombine.field 的值;

2.3、如果设置了 hoodie.datasource.write.hive_style_partitioning,但没有设置 hoodie.datasource.write.hive_style_partitioning,则将 hoodie.datasource.write.hive_style_partitioning 的值设置为 hoodie.datasource.write.hive_style_partitioning 的值。

3、必要选项检查:

3.1、检查表的类型(checkTableType),如果 table.type 的值为空,则不做处理,否则必须为 COPY_ON_WRITE 或者 MERGE_ON_READ,不然抛异常Invalid table type: TABLETYPE . Table type should be either MERGE_ON_READ or COPY_ON_WRITE.“;

3.2、如果为非 Append 模式,则检查是否设置了 hoodie.datasource.write.recordkey.field 和 precombine.field。

4、依次设置:

4.1、表名(hoodie.table.name);

4.2、主键(hoodie.datasource.write.recordkey.field);

4.3、分区(hoodie.datasource.write.partitionpath.field);

4.4、如果是 index 类型为 BUCKET,则设置桶(bucket)的键 hoodie.bucket.index.hash.field;

  4.4.1、如果还没有设置 hoodie.bucket.index.hash.field,则使用 hoodie.datasource.write.recordkey.field 的值作为 hoodie.bucket.index.hash.field 的值;4.4.2、否则进一步检查 hoodie.bucket.index.hash.field 的值是否为 hoodie.datasource.write.recordkey.field 值的子集。假设 hoodie.datasource.write.recordkey.field 值为“ds,dh”,则 hoodie.bucket.index.hash.field 值可以为“ds”、“dh”或“ds,dh”。4.5、设置压缩选项:4.5.1、设置 archive.min_commits,4.5.1、设置 archive.max_commits。4.6、设置Hive选项:4.6.1、如果没有设置 hive_sync.db,则设置 hive_sync.db;4.6.2、如果没有设置 hive_sync.table,则设置 hive_sync.table。4.7、设置read选项,如果不是增量查询则什么也不做;否则设置 hoodie.datasource.query.type 值为 incremental 。4.8、设置write选项:如果 write.operation 为默认值且为 cow 表,则实则 write.precombine 为 true 。4.9、如果没有设置 source.avro-schema.path 和 source.avro-schema,则设置 source.avro-schema 。

5、设置排序选项(flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java):

5.1、设置 Flink 的 table.exec.sort.max-num-file-handles

5.2、设置 Flink 的 table.exec.spill-compression.enabled

5.3、设置 Flink 的 table.exec.spill-compression.block-size

5.4、设置 Flink 的 table.exec.sort.async-merge-enabled

Append 模式

write.operation 值为 insert,并且为 mor 表;或则为 cow 表,但是 write.insert.cluster 值为 false。

  • write.insert.cluster

该选项用于控制是否在写入时合并小文件,仅对 cow 类型表有效,默认为 false。如果设置为 true,则每次写入前先合并小文件,这会降低写吞吐量,但可提高读性能。

相关文章:

Flink创建Hudi的Sink动态表

工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieT…...

人脸识别技术的安全性及其应用探讨

随着科技的不断发展,人脸识别技术已经成为了一个热门话题。人脸识别系统的出现,给人们的生活带来了极大的便利,同时也为一些犯罪分子提供了方便。因此,人脸识别技术的安全性和可靠性一直备受关注。 一、人脸识别技术的原理 人脸识…...

老域名查询工具- 在线域名批量查询工具

域名批量查询工具 域名批量查询工具是一种帮助用户快速查询多个域名信息的工具,通常能够自动扫描一组域名的WHOIS信息、DNS、IP地址、服务器等各种信息,并提供快速的结果反馈。 以下是域名批量查询工具主要的优点: 提高工作效率&#xff1a…...

JimuReport - 积木报表(一款免费Web报表工具)

一款免费的数据可视化报表,含报表和大屏设计,像搭建积木一样在线设计报表!功能涵盖,数据报表、打印设计、图表报表、大屏设计等! Web 版报表设计器,类似于excel操作风格,通过拖拽完成报表设计。…...

01-数据操作+数据预处理

1.n维数组,也称为张量(tensor):tensor和ndarray没有本质区别。tensor是有数学上的严格定义,ndarray是计算机描述的;张量表示一个由数值组成的数组,这个数组可能有多个维度; 无论使用…...

macOS本地python环境/vscode/导入python包/设置python解释器

查看macbook本地是否有python环境 输入python或者python3,退出python环境使用exit(),别忘了括号 没有的话去官网安装https://www.python.org/ 2. 安装vscode 官网https://code.visualstudio.com/ 3. 安装插件 点击左边的“插件”按钮,安装…...

【转存】Go语言设计模式

导语| 设计模式是针对软件设计中常见问题的工具箱,其中的工具就是各种经过实践验证的解决方案。即使你从未遇到过这些问题,了解模式仍然非常有用,因为它能指导你如何使用面向对象的设计原则来解决各种问题,提高开发效率&#xff0…...

第十一章 升级与定制

第十一章 升级与定制 一、 RPM 包安装操作 RPM(Redhat Packet Manager)。 ①安装 rpm –i rpm 文件名 (注:⑴常见用法:-ivh 参数显示安装过程和 hash 符#; ⑵覆盖安装:使用- -force 选项。…...

代码随想录算法训练营第二十二天|235. 二叉搜索树的最近公共祖先、701.二叉搜索树中的插入操作、450.删除二叉搜索树中的节点

目录 235. 二叉搜索树的最近公共祖先 1、递归实现 2、迭代法实现 701.二叉搜索树中的插入操作(递归实现) 450.删除二叉搜索树中的节点(递归实现) 235. 二叉搜索树的最近公共祖先 相对于 二叉树的最近公共祖先 本题就简单一些了…...

hbase表出现RIT删除方案

1.删除zookeeper中对应表注册信息 cd /opt/cloudera/parcels/CDH/lib/zookeeper/bin ./zkCli.sh -server node2:2181 --node2为仿真节点,生产需改 deleteall /hbase/table/表名 2.删除hdfs对应表数据 hadoop dfs -rm -r /hbase/data/default/表名 3.删除hbase:met…...

SQL学习(3)

SELECT 语句用于从表中选取数据。 SELECT 列名称 FROM 表名称 SELECT * FROM 表名称关键词 DISTINCT 用于返回唯一不同的值 SELECT DISTINCT 列名称 FROM 表名称WHERE 子句用于规定选择的标准 如需有条件地从表中选取数据,可将 WHERE 子句添加到 SELECT 语句。 S…...

连接型CRM助力医疗企业把“成本中心”变成“利润中心”

在市场竞争日益加剧的情形下,企业获客成本大幅上涨,存量客户的维护和开发开始被重视,售后服务部门的职责在企业中发挥的价值越来越大。因为企业售后服务不仅能帮助客户解决问题的部门,还是客户与企业沟通的桥梁,将客户…...

《Vue.js 设计与实现》—— 03 Vue.js 3 的设计思路

1. 声明式地描述 UI Vue.js 3 是一个声明式的 UI 框架,即用户在使用 Vue.js 3 开发页面时是声明式地描述 UI 的。 编写前端页面涉及的内容如下: DOM 元素:例如是 div 标签还是 a 标签属性:如 a 标签的 href 属性,再…...

2023年湖北省建设厅特种作业操作证报名条件是什么?

建筑施工特种作业人员是指在房屋建筑和市政工程施工活动中,从事可能对本人、他人及周围设备设施的安全造成重大危害作业的人员。建筑施工特种作业人员必须经建设主管部门考核合格,取得建筑施工特种作业人员操作资格证书(以下简称“资格证书”…...

Redis 进阶

🥲 🥸 🤌 🫀 🫁 🥷 🐻‍❄️🦤 🪶 🦭 🪲 🪳 🪰 🪱 🪴 🫐 🫒 🫑…...

伙伴匹配系统笔记---02

Java 8特性 1. stream / parallelStream 流失处理 2. Optional 可选类 一. 前端整合路由 1. 路由:vue 路由组件库地址:安装 | Vue Router (vuejs.org) 安装:yarn add vue-router@4 2. 整合路由: // 1. 定义路由组件. // 也可以从其他文件导入 const Home = { templ…...

Redis学习——单机版安装

目录 1.解压 2.安装gcc 3.执行make命令 4.复制redis的配置文件到默认安装目录下 5.修改redis.conf文件 6.启动redis服务与客户端 7.查看redis进行是否启动 8.关闭redis服务 9.redis性能测试 注意:安装redis前要安装jdk。 1.解压 [rootlxm148 install]# t…...

第三十一章 React中路由组件和一般组件

在React中,组件是应用程序的构建块。它们是可重用的,可以用于创建复杂的UI。React中有两种类型的组件:路由组件和一般组件。 一般组件 一般组件是React应用程序的基本构建块。它们是可重用的,可以用于创建复杂的UI。它们不知道U…...

怎么把pdf中的某一页分出来?

怎么把pdf中的某一页分出来?PDF格式的文档在日常生活中是非常常见的,相信大家都对其有所了解,并且经常使用。它的主要特点是不允许用户随意编辑其中的内容,当我们仅需要阅读时,PDF文档无疑是十分方便的,尤其…...

MongoDB 聚合操作Map-Reduce

这此之前已经对MongoDB中的一些聚合操作进行了详细的介绍,主要介绍了聚合方法和聚合管道;如果您想对聚合方法和聚合管道进行了解,可以参考: MongoDB 数据库操作汇总https://blog.csdn.net/m1729339749/article/details/130086022…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...

前端倒计时误差!

提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用

文章目录 一、背景知识&#xff1a;什么是 B-Tree 和 BTree&#xff1f; B-Tree&#xff08;平衡多路查找树&#xff09; BTree&#xff08;B-Tree 的变种&#xff09; 二、结构对比&#xff1a;一张图看懂 三、为什么 MySQL InnoDB 选择 BTree&#xff1f; 1. 范围查询更快 2…...

Linux系统部署KES

1、安装准备 1.版本说明V008R006C009B0014 V008&#xff1a;是version产品的大版本。 R006&#xff1a;是release产品特性版本。 C009&#xff1a;是通用版 B0014&#xff1a;是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存&#xff1a;1GB 以上 硬盘&#xf…...

9-Oracle 23 ai Vector Search 特性 知识准备

很多小伙伴是不是参加了 免费认证课程&#xff08;限时至2025/5/15&#xff09; Oracle AI Vector Search 1Z0-184-25考试&#xff0c;都顺利拿到certified了没。 各行各业的AI 大模型的到来&#xff0c;传统的数据库中的SQL还能不能打&#xff0c;结构化和非结构的话数据如何和…...

6.9-QT模拟计算器

源码: 头文件: widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QMouseEvent>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);…...

【PX4飞控】mavros gps相关话题分析,经纬度海拔获取方法,卫星数锁定状态获取方法

使用 ROS1-Noetic 和 mavros v1.20.1&#xff0c; 携带经纬度海拔的话题主要有三个&#xff1a; /mavros/global_position/raw/fix/mavros/gpsstatus/gps1/raw/mavros/global_position/global 查看 mavros 源码&#xff0c;来分析他们的发布过程。发现前两个话题都对应了同一…...

ZYNQ学习记录FPGA(二)Verilog语言

一、Verilog简介 1.1 HDL&#xff08;Hardware Description language&#xff09; 在解释HDL之前&#xff0c;先来了解一下数字系统设计的流程&#xff1a;逻辑设计 -> 电路实现 -> 系统验证。 逻辑设计又称前端&#xff0c;在这个过程中就需要用到HDL&#xff0c;正文…...