当前位置: 首页 > news >正文

Spark 内存运用

RDD Cache

当同一个 RDD 被引用多次时,就可以考虑进行 Cache,从而提升作业的执行效率

// 用 cache 对 wordCounts 加缓存
wordCounts.cache
// cache 后要用 action 才能触发 RDD 内存物化
wordCounts.count// 自定义 Cache 的存储介质、存储形式、副本数量
wordCounts.persist(MEMORY_ONLY)

Spark 的 Cache 机制 :

  • 缓存的存储级别:限定了数据缓存的存储介质,如 : 内存、磁盘
  • 缓存的计算过程:从 RDD 展开到分片 (Block),存储于内存或磁盘的过程
  • 缓存的销毁过程:缓存数据主动或被动删除的内存或磁盘的过程

存储级别

Spark 支持的存储级别:

  • RDD 缓存的默认存储级别:MEMORY_ONLY
  • DataFrame 缓存的默认存储级别:MEMORY_AND_DISK
存储级别存储介质存储形式副本设置
内存磁盘对象值序列化
MEMORY_ONLY1
MEMORY_ONLY_22
MEMORY_ONLY_SER1
MEMORY_ONLY_SER_22
DISK_ONLY1
DISK_ONLY_22
DISK_ONLY_33
MEMORY_AND_DISK1内存以对象值存储,磁盘以序列化
MEMORY_AND_DISK_22
MEMORY_AND_DISK_SER1内存/磁盘都以序列化的字节数组存储
MEMORY_AND_DISK_SER22

计算过程

缓存的计算过程 :

  • MEMORY_AND_DISK :先把数据集全部缓存到内存,内存不足时,才把剩余的数据落磁盘
  • MEMORY_ONLY :只把数据往内存里塞

内存中的存储过程 :

在这里插入图片描述

销毁过程

缓存的销毁过程 :

  • 缓存抢占 Execution Memory 空间,会进行缓存释放

Spark 清除缓存的原则:

  • LRU:按元素的访问顺序,优先清除那些最近最少访问的 BlockId、MemoryEntry 键值对
  • 在清除时,同属一个 RDD 的 MemoryEntry 不会清除

Spark 实现 LRU 的数据结构:LinkedHashMap , 内部有两个数据结构

  • HashMap : 用于快速访问,根据指定的 BlockId,O(1) 返回 MemoryEntry
  • 双向链表 : 用于维护元素(BlockId 和 MemoryEntry 键值对)的访问顺序

Spark 会释放 LRU 的 MemoryEntry :

在这里插入图片描述

Cache 注意点

用 Cache 的基本原则 :

  • RDD/DataFrame/Dataset 的引用数为 1,坚决不用 Cache
  • 当引用数大于 1,且运行成本超过 30%,就考虑用 Cache

运行成本占比 : 计算某个分布式数据集要消耗的总时间与作业执行时间的比值

  • 端到端的执行时间为 1 小时
  • DataFrame 被引用了 2 次
  • 从读取数据源到生成该 DataFrame 花了 12 分钟
  • 该 DataFrame 的运行成本占比 = 12*2/60 = 40%

用 noop 计算 DataFrame 运行时间 :

df.write.format("noop").save()

.cache 是惰性操作,在调用 .cache后,要先用 count 才能触发缓存的完全物化

Cache 要遵循最小公共子集原则 :

val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)//Cache方式一
val cachedDF = df.cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)

两个查询的 Analyzed Logical Plan 不一致,无法缓存复用

//Cache方式二
df.select(col1, col2).filter(col2 > 0).cache
//数据分析
df.filter(col2 > 0).select(col1, col2)
df.select(col1, col2).filter(col2 > 100)

Analyzed Logical Plan 完全一致,能缓存复用

//Cache方式三
val cachedDF = df.select(col1, col2).cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)

及时清理 Cache :

  • 异步模式 (常用):调用 unpersist() 或 unpersist(False)
  • 同步模式:调用 unpersist(True)

OOM

OOM的具体区域 :

  • 发生 OOM 的 LOC(Line Of Code),代码位置
  • OOM 发生在 Driver 端,还是在 Executor 端
  • 在 Executor 端的哪片内存区域

Driver OOM

Driver 端的 OOM 位置 :

  • 创建小规模的分布式数据集:使用 parallelize、createDataFrame 创建数据集
  • 收集计算结果:通过 take、show、collect 把结果收集到 Driver 端

Driver 端的 OOM 原因:

  • 创建的数据集超过内存上限
  • 收集的结果集超过内存上限

广播变量的创建与分发 :

在这里插入图片描述

广播变量的数据拉取就是用 collect 。当数据总大小超过 Driver 端内存时 , 就报 OOM :

java.lang.OutOfMemoryError: Not enough memory to build and broadcast

对结果集尺寸预估,适当增加 Driver 内存配置

  • Driver 内存大小 : spark.driver.memory

查看执行计划 :

val df: DataFrame = _
df.cache.countval plan = df.queryExecution.logical
val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes

Executor OOM

当 Executors OOM 时,定位 Execution Memory 和 User Memory

User Memory OOM

User Memory 用于存储用户自定义的数据结构,如: 数组、列表、字典

当自定义数据结构的总大小超出 User Memory 上限时,就会报错

java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf
java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance

计算 User Memory 消耗时,要考虑 Executor 的线程池大小

  • 当 dict 大小为 #size , Executor 线程池大小为 #threads
  • dict 对 User Memory 的总消耗:#size * #threads
  • 当总消耗超出 User Memory 上限,就会 OOM
val dict = List("spark", "tune")
val words = spark.sparkContext.textFile("~/words.csv")val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

自定义数据分发 :

  • 自定义的列表 dict 会随着 Task 分发到所有 Executors
  • 多个 Task 的 dict 会对 User Memory 产生重复消耗

在这里插入图片描述

解决 User Memory OOM 的思路 :

  • 先对数据结构的消耗进行预估
  • 相应地扩大 User Memory

UserMemory 总大小 = spark.executor.memory * (1 - spark.memory.fraction)

Execution Memory OOM

Execution Memory OOM 常见实例:数据倾斜和 数据膨胀

配置说明:

  • 2 个 CPU core,每个 core 有两个线程,内存大小为 1GB
  • spark.executor.cores = 3,spark.executor.memory = 900MB
  • Execution Memory = 180MB
  • Storage Memory = 180MB
  • Execution Memory 上限 = 360MB

数据倾斜

3 个 Reduce Task 对应的数据分片大小分别是 100MB , 100MB , 300MB

  • 当 Executor 线程池大小为 3,所以每个 Reduce Task 最多 360MB * 1/3 = 120MB

数据倾斜导致OOM :

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NHRj4w40-1677761995168)(…/…/png/%E5%86%85%E5%AD%98%E8%BF%90%E7%94%A8/image-20230209210919876.png)]

2 种调优思路:

  • 消除数据倾斜,让所有的数据分片尺寸都小于 100MB
  • 调整 Executor 线程池、内存、并行度等相关配置,提高 1/N 上限到 300MB

在 CPU 利用率高下解决 OOM :

  • 维持并发度、并行度不变,增大执行内存设置,提高 1/N 上限到 300MB
  • 维持并发度、执行内存不变,提升并行度把数据打散,将所有的数据分片尺寸都缩小到 100MB 内

数据膨胀

3 个 Map Task 对应的数据分片大小都是 100MB

数据膨胀导致OOM :

在这里插入图片描述

2 种调优思路:

  • 把数据打散,提高数据分片数量、降低数据粒度,让膨胀后的数据量降到 100MB 左右
  • 加大内存配置,结合 Executor 线程池调整,提高 1/N 上限到 300MB

相关文章:

Spark 内存运用

RDD Cache 当同一个 RDD 被引用多次时,就可以考虑进行 Cache,从而提升作业的执行效率 // 用 cache 对 wordCounts 加缓存 wordCounts.cache // cache 后要用 action 才能触发 RDD 内存物化 wordCounts.count// 自定义 Cache 的存储介质、存储形式、副本…...

SpringBoot集成Swagger3.0(入门) 02

文章目录Swagger3常用配置注解接口测试API信息配置Swagger3 Docket开关,过滤,分组Swagger3常用配置注解 ApiImplicitParams,ApiImplicitParam:Swagger3对参数的描述。 参数名参数值name参数名value参数的具体意义,作用。required参…...

网络协议丨ICMP协议

ICMP协议,全称 Internet Control Message Protocol,就是互联网控制报文协议。我们其实对它并不陌生,我们平时经常使用的”ping“一下就是基于这个协议工作的。网络包在异常复杂的网络环境中传输时,常常会遇到各种各样的问题。当遇…...

12.1 基于Django的服务器信息查看应用(系统信息、用户信息)

文章目录新建Django项目创建子应用并设置本地化创建数据库表创建超级用户git管理项目(requirements.txt、README.md、.ignore)主机信息监控应用的框架搭建具体功能实现系统信息展示前端界面设计视图函数设计用户信息展示视图函数设计自定义过滤器的实现前…...

ExSwin-Unet 论文研读

ExSwin-Unet摘要1 引言2 方法2.1 基于窗口的注意力块2.2 外部注意力块2.3 不平衡的 Unet 架构2.4 自适应加权调整2.5 双重损失函数3 实验结果3.1 数据集3.2 实现细节3.3 与 SOTA 方法的比较3.4 消融研究4 讨论和限制5 结论数据集来源: https://feta.grand-challenge…...

置顶!!!主页禁言提示原因:在自己论坛发动态误带敏感词,在自己论坛禁止评论90天

置顶!!!主页禁言提示原因:在自己论坛发动态误带敏感词,在自己论坛禁止评论90天 置顶!!!主页禁言提示原因:在自己论坛发动态误带敏感词,在自己论坛禁止评论90天…...

优思学院|解密六西格玛:探索DMAIC和DMADV之间的区别

六西格玛方法中最为广泛使用的两种方法是DMAIC和DMADV。这两种方法都是为了让企业流程更加高效和有效而设计的。虽然这两种方法有一些重要的共同特点,但它们并不可以互相替代,并且被开发用于不同的企业流程。在更详细地比较这两种方法之前,我…...

Pytorch的DataLoader输入输出(以文本为例)

本文不做太多原理介绍,直讲使用流畅。想看更多底层实现-〉传送门。DataLoader简介torch.utils.data.DataLoader是PyTorch中数据读取的一个重要接口,该接口定义在dataloader.py脚本中,只要是用PyTorch来训练模型基本都会用到该接口。本文介绍t…...

代谢组学:Microbiome又一篇!绘制重症先天性心脏病新生儿肠道微生态全景图谱

文章标题:Mapping the early life gut microbiome in neonates with critical congenital heart disease: multiomics insights and implications for host metabolic and immunological health 发表期刊:Microbiome 影响因子:16.837…...

Java基本类型所占字节简述

类型分类所占字节取值范围boolean布尔型1bit0 false、 1 true (1个bit 、1个字节、4个字节)char ​字符型(Unicode字符集中的一个元素)​ 2字节-32768~32767(-2的15次方~2的15次方-1)byte整型1字节-128&a…...

Linux vi常用操作

vi/vim 共分为三种模式,分别是命令模式(Command mode),输入模式(Insert mode)和底线命令模式(Last line mode)。 这三种模式的作用分别是: 命令模式: 用户刚…...

Unicode(宽字节)、ANSI(多字节)

1、什么时候用Unicode(宽字节),什么时候用ANSI(多字节)? 在linux/windows等操作系统中使用的,一般都是Unicode(宽字节)。 下位机PLC/单片机等硬件设备中使用,一般都是ANSI(多字节)。 所以,通讯中(比如VS项目&#x…...

STM32实战之LED循环点亮

接着上一章讲。本章我们来讲一讲LED流水灯,循环点亮LED。 在LED章节有的可能没有讲到,本章会对其进行说明,尽量每个函数说一下作用。也会在最后说一下STM32的寄存器,在编程中寄存器是避免不了的东西,寄存器也是非常好理…...

智慧厕所智能卫生间系统有哪些功能

南宁北站智能厕所主要功能有哪些?1、卫生间环境空气监测男厕、女厕环境空气监测系统包括对厕所内的温度、湿度、氨气、硫化氢、PM2.5、烟雾等气体数据的实时监测。2、卫生间厕位状态监测系统实时监测厕位内目前的使用状态(有人或无人),数据信…...

【网络】套接字 -- TCP

🥁作者: 华丞臧. 📕​​​​专栏:【网络】 各位读者老爷如果觉得博主写的不错,请诸位多多支持(点赞收藏关注)。如果有错误的地方,欢迎在评论区指出。 推荐一款刷题网站 👉 LeetCode刷题网站 文章…...

NDK C++ map容器

map容器// TODO map容器 #include <iostream> #include <map>using namespace std;int main() {// TODO map<int, string>按key值排序&#xff0c;同一个key不可以重复插入map<int, string> map1;map1.insert(pair<int, string>(1, "111&qu…...

linux(Centos)安装docker

官网地址&#xff1a;Install Docker Engine on CentOS 首先检查linux系统版本及内核&#xff1a; 安装docker要求系统版本至少为7.x版本&#xff0c;内核至少为3.8以上 cat /etc/redhat-release # 查看系统版本号uname -r #查看linux系统内核 检查系统是否能连上外网&#…...

Delphi 中 FireDAC 数据库连接(处理错误)

参见&#xff1a;Delphi 中 FireDAC 数据库连接&#xff08;总览&#xff09;本主题描述了如何用FireDAC处理数据库错误。一、概述EFDDBEngineException类是所有DBMS异常的基类。单个异常对象是一个数据库错误的集合&#xff0c;可以通过EFDDBEngineException.Errors[]属性访问…...

算法小抄3-理解使用Python容器之列表

引言 首先说一个概念哈,程序算法数据结构,算法是条件语句与循环语句组成的逻辑结构,而数据结构也就是容器. 算法决定数据该如何处理,而容器则决定如何数据如何存储. 不同的语言对容器有不同的实现方式, 但他们的功能都是相似的, 打好容器基础,你就可以在各式各样的语言中来回横…...

Vue3中watch的value问题

目录前言一&#xff0c;ref和reactive的简单复习1.ref函数1.2 reactive函数1.3 用ref定义对象类型数据不用reactive二&#xff0c;watch的value问题2.1 ref2.1.1 普通类型数据2.1.2 对象类型数据2.1.3 另一种方式2.2 reactive三&#xff0c;总结后记前言 在Vue3中&#xff0c;…...

pam_env.so模块配置解析

在PAM&#xff08;Pluggable Authentication Modules&#xff09;配置中&#xff0c; /etc/pam.d/su 文件相关配置含义如下&#xff1a; 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块&#xff0c;负责验证用户身份&am…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解&#xff08;7大核心步骤实战技巧&#xff09; 一、爬虫完整工作流程 以下是爬虫开发的完整流程&#xff0c;我将结合具体技术点和实战经验展开说明&#xff1a; 1. 目标分析与前期准备 网站技术分析&#xff1a; 使用浏览器开发者工具&#xff08;F12&…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

LeetCode - 199. 二叉树的右视图

题目 199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 思路 右视图是指从树的右侧看&#xff0c;对于每一层&#xff0c;只能看到该层最右边的节点。实现思路是&#xff1a; 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)

考察一般的三次多项式&#xff0c;以r为参数&#xff1a; p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]&#xff1b; 此多项式的根为&#xff1a; 尽管看起来这个多项式是特殊的&#xff0c;其实一般的三次多项式都是可以通过线性变换化为这个形式…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...

若依登录用户名和密码加密

/*** 获取公钥&#xff1a;前端用来密码加密* return*/GetMapping("/getPublicKey")public RSAUtil.RSAKeyPair getPublicKey() {return RSAUtil.rsaKeyPair();}新建RSAUti.Java package com.ruoyi.common.utils;import org.apache.commons.codec.binary.Base64; im…...

土建施工员考试:建筑施工技术重点知识有哪些?

《管理实务》是土建施工员考试中侧重实操应用与管理能力的科目&#xff0c;核心考查施工组织、质量安全、进度成本等现场管理要点。以下是结合考试大纲与高频考点整理的重点内容&#xff0c;附学习方向和应试技巧&#xff1a; 一、施工组织与进度管理 核心目标&#xff1a; 规…...