Apache Hudi初探(五)(与flink的结合)--Flink 中hudi clean操作
背景
本文主要是具体说说Flink中的clean操作的实现
杂说闲谈
在flink中主要是CleanFunction函数:
@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();String instantTime = HoodieActiveTimeline.createNewInstantTime();LOG.info(String.format("exec clean with instant time %s...", instantTime));executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish");}@Overridepublic void notifyCheckpointComplete(long l) throws Exception {if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {executor.execute(() -> {try {this.writeClient.waitForCleaningFinish();} finally {// ensure to switch the isCleaning flagthis.isCleaning = false;}}, "wait for cleaning finish");}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {try {this.writeClient.startAsyncCleaning();this.isCleaning = true;} catch (Throwable throwable) {// catch the exception to not affect the normal checkpointingLOG.warn("Error while start async cleaning", throwable);}}}
-
open函数
-
writeClient =FlinkWriteClients.createWriteClient(conf, getRuntimeContext())
创建FlinkWriteClient,用于写hudi数据 -
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
创建一个只有一个线程的线程池,改线程池的主要作用来异步执行hudi写操作 -
executor.execute(() -> writeClient.clean(instantTime)
异步执行hudi的清理操作,该clean函数的主要代码如下:if (!tableServicesEnabled(config)) {return null;}final Timer.Context timerContext = metrics.getCleanCtx();CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));HoodieTable table = createTable(config, hadoopConf);if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {LOG.info("Cleaner started");// proceed only if multiple clean schedules are enabled or if there are no pending cleans.if (scheduleInline) {scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);table.getMetaClient().reloadActiveTimeline();}}// Proceeds to execute any requested or inflight clean instances in the timelineHoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);if (timerContext != null && metadata != null) {long durationMs = metrics.getDurationInMs(timerContext.stop());metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()+ " cleanerElapsedMs" + durationMs);}return metadata;-
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION,() -> rollbackFailedWrites *
根据配置hoodie.cleaner.policy.failed.writes* 默认是EAGER,也就是在写数据失败的时候,会立即进行这次写失败的数据的清理,在这种情况下,
就不会执行rollbackFailedWrites操作,也就是回滚写失败文件的操作 -
HoodieTable table = createTable *
创建HoodieFlinkMergeOnReadTable*类型的hudi表,用来做clean等操作 -
scheduleTableServiceInternal
如果hoodie.clean.allow.multiple为true(默认为true)或者没有正在运行中clean操作,则会生成Clean计划
这里最终调用的是FlinkWriteClient.scheduleCleaning方法,即CleanPlanActionExecutor.execute方法这里最重要的就是requestClean方法:
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config); Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain(); List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant) int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism).stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue)) Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue()))) List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey).collect(Collectors.toList()) return new HoodieCleanerPlan(earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),planner.getLastCompletedCommitTimestamp(),config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete)- planner.getEarliestCommitToRetain();
根据保留策略,获取到最早需要保留的commit的HoodieInstant,在这里会兼顾考虑到hoodie.cleaner.commits.retained(默认是10)以及hoodie.cleaner.hours.retained默认是24小时以及hoodie.cleaner.policy策略(默认是KEEP_LATEST_COMMITS) - planner.getPartitionPathsToClean(earliestInstant);
根据保留的最新commit的HoodieInstant,得到要删除的分区,这里会根据配置hoodie.cleaner.incremental.mode(默认是true)来进行增量清理,
这个时候就会根据上一次已经clean的信息,只需要删除差量的分区数据就行 - cleanOpsWithPartitionMeta = context
根据上面得到的需要删除的分区信息,获取需要删除的文件信息,具体的实现可以参考CleanPlanner.getFilesToCleanKeepingLatestCommits
这里的操作主要是先通过fileSystemView获取分区下所有的FileGroup,之后再获取每个FileGroup下的所有的FileSlice(这里的FileSlice就有版本的概念,也就是commit的版本),之后再与最新保留的commit的时间戳进行比较得到需要删除的文件信息 - new HoodieCleanerPlan
最后组装成HoodieCleanPlan的计划,并且在外层调用table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); 方法把clean request的状态存储到对应的.hoodie目录下,并建立一个xxxx.clean.requested的元数据文件
- planner.getEarliestCommitToRetain();
-
table.getMetaClient().reloadActiveTimeline()
重新加载timeline,便于过滤出来刚才scheduleTableServiceInternal操作生成的xxxxxxxxxxxxxx.clean.requested的元数据文件 -
table.clean(context, cleanInstantTime, skipLocking)
真正执行clean的部分,主要是调用CleanActionExecutor.execute的方法,最终调用的是*runPendingClean(table, hoodieInstant)*方法:HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);return runClean(table, cleanInstant, cleanerPlan);首先是反序列化CleanPlan,然后在进行清理,主要是删除1. 如果没有满足的分区,直接删除该分区,2. 否则删除该分区下的满足条件的文件,最后返回HoodieCleanStat包含删除的文件信息等。
-
-
-
snapshotState方法
- 如果clean.async.enabled是true(默认是true),并且不是正在进行clean动作,则会进行异步清理
this.writeClient.startAsyncCleaning(); 这里最终也是调用的writeClient.clean方法。 - this.isCleaning = true;
设置标志位,用来保证clean操作的有序性
- 如果clean.async.enabled是true(默认是true),并且不是正在进行clean动作,则会进行异步清理
-
notifyCheckpointComplete方法
- 如果clean.async.enabled是true(默认是true),并且正在进行clean动作,则等待clean操作完成,
并且设置清理标识位,用来和snapshotState方法进行呼应以保证clean操作的有序性
- 如果clean.async.enabled是true(默认是true),并且正在进行clean动作,则等待clean操作完成,
相关文章:
Apache Hudi初探(五)(与flink的结合)--Flink 中hudi clean操作
背景 本文主要是具体说说Flink中的clean操作的实现 杂说闲谈 在flink中主要是CleanFunction函数: Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);this.writeClient FlinkWriteClients.createWriteClient(conf,…...
stream对list数据进行多字段去重
方法一: //根据sj和name去重 List<NursingHandover> testList list.stream().collect(Collectors.collectingAndThen(Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getj() ";" o.getName() ";&…...
一种基于体素的射线检测
效果 基于体素的射线检测 一个漏检的射线检测 从起点一直递增指定步长即可得到一个稀疏的检测 bool Raycast(Vector3 from, Vector3 forword, float maxDistance){int loop 6666;Vector3 pos from;Debug.DrawLine(from, from forword * maxDistance, Color.red);while (loo…...
利用Docker安装Protostar
文章目录 一、Protostar介绍二、Ubuntu下安装docker三、安装Protostar 一、Protostar介绍 Protostar是一个免费的Linux镜像演练环境,包含五个系列共23道漏洞分析和利用实战题目。 Protostar的安装有两种方式 第一种是下载镜像并安装虚拟机https://github.com/Exp…...
go基础语法10问
1.使用值为 nil 的 slice、map会发生啥 允许对值为 nil 的 slice 添加元素,但对值为 nil 的 map 添加元素,则会造成运行时 panic。 // map 错误示例 func main() {var m map[string]intm["one"] 1 // error: panic: assignment to entry i…...
SpringCloud + SpringGateway 解决Get请求传参为特殊字符导致400无法通过网关转发的问题
title: “SpringCloud SpringGateway 解决Get请求传参为特殊字符导致400无法通过网关转发的问题” createTime: 2021-11-24T10:27:5708:00 updateTime: 2021-11-24T10:27:5708:00 draft: false author: “Atomicyo” tags: [“tomcat”] categories: [“java”] description: …...
vim基本操作
功能: 命令行模式下的文本编辑器。根据文件扩展名自动判别编程语言。支持代码缩进、代码高亮等功能。使用方式:vim filename 如果已有该文件,则打开它。 如果没有该文件,则打开个一个新的文件,并命名为filename 模式…...
Drift plus penalty 漂移加惩罚Part1——介绍和工作原理
文章目录 正文Methodology 方法论Origins and applications 起源和应用How it works 它是怎样工作的The stochastic optimization problem 随机优化问题Virtual queues 虚拟队列The drift-plus-penalty expression 漂移加惩罚表达式Drift-plus-penalty algorithmApproximate sc…...
(四)动态阈值分割
文章目录 一、基本概念二、实例解析 一、基本概念 基于局部阈值分割的dyn_threshold()算子,适用于一些无法用单一灰度进行分割的情况,如背景比较复杂,有的部分比前景目标亮,或者有的部分比前景目标暗;又比如前景目标包…...
jvm介绍
1. JVM是什么 JVM是Java Virtual Machine的缩写,即咱们经常提到的Java虚拟机。虚拟机是一种抽象化的计算机,有着自己完善的硬件架构,如处理器、堆栈等,具体有什么咱们不做了解。目前我们只需要知道想要运行Java文件,必…...
数据结构与算法课后题-第三章(顺序队和链队)
#include <iostream> //引入头文件 using namespace std;typedef int Elemtype;#define Maxsize 5 #define ERROR 0 #define OK 1typedef struct {Elemtype data[Maxsize];int front, rear;int tag; }SqQueue;void InitQueue(SqQueue& Q) //初始化队列 {Q.rear …...
SSM - Springboot - MyBatis-Plus 全栈体系(十六)
第三章 MyBatis 三、MyBatis 多表映射 2. 对一映射 2.1 需求说明 根据 ID 查询订单,以及订单关联的用户的信息! 2.2 OrderMapper 接口 public interface OrderMapper {Order selectOrderWithCustomer(Integer orderId); }2.3 OrderMapper.xml 配置…...
k8s--storageClass自动创建PV
文章目录 一、storageClass自动创建PV1.1 安装NFS1.2 创建nfs storageClass1.3 测试自动创建pv 一、storageClass自动创建PV 这里使用NFS实现 1.1 安装NFS 安装nfs-server: sh nfs_install.sh /mnt/data03 10.60.41.0/24nfs_install.sh #!/bin/bash### How to i…...
7.3 调用函数
前言: 思维导图: 7.3.1 函数调用的形式 我的笔记: 函数调用的形式 在C语言中,调用函数是一种常见的操作,主要有以下几种调用方式: 1. 函数调用语句 此时,函数调用独立存在,作为…...
如果使用pprof来进行性能的观测和优化
1. 分析性能瓶颈 在开始优化之前,首先需要确定你的程序的性能瓶颈在哪里。使用性能分析工具(例如 Go 的内置 pprof 包)来检测程序中消耗时间和内存的地方。这可以帮助你确定需要优化的具体部分。 2. 选择适当的数据结构和算法 选择正确的数…...
在移动固态硬盘上安装Ubuntu系统和ROS2
目录 原视频准备烧录 原视频 b站鱼香ros 准备 1.在某宝上买一个usb移动固态硬盘或固态U盘,至少64G 2.下载鱼香ros烧录工具 下载第二个就行了,不然某网盘的速度下载全部要一天 下载后,选择FishROS2OS制作工具压缩包,进行解压…...
【iptables 实战】02 iptables常用命令
一、iptables中基本的命令参数 -P 设置默认策略-F 清空规则链-L 查看规则链-A 在规则链的末尾加入新规则-I num 在规则链的头部加入新规则-D num 删除某一条规则-s 匹配来源地址IP/MASK,加叹号“!”表示除这个IP外-d 匹配目标地址-i 网卡名称 匹配从这块…...
webview_flutter
查看webview内核 https://liulanmi.com/labs/core.html h5中获取设备 https://cloud.tencent.com/developer/ask/sof/105938013 https://developer.mozilla.org/zh-CN/docs/Web/API/Navigator/mediaDevices web资源部署后navigator获取不到mediaDevices实例的解决方案&…...
【GESP考级C++】1级样题 闰年统计
GSEP 1级样题 闰年统计 题目描述 小明刚刚学习了如何判断平年和闰年,他想知道两个年份之间(包含起始年份和终止年份)有几个闰年。你能帮帮他吗? 输入格式 输入一行,包含两个整数,分别表示起始年份和终止…...
CentOS密码重置
背景: 我有一个CentOS虚拟机,但是密码忘记了,偶尔记起可以重置密码,于是今天尝试记录一下,又因为我最近记性比较差,所以必须要记录一下。 过程: 1、在引导菜单界面(grubÿ…...
手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
渲染学进阶内容——模型
最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...
智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配
AI3D视觉的工业赋能者 迁移科技成立于2017年,作为行业领先的3D工业相机及视觉系统供应商,累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成,通过稳定、易用、高回报的AI3D视觉系统,为汽车、新能源、金属制造等行…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...
数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !
我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...
面试高频问题
文章目录 🚀 消息队列核心技术揭秘:从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"?性能背后的秘密1.1 顺序写入与零拷贝:性能的双引擎1.2 分区并行:数据的"八车道高速公路"1.3 页缓存与批量处理…...
