当前位置: 首页 > news >正文

Fink CDC数据同步(六)数据入湖Hudi

数据入湖Hudi

Apache Hudi(简称:Hudi)使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:

  • Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照,并基于此输出结果。
  • 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势(类别)。

配置

将hudi相关jar包放在flink安装目录的lib下

hudi-flink1.16-bundle-0.13.0.jar

hudi-hadoop-mr-0.13.0.jar

hudi-hive-sync-0.13.0.jar

确保/etc/profile配置了hadoop和hive的环境变量

#HADOOP_HOME
export HADOOP_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_CONF_DIR=/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop
export HADOOP_COMMON_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_HDFS_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_YARN_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_MAPRED_HOME=/usr/hdp/3.1.5.0-152/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`#HIVE HOME
export HIVE_HOME=/usr/hdp/3.1.5.0-152/hive
export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/sbin

测试插入hudi表

set sql-client.execution.result-mode = tableau;
set execution.checkpointing.interval=30sec;
SET table.sql-dialect=default;CREATE TABLE hudi_test(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi',  -- 连接器指定hudi'path' = 'hdfs://bigdata101:8020/hudi/hudi_test',  -- 数据存储地址'table.type' = 'MERGE_ON_READ' -- 表类型,默认COPY_ON_WRITE,可选MERGE_ON_READ
);INSERT INTO hudi_test VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

MySql数据写入Hudi表

建hudi表

create table hudi_user(id string not null,name string,birth string,gender string,primary key (id) not enforced
)
with ('connector' = 'hudi','path' = 'hdfs://bigdata101:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','write.option' = 'bulk_insert','write.precombine.field' = 'id'
);

将MySql映射表的数据插入hudi表,此时会生成一个flink任务

insert into ods.hudi_user select * from mysql_user;

流式查询

上面的查询方式是非流式查询,流式查询会生成一个flink作业,并且实时显示数据源变更的数据。

流式查询(Streaming Query)需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所有数据,设置值为earliest。

使用参数如下:

参数名称

是否必填

默认值

备注

read.streaming.enabled

FALSE

FALSE

设置为true,开启stream query

read.start-commit

FALSE

the latest commit

Instant time的格式为:’yyyyMMddHHmmss’

read.streaming_skip_compaction

FALSE

FALSE

是否不消费compaction commit,消费compaction commit会出现重复数据

clean.retain_commits

FALSE

10

当开启change log mode,保留的最大commit数量。如果checkpoint interval为5分钟,则保留50分钟的change log

建表:

create table hudi_user_read_streaming(id int not null ,name string,birth string,gender string,primary key (id) not enforced
)
with ('connector' = 'hudi','path' = 'hdfs://bigdata101:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','write.option' = 'bulk_insert','write.precombine.field' = 'id','read.streaming.enabled' = 'true',  -- 默认值false,设置为true,开启stream query'read.start-commit' = '20231008134557', -- start-commit之前提交的数据不显示,'read.streaming.check-interval' = '4'  -- 检查间隔,默认60s);insert into hudi_user_read_streaming select * from mysql_user;select * from hudi_user_read_streaming;

此时,执行select 语句就会生成一个flink 作业

源端变更数据会实时展示出来


 系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

相关文章:

Fink CDC数据同步(六)数据入湖Hudi

数据入湖Hudi Apache Hudi(简称:Hudi)使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是: Update/Delete记录:H…...

线程和进程的区别及基础线程创建

1 线程和进程的区别 资源分配和调度: 进程(火车)是操作系统进行资源分配和调度的最小单位。它有自己的独立资源空间,包括内存、文件句柄等。线程(车厢)是CPU调度的最小单位。一个进程可以包含多个线程&…...

如何使用postman进行接口调试

使用Postman进行接口调试 有些时候我们写代码的时候,会发现接口有报错,提示参数错误,我们为了更好的排查错误原因,可以在Postman上进行接口调试。将url,请求方式,参数,cookie都填写到Postman中…...

Leetcode 198 打家劫舍

题意理解: 你是一个专业的小偷,计划偷窃沿街的房屋。每间房内都藏有一定的现金,影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统,如果两间相邻的房屋在同一晚上被小偷闯入,系统会自动报警。 给定一个代…...

相机图像质量研究(9)常见问题总结:光学结构对成像的影响--工厂镜头组装

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结:光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结:光学结构对成…...

Linux内核与驱动面试经典“小”问题集锦(5)

接前一篇文章:Linux内核与驱动面试经典“小”问题集锦(4) 问题6 问:mutex_lock和mutex_lock_interruptible的区别是什么? 备注:此问题也是笔者近期参加蔚来面试时遇到的一个问题。 答: 尽管…...

基于51 单片机的交通灯系统 源码+仿真+ppt

主要内容: 1)南北方向的绿灯、东西方向的红灯同时亮40秒。 2)南北方向的绿灯灭、黄灯亮5秒,同时东西方向的红灯继续亮。 3)南北方向的黄灯灭、左转绿灯亮,持续20秒,同时东西方向的红灯继续…...

【蓝桥杯冲冲冲】[NOIP2017 提高组] 宝藏

蓝桥杯备赛 | 洛谷做题打卡day29 文章目录 蓝桥杯备赛 | 洛谷做题打卡day29[NOIP2017 提高组] 宝藏题目背景题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1样例 #2样例输入 #2样例输出 #2提示题解代码我的一些话[NOIP2017 提高组] 宝藏 题目背景 NOIP2017 D2T2 题目描…...

C#中实现串口通讯和网口通讯(使用SerialPort和Socket类)

仅作自己学习使用 1 准备部份 串口通讯需要两个调试软件commix和Virtual Serial Port Driver,分别用于监视串口和创造虚拟串口。网口通讯需要一个网口调试助手,网络上有很多资源,我在这里采用的是微软商店中的TCP/UDP网络调试助手&#xff0…...

LeetCode回溯算法的解题思路

回溯法概念 回溯法:一种通过探索所有可能的候选解来找出所有的解的算法。如果候选解被确认不是一个解(或者至少不是最后一个解),回溯算法会通过在上一步进行一些变化抛弃该解,即回溯并且再次尝试。 应用场景 回溯算…...

泰克示波器(TBS2000系列)数学运算功能使用

目录 1 数学运算菜单1.1 运算符选择1.2 信源选择1.3 数学运算结果 1 数学运算菜单 Math运算按钮,用于实现对两个通道的信号进行实时的“加、减、乘”运算,计算时信源1在前面,信源2在运算符的右边,设置时设置信源与运算符就行了。…...

数据结构与算法之美学习笔记:50 | 索引:如何在海量数据中快速查找某个数据?

目录 前言为什么需要索引?索引的需求定义构建索引常用的数据结构有哪些?总结引申 前言 本节课程思维导图: 在第 48 节中,我们讲了 MySQL 数据库索引的实现原理。MySQL 底层依赖的是 B 树这种数据结构。留言里有同学问我&#xff…...

Python(SQLite)executescript用法

SQLite 数据库模块的游标对象还包含了一个 executescript() 方法,这不是一个标准的 API 方法,这意味着在其他数据库 API 模块中可能没有这个方法。但是这个方法却很实用,它可以执行一段 SQL 脚本。 例如,如下程序使用 executescr…...

BUUCTF-Real-[ThinkPHP]IN SQL INJECTION

目录 漏洞描述 漏洞分析 漏洞复现 漏洞描述 漏洞发现时间&#xff1a; 2018-09-04 CVE 参考&#xff1a;CVE-2018-16385 最高严重级别&#xff1a;低风险 受影响的系统&#xff1a;ThinkPHP < 5.1.23 漏洞描述&#xff1a; ThinkPHP是一款快速、兼容、简单的轻量级国产P…...

python安装步骤

安装 Python 的步骤如下&#xff1a; 在 Python 官方网站&#xff08;https://www.python.org&#xff09;上下载 Python 安装程序。运行下载的安装程序。在安装程序中选择要安装的 Python 版本&#xff08;通常选择最新版本&#xff09;&#xff0c;并选择安装目录。确保勾选…...

BlueLotus 下载安装使用

说明 蓝莲花平台BlueLotus&#xff0c;是清华大学曾经的蓝莲花战队搭建的平台&#xff0c;该平台用于接收xss返回数据。 正常执行反射型xss和存储型xss&#xff1a; 反射型在执行poc时&#xff0c;会直接在页面弹出执行注入的poc代码&#xff1b;存储型则是在将poc代码注入用…...

.[hudsonL@cock.li].mkp勒索病毒数据怎么处理|数据解密恢复

导言&#xff1a; 在当今数字化时代&#xff0c;勒索病毒已成为网络安全领域的一大威胁。其中一种新近出现的勒索病毒是由[hudsonLcock.li].mkp[hendersoncock.li].mkp[myersairmail.cc].mkp制作的&#xff0c;它以其高效的加密算法和勒索方式而备受关注。本文91数据恢复将介绍…...

基于SpringBoot和PostGIS的震中影响范围可视化实践

目录 前言 一、基础数据 1、地震基础信息 2、全国行政村 二、Java后台服务设计 1、实体类设计 2、Mapper类设计 3、控制器设计 三、前端展示 1、初始化图例 2、震中位置及影响范围标记 3、行政村点查询及标记 总结 前言 地震等自然灾害目前还是依然不能进行准确的预…...

JUnit实践教程——Java的单元测试框架

前言 大家好&#xff0c;我是chowley&#xff0c;最近在学单元测试框架——JUnit&#xff0c;写个博客记录一下&#xff01; 在软件开发中&#xff0c;单元测试是确保代码质量和稳定性的重要手段之一。JUnit作为Java领域最流行的单元测试框架&#xff0c;为开发人员提供了简单…...

选择大语言模型:2024 年开源 LLM 入门指南

作者&#xff1a;来自 Elastic Aditya Tripathi 如果说人工智能在 2023 年起飞&#xff0c;这绝对是轻描淡写的说法。数千种新的人工智能工具被推出&#xff0c;人工智能功能被添加到现有的应用程序中&#xff0c;好莱坞因对这项技术的担忧而戛然而止。 甚至还有一个人工智能工…...

逆向工程实战:从V8引擎角度破解JavaScript无限debugger(保姆级教程)

V8引擎深度解析&#xff1a;JavaScript调试机制与安全实践 在JavaScript开发领域&#xff0c;调试器(debugger)是开发者日常工作中不可或缺的工具。作为Chrome浏览器和Node.js的核心引擎&#xff0c;V8对debugger关键字的处理机制直接影响着开发者的调试体验。本文将深入探讨V8…...

TWS耳机充电仓硬件设计全解析:从Type-C接口到NTC保护的7大核心模块

TWS耳机充电仓硬件设计全解析&#xff1a;从Type-C接口到NTC保护的7大核心模块 当你在咖啡馆掏出AirPods时&#xff0c;可能不会想到那个小巧的充电仓里藏着多少精密电路。作为硬件工程师&#xff0c;我们眼中的充电仓不是简单的塑料盒子&#xff0c;而是一个由七大核心模块组成…...

DeerFlow资源优化实践:控制Python执行环境内存占用方法

DeerFlow资源优化实践&#xff1a;控制Python执行环境内存占用方法 1. 认识DeerFlow&#xff1a;您的智能研究助手 DeerFlow是一个基于LangStack技术框架开发的深度研究开源项目&#xff0c;它就像是您的个人研究团队&#xff0c;能够帮您完成各种复杂的调研任务。这个工具整…...

FLUX.1-dev像素生成器实战:生成符合NES/SNES调色板限制的合法像素图

FLUX.1-dev像素生成器实战&#xff1a;生成符合NES/SNES调色板限制的合法像素图 1. 像素艺术生成新纪元 在数字艺术创作领域&#xff0c;像素艺术正经历一场由AI驱动的复兴。传统像素画创作需要艺术家手动放置每个像素&#xff0c;而现代AI技术可以智能生成符合经典游戏机调色…...

yolo系列演进分析

YOLO(You Only Look Once)作为计算机视觉领域最具影响力的目标检测算法系列之一,自2016年首次提出以来经历了持续的技术革新与架构演进。从最初的YOLOv1到2026年最新发布的YOLO26,这一系列不仅实现了从"单阶段检测"到"端到端推理"的范式转变,更在速度…...

SDMatte透明PNG元数据规范:EXIF/IPTC嵌入、版权信息自动写入功能

SDMatte透明PNG元数据规范&#xff1a;EXIF/IPTC嵌入、版权信息自动写入功能 1. 产品概述 SDMatte 是一款面向高质量图像抠图场景的 AI 模型&#xff0c;特别适合处理主体分离、透明物体提取、边缘精修、商品图去背景等任务。该模型对玻璃、薄纱、羽毛、叶片等边缘细节复杂或…...

工业自动化必备:Kepware+UaExpert实现OPC UA通信的5个关键步骤与常见问题解决

工业自动化实战&#xff1a;Kepware与UaExpert的OPC UA通信全流程解析 在工业4.0时代&#xff0c;设备间的无缝通信已成为智能制造的基础能力。作为工业自动化领域的黄金标准&#xff0c;OPC UA协议凭借其跨平台、高安全性等特性&#xff0c;正在取代传统OPC DA成为工厂数据交互…...

4吨卧式燃气蒸汽锅炉食品厂洗涤商用

WNS型4吨卧式燃气蒸汽锅炉&#xff0c;专为食品加工、商用洗涤等行业量身打造&#xff0c;是高效稳定、环保节能的核心供汽设备&#xff0c;完美适配食品蒸煮杀菌、洗涤熨烫烘干等高频蒸汽需求&#xff0c;助力企业降本增效、合规生产。 锅炉采用卧式三回程湿背式经典结构&…...

告别打包烦恼:Qt Installer Framework 4.6 保姆级教程,从配置到生成exe安装包

Qt Installer Framework 4.6 终极实战指南&#xff1a;从零构建专业级安装包 当你终于完成了一个Qt应用的开发&#xff0c;编译了Release版本&#xff0c;甚至用windeployqt处理了依赖&#xff0c;接下来面临的挑战是如何将这些文件打包成一个专业的安装程序。这正是Qt Instal…...

java毕业设计基于SpringBoot酒店预定系统

前言 Spring Boot酒店预定系统是一种功能丰富、易于维护和扩展的在线预订平台。它通过整合前后端技术&#xff0c;实现了酒店信息的在线展示、预订、支付以及管理等一系列功能&#xff0c;为用户和酒店提供了便捷、高效的预订服务。随着旅游业和酒店业的不断发展&#xff0c;该…...