Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)
背景
本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则,CoalesceShufflePartitions的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。
结论
OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)这种情况的时候,如果col的值是固定的,比如说值永远是20240320,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。
分析
直接到OptimizeSkewInRebalancePartitions中的代码中来:
override def apply(plan: SparkPlan): SparkPlan = {if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {return plan}plan transformUp {case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>tryOptimizeSkewedPartitions(stage)}}
如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false(默认是true),那么就不会应用此规则,那么如果Col为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:

假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。
在看合并的计算公式,该数据流如下:
tryOptimizeSkewedPartitions||\/optimizeSkewedPartitions||\/ShufflePartitionsUtil.createSkewPartitionSpecs||\/ShufflePartitionsUtil.splitSizeListByTargetSize
splitSizeListByTargetSize方法中涉及到的参数解释如下 :
- 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:

比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.
- 参数targetSize 为
spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如说是256MB - 参数smallPartitionFactor为
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor的值,默认是0.2
这里有个计算公式:
def tryMergePartitions() = {// When we are going to start a new partition, it's possible that the current partition or// the previous partition is very small and it's better to merge the current partition into// the previous partition.val shouldMergePartitions = lastPartitionSize > -1 &&((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||(currentPartitionSize < targetSize * smallPartitionFactor ||lastPartitionSize < targetSize * smallPartitionFactor))if (shouldMergePartitions) {// We decide to merge the current partition into the previous one, so the start index of// the current partition should be removed.partitionStartIndices.remove(partitionStartIndices.length - 1)lastPartitionSize += currentPartitionSize} else {lastPartitionSize = currentPartitionSize}}。。。while (i < sizes.length) {// If including the next size in the current partition exceeds the target size, package the// current partition and start a new partition.if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {tryMergePartitions()partitionStartIndices += icurrentPartitionSize = sizes(i)} else {currentPartitionSize += sizes(i)}i += 1}tryMergePartitions()partitionStartIndices.toArray
这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask,
可以看到tryMergePartitions有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB 的话,也还是会合并到前一个分区中去,如果smallPartitionFactor设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:
val targetSize = 100val smallPartitionFactor2 = 0.5// merge last two partition if their size is not bigger than smallPartitionFactor * targetval sizeList5 = Array[Long](50, 50, 40, 5)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList5, targetSize, smallPartitionFactor2).toSeq ==Seq(0))val sizeList6 = Array[Long](40, 5, 50, 45)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList6, targetSize, smallPartitionFactor2).toSeq ==Seq(0))
这种情况下,就会只有一个reduce任务运行。
相关文章:
Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)
背景 本文基于Spark 3.5.0 目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,…...
Vue 3中实现基于角色的权限认证实现思路
一、基于角色的权限认证主要步骤 在Vue 3中实现基于角色的权限认证通常涉及以下几个主要步骤: 定义角色和权限:首先需要在后端服务定义不同的角色和它们对应的权限。权限可以是对特定资源的访问权限,比如读取、写入、修改等。用户认证&#…...
Visual Studio 2022进行文件差异比较
前言 Visual Studio 2022在版本17.7.4中发布在解决方案资源管理器中比较文件的功能,通过使用此功能,可以轻松地查看两个文件之间的差异,包括添加、删除和修改的代码行。可以逐行查看差异,并根据需要手动调整和编辑文件内容以进行…...
1.2 编译型语言和解释型语言的区别
编译型语言和解释型语言的区别 通过高级语言编写的源码,我们能够轻松理解,但对于计算机来说,它只认识二进制指令,源码就是天书,根本无法识别。源码要想执行,必须先转换成二进制指令。 所谓二进制指令&…...
C语言-常量
什么是常量? 答:常量是在程序执行过程中,其值不发生改变的量,常量分为直接常量和符号常量两种。 其中直接常量又可以分为整型常量、实型常量、字符型常量、字符串常量。 直接常量 1.整型常量 整型常量即整数,包括正整数,负整数和0。c语言中常量可以用八进制,十进制和十六…...
开源的OCR工具基本使用:PaddleOCR/Tesseract/CnOCR
前言 因项目需要,调研了一下目前市面上一些开源的OCR工具,支持本地部署,非调用API,主要有PaddleOCR/CnOCR/chinese_lite OCR/EasyOCR/Tesseract/chineseocr/mmocr这几款产品。 本文主要尝试了EasyOCR/CnOCR/Tesseract/PaddleOCR这…...
vue3实现输入框短信验证码功能---全网始祖
组件功能分析 1.按键删除,清空当前input,并跳转prevInput & 获取焦点,按键delete,清空当前input,并跳转nextInput & 获取焦点。按键Home/End键,焦点跳转first/最后一个input输入框。ArrowLeft/ArrowRight键点击…...
[C#]winformYOLO区域检测任意形状区域绘制射线算法实现
【简单介绍】 Winform OpenCVSharp YOLO区域检测与任意形状区域射线绘制算法实现 在现代安全监控系统中,区域检测是一项至关重要的功能。通过使用Winform结合OpenCVSharp库,并结合YOLO(You Only Look Once)算法,我们…...
个人网站制作 Part 14 添加网站分析工具 | Web开发项目
文章目录 👩💻 基础Web开发练手项目系列:个人网站制作🚀 添加网站分析工具🔨使用Google Analytics🔧步骤 1: 注册Google Analytics账户🔧步骤 2: 获取跟踪代码 🔨使用Vue.js&#…...
数据按设定单位(分辨率)划分的方法
1. 问题描述 需要将使用公式计算后的float数值换算到固定间隔数轴的对应位置上的数据,比如2.186这个数据,将该数据换算到以0.25为间隔的数轴上,换算后是2.0,还是2.25呢?该方法就是解决这个问题。 2. 方法 输入&…...
Ubuntu 搭建gitlab服务器,及使用repo管理
一、GitLab安装与配置 GitLab 是一个用于仓库管理系统的开源项目,使用Git作为代码管理工具,并在此基础上搭建起来的Web服务。 1、安装Ubuntu系统(这个教程很多,就不展开了)。 2、安装gitlab社区版本,有需…...
QT(19)-QNetworkRequest
attribute(QNetworkRequest::Attribute code, const QVariant &defaultValue QVariant()) const 获取指定的请求属性。如果该属性未设置,则返回默认值。 hasRawHeader(const QByteArray &headerName) const 检查是否存在指定名称的原始请求头。 header(Q…...
基于Vue的社区旧衣回收利用系统的设计与实现
经济的高速发展使得每一个家庭的收入都获得了大幅增长,随之而来的就是各种梦想的逐步实现,首当其冲的就是各类衣服的更新换代而导致了大量旧衣物在家中的积存。为了帮助人们解决旧衣物处理的问题而以当前主流的互联网技术构建一个可于社区中实现旧衣回收…...
【网站项目】291校园疫情防控系统
🙊作者简介:拥有多年开发工作经验,分享技术代码帮助学生学习,独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。🌹赠送计算机毕业设计600个选题excel文件,帮助大学选题。赠送开题报告模板ÿ…...
win git filter-repo教程
git filter-repo 是一个用于过滤和清理 Git 仓库历史的工具,它可以高效地批量修改提交历史中的文件内容、删除文件、重命名文件以及进行其他历史重构操作。相较于 git filter-branch,它通常更快且更易于使用。 以下是一个基本示例,说明如何使…...
Redis相关操作高阶篇--集群搭建
Redis相关操作大全一篇全搞定-CSDN博客 Redis集群 是一个由多个主从节点群组成的分布式服务器群,它具有复制、高可用和分片特性。Redis集群不需要seninel哨兵也能完成节点移除和故障转移的功能。需要将每个节点 设置成集群模式,这种集群模式没有中心节…...
JNDI注入原理及利用IDEA漏洞复现
🍬 博主介绍👨🎓 博主介绍:大家好,我是 hacker-routing ,很高兴认识大家~ ✨主攻领域:【渗透领域】【应急响应】 【Java、PHP】 【VulnHub靶场复现】【面试分析】 🎉点赞➕评论➕收…...
大数据,或称巨量资料
大数据,或称巨量资料,指的是在传统数据处理应用软件不足以处理的大或复杂的数据集。大数据也可以定义为来自各种来源的大量非结构化或结构化数据。从学术角度而言,大数据的出现促成广泛主题的新颖研究,这也导致各种大数据统计方法…...
windows上打开redis服务闪退问题处理
方法1:在windows上面打开redis服务时,弹窗闪退可能是6379端口占用,可以用以下命令查看: netstat -aon | findstr 6379 如果端口被占用可以用这个命令解决: taskkill /f /pid 进程号 方法2: 可以使用…...
分布式锁简单实现
分布式锁 Redis分布式锁最简单的实现 想要实现分布式锁,必须要求 Redis 有「互斥」的能力,我们可以使用 SETNX 命令,这个命令表示SET if Not Exists,即如果 key 不存在,才会设置它的值,否则什么也不做。 …...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...
调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
从零实现富文本编辑器#5-编辑器选区模型的状态结构表达
先前我们总结了浏览器选区模型的交互策略,并且实现了基本的选区操作,还调研了自绘选区的实现。那么相对的,我们还需要设计编辑器的选区表达,也可以称为模型选区。编辑器中应用变更时的操作范围,就是以模型选区为基准来…...
el-switch文字内置
el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...
JS设计模式(4):观察者模式
JS设计模式(4):观察者模式 一、引入 在开发中,我们经常会遇到这样的场景:一个对象的状态变化需要自动通知其他对象,比如: 电商平台中,商品库存变化时需要通知所有订阅该商品的用户;新闻网站中࿰…...
