当前位置: 首页 > news >正文

【Apache Paimon】-- 4 -- Flink 消费 kafka 数据,然后写入 paimon

目录

1、本地开发环境

2、kafka2paimon 实现流程

3、代码实现

3.1、项目名称

3.2、项目结构

3.3、Pom.xml 和 log4j.properties 文件

3.4、代码核心类

3.4.1、入口类:Kafka2PaimonDemo.java

3.4.2、参数解析类

3.4.2.1、JobParameterUtil.java( flink job scheduler )

3.4.2.2、KafkaSourceParametersUtil.java(kafka source 参数)

3.4.2.3、OSSParametersUtils.java (oss 参数)

3.4.2.4、PaimonCatalogParameterUtils.java(paimon catalog 参数)

3.4.2.5、PaimonTableParameterUtils.java (paimon table 参数)

3.4.3、flink table env 类:FlinkTableInitUtils.java

3.4.4、配置类

3.4.4.1、DefaultConnectorVal.java

3.4.4.2、DefaultFlinkConfigVal.java

3.4.4.3、ParameterConfigs.java

3.4.4.4、PropertiesConstants.java

3.4.5、bean类

3.4.5.1、PaimonFileSystemCatalogInfo.java

3.4.5.2、JobSchedulerInfo.java

3.4.5.3、OSSGlobalVar.java

3.4.5.4、OSSInfo.java

3.4.5.5、PaimonPrimaryKeyTableSinkInfo.java

3.4.5.6、KafkaSourceInfo.java

3.4.6、重写 OSSFileSystemFactory.java

3.5、运行核心类的步骤

3.5.1、通过本地 kafka shell 生产数据到 topic:test_paimon中

​编辑

3.5.2、编辑 main class 的 args

3.5.4、运行 Kafka2PaimonDemo.java,本地访问 flink web-ui

4、查询 oss 结果

4.1 paimon 表

4.2 flink checkpoint/savepoint 存储

5、参考


1、本地开发环境

Mac OS 10.15.6
Oracle Jdk 11
Scala 2.12.17
Intellij Idea 2023.1
阿里云 OSS

scala 包和 jdk 包下载:

链接:https://pan.baidu.com/s/1HSkoUmzpbFcTx3aB9wte6w?pwd=81jc 
提取码: 81jc

maven pom 核心依赖包:

<apache.flink.version>1.19.1</apache.flink.version>
<apache.paimon.version>0.9.0</apache.paimon.version>
<flink-kafka.version>3.3.0-1.19</flink-kafka.version>
<aliyun.oss.version>3.17.2</aliyun.oss.version>
<fs.hadoopshaded.version>3.3.0</fs.hadoopshaded.version>
<fastjson.version>1.2.83</fastjson.versi

相关文章:

【Apache Paimon】-- 4 -- Flink 消费 kafka 数据,然后写入 paimon

目录 1、本地开发环境 2、kafka2paimon 实现流程 3、代码实现 3.1、项目名称 3.2、项目结构 3.3、Pom.xml 和 log4j.properties 文件 3.4、代码核心类 3.4.1、入口类:Kafka2PaimonDemo.java 3.4.2、参数解析类 3.4.2.1、JobParameterUtil.java( flink job schedule…...

【成功解决】:VS2019(Visual Studio 2019)遇到E2870问题:此配置中不支持 128 位浮点类型

起因:项目中需要用json来操作数据,就引了cJSON库(cJSON.h和cJSON.c文件),但是发现编译报错如下 E2870 此配置中不支持 128 位浮点类型 test0 ...\usr\include\x86_64-linux-gnu\bits\floatn.h 75 然后先新建了个工程来检查问题(甚至在这之前还以为是cjson…...

什么是TCP的三次握手?

TCP的三次握手&#xff1a;深入理解建立可靠连接的过程 引言 在计算机网络中&#xff0c;传输控制协议&#xff08;TCP&#xff09;是确保数据可靠传输的核心协议之一。TCP通过三次握手机制来建立一个稳定的、双向的连接&#xff0c;这对于确保数据的完整性和顺序至关重要。本…...

SQL教程(2):SQL基础语法及用途

在上一篇文章中&#xff0c;我们介绍了 SQL&#xff08;结构化查询语言&#xff09;的基本概念&#xff0c;以及它在用户研究中的重要作用。今天&#xff0c;我们将深入了解 SQL 的基本语法&#xff0c;并通过实际应用场景帮助你更好地理解如何使用 SQL 提取和分析数据。对于刚…...

在Ubuntu22.04 jammy下用qemu模型riscv32环境装鸿蒙(待续)

在使用实体ESP32C3 安装鸿蒙失败后&#xff0c;就是这个&#xff1a;完全按照手册win10里装Ubuntu 虚拟机然后编译ESP32&#xff08;主要是想针对ESP32C3和S3&#xff09;开发板的鸿蒙系统(失败)-CSDN博客转向用qemu模拟环境装鸿蒙 学习手册riscv32_virt/README_zh.md OpenHar…...

C++:基本-union是没有构造函数和析构函数的

今天发现当我在union中包含了多个结构体时&#xff0c;结构体有默认构造函数时&#xff0c;编译报错。 问题点&#xff1a; union不支持构造函数和析构函数union中的元素本身也是不支持构造函数和析构函数的。包含union的结构体也不支持构造函数和析构函数。 出错代码如下&a…...

报错 JSON.parse: expected property name or ‘}‘,JSON数据中对象的key值不为字符串

报错 JSON.parse: expected property name or ‘}’ 原因 多是因为数据转换时出错&#xff0c;可能是存在单引号或者对象key值不为string导致 这里记录下我遇见的问题&#xff08;后端给的JSON数据里&#xff0c;对象key值不为string&#xff09; 现在后端转换JSON数据大多…...

LeetCode 热题 100_旋转图像(20_48_中等_C++)(原地旋转;翻转)

LeetCode 热题 100_旋转图像&#xff08;20_48&#xff09; 题目描述&#xff1a;输入输出样例&#xff1a;题解&#xff1a;解题思路&#xff1a;思路一&#xff08;原地旋转&#xff09;&#xff1a;思路二&#xff08;翻转&#xff09;&#xff1a; 代码实现&#xff08;思路…...

mysql查询所有用户及删除用户

查询用户 select user, host, password_expired from mysql.user;删除用户 DROP USER [username]localhost &#xff1b;刷新权限 FLUSH PRIVILEGES&#xff1b;查询所有用户/账号设置/日志/开启日志 select user,host,password_expired,password_last_changed,password_li…...

Vue 鼠标滚轮缩放图片的实现

wheel"handleZoom" 监听鼠标滚轮事件 event.deltaY < 0 代表向上滚动 event.deltaY > 0 代表向下滚动 使用computed处理scale比例的变化 const imageStyle computed(() > ({ transform: translate(-50%, -50%…...

全景图 与 6面图转换

目录 全景图转6面图&#xff1a; 6面图转全景图 全景图转6面图&#xff1a; https://github.com/springcheese/panoramic_to_cubemap_generation # Necessary Imports import math import argparse import numpy as np from PIL import Image# Dictionary for CUBEMAP FACES…...

深入浅出:PHP 文件操作

文章目录 引言文件的基本操作打开文件读取文件逐行读取读取整个文件 写入文件追加写入覆盖写入 关闭文件 文件和目录的管理检查文件或目录是否存在创建和删除文件创建和删除目录复制和移动文件 处理文件权限设置文件权限获取文件权限 处理文件属性获取文件大小获取文件最后修改…...

116. UE5 GAS RPG 实现击杀掉落战利品功能

这一篇&#xff0c;我们实现敌人被击败后&#xff0c;掉落战利品的功能。首先&#xff0c;我们将创建一个新的结构体&#xff0c;用于定义掉落体的内容&#xff0c;方便我们设置掉落物。然后&#xff0c;我们实现敌人死亡时的掉落函数&#xff0c;并在蓝图里实现对应的逻辑&…...

【批处理脚本】更改Windows系统中的 hosts 解析文件

概述 作用 修改 Windows 系统中的 hosts 文件&#xff0c;可以实现 插入 或 删除 条目。该脚本允许用户以管理员权限执行&#xff0c;将特定的域名解析到指定的 IP 地址 应用场景 非常适用于需要频繁或批量修改 hosts 文件的场景&#xff1a; 屏蔽网站、域名重定向、DNS 污染防…...

fastDFS

docker 部署fastDFS docker pull delron/fastdfs docker-compose.yml version: 3services:fastdfs_tracker:image: delron/fastdfs:latestcontainer_name: trackerhostname: trackernetwork_mode: hostports:- "22122:22122"volumes:- ./data/tracker:/var/fdfsco…...

【Linux】存储

声明&#xff1a;以下内容均来学习自《Linux就该这么学》一书 Linux系统中的一切文件都是从“根(/)”目录开始的&#xff0c;并按照文件系统层次化标准&#xff08;FHS&#xff09;采用树形结构来存放文件&#xff0c;以及定义了常见目录的用途。此外&#xff0c;Linux系统中的…...

hadoop单机安装

步骤 1:安装 Java 安装 OpenJDK bash sudo yum install -y java-1.8.0-openjdk 验证 Java 安装 bash java -version 输出类似以下内容表示成功: arduino openjdk version “1.8.0_xxx” 步骤 2:下载 Hadoop 下载 Hadoop 安装包 前往 Hadoop 官方下载页面,获取最新稳…...

产品批量分类设置——未来之窗行业应用跨平台架构

一、批量统计分类 提高效率 节省时间&#xff1a;当商品数量庞大时&#xff0c;手动逐个修改商品分类是一项极其耗时的任务。例如&#xff0c;一个电商平台有数千种商品&#xff0c;如果手动操作&#xff0c;可能需要花费数天甚至数周的时间来完成分类转移。而批量设置功能可以…...

2024年中国各省份碳相关投资分析:区域差异与未来趋势

随着中国“双碳”目标的推进&#xff0c;各省份的碳相关投资逐渐成为推动绿色经济转型的关键力量。2024年&#xff0c;各地的双碳项目进入了快速发展阶段&#xff0c;尤其是在清洁能源、绿色技术和碳捕集领域。本文将分析中国各省份在碳减排、碳中和目标实现过程中的投资重点和…...

【六足机器人】03步态算法

温馨提示&#xff1a;此部分内容需要较强的数学能力&#xff0c;包括但不限于矩阵运算、坐标变换、数学几何。 一、数学知识 1.1 正逆运动学&#xff08;几何法&#xff09; 逆运动学解算函数 // 逆运动学-->计算出三个角度 void inverse_caculate(double x, double y, …...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)

目录 1.TCP的连接管理机制&#xff08;1&#xff09;三次握手①握手过程②对握手过程的理解 &#xff08;2&#xff09;四次挥手&#xff08;3&#xff09;握手和挥手的触发&#xff08;4&#xff09;状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

Selenium常用函数介绍

目录 一&#xff0c;元素定位 1.1 cssSeector 1.2 xpath 二&#xff0c;操作测试对象 三&#xff0c;窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四&#xff0c;弹窗 五&#xff0c;等待 六&#xff0c;导航 七&#xff0c;文件上传 …...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...

Java详解LeetCode 热题 100(26):LeetCode 142. 环形链表 II(Linked List Cycle II)详解

文章目录 1. 题目描述1.1 链表节点定义 2. 理解题目2.1 问题可视化2.2 核心挑战 3. 解法一&#xff1a;HashSet 标记访问法3.1 算法思路3.2 Java代码实现3.3 详细执行过程演示3.4 执行结果示例3.5 复杂度分析3.6 优缺点分析 4. 解法二&#xff1a;Floyd 快慢指针法&#xff08;…...

HTTPS证书一年多少钱?

HTTPS证书作为保障网站数据传输安全的重要工具&#xff0c;成为众多网站运营者的必备选择。然而&#xff0c;面对市场上种类繁多的HTTPS证书&#xff0c;其一年费用究竟是多少&#xff0c;又受哪些因素影响呢&#xff1f; 首先&#xff0c;HTTPS证书通常在PinTrust这样的专业平…...

写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里

写一个shell脚本&#xff0c;把局域网内&#xff0c;把能ping通的IP和不能ping通的IP分类&#xff0c;并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...