当前位置: 首页 > news >正文

Spark join数据倾斜调优

Spark中常见的两种数据倾斜现象如下

  • stage部分task执行特别慢

在这里插入图片描述

一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。

  • 作业重试多次某几个task总会失败
    在这里插入图片描述

常见的退出码143、53、137、52以及heartbeat timed out异常,通常可认为是executor内存被打满。

RDD调优方法

  1. 查看数据分布
    Spark Core中shuffle算子出现数据倾斜时,可在Spark作业中加入查看key分布的代码,也可以将代码拆解出来使用spark-shell做测试
val rdd = sc.parallelize(Array("hello", "hello", "hello", "hi")).map((_,1))// 数据量较少
rdd.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(20)
// 数据量较大, 用sample采样后在统计
rdd.sample(false, 0.1)
.reduceByKey(_+_)
.sortBy(_._2, false)
.take(20)
  1. 调整shuffle并行度
    原理:Spark在做shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适如比较小,可能造成大量不相同的key对应的数据被分配到了同一个task上,造成该task所处理的数据远大于其它task,从而造成数据倾斜
    在这里插入图片描述

调优建议:

  • 使用spark.default.parallelism调整分区数,默认值200建议500或更大
  • 在shuffle的算子上直接设置分区数,如:a.join(b, 500)、rdd.reduceByKey(_ + _, 500)
  1. reduce join转map join
    原理:不使用join算子直接进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的出现
    在这里插入图片描述

调优建议:

  • broadcast的数据量不要超过500M, 过大driver/executor可能会oom
// 1.broadcast小表
val rdd1Broadcast = sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x =>val rdd1DataMap = rdd1Broadcast.value.toMaprdd1DataMap.get(x._1) match {case Some(v) => (x._1, (x._2, v))case None => (x._1, (x._2, null))}
}
// 2.或者直接
rdd2.join(rdd1Broadcast)
  1. 分拆join在union
    原理:将有数据倾斜的RDD1中倾斜key对应的数据集单独抽取出来加盐(随机前缀),另外一个RDD2每条数据分别与所有的随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者join之后去掉前缀;然后将不包含倾斜key的剩余数据进行join;最后将两次join的结果集通过union合并,即可得到全部join结果。
    在这里插入图片描述

调优建议:

// 1.统计数量最大的key
val skewedKeySet = rdd1.sample(false, 0.2).reduceByKey(_ + _).sortBy(_._2, false).take(10).map(x => x._1).toSet// 2.拆分异常的rdd, 倾斜key加上随机数
val rdd1_1 = rdd1.filter(x => skewedKeySet.contains(x._1)).map { x =>val prefix = scala.util.Random.nextInt(10).toString(s"${prefix}_${x._1}", x._2)
}
val rdd1_2 = rdd1.filter(x => !skewedKeySet.contains(x._1))// 3.正常rdd存在倾斜key的部分进行膨胀
val rdd2_1 = rdd2.filter(x => skewedKeySet.contains(x._1)).flatMap { x =>val list = 0 until 10list.map(i => (s"${i}_${x._1}", x._2))}val rdd2_2 = rdd2.filter(x => !skewedKeySet.contains(x._1))// 4.倾斜key的rdd进行join
val skewedRDD = rdd1_1.join(rdd2_1).map(x => (x._1.split("_")(1), x._2))
// 5.普通key的rdd进行join
val sampleRDD = rdd1_2.join(rdd2_2)
// 6.结果union
skewedRDD.union(sampleRDD)

SQL调优方法

  1. 查看数据分布
    统计某个查询结果或表中出现次数超过200次的key
WITH a AS (${query})
SELECT k,s
FROM (SELECT ${key} AS k,count(*) AS sFROM aGROUP BY ${key}
)
WHERE s > 200
  1. 自动调整shuffle并行度
    原理:自适应执行开启的前提下(AQE),假设我们设置的shuffle partition个数为5,在map stage结束之后,我们知道每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB,那么在运行时,我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB)
    在这里插入图片描述

Spark参数:

参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.coalescePartitions.minPartitionNum自适应执行中使用的最小shuffle后分区数,默认值executor*core数
spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分区数量,默认值spark.sql.shuffle.partitions
spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区到建议的目标值, 默认256m
spark.sql.shuffle.partitionsjoin等操作分区数,默认值200推荐500或更大
  1. 自动优化Join
    原理:自适应执行开启的前提下(AQE),我们可以获得SortMergeJoin两个子stage的数据量,在满足条件的情况下,即一张表小于broadcast阈值,可以将SortMergeJoin转化成BroadcastHashJoin
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.autoBroadcastJoinThreshold默认10M,设置为-1可以禁用广播;实际根据hive表存储的统计信息或文件预估大小与此值做判断看是否做broadcast,由于文件是压缩格式一般情况下此参数并不可靠建议膨胀系数spark.sql.sources.fileCompressionFactor=10推荐此参数保持默认,调整自适应的broadcast参数
spark.sql.adaptive.autoBroadcastJoinThreshold此参数仅影响自适应执行阶段join优化时broadcast阈值;设置为-1可以禁用广播;默认值spark.sql.autoBroadcastJoinThreshold自适应执行得到的数据比较准确,driver内存足够的前提下可以将此值调大如200M
  1. 自动处理数据倾斜
    原理:自适应执行开启的前提下(AQE),我们可以在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时,我们收集该stage每个mapper 的shuffle数据大小和记录条数。如果某一个partition的数据量或者记录条数超过中位数的N倍,并且大于某个预先配置的阈值,我们就认为这是一个数据倾斜的partition,需要进行特殊的处理
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.skewJoin.enabled开启自动解决数据倾斜,默认值true
spark.sql.adaptive.skewJoin.skewedPartitionFactor影响因子,某分区数据大小超过所有分区中位数与影响因子乘积,才会被认为发生了数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes视为倾斜分区的分区数据最小值

相关文章:

Spark join数据倾斜调优

Spark中常见的两种数据倾斜现象如下 stage部分task执行特别慢 一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。 作业重试多次某几个task总会失败 常见的退出码143、53、137…...

YOLOv5初学者问题——用自己的模型预测图片不画框

如题,我在用自己的数据集训练权重模型的时候,在训练完成输出的yolov5-v5.0\runs\train\exp2目录下可以看到,在训练测试的时候是有输出描框的。 但是当我引用训练好的best.fangpt去进行预测的时候, 程序输出的图片并没有描框。根据…...

【linux学习---1】点亮一个LED---驱动一个GPIO

文章目录 1、原理图找对应引脚2、IO复用3、IO配置4、GPIO配置5、GPIO时钟使能6、总结 1、原理图找对应引脚 从上图 可以看出, 蜂鸣器 接到了 BEEP 上, BEEP 就是 GPIO5_IO05 2、IO复用 查找IMX6UL参考手册 和 STM32一样,如果某个 IO 要作为…...

Redis分布式锁代码实现详解

引言 在分布式系统中,资源竞争和数据一致性问题常常需要通过锁机制来解决。Redis作为一个高性能的键值存储系统,因其提供的原子操作、丰富的数据结构以及网络延迟低等特点,成为了实现分布式锁的理想选择。本文将详细介绍如何使用Redis来实现…...

Day01-02-gitlab

Day01-02-gitlab 1. 什么是gitlab2. Gitlab vs Github/Gitee3. Gitlab 应用场景4. 架构5. Gitlab 快速上手指南5.0 安装要求5.1 安装Gitlab组件5.3 配置访问url5.6 初始化5.8 登录与查看5.9 汉化5.10 设置密码5.11 目录结构5.12 删除5.13 500 vs 5025.14 重置密码 6. Gitlab用户…...

PyCharm远程开发配置(2024以下版本)

目录 PyCharm远程开发配置 1、清理远程环境 1.1 点击Setting 1.2 进入Interpreter 1.3 删除远程环境 1.4 删除SSH 2、连接远程环境 2.1 点击Close Project 2.2 点击New Project 2.3 项目路径设置 2.4 SSH配置 2.5 选择python3解释器在远程环境的位置 2.6 配置远程…...

解决Ucharts在小程序上的层级过高问题

<qiun-wx-ucharts canvas2d"{{true}}" type"pie" opts"{{rectificationRateOpts}}" chartData"{{rectificationRateData}}" /> 开启2d渲染即可解决&#xff08;在小程序开发工具上看着层级还是高&#xff0c;但是在手机上是正常…...

重保期间的网站安全防护:网站整站锁的应用与实践

标题&#xff1a;重保期间的网站安全防护&#xff1a;网站整站锁的应用与实践 一、引言 在重大活动或事件&#xff08;通常被称为“重保”&#xff09;期间&#xff0c;网站的安全问题尤为突出。由于此时网站的访问量和关注度可能达到高峰&#xff0c;因此也成为了黑客攻击的…...

Qt自定义类型

概述 在使用Qt创建用户界面时&#xff0c;特别是那些具有特殊控件和特性的界面时&#xff0c;开发人员有时需要创建新的数据类型&#xff0c;以便与Qt现有的值类型集一起使用或代替它们。 QSize、QColor和QString等标准类型都可以存储在QVariant对象中&#xff0c;作为基于qo…...

UE4_材质_材质节点_DepthFade

一、DepthFade参数 DepthFade&#xff08;深度消退&#xff09;表达式用来隐藏半透明对象与不透明对象相交时出现的不美观接缝。 项目说明属性消退距离&#xff08;Fade Distance&#xff09;这是应该发生消退的全局空间距离。未连接 FadeDistance&#xff08;FadeDistance&a…...

如何对GD32 MCU进行加密?

GD32 MCU有哪些加密方法呢&#xff1f;大家在平时项目开发的过程中&#xff0c;最后都可能会面临如何对出厂产品的MCU代码进行加密&#xff0c;避免产品流向市场被别人读取复制。 下面为大家介绍GD32 MCU所支持的几种常用的加密方法&#xff1a; 首先GD32 MCU本身支持防硬开盖…...

快速了解GPT-4o和GPT-4区别

GPT-4o简介 在5月14日的OpenAI举行春季发布会上&#xff0c;OpenAI在活动中发布了新旗舰模型“GPT-4o”&#xff01;据OpenAI首席技术官穆里穆拉蒂&#xff08;Muri Murati&#xff09;介绍&#xff0c;GPT-4o在继承GPT-4强大智能的同时&#xff0c;进一步提升了文本、图像及语…...

周末休息日也能及时回应客户消息!微信自动回复神器太就好用啦!

无论是在忙碌时&#xff0c;还是在周末休息日&#xff0c;如果没能及时回应客户&#xff0c;很可能会造成客户流失。 今天&#xff0c;我要为大家介绍一个多微管理神器——个微管理系统&#xff0c;它可以帮助你实现自动回复&#xff0c;提高回复效率。 自动通过好友请求 在…...

力扣404周赛 T1/T2/T3 枚举/动态规划/数组/模拟

博客主页&#xff1a;誓则盟约系列专栏&#xff1a;IT竞赛 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ 3200.三角形的最大高度【简单】 题目&#xff1a; 给你两个整数 red 和 b…...

Taurus 性能测试工具详解

文章目录 简介原理安装编写测试配置运行测试集成其他工具结果分析优点与缺点优点缺点 参考资料总结 简介 Taurus 是一个开源的自动化测试工具&#xff0c;用于简化和增强性能测试流程。与其他性能测试工具不同&#xff0c;Taurus 旨在通过友好的 YAML 配置文件和对多种负载测试…...

天猫商品详情API接口(店铺|标题|主图|价格|SKU属性等)

天猫商品详情API接口为开发者提供了获取天猫商品详细信息的能力&#xff0c;包括店铺信息、商品标题、主图、价格、SKU属性等。以下是该接口的使用过程和相关技术要点&#xff1a; 注册账号并创建应用 注册账号&#xff1a;需要在天猫开放平台注册一个开发者账号。创建应用&a…...

双向广搜——AcWing 190. 字串变换

双向广搜 定义 双向广度优先搜索&#xff08;Bi-directional Breadth-First Search, Bi-BFS&#xff09;是一种在图或树中寻找两点间最短路径的算法。与传统的单向广度优先搜索相比&#xff0c;它从起始点和目标点同时开始搜索&#xff0c;从而有可能显著减少搜索空间&#x…...

工商业光伏项目如何快速开发?

一、前期调研与规划 1、屋顶资源评估&#xff1a;详细测量屋顶面积、承重能力及朝向&#xff0c;利用光伏业务管理软件进行日照分析和发电量预测&#xff0c;确保项目可行性。 2、政策与补贴研究&#xff1a;深入了解当地政府对工商业光伏项目的政策支持和补贴情况&#xff0…...

Kafka入门-分区及压缩

一、生产者消息分区 Kafka的消息组织方式实际上是三级结构&#xff1a;主题-分区-消息。主题下的每条消息只会保存在某一个分区中&#xff0c;而不会在多个分区中被保存多份。 分区的作用就是提供负载均衡的能力&#xff0c;或者说对数据进行分区的主要原因&#xff0c;就是为…...

被⽹络罪犯利⽤的5⼤ChatGPT越狱提⽰

⾃ChatGPT发布的近18个月以来&#xff0c;⽹络罪犯们已经能够利⽤⽣成式AI进⾏攻击。OpenAI在其内容政策中制定了限制措施&#xff0c;以阻⽌⽣成恶意内容。作为回应&#xff0c;攻击者们创建了⾃⼰的⽣成式AI平台&#xff0c;如 WormGPT和FraudGPT&#xff0c;并且他们还分享了…...

AVR晶体管测试仪开源制作与验证

AVR晶体管测试仪开源制作与验证 &#x1f4cd;原项目地址&#xff1a;https://www.mikrocontroller.net/articles/AVR_Transistortester github地址&#xff1a;https://github.com/Mikrocontroller-net/transistortester &#x1f388;EasyEDA项目地址&#xff1a;https://osh…...

头条系统-05-延迟队列精准发布文章-概述添加任务(db和redis实现延迟任务)、取消拉取任务定时刷新(redis管道、分布式锁setNx)...

文章目录 延迟任务精准发布文章 1)文章定时发布2)延迟任务概述 2.1)什么是延迟任务2.2)技术对比 2.2.1)DelayQueue2.2.2)RabbitMQ实现延迟任务2.2.3)redis实现 3)redis实现延迟任务4)延迟任务服务实现 4.1)搭建heima-leadnews-schedule模块4.2)数据库准备4.3)安装redis4.4)项目…...

不同系统间数据交换要通过 api 不能直接数据库访问

很多大数据开发提供数据给外部系统直接给表结构&#xff0c;这是不好的方式。在不同系统间进行数据交换时&#xff0c;通过API&#xff08;应用程序编程接口&#xff09;而非直接访问数据库是现代系统集成的一种最佳实践。 目录 为什么要通过API进行数据交换如何通过API进行数据…...

深度探索“目录名称无效“:原因、解决方案与最佳实践

目录名称无效&#xff1a;现象背后的秘密 在日常使用电脑或移动设备时&#xff0c;我们时常会遇到“目录名称无效”的错误提示&#xff0c;这一提示仿佛是一道无形的屏障&#xff0c;阻断了我们与重要数据的联系。从本质上讲&#xff0c;“目录名称无效”意味着系统无法识别或…...

open3d基础使用-简单易懂

Open3D是一个开源库&#xff0c;主要用于快速开发处理3D数据的软件。它提供了丰富的数据结构和算法&#xff0c;支持点云、网格和RGB-D图像等多种3D数据的处理。以下是对Open3D基础使用的详细归纳和说明&#xff1a; 一、安装Open3D Open3D可以通过Python的包管理器pip进行安…...

【前端】HTML+CSS复习记录【5】

文章目录 前言一、padding、margin、border&#xff08;边框边距&#xff09;二、样式优先级三、var&#xff08;使用 CSS 变量更改多个元素样式&#xff09;四、media quary&#xff08;媒体查询&#xff09;系列文章目录 前言 长时间未使用HTML编程&#xff0c;前端知识感觉…...

三分钟看懂SMD封装与COB封装的差异

全彩LED显示屏领域中&#xff0c;COB封装于SMD封装是比较常见的两种封装方式&#xff0c;SMD封装产品主要有常规小间距以及室内、户外型产品&#xff0c;COB封装产品主要集中在小间距以及微间距系列产品中&#xff0c;今天跟随COB显示屏厂家中品瑞一起快速看懂SMD封装与COB封装…...

深入理解策略梯度算法

策略梯度&#xff08;Policy Gradient&#xff09;算法是强化学习中的一种重要方法&#xff0c;通过优化策略以获得最大回报。本文将详细介绍策略梯度算法的基本原理&#xff0c;推导其数学公式&#xff0c;并提供具体的例子来指导其实现。 策略梯度算法的基本概念 在强化学习…...

Unicode 和 UTF-8 以及它们之间的关系

通俗易懂的 Unicode 和 UTF-8 解释 Unicode 是什么&#xff1f; 想象一下&#xff0c;我们有一个巨大的图书馆&#xff0c;这个图书馆里有各种各样的书&#xff0c;每本书都有一个唯一的编号。Unicode 就像是这个图书馆的目录系统&#xff0c;它给世界上所有的字符&#xff0…...

【C++】多态详解

&#x1f497;个人主页&#x1f497; ⭐个人专栏——C学习⭐ &#x1f4ab;点击关注&#x1f929;一起学习C语言&#x1f4af;&#x1f4ab; 目录 一、多态概念 二、多态的定义及实现 1. 多态的构成条件 2. 虚函数 2.1 什么是虚函数 2.2 虚函数的重写 2.3 虚函数重写的两个…...