当前位置: 首页 > news >正文

iceberg系列之 hadoop catalog 小文件合并实战

  1. 背景
    flink1.15 hadoop3.0
  2. pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.iceberg</groupId><artifactId>flink-iceberg</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--idea运行时也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>compile</scope></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime-1.15</artifactId><version>1.3.0</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-core</artifactId><version>1.3.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><!-- 指定主类 --><mainClass>com.iceberg.flink.UnionDelData</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
  1. 资源配置文件
    hadoop三个常用配置文件core-site.xml hdfs-site.xml yarn-site.xml 放到资源目录下
  2. java代码
package com.iceberg.flink;import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;import java.io.File;
import java.net.MalformedURLException;public class UnionDelData {public static void main(String[] args) throws MalformedURLException {      String tableNames = args[1];long targetsSize = parseSizeToBytes(args[2]);int parallelism = Integer.parseInt(args[3]);long retainTime = parseTimeToMillis(args[4]);int retainLastNum = Integer.parseInt(args[5]);Configuration conf = new Configuration();conf.addResource(new File("/home/hadoop/hadoopconf/core-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/hdfs-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/yarn-site.xml").toURI().toURL());HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, "/user/hadoop/path/");for (String tableName : tableNames.split(",")) {Table table = hadoopCatalog.loadTable(TableIdentifier.of("prod", tableName));UnionDataFile(table,parallelism,targetsSize);deleteSnap(table,retainTime,retainLastNum);}}public static void UnionDataFile(Table table,int parallelism,long targetsSize) {Actions.forTable(table).rewriteDataFiles().maxParallelism(parallelism).caseSensitive(false).targetSizeInBytes(targetsSize).execute();}public static void deleteSnap(Table table,long retainTime,int retainLastNum){Snapshot snapshot = table.currentSnapshot();long oldSnapshot = snapshot.timestampMillis() - retainTime;if (snapshot != null) {            table.expireSnapshots().expireOlderThan(oldSnapshot).cleanExpiredFiles(true).retainLast(retainLastNum).commit();}}public static long parseSizeToBytes(String sizeWithUnit) {long size = Long.parseLong(sizeWithUnit.substring(0, sizeWithUnit.length() - 1));char unit = sizeWithUnit.charAt(sizeWithUnit.length() - 1); switch (unit) {case 'B':return size;case 'K':case 'k': return size * 1024;case 'M':case 'm': return size * 1024 * 1024;case 'G':case 'g': return size * 1024 * 1024 * 1024;default:throw new IllegalArgumentException("Invalid size unit: " + unit);}}public static long parseTimeToMillis(String timeWithUnit) {long time = Long.parseLong(timeWithUnit.substring(0, timeWithUnit.length() - 1));char unit = timeWithUnit.charAt(timeWithUnit.length() - 1);switch (unit) {case 's':case 'S':return time * 1000;case 'm':case 'M':return time * 60 * 1000;case 'h':case 'H':return time * 60 * 60 * 1000;case 'd':case 'D':return time * 24 * 60 * 60 * 1000;default:throw new IllegalArgumentException("Invalid time unit: " + unit);}}
}

相关文章:

iceberg系列之 hadoop catalog 小文件合并实战

背景 flink1.15 hadoop3.0pom文件 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://mave…...

神经网络基础-神经网络补充概念-25-深层神经网络

简介 深层神经网络&#xff08;Deep Neural Network&#xff0c;DNN&#xff09;是一种具有多个隐藏层的神经网络&#xff0c;它可以用来解决复杂的模式识别和特征学习任务。深层神经网络在近年来的机器学习和人工智能领域中取得了重大突破&#xff0c;如图像识别、自然语言处…...

MySQL— 基础语法大全及操作演示!!!(上)

MySQL—— 基础语法大全及操作演示&#xff08;上&#xff09; 一、MySQL概述1.1 、数据库相关概念1.1.1 MySQL启动和停止 1.2 、MySQL 客户端连接1.3 、数据模型 二、SQL2.1、SQL通用语法2.2、SQL分类2.3、DDL2.3.1 DDL — 数据库操作2.3.1 DDL — 表操作 2.4、DML2.4.1 DML—…...

[golang gin框架] 46.Gin商城项目-微服务实战之后台Rbac客户端调用微服务权限验证以及Rbac微服务数据库抽离

一. 根据用户的权限动态显示左侧菜单微服务 1.引入 后台Rbac客户端调用微服务权限验证功能主要是: 登录后显示用户名称、根据用户的权限动态显示左侧菜单,判断当前登录用户的权限 、没有权限访问则拒绝,参考[golang gin框架] 14.Gin 商城项目-RBAC管理,该微服务功能和上一节[g…...

域名和ip的关系

域名和ip的关系 一&#xff1a;什么是域名 域名&#xff0c;简称域名、网域&#xff0c;是由一串用点分隔的名字组成的上某一台计算机或计算机组的名称&#xff0c;用于在数据传输时标识 计算机的电子方位(有时也指地理位置)。网域名称系统&#xff0c;有时也简称为域名…...

excel日期函数篇1

1、DAY(serial_number)&#xff1a;返回序列数表示的某月的天数 在括号内给出一个时间对象或引用一个时间对象&#xff08;年月日&#xff09;&#xff0c;返回多少日 下面结果都为20 2、MONTH(serial_number)&#xff1a;返回序列数表示的某年的月份 在括号内给出一个时间对…...

Leetcode151 翻转字符串中的单词

给你一个字符串 s &#xff0c;请你反转字符串中 单词 的顺序。 单词 是由非空格字符组成的字符串。s 中使用至少一个空格将字符串中的 单词 分隔开。 返回 单词 顺序颠倒且 单词 之间用单个空格连接的结果字符串。 注意&#xff1a;输入字符串 s中可能会存在前导空格、尾随空格…...

PHP FTP的相关函数及简单使用示例

简介 FTP是ARPANet的标准文件传输协议&#xff0c;该网络就是现今Internet的前身。 PHP FTP函数是通过文件传输协议提供对文件服务器的客户端访问&#xff0c;FTP函数用于打开、登陆以及关闭连接&#xff0c;也用于上传、下载、重命名、删除以及获取服务器上文件信息。 安装 …...

高光谱 | 矿物识别和分类标签数据制作、农作物病虫害数据分类、土壤有机质含量回归与制图、木材含水量评估和制图

本课程提供一套基于Python编程工具的高光谱数据处理方法和应用案例。 本课程涵盖高光谱遥感的基础、方法和实践。基础篇以学员为中心&#xff0c;用通俗易懂的语言解释高光谱的基本概念和理论&#xff0c;旨在帮助学员深入理解科学原理。方法篇结合Python编程工具&#xff0c;…...

【数据结构】二叉树篇| 纲领思路01+刷题

博主简介&#xff1a;努力学习的22级计算机科学与技术本科生一枚&#x1f338;博主主页&#xff1a; 是瑶瑶子啦每日一言&#x1f33c;: 所谓自由&#xff0c;不是随心所欲&#xff0c;而是自我主宰。——康德 目录 一、二叉树刷题纲领二、刷题1、104. 二叉树的最大深度2、 二叉…...

系统架构设计师---计算机基础知识之数据库系统结构与规范化

目录 一、基本概念 二、 数据库的结构 三、常用的数据模型 概念数据模型...

PyCharm连接Docker中的容器(ubuntu)

一、为什么要用Pycharm链接Docker中的ubuntu 因为在进行深度学习的时候&#xff0c;基于windows系统在开发的过程中&#xff0c;老是出现很多问题&#xff0c;大多数是环境问题。 尽管安装了Conda&#xff0c;也不能很好的解决问题&#xff0c;使用ubuntu是最好的选择。 二、…...

安防视频汇聚平台EasyCVR视频监控综合管理平台H.265转码功能更新,新增分辨率配置的具体步骤

安防视频集中存储EasyCVR视频监控综合管理平台可以根据不同的场景需求&#xff0c;让平台在内网、专网、VPN、广域网、互联网等各种环境下进行音视频的采集、接入与多端分发。在视频能力上&#xff0c;视频云存储平台EasyCVR可实现视频实时直播、云端录像、视频云存储、视频存储…...

全平台数据(数据库)管理工具 DataCap 管理 Rainbond 上的所有数据库

DataCap是用于数据转换、集成和可视化的集成软件&#xff0c;支持多种数据源、文件类型、大数据相关数据库、关系数据库、NoSQL数据库等。通过该 DataCap 可以实现对多个数据源的管理&#xff0c;对数据源下的数据进行各种操作转换&#xff0c;制作数据图表&#xff0c;监控数据…...

“深入探究JVM内部机制:从字节码到实际执行“

标题&#xff1a;深入探究JVM内部机制&#xff1a;从字节码到实际执行 摘要&#xff1a;本文将深入探究Java虚拟机&#xff08;JVM&#xff09;的内部机制&#xff0c;从字节码的生成、类加载、字节码解释和即时编译等环节&#xff0c;详细介绍JVM是如何将Java程序的字节码转化…...

C++写文件,直接写入结构体

C写文件&#xff0c;直接写入结构体 以前写文件都是写入字符串或者二进制再或者就是一些配置文件&#xff0c;今天介绍一下直接写入结构体&#xff0c;可以在软件参数较多的时候直接进行读写&#xff0c;直接将整个结构体写入和读取&#xff0c;看代码&#xff1a; #include&…...

【Spring专题】Spring之Bean的生命周期源码解析——阶段二(二)(IOC之属性填充/依赖注入)

目录 前言阅读准备阅读指引阅读建议 课程内容一、依赖注入方式&#xff08;前置知识&#xff09;1.1 手动注入1.2 自动注入1.2.1 XML的autowire自动注入1.2.1.1 byType&#xff1a;按照类型进行注入1.2.1.2 byName&#xff1a;按照名称进行注入1.2.1.3 constructor&#xff1a;…...

线程|线程的使用、四种实现方式

1.线程的实现方式 1.用户级线程 开销小&#xff0c;用户空间就可以创建多个。缺点是&#xff1a;内核无法感知用户级多个线程的存在&#xff0c;把其当作只有一个线程&#xff0c;所以只会提供一个处理器。 2.内核级线程 相对于用户级开销稍微大一点&#xff0c;可以利用多…...

Facebook 应用未启用:这款应用目前无法使用,应用开发者已得知这个问题。

错误&#xff1a;Facebook 应用未启用:这款应用目前无法使用&#xff0c;应用开发者已得知这个问题。应用重新启用后&#xff0c;你便能登录。 「应用未经过审核或未发布」&#xff1a; 如果一个应用还没有经过Facebook的审核或者开发者尚未将应用发布&#xff0c;那么它将无法…...

(十八)大数据实战——Hive的metastore元数据服务安装

前言 Hive的metastore服务作用是为Hive CLI或者Hiveserver2提供元数据访问接口。Hive的metastore 是Hive元数据的存储和管理组件&#xff0c;它负责管理 Hive 表、分区、列等元数据信息。元数据是描述数据的数据&#xff0c;它包含了关于表结构、存储位置、数据类型等信息。本…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

OpenLayers 可视化之热力图

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 热力图&#xff08;Heatmap&#xff09;又叫热点图&#xff0c;是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》

引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)

1.获取 authorizationCode&#xff1a; 2.利用 authorizationCode 获取 accessToken&#xff1a;文档中心 3.获取手机&#xff1a;文档中心 4.获取昵称头像&#xff1a;文档中心 首先创建 request 若要获取手机号&#xff0c;scope必填 phone&#xff0c;permissions 必填 …...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

Fabric V2.5 通用溯源系统——增加图片上传与下载功能

fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...

CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)

漏洞概览 漏洞名称&#xff1a;Apache Flink REST API 任意文件读取漏洞CVE编号&#xff1a;CVE-2020-17519CVSS评分&#xff1a;7.5影响版本&#xff1a;Apache Flink 1.11.0、1.11.1、1.11.2修复版本&#xff1a;≥ 1.11.3 或 ≥ 1.12.0漏洞类型&#xff1a;路径遍历&#x…...