flink写入hudi MOR表
第一步:创建flink内存表从kafka读取数据:
DROP TABLE IF EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG;
CREATE TABLE IF NOT EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG(
ID STRING comment '编码'
,NAME STRING comment '名称'
,PRIMARY KEY(RCLNT,RLDNR,RRCTY,RVERS,RYEAR,ROBJNR,COBJNR,SOBJNR,RTCUR,RUNIT,DRCRK,RPMAX) NOT ENFORCED
) with (
'connector'='kafka',
'topic'='GLFUNCT_DEBEZIUM_TRANSFER',
--'scan.startup.mode'='earliest-offset',
'scan.startup.mode'='timestamp',
'scan.startup.timestamp-millis'='1725811200000',
'properties.group.id'='KAFKA_GLFUNCT_CHANGELOG_HUDI7',
'properties.bootstrap.servers'='10.66.28.69:9092,10.66.28.70:9092,10.66.28.61:9092',
'value.format'='debezium-json',
'scan.topic-partition-discovery.interval' = '10000',
'value.debezium-json.ignore-parse-errors' = 'true'
);
第二步:创建MOR类型的hudi表
DROP TABLE IF EXISTS HUDI_ZHANG;
CREATE TABLE IF NOT EXISTS HUDI_ZHANG(
ID STRING comment '编码'
,NAME STRING comment '名称'
,PRIMARY KEY(ID,NAME) NOT ENFORCED
)with (
'connector' = 'hudi',
'path' = 'hdfs://nameservice1/user/hive/warehouse/hudi_ods_sap.db/HUDI_ZHANG',
'table.type' = 'MERGE_ON_READ',
'hive_sync.skip_ro_suffix' = 'true',
'hoodie.datasource.write.recordkey.field' = 'ID,NAME',
'write.operation' = 'upsert',
--'write.precombine.field' = 'ETL_DT',
'write.tasks' = '4',
'index.bootstrap.enabled' = 'true',
'write.insert.drop.duplicates'='true',
'compaction.tasks' = '4',
'compaction.async.enabled' = 'true',
'compaction.trigger.strategy' = 'time_elapsed',
'compaction.delta_seconds' = '1200',
'changelog.enabled' = 'true',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://pld3cwztmg01:9083',
--'hive_sync.jdbc_url' = 'jdbc:hive2://pld3cwztmg01:10000',
'hive_sync.table' = 'ZHANG',
'hive_sync.db' = 'hudi_ods_sap',
'hive_sync.username' = 'hive',
'hive_sync.password' = 'hive'
);
第三步:把kafka表写入到hudi表即可
insert into HUDI_ZHANG select * from HUDI_KAFKA_DEBEZIUM_ZHANG where RCLNT = '300';
以上就是从kafka读取数据写入到hudi表,且表类型是MOR。
相关文章:
flink写入hudi MOR表
第一步:创建flink内存表从kafka读取数据: DROP TABLE IF EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG; CREATE TABLE IF NOT EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG( ID STRING comment 编码 ,NAME STRING comment 名称 ,PRIMARY KEY(RCLNT,RLDNR,RRCTY,RVERS,RYEAR,…...
智能工厂程序设计 之-2 (Substrate) :三个世界--“存在的意义”-“‘我’的价值的实现” 之2
Q13、我刚看了一下前门前面的讨论。有一段文字您的重新 理解一下。那就是: 对题目 的另一角度( “智能工厂的程序设计”的三个层次词 分别关注的问题 及其 解决 思路的描述)的解释: 三个不同层次(深度)&…...
概要设计例题
答案:A 知识点: 概要设计 设计软件系统的总体结构:采用某种方法,将一个复杂的系统按照功能划分成模块;确定每个模块的功能;确定模块之间的调用关系;确定模块之间的接口,即模块之间…...
注册表模式:使用注册表和装饰器函数的模块化设计
在现代软件开发中,模块化设计是提高代码可维护性和可扩展性的关键技术之一。本文将探讨如何使用注册表(Registry)和装饰器函数(Decorator Function)来实现模块化设计,提升代码的灵活性和可扩展性。 什么是…...
怎样将vue项目 部署在ngixn的子目录下
如果同一服务器的80端口下,需要部署两个或以上数量的vue项目,那么就需要将其中一个vue项目部署在根目录下,其他的项目部署在子目录下. 像这样的配置 访问根目录 / 访问灭火器后台管理,访问 /mall/ 访问商城的后台管理 那么商场的vue项目,这样配置,才能在/mall/下正常访问? 1…...
FPGA开发:Verilog数字设计基础
EDA技术 EDA指Electronic Design Automation,翻译为:电子设计自动化,最早发源于美国的影像技术,主要应用于集成电路设计、FPGA应用、IC设计制造、PCB设计上面。 而EDA技术就是指以计算机为工具,设计者在EDA软件平台上…...
哈希表,算法
一.什么是哈希表 哈希表是一种用于快速数据存取的数据结构。它通过哈希函数将键(key)映射到表中的一个位置,从而实现高效的插入、删除和查找操作。 二.哈希冲突 哈希冲突发生在多个键通过哈希函数映射到哈希表的同一位置时。由于哈希表的大…...
Java数组的定义及遍历
数组的声明 长度不能超过定义的长度。超过则会报错通过下标来访问 数组的遍历 最常用最简单的方法是增强for循环。...
【电路笔记】-反相运算放大器
反相运算放大器 文章目录 反相运算放大器1、概述2、理想反相运算放大器3、实际反相运算放大器3.1 闭环增益3.2 输入阻抗3.3 输出阻抗4、反相运算放大器示例5、总结1、概述 上一篇关于同相运算放大器的文章中已介绍了该运算放大器配置的所有细节,该配置在同相引脚 (+) 上获取输…...
【电子通识】半导体工艺——刻蚀工艺
在文章【电子通识】半导体工艺——光刻工艺中我们讲到人们经常将 Photo Lithography(光刻)缩写成 Photo。光刻工艺是在晶圆上利用光线来照射带有电路图形的光罩,从而绘制电路。光刻工艺类似于洗印黑白照片,将在胶片上形成的图像印…...
vue-router 之如何在模版(template)中获取路由配置信息?
vue-router 之如何在模版(template)中获取路由配置信息? 获取当前路由信息 在vue3 中,route通常使用useRoute()钩子获取的,**代表当前激活的路由信息。**它包含了与当前路由相关的数据,比如路径、参数、查…...
HPL 源码结构分析
文件夹结构: $ cd /home/hipper/ex_hpl_hpcg/ $ pwd $ mkdir ./openmpi $mkdir ./openblas $mkdir ./hpl $ tree 1. 安装openmpi 1.1.1 使用Makefile下载配置编译安装 openmpi Makefile: all:wget https://download.open-mpi.org/release/open-m…...
Java代码审计篇 | ofcms系统审计思路讲解 - 篇3 | 文件上传漏洞审计
文章目录 0. 前言1. 文件上传代码审计【有1处】1.1 可疑点1【无漏洞】1.1.1 直接搜索upload关键字1.1.2 选择第一个,点进去分析一下1.1.3 分析this.getFile()方法1.1.4 分析new MultipartRequest(request, uploadPath)1.1.5 分析isSafeFile()方法1.1.6 分析request.…...
【Kubernetes】常见面试题汇总(五)
目录 13.简述 Kubernetes Replica Set 和 Replication Controller 之间有什么区别? 14.简述 kube-proxy 作用? 15.简述 kube-proxy iptables 原理? 16.简述 kube-proxy ipvs 原理? 13.简述 Kubernetes Replica Set 和 Replicat…...
MySQL 解决时区相关问题
在使用 MySQL 的过程中,你可能会遇到时区相关问题,比如说时间显示错误、时区不是东八 区、程序取得的时间和数据库存储的时间不一致等等问题。其实,这些问题都与数据库时区设 置有关。 MySQL Server 中有 2 个环境变量和时区有关,…...
SpringSecurity Context 中 获取 和 更改 当前用户信息的问题
SpringSecurity Context 获取和更改用户信息的问题 SecurityContext 异步线程中获取用户信息 今天在做项目时遇到了一个问题,我需要获取当前用户的 ID。之前,前端并没有存储用户信息,我一直是在后端的 service 中通过 SecurityContext 来获…...
Makefile的四种赋值运算符
Makefile有四种赋值运算符:简单赋值(:)、递归赋值()、条件赋值(?)和追加赋值() 1. 简单赋值(:) 作用:覆盖之前的值。若在多次简单赋…...
framebuffer
framebuffer:帧缓冲、帧缓存 Linux内核为显示提供的一套应用程序接口(驱动内核支持) 分辨率:像素点的总和 像素点: 显示屏:800*600(横向有800个像素点,纵向有600个像素点&#x…...
7.科学计算模块Numpy(4)ndarray数组的常用操作(二)
引言 书接上回,numpy能作为python中最受欢迎的数据处理模块,脱离不了它最核心的部件——ndarray数组。那么,我们今天就来了解一下numpy中对ndarray的常用操作。 通过阅读本篇博客,你可以: 1.掌握ndarray数组的分割 …...
抖音评论区截流脚本软件详细使用教学,抖音私域获客引流的五种方法。
1.先说下什么是抖音截流玩法,截流顾名思义就是在别的博主的视频下面去截流评论潜在流量,然后用评论文案的形式或者其它方式吸引用户加我们的私域~ 玩截流一定不是主动去私信别人,这个就不叫截流了,且一个账号私信多了一定会降权和…...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》
引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
