Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)
背景
本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance
这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions
,CoalesceShufflePartitions
,OptimizeShuffleWithLocalRead
,这里主要说一下OptimizeSkewInRebalancePartitions
规则,CoalesceShufflePartitions
的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead
的作用是加速shuffle fetch的速度。
结论
OptimizeSkewInRebalancePartitions
的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)
这种情况的时候,如果col
的值是固定的,比如说值永远是20240320
,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions
涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
,spark.sql.adaptive.advisoryPartitionSizeInBytes
,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor
这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。
分析
直接到OptimizeSkewInRebalancePartitions
中的代码中来:
override def apply(plan: SparkPlan): SparkPlan = {if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {return plan}plan transformUp {case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>tryOptimizeSkewedPartitions(stage)}}
如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false
(默认是true
),那么就不会应用此规则,那么如果Col
为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。
在看合并的计算公式,该数据流如下:
tryOptimizeSkewedPartitions||\/optimizeSkewedPartitions||\/ShufflePartitionsUtil.createSkewPartitionSpecs||\/ShufflePartitionsUtil.splitSizeListByTargetSize
splitSizeListByTargetSize
方法中涉及到的参数解释如下 :
- 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:
比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.
- 参数targetSize 为
spark.sql.adaptive.advisoryPartitionSizeInBytes
的值,假如说是256MB
- 参数smallPartitionFactor为
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor
的值,默认是0.2
这里有个计算公式:
def tryMergePartitions() = {// When we are going to start a new partition, it's possible that the current partition or// the previous partition is very small and it's better to merge the current partition into// the previous partition.val shouldMergePartitions = lastPartitionSize > -1 &&((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||(currentPartitionSize < targetSize * smallPartitionFactor ||lastPartitionSize < targetSize * smallPartitionFactor))if (shouldMergePartitions) {// We decide to merge the current partition into the previous one, so the start index of// the current partition should be removed.partitionStartIndices.remove(partitionStartIndices.length - 1)lastPartitionSize += currentPartitionSize} else {lastPartitionSize = currentPartitionSize}}。。。while (i < sizes.length) {// If including the next size in the current partition exceeds the target size, package the// current partition and start a new partition.if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {tryMergePartitions()partitionStartIndices += icurrentPartitionSize = sizes(i)} else {currentPartitionSize += sizes(i)}i += 1}tryMergePartitions()partitionStartIndices.toArray
这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask
,
可以看到tryMergePartitions
有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor
,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB
的话,也还是会合并到前一个分区中去,如果smallPartitionFactor
设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:
val targetSize = 100val smallPartitionFactor2 = 0.5// merge last two partition if their size is not bigger than smallPartitionFactor * targetval sizeList5 = Array[Long](50, 50, 40, 5)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList5, targetSize, smallPartitionFactor2).toSeq ==Seq(0))val sizeList6 = Array[Long](40, 5, 50, 45)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList6, targetSize, smallPartitionFactor2).toSeq ==Seq(0))
这种情况下,就会只有一个reduce任务运行。
相关文章:

Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)
背景 本文基于Spark 3.5.0 目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,…...

Vue 3中实现基于角色的权限认证实现思路
一、基于角色的权限认证主要步骤 在Vue 3中实现基于角色的权限认证通常涉及以下几个主要步骤: 定义角色和权限:首先需要在后端服务定义不同的角色和它们对应的权限。权限可以是对特定资源的访问权限,比如读取、写入、修改等。用户认证&#…...

Visual Studio 2022进行文件差异比较
前言 Visual Studio 2022在版本17.7.4中发布在解决方案资源管理器中比较文件的功能,通过使用此功能,可以轻松地查看两个文件之间的差异,包括添加、删除和修改的代码行。可以逐行查看差异,并根据需要手动调整和编辑文件内容以进行…...

1.2 编译型语言和解释型语言的区别
编译型语言和解释型语言的区别 通过高级语言编写的源码,我们能够轻松理解,但对于计算机来说,它只认识二进制指令,源码就是天书,根本无法识别。源码要想执行,必须先转换成二进制指令。 所谓二进制指令&…...
C语言-常量
什么是常量? 答:常量是在程序执行过程中,其值不发生改变的量,常量分为直接常量和符号常量两种。 其中直接常量又可以分为整型常量、实型常量、字符型常量、字符串常量。 直接常量 1.整型常量 整型常量即整数,包括正整数,负整数和0。c语言中常量可以用八进制,十进制和十六…...

开源的OCR工具基本使用:PaddleOCR/Tesseract/CnOCR
前言 因项目需要,调研了一下目前市面上一些开源的OCR工具,支持本地部署,非调用API,主要有PaddleOCR/CnOCR/chinese_lite OCR/EasyOCR/Tesseract/chineseocr/mmocr这几款产品。 本文主要尝试了EasyOCR/CnOCR/Tesseract/PaddleOCR这…...

vue3实现输入框短信验证码功能---全网始祖
组件功能分析 1.按键删除,清空当前input,并跳转prevInput & 获取焦点,按键delete,清空当前input,并跳转nextInput & 获取焦点。按键Home/End键,焦点跳转first/最后一个input输入框。ArrowLeft/ArrowRight键点击…...

[C#]winformYOLO区域检测任意形状区域绘制射线算法实现
【简单介绍】 Winform OpenCVSharp YOLO区域检测与任意形状区域射线绘制算法实现 在现代安全监控系统中,区域检测是一项至关重要的功能。通过使用Winform结合OpenCVSharp库,并结合YOLO(You Only Look Once)算法,我们…...

个人网站制作 Part 14 添加网站分析工具 | Web开发项目
文章目录 👩💻 基础Web开发练手项目系列:个人网站制作🚀 添加网站分析工具🔨使用Google Analytics🔧步骤 1: 注册Google Analytics账户🔧步骤 2: 获取跟踪代码 🔨使用Vue.js&#…...
数据按设定单位(分辨率)划分的方法
1. 问题描述 需要将使用公式计算后的float数值换算到固定间隔数轴的对应位置上的数据,比如2.186这个数据,将该数据换算到以0.25为间隔的数轴上,换算后是2.0,还是2.25呢?该方法就是解决这个问题。 2. 方法 输入&…...

Ubuntu 搭建gitlab服务器,及使用repo管理
一、GitLab安装与配置 GitLab 是一个用于仓库管理系统的开源项目,使用Git作为代码管理工具,并在此基础上搭建起来的Web服务。 1、安装Ubuntu系统(这个教程很多,就不展开了)。 2、安装gitlab社区版本,有需…...
QT(19)-QNetworkRequest
attribute(QNetworkRequest::Attribute code, const QVariant &defaultValue QVariant()) const 获取指定的请求属性。如果该属性未设置,则返回默认值。 hasRawHeader(const QByteArray &headerName) const 检查是否存在指定名称的原始请求头。 header(Q…...

基于Vue的社区旧衣回收利用系统的设计与实现
经济的高速发展使得每一个家庭的收入都获得了大幅增长,随之而来的就是各种梦想的逐步实现,首当其冲的就是各类衣服的更新换代而导致了大量旧衣物在家中的积存。为了帮助人们解决旧衣物处理的问题而以当前主流的互联网技术构建一个可于社区中实现旧衣回收…...

【网站项目】291校园疫情防控系统
🙊作者简介:拥有多年开发工作经验,分享技术代码帮助学生学习,独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。🌹赠送计算机毕业设计600个选题excel文件,帮助大学选题。赠送开题报告模板ÿ…...
win git filter-repo教程
git filter-repo 是一个用于过滤和清理 Git 仓库历史的工具,它可以高效地批量修改提交历史中的文件内容、删除文件、重命名文件以及进行其他历史重构操作。相较于 git filter-branch,它通常更快且更易于使用。 以下是一个基本示例,说明如何使…...

Redis相关操作高阶篇--集群搭建
Redis相关操作大全一篇全搞定-CSDN博客 Redis集群 是一个由多个主从节点群组成的分布式服务器群,它具有复制、高可用和分片特性。Redis集群不需要seninel哨兵也能完成节点移除和故障转移的功能。需要将每个节点 设置成集群模式,这种集群模式没有中心节…...

JNDI注入原理及利用IDEA漏洞复现
🍬 博主介绍👨🎓 博主介绍:大家好,我是 hacker-routing ,很高兴认识大家~ ✨主攻领域:【渗透领域】【应急响应】 【Java、PHP】 【VulnHub靶场复现】【面试分析】 🎉点赞➕评论➕收…...
大数据,或称巨量资料
大数据,或称巨量资料,指的是在传统数据处理应用软件不足以处理的大或复杂的数据集。大数据也可以定义为来自各种来源的大量非结构化或结构化数据。从学术角度而言,大数据的出现促成广泛主题的新颖研究,这也导致各种大数据统计方法…...

windows上打开redis服务闪退问题处理
方法1:在windows上面打开redis服务时,弹窗闪退可能是6379端口占用,可以用以下命令查看: netstat -aon | findstr 6379 如果端口被占用可以用这个命令解决: taskkill /f /pid 进程号 方法2: 可以使用…...

分布式锁简单实现
分布式锁 Redis分布式锁最简单的实现 想要实现分布式锁,必须要求 Redis 有「互斥」的能力,我们可以使用 SETNX 命令,这个命令表示SET if Not Exists,即如果 key 不存在,才会设置它的值,否则什么也不做。 …...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
Mysql8 忘记密码重置,以及问题解决
1.使用免密登录 找到配置MySQL文件,我的文件路径是/etc/mysql/my.cnf,有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...
为什么要创建 Vue 实例
核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
go 里面的指针
指针 在 Go 中,指针(pointer)是一个变量的内存地址,就像 C 语言那样: a : 10 p : &a // p 是一个指向 a 的指针 fmt.Println(*p) // 输出 10,通过指针解引用• &a 表示获取变量 a 的地址 p 表示…...

nnUNet V2修改网络——暴力替换网络为UNet++
更换前,要用nnUNet V2跑通所用数据集,证明nnUNet V2、数据集、运行环境等没有问题 阅读nnU-Net V2 的 U-Net结构,初步了解要修改的网络,知己知彼,修改起来才能游刃有余。 U-Net存在两个局限,一是网络的最佳深度因应用场景而异,这取决于任务的难度和可用于训练的标注数…...

消息队列系统设计与实践全解析
文章目录 🚀 消息队列系统设计与实践全解析🔍 一、消息队列选型1.1 业务场景匹配矩阵1.2 吞吐量/延迟/可靠性权衡💡 权衡决策框架 1.3 运维复杂度评估🔧 运维成本降低策略 🏗️ 二、典型架构设计2.1 分布式事务最终一致…...