Flink+Paimon多流拼接性能优化实战
目录
(零)本文简介
(一)背景
(二)探索梳理过程
(三)源码改造
(四)修改效果
1、JOB状态
2、Level5的dataFile总大小
3、数据延迟
(五)未来展望:异步Compact
(零)本文简介
Paimon多流拼接/合并性能优化;
为解决离线T+1多流拼接数据时效性、Flink实时状态太大任务稳定性问题,这里基于数据湖工具Apache Paimon进行近实时的多流拼接。
使用Flink+Paimon基于ParmaryKey Table(PartialUpdate)进行多流拼接的时候,跑一段时间有时会遇到周期性背压、checkpoint时间过长等情况,本文通过剖析源码逻辑、修改源码,在一定程度上解决了这个问题。
Apache Paimon基础 、多流拼接方法 及 与Hudi 的对比 可参考前面文章:
新一代数据湖存储技术Apache Paimon入门Demo_Leonardo_KY的博客-CSDN博客
基于数据湖的多流拼接方案-HUDI概念篇_Leonardo_KY的博客-CSDN博客
(一)背景
这里使用 Flink 1.14 + Apache Paimon 0.5 snapshot 进行多流拼接(前端埋点流 + 服务端埋点流);
当前情况是一天一个分区,一个分区100个bucket;就会出现如下情况:分区/bucket中的数据越来越多,到达下午或者傍晚的时候就会出现 paimon 作业周期性背压(因为mergeTree中维护的数据越来越多,tree越来越大),checkpoint时间也会比较长;于是决定将mergeTree中的过期数据删除,即让其不进入tree中,减少计算量;
这里的“过期”按需自定义,比如调研发现99.9%的数据都可以使用3个小时之内的数据拼接上,那就根据时间戳与当前时间戳(假设没有很严重的消费积压)相比,时间差超过3小时的数据就将其丢弃;
具体细节涉及到(这里先将结论给出):
-  - data文件创建后是否还会修改?(不会)
- 根据时间排序的data数据文件是增量还是全量?(几个最新文件加起来就是全量)
- 应该根据dataFile的创建/修改时间判断过期 还是 通过具体每个record字段值的时间戳判断过期?(通过record)
 
(二)探索梳理过程
1、首先观察hdfs文件之后发现,dataFile只保留最近一个小时的文件,超过一小时的文件就会被删除,这里应该对应参数 partition.expiration-check-interval = 1h,由此可知data文件不是增量的【下文compact只有几个文件再次加强验证】(那么就不能通过dataFile的最新修改时间判断文件过期将数据过滤);
2、观察flink log发现,每次compaction都只读几个文件,如下所示:

每次其实只读取一个level0的file,再加上几个level5的file(level5这里file就是之前的全部数据,包含多个流的),最后将compact之后的文件再命名为新的名字写到level5;
随着分区数据量的增多,参与compact的file也会越来越多(这也是会导致tree偏大,出现周期性背压的原因);
另外,dataFile命名呈现如下规律:
level5的第二个文件总是跟第一个中间隔一个(这个跟改源码没有关系,只是适合观察规律);

到晚间的时候参与compact的file更多了:

3、观察每次level5生成的dataFile(理论上level5的dataFile会越来越大/多,当单个文件大小超过128M *(1+rate)时,会生成新文件);
所有level5的文件大小加起来会越来越大,即永远是呈增长趋势;
如下每一层的总大小在不断增大,同时当文件到一定程度之后,每层2个文件变成3个文件;

4、【以上3点均为原始实现思路,从这里开始改造】思考:既然已知每个bucket中只要最新的几个dataFile就包含了全部的data数据(dataFile不是增量的),那么就不能通过文件最新修改时间来判断数据是否过期,只能从最新的几个dataFile的每条记录来进行判断了,即原本每次参与合并的record是从这个partition+bucket建立开始的全部数据,那么是否可以通过修改源码判断每条record是否过期,从而不参与mergeTree,在compact完成之后也不会再次写入新的dataFile(如果还是写进来,每次读进tree时都需要判断是否过期,是否进入tree)?【答案当然是可以的!】
(三)源码改造
1、首先说明一下,在源码中有这么一段
// IntervalPartition.partition()
public List<List<SortedRun>> partition() {List<List<SortedRun>> result = new ArrayList<>();List<DataFileMeta> section = new ArrayList<>();BinaryRow bound = null;for (DataFileMeta meta : files) {if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {// larger than current right bound, conclude current section and create a new oneresult.add(partition(section));section.clear();bound = null;}section.add(meta);if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {// update right boundbound = meta.maxKey();}}if (!section.isEmpty()) {// conclude last sectionresult.add(partition(section));}return result;
}此处为了将文件排序、再将有overlap的放在一个list里边,一但产生gap(即没有overlap),那么就创建新的list,最终将这些 list 再放到List>中:
示意图如下:

2、后续通过一些处理变成 List> 的格式,这里的KeyValue就包含我们想要去操纵的record!
源码是这样的:
public <T> RecordReader<T> mergeSort(List<ReaderSupplier<KeyValue>> lazyReaders,Comparator<InternalRow> keyComparator,MergeFunctionWrapper<T> mergeFunction)throws IOException {if (ioManager != null && lazyReaders.size() > spillThreshold) {return spillMergeSort(lazyReaders, keyComparator, mergeFunction);}List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());for (ReaderSupplier<KeyValue> supplier : lazyReaders) {try {readers.add(supplier.get());} catch (IOException e) {// if one of the readers creating failed, we need to close them all.readers.forEach(IOUtils::closeQuietly);throw e;}}return SortMergeReader.createSortMergeReader(readers, keyComparator, mergeFunction, sortEngine);
}这里的return就会创建sortMergeReader了,我们可以在将数据传入这里之前,先进行过滤(通过判断每一条record是否超过过期时间),修改如下:
public <T> RecordReader<T> mergeSort(List<ReaderSupplier<KeyValue>> lazyReaders,Comparator<InternalRow> keyComparator,MergeFunctionWrapper<T> mergeFunction)throws IOException {if (ioManager != null && lazyReaders.size() > spillThreshold) {return spillMergeSort(lazyReaders, keyComparator, mergeFunction);}List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());for (ReaderSupplier<KeyValue> supplier : lazyReaders) {try {// 过滤掉过期数据RecordReader<KeyValue> filterSupplier =supplier.get().filter((KeyValue keyValue) ->isNotExpiredRecord(keyValue.value(), expireTimeMillis));readers.add(filterSupplier);} catch (IOException e) {// if one of the readers creating failed, we need to close them all.readers.forEach(IOUtils::closeQuietly);throw e;}}return SortMergeReader.createSortMergeReader(readers,keyComparator,mergeFunction,sortEngine,keyType.getFieldTypes(),valueType.getFieldTypes());
}// 判断这条数据是否过期
public boolean isNotExpiredRecord(InternalRow row, long expireTimeMillis) {if (expireTimeMillis <= 0) {return true;}// 只要有一个字段不为空,且大于0,且过期时间大于expireTimeMillis,就判断为过期for (Integer pos : expireFieldsPosSet) {if ((!row.isNullAt(pos))&& row.getLong(pos) > 0&& (System.currentTimeMillis() - row.getLong(pos)) > expireTimeMillis) {return false;}}return true;
}与此同时,将相关参数暴露出来,可以在建表时进行自定义配置:
public static final ConfigOption<Integer> RECORDS_EXPIRED_HOUR =key("record.expired-hour").intType().defaultValue(-1).withDescription("Records in streams WON'T be offered into MergeTree when they are expired."+ " (Inorder to avoid too large MergeTree; -1 means never expired). ");public static final ConfigOption<String> RECORDS_EXPIRED_FIELDS =key("record.expired-fields").stringType().noDefaultValue().withDescription("Records in streams WON'T be offered into MergeTree when they are judged as [expired] according to these fields."+ "If you specify multiple fields, delimiter is ','.");使用方法:
val createPaimonJoinTable = (s"CREATE TABLE IF NOT EXISTS ${paimonTable}(\n"+ " uuid STRING,\n"+ " metaid STRING,\n"+ " cid STRING,\n"+ " area STRING,\n"+ " ts1 bigint,\n"+ " ts2 bigint,\n"+ " d STRING, \n"+ " PRIMARY KEY (d, uuid) NOT ENFORCED \n"+ ") PARTITIONED BY (d) \n"+ " WITH (\n" +"    'merge-engine' = 'partial-update',\n" +"    'changelog-producer' = 'full-compaction', \n" +"    'file.format' = 'orc', \n" +s"    'sink.managed.writer-buffer-memory' = '${sinkWriterBuffer}', \n" +s"    'full-compaction.delta-commits' = '${fullCompactionCommits}', \n" +s"    'scan.mode' = '${scanMode}', \n" +s"    'bucket' = '${bucketNum}', \n" +s"    'sink.parallelism' = '${sinkTaskNum}', \n" +s"    'record.expired-hour' = '3' , \n" +   // user defined para"     'record.expired-fileds' = '4,5' , \n" +   // user defined para"     'sequence.field' = 'ts1' \n" +")")
tableEnv.executeSql(createPaimonJoinTable)(四)修改效果
1、JOB状态
运行到晚上20点尚未出现背压:

checkpoint时间也没有过长(如果不剔除过期数据,到这个时间cp时长应该在3分钟左右):

生产到Kafka的消息也没有严重的断流或者锯齿现象:

还是有可能出现exception如下(但对数据量没有任何影响):


2、Level5的dataFile总大小
上边只是现象,最终还是要数据说话。
修改源码之后,观察dataFile,理论上每一层的size总大小可能会出现减小的情况 (因为过期数据就不会再写入到 level5 新的data文件中了)
如下图:levelSize diff(下一次level总size - 上一次level总size),确实出现了“有正有负”的情况,于是验证源码修改生效(即每次进行compact只会读取近 n 个小时的数据进行合并)!

3、数据延迟
有意思的是,当我们修改源码(将过期的数据丢弃)之后,数据延迟也变小了。
数据延迟计算方法:paimon处理完将数据写到kafka队列的时间戳 - 前端埋点被触发被服务器接收到的时间戳;
修改前:

修改后:

(五)未来展望:异步Compact
官方提供的paimon源码,里边的compaction是 sync 模式的,我尝试改成过 async 的,但是时不时会出现很少量的数据丢失(感觉可能是因为同一时刻有多个compact任务在进行),后续有机会可以再继续尝试一下。
相关文章:
 
Flink+Paimon多流拼接性能优化实战
目录 (零)本文简介 (一)背景 (二)探索梳理过程 (三)源码改造 (四)修改效果 1、JOB状态 2、Level5的dataFile总大小 3、数据延迟 (五&…...
 
cocos 2.4 版本 设置物理引擎步长 解决帧数不一致的设备 物理表现不一致问题 设置帧刷新率
官网地址Cocos Creator 3.8 手册 - 2D 物理系统 官网好像写的不太对 下面是我自己运行好使的 PhysicsManager.openPhysicsSystem()var manager cc.director.getPhysicsManager();// 开启物理步长的设置manager.enabledAccumulator true;// cc.PhysicsManagercc.PhysicsManag…...
 
Spark及其生态简介
一、Spark简介 Spark 是一个用来实现快速而通用的集群计算的平台,官网上的解释是:Apache Spark™是用于大规模数据处理的统一分析引擎。 Spark 适用于各种各样原先需要多种不同的分布式平台的场景,包括批处理、迭代算法、交互式查询、流处理…...
 
从Instagram到TikTok:利用社交媒体平台实现业务成功
自 2000年代初成立和随后兴起以来,社交媒体一直被大大小小的品牌用作高度针对性的营销工具,自 Facebook推出近二十年以来,这些网站继续彻底改变企业处理广告的方式。 在这篇博文中,我们将讨论订阅企业应该如何从整体上对待社交媒…...
单元测试
1. 单元测试Junit 1.1 什么是单元测试?(掌握) 对部分代码进行测试。 1.2 Junit的特点?(掌握) 是一个第三方的工具。(把别人写的代码导入项目中)(专业叫法:…...
 
科技云报道:AI+云计算共生共长,能否解锁下一个高增长空间?
科技云报道原创。 在过去近一年的时间里,AI大模型从最初的框架构建,逐步走到落地阶段。 然而,随着AI大模型深入到千行百业中,市场开始意识到通用大模型虽然功能强大,但似乎并不能完全满足不同企业的个性化需求。 大…...
 
ReactPy:使用 Python 构建动态前端应用程序
在 Web 开发领域,ReactJS 已成为主导者,为开发人员提供了用于创建动态和交互式用户界面的强大工具集。但是,如果您更喜欢 Python 的多功能性和简单性作为后端,并且希望在前端也利用它的功能,该怎么办?ReactPy 是一个 Python 库,它将熟悉的 ReactJS 语法和灵活性带入了 P…...
安全攻防基础以及各种漏洞库
安全攻防基础以及各种漏洞库 信息搜集企业信息搜集1. 企业架构2. ICP备案查询,确定目标子域名3. 员工信息(搜集账号信息、钓鱼攻击)4. 社交渠道 域名信息搜集IP搜集信息泄露移动端搜集打点进内网命令和控制(持续控制)穿…...
 
护眼灯值不值得买?开学给孩子买什么样的护眼台灯
如果不想家里的孩子年纪小小的就戴着眼镜,从小就容易近视,那么护眼灯的选择就非常重要了,但是市场上那么多品类,价格也参差不齐,到底怎么选呢?大家一定要看完本期内容。为大家推荐五款热门的护眼台灯 一、…...
 
windows安装Scala
Windows安装Scala 下载地址:https://downloads.lightbend.com/scala/2.11.11/scala-2.11.11.zip 解压完成之后 配置环境变量...
 
API类型和集成规范指南
在我们的常见应用中,往往包含着大量服务于各种数据交换的API类型、以及各种常见的API架构与协议。下面,我将从集成的角度和您讨论,在准备将多个服务相互集成时,使用不同类型、架构和协议的API意味着什么?我们可以使用哪些工具&am…...
 
[ES]mac安装es、kibana、ik分词器
一、安装es和kibana 1、创建一个网络,网络内的框架(eskibana)互联 docker network create es-net 2、下载es和kibana docker pull elasticsearch:7.12.1 docker pull kibana:7.12.1 3、运行docker命令部署单点eskibana(用来操作es) doc…...
 
YOLO目标检测——视觉显著性检测MSRA1000数据集下载分享
MSRA1000数据集是一个常用的视觉显著性检测数据集,它包含了1000张图像和对应的显著性标注。在以下几个应用场景中,MSRA1000数据集可以发挥重要作用:图像编辑和后期处理、图像检索和分类、视觉注意力模型、自动驾驶和智能交通等等 数据集点击下…...
 
【基于空间纹理的残差网络无监督Pansharpening】
Unsupervised Pansharpening method Using Residual Network with Spatial Texture Attention (基于空间纹理的残差网络无监督泛锐化方法) 近年来,深度学习已经成为最受欢迎的泛锐化工具之一,许多相关方法已经被研究并反映出良好…...
 
2023年信息安全管理与评估(赛项)评分标准第三阶段夺旗挑战CTF(网络安全渗透)
全国职业院校技能大赛 高职组 信息安全管理与评估 (赛项) 评分标准 第三阶段 夺旗挑战CTF(网络安全渗透) 竞赛项目赛题 本文件为信息安全管理与评估项目竞赛-第三阶段赛题,内容包括:夺旗挑战CTF(…...
 
开启智能时代:深度解析智能文档分析技术的前沿与应用
开启智能时代:深度解析智能文档分析技术的前沿与应用 本章主要介绍文档分析技术的理论知识,包括背景介绍、算法分类和对应思路。通过本文学习,你可以掌握:1. 版面分析的分类和典型思想 2. 表格识别的分类和典型思想 3. 信息提取的…...
 
高级时钟项目
高级时钟项目 笔者来介绍一下一个简单的时钟项目,主要功能就是显示时间 1、背景 2、数码管版本(第一版) 3、OLED屏幕版本(第二版) 3.1、Boot 3.2、app 3.3、上位机 界面一:时间天气显示 界面二 &…...
 
跨境海淘攻略:如何实现自己批量养买家账号海淘
近年来,随着互联网的发展,网购已经成为人们日常生活中不可或缺的一部分。不仅在国内购买商品,在跨境电商行业越来越成熟,很多的消费者开始选择购买国外平台商品,价格相比国内专柜来说会更为优惠。因此,海淘…...
【lua】在微软 windows 系统上安装 lua
https://sourceforge.net/projects/luabinaries...
 
系统学习Linux-PXE无人值守装机(附改密)
目录 pxe实现系统自动安装pxe工作原理 大致的工作过程如下: PXE的组件: 一、配置vsftpd 二、配置tftp 三、准备pxelinx.0文件、引导文件、内核文件 四、配置dhcp 配置ip 配置dhcp 五、创建default文件 六、新建测试主机用来测试装机效果 七、…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
 
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
 
C++实现分布式网络通信框架RPC(3)--rpc调用端
目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中,我们已经大致实现了rpc服务端的各项功能代…...
 
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
 
工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...
 
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
