当前位置: 首页 > news >正文

Hudi第三章:集成Flink

系列文章目录

Hudi第一章:编译安装
Hudi第二章:集成Spark
Hudi第二章:集成Spark(二)
Hudi第三章:集成Flink


文章目录

  • 系列文章目录
  • 前言
  • 一、环境准备
    • 1.上传并解压
    • 2.修改配置文件
    • 3.拷贝jar包
    • 4.启动sql-client
      • 1.启动hadoop
      • 2.启动session
      • 3.启动sql-client
  • 二、sql-client编码
    • 1.创建表
    • 2.插入数据
    • 3.查询数据
    • 4.更新数据
    • 5.流式插入
  • 三、IDEA编码
    • 1.编写pom.xml
    • 2.编写demo
  • 总结


前言

之前的两次博客学习了hudi和spark的集成,现在我们来学习hudi和flink的集成。


一、环境准备

1.上传并解压

在这里插入图片描述

2.修改配置文件

vim /opt/module/flink-1.13.6/conf/flink-conf.yaml
直接在最后追加即可。

classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop102:8020/ckps
state.backend.incremental: true

sudo vim /etc/profile.d/my_env.sh

export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

source /etc/profile.d/my_env.sh

3.拷贝jar包

cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar  /opt/module/flink-1.13.6/lib/
cp /opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/
cp /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/module/flink-1.13.6/lib/

4.启动sql-client

1.启动hadoop

2.启动session

/opt/module/flink-1.13.6/bin/yarn-session.sh -d

3.启动sql-client

bin/sql-client.sh embedded -s yarn-session

启动成功后可以在web端看一下。
在这里插入图片描述
也可以跳转到flink的webui。
在这里插入图片描述
在这里插入图片描述
现在我们就可以在终端写代码了。
在这里插入图片描述

二、sql-client编码

1.创建表

CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi','path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ'
);

在这里插入图片描述

2.插入数据

INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

3.查询数据

我们先更改一下表格式,默认的看得可能不习惯。

set sql-client.execution.result-mode=tableau;
select * from t1;

在这里插入图片描述

4.更新数据

前面说过hudi的更新操作就是插入一条主键相同的新数据,由更新的ts来覆盖旧的。

insert into t1 values('id1','Danny',27,TIMESTAMP '1970-01-02 00:00:01','par1');

在这里插入图片描述
可以看到数据已经完成了更新。

5.流式插入

flink最常用的还是流式数据的处理。

CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20)
) WITH ('connector' = 'datagen','rows-per-second' = '1'
);create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20)
)
with ('connector' = 'hudi','path' = '/tmp/hudi_flink/t2','table.type' = 'MERGE_ON_READ'
);

我们创建两张表,第一张的连接器是datagen可以用来流式的生产数据。第二张表是正常的hudi表。

insert into t2 select * from sourceT;

我们可以在webui看一下。
在这里插入图片描述
因为是流式处理,所以这个进程是不会停止的。

select * from t2 limit 10;

在这里插入图片描述
再查看一次
在这里插入图片描述
我们会发现是不断有数据产生。

三、IDEA编码

我们需要将编译好的一个包拉到本地。
在这里插入图片描述
然后将他倒入maven仓库

mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar

1.编写pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.hudi</groupId><artifactId>flink-hudi-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><hudi.version>0.12.0</hudi.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope>   <!--不会打包到依赖中,只参与编译,不参与运行 --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--idea运行时也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><!--手动install到本地maven仓库--><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink_2.12</artifactId><version>${hudi.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

2.编写demo

HudiDemo.java
一个简单的流式数据处理和刚刚一样。

package com.atguigu.hudi.flink;import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiDemo {public static void main(String[] args) {System.setProperty("HADOOP_USER_NAME", "atguigu");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());// 设置状态后端RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);embeddedRocksDBStateBackend.setDbStoragePath("/home/chaoge/Downloads/hudi");embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);// checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env);sTableEnv.executeSql("CREATE TABLE sourceT (\n" +"  uuid varchar(20),\n" +"  name varchar(10),\n" +"  age int,\n" +"  ts timestamp(3),\n" +"  `partition` varchar(20)\n" +") WITH (\n" +"  'connector' = 'datagen',\n" +"  'rows-per-second' = '1'\n" +")");sTableEnv.executeSql("create table t2(\n" +"  uuid varchar(20),\n" +"  name varchar(10),\n" +"  age int,\n" +"  ts timestamp(3),\n" +"  `partition` varchar(20)\n" +")\n" +"with (\n" +"  'connector' = 'hudi',\n" +"  'path' = 'hdfs://hadoop102:8020/tmp/hudi_idea/t2',\n" +"  'table.type' = 'MERGE_ON_READ'\n" +")");sTableEnv.executeSql("insert into t2 select * from sourceT");}
}

当我们运行的时候,可以再本地webui查看。
127.0.0.1:8081/
在这里插入图片描述
也可以在hdfs路径看一下。
在这里插入图片描述


总结

flink第一次就先写到这里剩下的还要在写一次。

相关文章:

Hudi第三章:集成Flink

系列文章目录 Hudi第一章&#xff1a;编译安装 Hudi第二章&#xff1a;集成Spark Hudi第二章&#xff1a;集成Spark(二) Hudi第三章&#xff1a;集成Flink 文章目录 系列文章目录前言一、环境准备1.上传并解压2.修改配置文件3.拷贝jar包4.启动sql-client1.启动hadoop2.启动ses…...

MTC证书|欧盟与英国金属类产品清关新要求

从10月1日起&#xff0c;欧盟海关将严格检查所有申报HS代码为7323、7326等含有金属的货物&#xff0c;所有进口国家的金属相关产品必须提供MTC证书&#xff0c;证明产品材料的来源并非源自俄罗斯。 对于未使用7323、7326等含有金属类的HS编码申报&#xff0c;且品名未明显体现…...

保护敏感数据的艺术:数据安全指南

多年来&#xff0c;工程和技术迅速转型&#xff0c;生成和处理了大量需要保护的数据&#xff0c;因为网络攻击和违规的风险很高。为了保护企业数据&#xff0c;组织必须采取主动的数据安全方法&#xff0c;了解保护数据的最佳实践&#xff0c;并使用必要的工具和平台来实现数据…...

Commonjs与ES Module

commonjs 1 commonjs 实现原理 commonjs每个模块文件上存在 module&#xff0c;exports&#xff0c;require三个变量,然而这三个变量是没有被定义的&#xff0c;但是我们可以在 Commonjs 规范下每一个 js 模块上直接使用它们。在 nodejs 中还存在 __filename 和 __dirname 变…...

分布式对象存储

参考《分布式对象存储----原理、架构以及Go语言实现》&#xff08;作者&#xff1a;胡世杰&#xff09; 对象存储简介 数据的管理方式 以对象的方式管理数据&#xff0c;一个对象包括&#xff1a;对象的数据、对象的元数据、对象的全局唯一标识符 访问数据的方式 可扩展的分…...

跨境独立站代购中国电商平台商品PHP多语言多货币

跨境独立站代购中国电商平台商品是指代购者在海外建立自己的独立电商平台&#xff0c;代理中国主流电商平台&#xff08;如淘宝、京东等&#xff09;的商品进行销售和代购。这种模式的优势在于代购者可以自主选择产品和价格策略&#xff0c;同时还能提供更专业和优质的服务。 …...

Python接口自动化 —— Json 数据处理实战(详解)

简介 上一篇说了关于json数据处理&#xff0c;是为了断言方便&#xff0c;这篇就带各位小伙伴实战一下。首先捋一下思路&#xff0c;然后根据思路一步一步的去实现和实战&#xff0c;不要一开始就盲目的动手和无头苍蝇一样到处乱撞&#xff0c;撞得头破血流后而放弃了。不仅什么…...

微信页面公众号页面 安全键盘收起后页面空白

微信浏览器打开H5页面和公众号页面&#xff0c;输入密码时调起安全键盘&#xff0c;键盘收起后 键盘下方页面留白 解决办法&#xff1a; 1、&#xff08;简单&#xff09;只有在调起安全键盘&#xff08;输入密码&#xff09;的时候会出现这种情况&#xff0c;将input属性改为n…...

数据结构 - 二叉树

递归实现前中后序遍历 #include<stdio.h> #include<stdlib.h>#define TElemType inttypedef struct BiTNode{TElemType data;struct BiTNode *lchild,*rchild; }BiTNode,*BiTree; BiTNode root;void visit(TElemType& e){printf("%d",e); }void Pre…...

【Overload游戏引擎细节分析】从视图投影矩阵提取视锥体及overload对视锥体的封装

overoad代码中包含一段有意思的代码&#xff0c;可以从视图投影矩阵逆推出摄像机的视锥体&#xff0c;本文来分析一下原理 一、平面的方程 视锥体是用平面来表示的&#xff0c;所以先看看平面的数学表达。 平面方程可以由其法线N&#xff08;A, B, C&#xff09;和一个点Q(x0,…...

Linux 安全 - LSM hook点

文章目录 一、LSM file system hooks1.1 LSM super_block hooks1.2 LSM file hooks1.3 LSM inode hooks 二、LSM Task hooks三、LSM IPC hooks四、LSM Network hooks五、LSM Module & System hooks 一、LSM file system hooks 在VFS&#xff08;虚拟文件系统&#xff09;层…...

【iOS逆向与安全】越狱检测与过检测附ida伪代码

首先在网上查找一些检测代码 放入项目运行&#xff0c;用 ida 打开后 F5 得到下面的 __int64 __usercall sub_10001B3F0<X0>(__int64 a1, __int64 a2, __int64 a3, __int64 a4, __int64 a5, __int64 a6, __int64 a7, __int64 a8, __int64 a9, __int64 a10, __int64 a11…...

Android Studio gradle手动下载配置

项目同步时&#xff0c;有时候会遇到Android Studio第一步下载gradle就是连接失败的问题。 这种情况&#xff0c;我们可以手动去gradle官网下载好gradle文件&#xff0c;放置在Android Studio的缓存目录下&#xff0c;这样AS在同步代码时就会自动解压下载好的文件。 步骤如下&…...

ChatGPT Prompting开发实战(十三)

一&#xff0e; 如何评估prompts是否包含有害内容 用户在与ChatGPT交互时提供的prompts可能会包括有害内容&#xff0c;这时可以通过调用OpenAI提供的API来进行判断&#xff0c;接下来给出示例&#xff0c;通过调用模型“gpt-3.5-turbo”来演示这个过程。 prompt示例如下&…...

银河麒麟 ARM 架构 离线安装Docker

1. 下载对应的安装包 进入此地址下载对应的docker 离线安装包 下载地址 将文件上传到服务器 解压此文件 tar zxf docker-18.09.1.tgz将 docker 相关命令拷贝到 /usr/bin&#xff0c;方便直接运行命令 cp docker/* /usr/bin/启动Docker守护程序 dockerd &验证是否安装成…...

虹科科技 | 探索CAN通信世界:PCAN-Explorer 6软件的功能与应用

CAN&#xff08;Controller Area Network&#xff09;总线是一种广泛应用于汽车和工业领域的通信协议&#xff0c;用于实时数据传输和设备之间的通信。而虹科的PCAN-Explorer 6软件是一款功能强大的CAN总线分析工具&#xff0c;为开发人员提供了丰富的功能和灵活性。本文将重点…...

SELECT COUNT(*)会不会导致全表扫描引起慢查询

SELECT COUNT(*)会不会导致全表扫描引起慢查询呢&#xff1f; SELECT COUNT(*) FROM SomeTable 网上有一种说法&#xff0c;针对无 where_clause 的 COUNT(*)&#xff0c;MySQL 是有优化的&#xff0c;优化器会选择成本最小的辅助索引查询计数&#xff0c;其实反而性能最高&…...

英国物联网初创公司【FourJaw】完成180万英镑融资

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经获悉&#xff0c;总部位于英国谢菲尔德的物联网初创公司【FourJaw】今日宣布已完成180万英镑融资。 本轮融资完成后&#xff0c;FourJaw的总融资金额已达400万英镑&#xff0c;本轮融资的投资机构包括&#xff1a;…...

许战海战略文库|无增长则衰亡:中小型制造企业增长困境

竞争环境不是匀速变化&#xff0c;而是加速变化。企业的衰退与进化、兴衰更迭在不断发生&#xff0c;这成为一种不可避免的现实。事实上&#xff0c;在产业链竞争中增长困境不分企业大小&#xff0c;而是一种普遍存在的问题&#xff0c;许多收入在1亿至10亿美元间的制造企业也同…...

广州华锐互动:候车室智能数字孪生系统实现交通信息可视化

随着科技的不断发展&#xff0c;数字化技术在各个领域得到了广泛的应用。智慧车站作为一种新型的交通服务模式&#xff0c;通过运用先进的数字化技术&#xff0c;为乘客提供了更加便捷、舒适的出行体验。 将智慧车站与数字孪生大屏结合&#xff0c;可以将实际现实世界的实体车站…...

测试微信模版消息推送

进入“开发接口管理”--“公众平台测试账号”&#xff0c;无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息&#xff1a; 关注测试号&#xff1a;扫二维码关注测试号。 发送模版消息&#xff1a; import requests da…...

简易版抽奖活动的设计技术方案

1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)

目录 1.TCP的连接管理机制&#xff08;1&#xff09;三次握手①握手过程②对握手过程的理解 &#xff08;2&#xff09;四次挥手&#xff08;3&#xff09;握手和挥手的触发&#xff08;4&#xff09;状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)

船舶制造装配管理现状&#xff1a;装配工作依赖人工经验&#xff0c;装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书&#xff0c;但在实际执行中&#xff0c;工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

Windows安装Miniconda

一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...