当前位置: 首页 > news >正文

Spark join数据倾斜调优

Spark中常见的两种数据倾斜现象如下

  • stage部分task执行特别慢

在这里插入图片描述

一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。

  • 作业重试多次某几个task总会失败
    在这里插入图片描述

常见的退出码143、53、137、52以及heartbeat timed out异常,通常可认为是executor内存被打满。

RDD调优方法

  1. 查看数据分布
    Spark Core中shuffle算子出现数据倾斜时,可在Spark作业中加入查看key分布的代码,也可以将代码拆解出来使用spark-shell做测试
val rdd = sc.parallelize(Array("hello", "hello", "hello", "hi")).map((_,1))// 数据量较少
rdd.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(20)
// 数据量较大, 用sample采样后在统计
rdd.sample(false, 0.1)
.reduceByKey(_+_)
.sortBy(_._2, false)
.take(20)
  1. 调整shuffle并行度
    原理:Spark在做shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适如比较小,可能造成大量不相同的key对应的数据被分配到了同一个task上,造成该task所处理的数据远大于其它task,从而造成数据倾斜
    在这里插入图片描述

调优建议:

  • 使用spark.default.parallelism调整分区数,默认值200建议500或更大
  • 在shuffle的算子上直接设置分区数,如:a.join(b, 500)、rdd.reduceByKey(_ + _, 500)
  1. reduce join转map join
    原理:不使用join算子直接进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的出现
    在这里插入图片描述

调优建议:

  • broadcast的数据量不要超过500M, 过大driver/executor可能会oom
// 1.broadcast小表
val rdd1Broadcast = sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x =>val rdd1DataMap = rdd1Broadcast.value.toMaprdd1DataMap.get(x._1) match {case Some(v) => (x._1, (x._2, v))case None => (x._1, (x._2, null))}
}
// 2.或者直接
rdd2.join(rdd1Broadcast)
  1. 分拆join在union
    原理:将有数据倾斜的RDD1中倾斜key对应的数据集单独抽取出来加盐(随机前缀),另外一个RDD2每条数据分别与所有的随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者join之后去掉前缀;然后将不包含倾斜key的剩余数据进行join;最后将两次join的结果集通过union合并,即可得到全部join结果。
    在这里插入图片描述

调优建议:

// 1.统计数量最大的key
val skewedKeySet = rdd1.sample(false, 0.2).reduceByKey(_ + _).sortBy(_._2, false).take(10).map(x => x._1).toSet// 2.拆分异常的rdd, 倾斜key加上随机数
val rdd1_1 = rdd1.filter(x => skewedKeySet.contains(x._1)).map { x =>val prefix = scala.util.Random.nextInt(10).toString(s"${prefix}_${x._1}", x._2)
}
val rdd1_2 = rdd1.filter(x => !skewedKeySet.contains(x._1))// 3.正常rdd存在倾斜key的部分进行膨胀
val rdd2_1 = rdd2.filter(x => skewedKeySet.contains(x._1)).flatMap { x =>val list = 0 until 10list.map(i => (s"${i}_${x._1}", x._2))}val rdd2_2 = rdd2.filter(x => !skewedKeySet.contains(x._1))// 4.倾斜key的rdd进行join
val skewedRDD = rdd1_1.join(rdd2_1).map(x => (x._1.split("_")(1), x._2))
// 5.普通key的rdd进行join
val sampleRDD = rdd1_2.join(rdd2_2)
// 6.结果union
skewedRDD.union(sampleRDD)

SQL调优方法

  1. 查看数据分布
    统计某个查询结果或表中出现次数超过200次的key
WITH a AS (${query})
SELECT k,s
FROM (SELECT ${key} AS k,count(*) AS sFROM aGROUP BY ${key}
)
WHERE s > 200
  1. 自动调整shuffle并行度
    原理:自适应执行开启的前提下(AQE),假设我们设置的shuffle partition个数为5,在map stage结束之后,我们知道每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB,那么在运行时,我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB)
    在这里插入图片描述

Spark参数:

参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.coalescePartitions.minPartitionNum自适应执行中使用的最小shuffle后分区数,默认值executor*core数
spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分区数量,默认值spark.sql.shuffle.partitions
spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区到建议的目标值, 默认256m
spark.sql.shuffle.partitionsjoin等操作分区数,默认值200推荐500或更大
  1. 自动优化Join
    原理:自适应执行开启的前提下(AQE),我们可以获得SortMergeJoin两个子stage的数据量,在满足条件的情况下,即一张表小于broadcast阈值,可以将SortMergeJoin转化成BroadcastHashJoin
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.autoBroadcastJoinThreshold默认10M,设置为-1可以禁用广播;实际根据hive表存储的统计信息或文件预估大小与此值做判断看是否做broadcast,由于文件是压缩格式一般情况下此参数并不可靠建议膨胀系数spark.sql.sources.fileCompressionFactor=10推荐此参数保持默认,调整自适应的broadcast参数
spark.sql.adaptive.autoBroadcastJoinThreshold此参数仅影响自适应执行阶段join优化时broadcast阈值;设置为-1可以禁用广播;默认值spark.sql.autoBroadcastJoinThreshold自适应执行得到的数据比较准确,driver内存足够的前提下可以将此值调大如200M
  1. 自动处理数据倾斜
    原理:自适应执行开启的前提下(AQE),我们可以在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时,我们收集该stage每个mapper 的shuffle数据大小和记录条数。如果某一个partition的数据量或者记录条数超过中位数的N倍,并且大于某个预先配置的阈值,我们就认为这是一个数据倾斜的partition,需要进行特殊的处理
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.skewJoin.enabled开启自动解决数据倾斜,默认值true
spark.sql.adaptive.skewJoin.skewedPartitionFactor影响因子,某分区数据大小超过所有分区中位数与影响因子乘积,才会被认为发生了数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes视为倾斜分区的分区数据最小值

相关文章:

Spark join数据倾斜调优

Spark中常见的两种数据倾斜现象如下 stage部分task执行特别慢 一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。 作业重试多次某几个task总会失败 常见的退出码143、53、137…...

YOLOv5初学者问题——用自己的模型预测图片不画框

如题,我在用自己的数据集训练权重模型的时候,在训练完成输出的yolov5-v5.0\runs\train\exp2目录下可以看到,在训练测试的时候是有输出描框的。 但是当我引用训练好的best.fangpt去进行预测的时候, 程序输出的图片并没有描框。根据…...

【linux学习---1】点亮一个LED---驱动一个GPIO

文章目录 1、原理图找对应引脚2、IO复用3、IO配置4、GPIO配置5、GPIO时钟使能6、总结 1、原理图找对应引脚 从上图 可以看出, 蜂鸣器 接到了 BEEP 上, BEEP 就是 GPIO5_IO05 2、IO复用 查找IMX6UL参考手册 和 STM32一样,如果某个 IO 要作为…...

Redis分布式锁代码实现详解

引言 在分布式系统中,资源竞争和数据一致性问题常常需要通过锁机制来解决。Redis作为一个高性能的键值存储系统,因其提供的原子操作、丰富的数据结构以及网络延迟低等特点,成为了实现分布式锁的理想选择。本文将详细介绍如何使用Redis来实现…...

Day01-02-gitlab

Day01-02-gitlab 1. 什么是gitlab2. Gitlab vs Github/Gitee3. Gitlab 应用场景4. 架构5. Gitlab 快速上手指南5.0 安装要求5.1 安装Gitlab组件5.3 配置访问url5.6 初始化5.8 登录与查看5.9 汉化5.10 设置密码5.11 目录结构5.12 删除5.13 500 vs 5025.14 重置密码 6. Gitlab用户…...

PyCharm远程开发配置(2024以下版本)

目录 PyCharm远程开发配置 1、清理远程环境 1.1 点击Setting 1.2 进入Interpreter 1.3 删除远程环境 1.4 删除SSH 2、连接远程环境 2.1 点击Close Project 2.2 点击New Project 2.3 项目路径设置 2.4 SSH配置 2.5 选择python3解释器在远程环境的位置 2.6 配置远程…...

解决Ucharts在小程序上的层级过高问题

<qiun-wx-ucharts canvas2d"{{true}}" type"pie" opts"{{rectificationRateOpts}}" chartData"{{rectificationRateData}}" /> 开启2d渲染即可解决&#xff08;在小程序开发工具上看着层级还是高&#xff0c;但是在手机上是正常…...

重保期间的网站安全防护:网站整站锁的应用与实践

标题&#xff1a;重保期间的网站安全防护&#xff1a;网站整站锁的应用与实践 一、引言 在重大活动或事件&#xff08;通常被称为“重保”&#xff09;期间&#xff0c;网站的安全问题尤为突出。由于此时网站的访问量和关注度可能达到高峰&#xff0c;因此也成为了黑客攻击的…...

Qt自定义类型

概述 在使用Qt创建用户界面时&#xff0c;特别是那些具有特殊控件和特性的界面时&#xff0c;开发人员有时需要创建新的数据类型&#xff0c;以便与Qt现有的值类型集一起使用或代替它们。 QSize、QColor和QString等标准类型都可以存储在QVariant对象中&#xff0c;作为基于qo…...

UE4_材质_材质节点_DepthFade

一、DepthFade参数 DepthFade&#xff08;深度消退&#xff09;表达式用来隐藏半透明对象与不透明对象相交时出现的不美观接缝。 项目说明属性消退距离&#xff08;Fade Distance&#xff09;这是应该发生消退的全局空间距离。未连接 FadeDistance&#xff08;FadeDistance&a…...

如何对GD32 MCU进行加密?

GD32 MCU有哪些加密方法呢&#xff1f;大家在平时项目开发的过程中&#xff0c;最后都可能会面临如何对出厂产品的MCU代码进行加密&#xff0c;避免产品流向市场被别人读取复制。 下面为大家介绍GD32 MCU所支持的几种常用的加密方法&#xff1a; 首先GD32 MCU本身支持防硬开盖…...

快速了解GPT-4o和GPT-4区别

GPT-4o简介 在5月14日的OpenAI举行春季发布会上&#xff0c;OpenAI在活动中发布了新旗舰模型“GPT-4o”&#xff01;据OpenAI首席技术官穆里穆拉蒂&#xff08;Muri Murati&#xff09;介绍&#xff0c;GPT-4o在继承GPT-4强大智能的同时&#xff0c;进一步提升了文本、图像及语…...

周末休息日也能及时回应客户消息!微信自动回复神器太就好用啦!

无论是在忙碌时&#xff0c;还是在周末休息日&#xff0c;如果没能及时回应客户&#xff0c;很可能会造成客户流失。 今天&#xff0c;我要为大家介绍一个多微管理神器——个微管理系统&#xff0c;它可以帮助你实现自动回复&#xff0c;提高回复效率。 自动通过好友请求 在…...

力扣404周赛 T1/T2/T3 枚举/动态规划/数组/模拟

博客主页&#xff1a;誓则盟约系列专栏&#xff1a;IT竞赛 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ 3200.三角形的最大高度【简单】 题目&#xff1a; 给你两个整数 red 和 b…...

Taurus 性能测试工具详解

文章目录 简介原理安装编写测试配置运行测试集成其他工具结果分析优点与缺点优点缺点 参考资料总结 简介 Taurus 是一个开源的自动化测试工具&#xff0c;用于简化和增强性能测试流程。与其他性能测试工具不同&#xff0c;Taurus 旨在通过友好的 YAML 配置文件和对多种负载测试…...

天猫商品详情API接口(店铺|标题|主图|价格|SKU属性等)

天猫商品详情API接口为开发者提供了获取天猫商品详细信息的能力&#xff0c;包括店铺信息、商品标题、主图、价格、SKU属性等。以下是该接口的使用过程和相关技术要点&#xff1a; 注册账号并创建应用 注册账号&#xff1a;需要在天猫开放平台注册一个开发者账号。创建应用&a…...

双向广搜——AcWing 190. 字串变换

双向广搜 定义 双向广度优先搜索&#xff08;Bi-directional Breadth-First Search, Bi-BFS&#xff09;是一种在图或树中寻找两点间最短路径的算法。与传统的单向广度优先搜索相比&#xff0c;它从起始点和目标点同时开始搜索&#xff0c;从而有可能显著减少搜索空间&#x…...

工商业光伏项目如何快速开发?

一、前期调研与规划 1、屋顶资源评估&#xff1a;详细测量屋顶面积、承重能力及朝向&#xff0c;利用光伏业务管理软件进行日照分析和发电量预测&#xff0c;确保项目可行性。 2、政策与补贴研究&#xff1a;深入了解当地政府对工商业光伏项目的政策支持和补贴情况&#xff0…...

Kafka入门-分区及压缩

一、生产者消息分区 Kafka的消息组织方式实际上是三级结构&#xff1a;主题-分区-消息。主题下的每条消息只会保存在某一个分区中&#xff0c;而不会在多个分区中被保存多份。 分区的作用就是提供负载均衡的能力&#xff0c;或者说对数据进行分区的主要原因&#xff0c;就是为…...

被⽹络罪犯利⽤的5⼤ChatGPT越狱提⽰

⾃ChatGPT发布的近18个月以来&#xff0c;⽹络罪犯们已经能够利⽤⽣成式AI进⾏攻击。OpenAI在其内容政策中制定了限制措施&#xff0c;以阻⽌⽣成恶意内容。作为回应&#xff0c;攻击者们创建了⾃⼰的⽣成式AI平台&#xff0c;如 WormGPT和FraudGPT&#xff0c;并且他们还分享了…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别

UnsatisfiedLinkError 在对接硬件设备中&#xff0c;我们会遇到使用 java 调用 dll文件 的情况&#xff0c;此时大概率出现UnsatisfiedLinkError链接错误&#xff0c;原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用&#xff0c;结果 dll 未实现 JNI 协…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

20个超级好用的 CSS 动画库

分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码&#xff0c;而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库&#xff0c;可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画&#xff0c;可以包含在你的网页或应用项目中。 3.An…...

【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制

使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下&#xff0c;限制某个 IP 的访问频率是非常重要的&#xff0c;可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案&#xff0c;使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...

【UE5 C++】通过文件对话框获取选择文件的路径

目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 &#xff0c;这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器&#xff0c;右键点击 .uproject 文件&#xff0c;选择 "Generate Visual Studio project files"&#xff0c;重…...

【深度学习新浪潮】什么是credit assignment problem?

Credit Assignment Problem(信用分配问题) 是机器学习,尤其是强化学习(RL)中的核心挑战之一,指的是如何将最终的奖励或惩罚准确地分配给导致该结果的各个中间动作或决策。在序列决策任务中,智能体执行一系列动作后获得一个最终奖励,但每个动作对最终结果的贡献程度往往…...

xmind转换为markdown

文章目录 解锁思维导图新姿势&#xff1a;将XMind转为结构化Markdown 一、认识Xmind结构二、核心转换流程详解1.解压XMind文件&#xff08;ZIP处理&#xff09;2.解析JSON数据结构3&#xff1a;递归转换树形结构4&#xff1a;Markdown层级生成逻辑 三、完整代码 解锁思维导图新…...

图解JavaScript原型:原型链及其分析 | JavaScript图解

​​ 忽略该图的细节&#xff08;如内存地址值没有用二进制&#xff09; 以下是对该图进一步的理解和总结 1. JS 对象概念的辨析 对象是什么&#xff1a;保存在堆中一块区域&#xff0c;同时在栈中有一块区域保存其在堆中的地址&#xff08;也就是我们通常说的该变量指向谁&…...

2025-05-08-deepseek本地化部署

title: 2025-05-08-deepseek 本地化部署 tags: 深度学习 程序开发 2025-05-08-deepseek 本地化部署 参考博客 本地部署 DeepSeek&#xff1a;小白也能轻松搞定&#xff01; 如何给本地部署的 DeepSeek 投喂数据&#xff0c;让他更懂你 [实验目的]&#xff1a;理解系统架构与原…...