当前位置: 首页 > news >正文

Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)

背景

本文基于Spark 3.5.0
目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子,这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并,使得最后落盘的文件不会太大也不会太小,从而达到小文件合并的作用,这其中的主要原理是在于三个规则:OptimizeSkewInRebalancePartitions,CoalesceShufflePartitions,OptimizeShuffleWithLocalRead,这里主要说一下OptimizeSkewInRebalancePartitions规则,CoalesceShufflePartitions的作用主要是进行文件的合并,是得文件不会太小,OptimizeShuffleWithLocalRead的作用是加速shuffle fetch的速度。

结论

OptimizeSkewInRebalancePartitions的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用Rebalance(col)这种情况的时候,如果col的值是固定的,比如说值永远是20240320,那么这里就得注意一下,关于OptimizeSkewInRebalancePartitions涉及到的参数spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled,spark.sql.adaptive.advisoryPartitionSizeInBytes,spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。而且大大加长了整个任务的运行时间。

分析

直接到OptimizeSkewInRebalancePartitions中的代码中来:

  override def apply(plan: SparkPlan): SparkPlan = {if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {return plan}plan transformUp {case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>tryOptimizeSkewedPartitions(stage)}}

如果我们禁用掉对rebalance的倾斜处理,也就是spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled为false(默认是true),那么就不会应用此规则,那么如果Col为固定值的情况下,就只会有一个Task进行文件的写入操作,也就只有一个文件,因为一个Task会拉取所有的Map的数据(因为此时每个maptask上的hash(Col)都是一样的,此时只有一个reduce task去拉取数据),如图:

在这里插入图片描述
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。

在看合并的计算公式,该数据流如下:

 tryOptimizeSkewedPartitions||\/optimizeSkewedPartitions||\/ShufflePartitionsUtil.createSkewPartitionSpecs||\/ShufflePartitionsUtil.splitSizeListByTargetSize

splitSizeListByTargetSize方法中涉及到的参数解释如下 :

  • 参数 sizes: Array[Long] 表示属于同一个reduce任务的maptask任务的大小数组,举例 sizes = [100,200,300,400]
    表明该任务有4个maptask,0表示maptask为0的所属reduce的大小,1表示maptask为1的所属reduce的大小,依次类推,图解如下:

在这里插入图片描述
比如说reduceTask0的从Maptask拉取的数据的大小分别是100,200,300,400.

  • 参数targetSize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes的值,假如说是256MB
  • 参数smallPartitionFactor为spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 的值,默认是0.2
    这里有个计算公式:
    def tryMergePartitions() = {// When we are going to start a new partition, it's possible that the current partition or// the previous partition is very small and it's better to merge the current partition into// the previous partition.val shouldMergePartitions = lastPartitionSize > -1 &&((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||(currentPartitionSize < targetSize * smallPartitionFactor ||lastPartitionSize < targetSize * smallPartitionFactor))if (shouldMergePartitions) {// We decide to merge the current partition into the previous one, so the start index of// the current partition should be removed.partitionStartIndices.remove(partitionStartIndices.length - 1)lastPartitionSize += currentPartitionSize} else {lastPartitionSize = currentPartitionSize}}。。。while (i < sizes.length) {// If including the next size in the current partition exceeds the target size, package the// current partition and start a new partition.if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {tryMergePartitions()partitionStartIndices += icurrentPartitionSize = sizes(i)} else {currentPartitionSize += sizes(i)}i += 1}tryMergePartitions()partitionStartIndices.toArray

这里的计算公式大致就是:从每个maptask中的获取到属于同一个reduce的数值,依次累加,如果大于targetSize就尝试合并,直至到最后一个maptask
可以看到tryMergePartitions有个计算公式:currentPartitionSize < targetSize * smallPartitionFactor,也就是说如果当前maptask的对应的reduce分区数据 小于 256MB*0.2 = 51.2MB 的话,也还是会合并到前一个分区中去,如果smallPartitionFactor设置过大,可能会导致所有的分区都会合并到一个分区中去,最终会导致一个文件会有几十GB(也就是targetSize * smallPartitionFactor`*shuffleNum),
比如说以下的测试案例:

    val targetSize = 100val smallPartitionFactor2 = 0.5// merge last two partition if their size is not bigger than smallPartitionFactor * targetval sizeList5 = Array[Long](50, 50, 40, 5)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList5, targetSize, smallPartitionFactor2).toSeq ==Seq(0))val sizeList6 = Array[Long](40, 5, 50, 45)assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList6, targetSize, smallPartitionFactor2).toSeq ==Seq(0))

这种情况下,就会只有一个reduce任务运行。

相关文章:

Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)

背景 本文基于Spark 3.5.0 目前公司在做小文件合并的时候用到了 Spark Rebalance 这个算子&#xff0c;这个算子的主要作用是在AQE阶段的最后写文件的阶段进行小文件的合并&#xff0c;使得最后落盘的文件不会太大也不会太小&#xff0c;从而达到小文件合并的作用&#xff0c;…...

Vue 3中实现基于角色的权限认证实现思路

一、基于角色的权限认证主要步骤 在Vue 3中实现基于角色的权限认证通常涉及以下几个主要步骤&#xff1a; 定义角色和权限&#xff1a;首先需要在后端服务定义不同的角色和它们对应的权限。权限可以是对特定资源的访问权限&#xff0c;比如读取、写入、修改等。用户认证&#…...

Visual Studio 2022进行文件差异比较

前言 Visual Studio 2022在版本17.7.4中发布在解决方案资源管理器中比较文件的功能&#xff0c;通过使用此功能&#xff0c;可以轻松地查看两个文件之间的差异&#xff0c;包括添加、删除和修改的代码行。可以逐行查看差异&#xff0c;并根据需要手动调整和编辑文件内容以进行…...

1.2 编译型语言和解释型语言的区别

编译型语言和解释型语言的区别 通过高级语言编写的源码&#xff0c;我们能够轻松理解&#xff0c;但对于计算机来说&#xff0c;它只认识二进制指令&#xff0c;源码就是天书&#xff0c;根本无法识别。源码要想执行&#xff0c;必须先转换成二进制指令。 所谓二进制指令&…...

C语言-常量

什么是常量? 答:常量是在程序执行过程中,其值不发生改变的量,常量分为直接常量和符号常量两种。 其中直接常量又可以分为整型常量、实型常量、字符型常量、字符串常量。 直接常量 1.整型常量 整型常量即整数,包括正整数,负整数和0。c语言中常量可以用八进制,十进制和十六…...

开源的OCR工具基本使用:PaddleOCR/Tesseract/CnOCR

前言 因项目需要&#xff0c;调研了一下目前市面上一些开源的OCR工具&#xff0c;支持本地部署&#xff0c;非调用API&#xff0c;主要有PaddleOCR/CnOCR/chinese_lite OCR/EasyOCR/Tesseract/chineseocr/mmocr这几款产品。 本文主要尝试了EasyOCR/CnOCR/Tesseract/PaddleOCR这…...

vue3实现输入框短信验证码功能---全网始祖

组件功能分析 1.按键删除&#xff0c;清空当前input&#xff0c;并跳转prevInput & 获取焦点,按键delete&#xff0c;清空当前input&#xff0c;并跳转nextInput & 获取焦点。按键Home/End键&#xff0c;焦点跳转first/最后一个input输入框。ArrowLeft/ArrowRight键点击…...

[C#]winformYOLO区域检测任意形状区域绘制射线算法实现

【简单介绍】 Winform OpenCVSharp YOLO区域检测与任意形状区域射线绘制算法实现 在现代安全监控系统中&#xff0c;区域检测是一项至关重要的功能。通过使用Winform结合OpenCVSharp库&#xff0c;并结合YOLO&#xff08;You Only Look Once&#xff09;算法&#xff0c;我们…...

个人网站制作 Part 14 添加网站分析工具 | Web开发项目

文章目录 &#x1f469;‍&#x1f4bb; 基础Web开发练手项目系列&#xff1a;个人网站制作&#x1f680; 添加网站分析工具&#x1f528;使用Google Analytics&#x1f527;步骤 1: 注册Google Analytics账户&#x1f527;步骤 2: 获取跟踪代码 &#x1f528;使用Vue.js&#…...

数据按设定单位(分辨率)划分的方法

1. 问题描述 需要将使用公式计算后的float数值换算到固定间隔数轴的对应位置上的数据&#xff0c;比如2.186这个数据&#xff0c;将该数据换算到以0.25为间隔的数轴上&#xff0c;换算后是2.0&#xff0c;还是2.25呢&#xff1f;该方法就是解决这个问题。 2. 方法 输入&…...

Ubuntu 搭建gitlab服务器,及使用repo管理

一、GitLab安装与配置 GitLab 是一个用于仓库管理系统的开源项目&#xff0c;使用Git作为代码管理工具&#xff0c;并在此基础上搭建起来的Web服务。 1、安装Ubuntu系统&#xff08;这个教程很多&#xff0c;就不展开了&#xff09;。 2、安装gitlab社区版本&#xff0c;有需…...

QT(19)-QNetworkRequest

attribute(QNetworkRequest::Attribute code, const QVariant &defaultValue QVariant()) const 获取指定的请求属性。如果该属性未设置&#xff0c;则返回默认值。 hasRawHeader(const QByteArray &headerName) const 检查是否存在指定名称的原始请求头。 header(Q…...

基于Vue的社区旧衣回收利用系统的设计与实现

经济的高速发展使得每一个家庭的收入都获得了大幅增长&#xff0c;随之而来的就是各种梦想的逐步实现&#xff0c;首当其冲的就是各类衣服的更新换代而导致了大量旧衣物在家中的积存。为了帮助人们解决旧衣物处理的问题而以当前主流的互联网技术构建一个可于社区中实现旧衣回收…...

【网站项目】291校园疫情防控系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…...

win git filter-repo教程

git filter-repo 是一个用于过滤和清理 Git 仓库历史的工具&#xff0c;它可以高效地批量修改提交历史中的文件内容、删除文件、重命名文件以及进行其他历史重构操作。相较于 git filter-branch&#xff0c;它通常更快且更易于使用。 以下是一个基本示例&#xff0c;说明如何使…...

Redis相关操作高阶篇--集群搭建

Redis相关操作大全一篇全搞定-CSDN博客 Redis集群 是一个由多个主从节点群组成的分布式服务器群&#xff0c;它具有复制、高可用和分片特性。Redis集群不需要seninel哨兵也能完成节点移除和故障转移的功能。需要将每个节点 设置成集群模式&#xff0c;这种集群模式没有中心节…...

JNDI注入原理及利用IDEA漏洞复现

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java、PHP】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收…...

大数据,或称巨量资料

大数据&#xff0c;或称巨量资料&#xff0c;指的是在传统数据处理应用软件不足以处理的大或复杂的数据集。大数据也可以定义为来自各种来源的大量非结构化或结构化数据。从学术角度而言&#xff0c;大数据的出现促成广泛主题的新颖研究&#xff0c;这也导致各种大数据统计方法…...

windows上打开redis服务闪退问题处理

方法1&#xff1a;在windows上面打开redis服务时&#xff0c;弹窗闪退可能是6379端口占用&#xff0c;可以用以下命令查看&#xff1a; netstat -aon | findstr 6379 如果端口被占用可以用这个命令解决&#xff1a; taskkill /f /pid 进程号 方法2&#xff1a; 可以使用…...

分布式锁简单实现

分布式锁 Redis分布式锁最简单的实现 想要实现分布式锁&#xff0c;必须要求 Redis 有「互斥」的能力&#xff0c;我们可以使用 SETNX 命令&#xff0c;这个命令表示SET if Not Exists&#xff0c;即如果 key 不存在&#xff0c;才会设置它的值&#xff0c;否则什么也不做。 …...

[具身智能-361]:Hugging Face(通常被称为“抱抱脸”)是当今人工智能领域最核心的开源平台,被广泛誉为 “AI 界的 GitHub”。

Hugging Face&#xff08;通常被称为“抱抱脸”&#xff09;是当今人工智能领域最核心的开源平台&#xff0c;被广泛誉为 “AI 界的 GitHub”。 简单来说&#xff0c;它是一个为全球开发者提供模型、数据集和代码的协作社区。无论你是想下载现成的大模型&#xff08;如 Llama …...

【CTFhub】web安全实战:备份文件泄露与源码保护策略

1. 备份文件泄露&#xff1a;Web安全的隐形炸弹 第一次参加CTF比赛时&#xff0c;我遇到一道看似简单的Web题&#xff0c;花了三小时都没解出来。直到偶然尝试访问/index.php.bak&#xff0c;才发现整个网站源码就躺在那儿等着我拿。这种"开门送分题"在真实网络攻防中…...

终极Dockertest错误处理指南:从连接失败到超时重试的完整解决方案

终极Dockertest错误处理指南&#xff1a;从连接失败到超时重试的完整解决方案 【免费下载链接】dockertest Write better integration tests! Dockertest helps you boot up ephermal docker images for your Go tests with minimal work. 项目地址: https://gitcode.com/gh_…...

猫抓Cat-Catch:解锁网页媒体资源的终极免费解决方案

猫抓Cat-Catch&#xff1a;解锁网页媒体资源的终极免费解决方案 【免费下载链接】cat-catch 猫抓 浏览器资源嗅探扩展 / cat-catch Browser Resource Sniffing Extension 项目地址: https://gitcode.com/GitHub_Trending/ca/cat-catch 还在为无法保存心爱的在线视频而苦…...

AIAgent记忆泄漏导致LLM幻觉加剧?SITS2026现场演示2分钟定位+4步清除陈旧记忆链

第一章&#xff1a;SITS2026演讲&#xff1a;AIAgent长期记忆管理 2026奇点智能技术大会(https://ml-summit.org) 在SITS2026主会场的Keynote环节&#xff0c;AIAgent架构团队首次公开了面向生产级应用的长期记忆&#xff08;Long-Term Memory, LTM&#xff09;管理框架——C…...

用Docker一键部署OpenMVS开发环境(Ubuntu 18.04 LTS版)

基于Docker的OpenMVS开发环境快速部署指南 在三维重建和计算机视觉领域&#xff0c;OpenMVS作为一套开源的Multi-View Stereo系统&#xff0c;因其强大的功能和灵活性而广受欢迎。然而&#xff0c;传统的本地安装方式往往面临依赖管理复杂、环境配置繁琐、系统兼容性等问题&…...

桌面端 Claw 个人微信接入指南焕

1.概述在人工智能快速发展的今天&#xff0c;AI不再仅仅是回答问题的聊天机器人&#xff0c;而是正在演变为能够主动完成复杂任务的智能代理。OpenAI的Codex CLI就是这一趋势的典型代表——一个跨平台的本地软件代理&#xff0c;能够在用户的机器上安全高效地生成高质量的软件变…...

intv_ai_mk11新手指南:如何用‘分步骤回答’‘用Markdown格式’等指令控制输出结构

intv_ai_mk11新手指南&#xff1a;如何用分步骤回答用Markdown格式等指令控制输出结构 1. 认识intv_ai_mk11对话机器人 intv_ai_mk11是一款基于7B参数Llama架构的AI对话助手&#xff0c;运行在GPU服务器上。它能理解自然语言指令&#xff0c;并以结构化的方式给出专业回答。与…...

阿里云PolarDB在CentOS 7上的性能调优实战:从THP配置到内核参数优化

阿里云PolarDB在CentOS 7上的性能调优实战&#xff1a;从THP配置到内核参数优化 当数据库规模达到TB级别时&#xff0c;每个百分点的性能提升都可能意味着数万元的成本节约。作为阿里云自主研发的云原生数据库&#xff0c;PolarDB凭借存储计算分离架构和共享存储池设计&#x…...

从Claude Agent Skills到Hatchify多Agent:我是如何把团队知识库变成自动化工作流的

从静态文档到智能工作流&#xff1a;基于Claude与Hatchify的团队知识自动化实践 当研发团队的文档库膨胀到Confluence里300页面、GitLab中50Markdown文件时&#xff0c;我们突然意识到一个残酷事实——这些耗费心血整理的代码规范、部署清单和排障手册&#xff0c;正以每月15%的…...