当前位置: 首页 > news >正文

Spark核心组件解析:Executor、RDD与缓存优化

Spark核心组件解析:Executor、RDD与缓存优化

Spark Executor

Executor 是 Spark 中用于执行任务(task)的执行单元,运行在 worker 上,但并不等同于 worker。实际上,Executor 是一组计算资源(如 CPU 核心和内存)的集合,多个 executor 共享 worker 上的 CPU 和内存资源。

Executor 的功能

  • 任务执行:Executor 负责执行分配给它的任务,并返回结果到 driver 程序。

  • 缓存机制:如果应用程序调用了 cache() 或 persist() 函数,Executor 会通过 Block Manager 为RDD 提供缓存机制,优化重复计算。

  • 生命周期:Executor 存在于整个 Spark 应用的生命周期内。

Executor 的创建

Spark 在以下几种情况下创建 Executor:

  • 当资源管理器为 Standalone 或 YARN,且 CoarseGrainedExecutorBackend 进程接收到
    RegisteredExecutor 消息时;
  • 当使用 Mesos 资源管理器时,MesosExecutorBackend 进程注册时;
  • 在本地模式下,当 LocalEndpoint 被创建时。

创建成功后,日志会显示如下信息:

INFO Executor: Starting executor ID [executorId] on host [executorHostname]

心跳发送线程

Executor 会定期向 driver 发送心跳信号以确保连接活跃。心跳线程通常是一个调度线程池,利用 ScheduledThreadPoolExecutor 来维持任务的实时性。

执行任务

Executor 通过 launchTask 方法来执行任务。这个方法会创建一个 TaskRunner 线程,并在 Executor Task Launch Worker 线程池中执行任务。

private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")

Spark RDD (Resilient Distributed Dataset)

RDD 是 Spark 的基础数据结构,表示一个不可变的分布式数据集。RDD 在集群中的各个节点上并行计算,并且具有弹性(容错性)和分布式的特性。

RDD 的特性

  • 弹性:RDD 是容错的,丢失的数据可以通过其父 RDD 重新计算。
  • 分布式:RDD 的数据分布在集群的不同节点上,支持分布式计算。
  • 不可修改:RDD 一旦创建,其数据不可修改,这也保证了数据的一致性。
  • 分区:RDD 会被划分为多个分区,以便并行处理。

RDD 的创建方式

(1)并行化:可以通过 SparkContext.parallelize() 方法从一个数据集合创建 RDD。
(2)从外部存储:可以通过 SparkContext.textFile() 等方法从外部存储系统(如 HDFS)加载数据创建 RDD。
(3)从其他 RDD:通过 Spark 的 Transformation 操作从已有的 RDD 创建新的 RDD。

RDD 操作

RDD 支持两种类型的操作:

  • Transformation 操作(转换):如 map()、filter(),返回新的 RDD。
  • Action 操作(行动):如 count()、collect(),触发实际计算并返回结果。

RDD 的容错性

RDD 提供容错能力。当某个节点失败时,可以根据其父 RDD 的计算逻辑恢复丢失的数据。这是通过 DAG(有向无环图)和父 RDD 关系来实现的。

RDD 的持久化

RDD 可以使用 cache() 或 persist() 进行持久化存储,缓存的 RDD 会存储在内存中,若内存不足则溢写到磁盘,避免重复计算。

RDD 的局限性

缺少内置优化引擎:RDD 无法像 DataFrame 和 Dataset 一样利用 Spark 的 Catalyst 优化器进行自动优化。

性能问题:随着数据量增大,RDD 计算的性能可能下降,尤其是与 JVM 垃圾回收和序列化相关的开销。

存储问题:当内存不足时,RDD 会将数据溢写到磁盘,这会导致计算性能大幅下降。

创建 RDD 的例子

  1. 并行化创建 RDD:
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result = data.map(_ * 2)
result.collect()  // 返回 [2, 4, 6, 8, 10]
  1. 从外部存储创建 RDD:
val rdd = sc.textFile("hdfs://path/to/file")
  1. 从其他 RDD 创建 RDD:
val newRdd = oldRdd.filter(_ > 10)

Spark RDD 缓存机制

Spark RDD 缓存是一种优化技术,用于将中间计算结果存储在内存中,以便在后续操作中复用,从而减少重复计算,提高性能。RDD 缓存可以显著加速一些需要迭代计算的应用,特别是在机器学习和图计算等场景中。

持久化 RDD

持久化操作会将 RDD 的计算结果存储到内存中。这样,每次对 RDD 进行操作时,Spark 会直接使用内存中的数据,而不必重新计算。通过持久化,可以避免重复计算从而提高效率。

  • cache():cache() 是 persist() 的简化方法,默认将 RDD 数据存储在内存中,使用 MEMORY_ONLY
    存储级别。
  • persist():可以通过 persist() 方法选择不同的存储级别,例如 MEMORY_ONLY、DISK_ONLY 等。
  • unpersist():用于移除已缓存的数据,释放内存。

RDD 持久化存储级别

Spark 提供了多种存储级别,每种级别的存储方式不同,根据具体的需求选择合适的存储级别。

存储级别使用空间CPU时间是否在内存中是否在磁盘上备注
MEMORY_ONLY默认级别,数据未序列化,全部存储在内存中
MEMORY_ONLY_2数据存储 2 份
MEMORY_ONLY_SER数据序列化存储,占用更少内存
MEMORY_AND_DISK中等部分部分内存不够时,数据溢写到磁盘
MEMORY_AND_DISK_SER部分部分数据序列化,内存不够时溢写到磁盘
DISK_ONLY数据仅存储在磁盘中
OFF_HEAP----存储在堆外内存,目前为试验性选项

副本机制

带有 _2 后缀的存储级别表示在每个节点上缓存数据的副本。副本机制是为了提高容错性。如果某个节点的数据丢失,Spark 可以从其他节点的副本中恢复数据,而不必重新计算。

缓存策略的选择

  • MEMORY_ONLY:适用于内存足够大的场景,避免序列化和磁盘 I/O开销。性能较高,但如果内存不足可能会导致计算失败。
  • MEMORY_ONLY_SER:适用于内存较为紧张的场景,将数据进行序列化后保存在内存中,减少内存占用,但会增加序列化的开销。
  • MEMORY_AND_DISK:适用于内存不足的场景,数据无法完全存储在内存时会溢写到磁盘,确保数据不会丢失。
  • DISK_ONLY:适用于数据量极大的情况,全部数据存储在磁盘中,性能较低,但可以处理大规模数据。

如何使用 Spark RDD 缓存

缓存 RDD:

val rdd = sc.textFile("data.txt")
rdd.cache()  // 使用默认的 MEMORY_ONLY 存储级别

选择存储级别:

rdd.persist(StorageLevel.MEMORY_AND_DISK)  // 选择 MEMORY_AND_DISK 存储级别

清除缓存

rdd.unpersist()  // 移除缓存

Spark 键值对 RDD

Spark 通过 PairRDD 处理键值对类型的数据,提供了多种用于处理键值对数据的转换操作。

  1. 如何创建键值对 RDD
    通过 map 操作将普通 RDD 转换为键值对 RDD。
    Scala 示例:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))

Python 示例:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
  1. 常见的键值对操作
    reduceByKey(func):对具有相同键的值进行规约操作。
val pairs = sc.parallelize(List((1,2),(3,4),(3,6)))
val result = pairs.reduceByKey((a, b) => a + b)
println(result.collect().mkString(","))

groupByKey():对具有相同键的值进行分组。

val result = pairs.groupByKey()
println(result.collect().mkString(","))

mapValues():对值进行转换操作,但不改变键。

val result = pairs.mapValues(x => x + 1)
println(result.collect().mkString(","))

sortByKey():按键排序。

val sorted = pairs.sortByKey()
println(sorted.collect().mkString(","))
  1. 对两个 RDD 的操作
    join():连接两个 RDD,返回键相同的数据。
val other = sc.parallelize(List((3, 9)))
val joined = pairs.join(other)
println(joined.collect().mkString(","))

leftOuterJoin() 和 rightOuterJoin():左外连接和右外连接,分别确保第一个和第二个 RDD 中的键存在。

val leftJoined = pairs.leftOuterJoin(other)
val rightJoined = pairs.rightOuterJoin(other)
println(leftJoined.collect().mkString(","))
println(rightJoined.collect().mkString(","))

通过合理使用 Spark 的缓存和键值对 RDD 操作,可以显著提升大数据计算的效率,尤其是在迭代计算和需要频繁访问中间数据的场景下。

相关文章:

Spark核心组件解析:Executor、RDD与缓存优化

Spark核心组件解析:Executor、RDD与缓存优化 Spark Executor Executor 是 Spark 中用于执行任务(task)的执行单元,运行在 worker 上,但并不等同于 worker。实际上,Executor 是一组计算资源(如…...

“AI玩手机”原理揭秘:大模型驱动的移动端GUI智能体

作者|郭源 前言 在后LLM时代,随着大语言模型和多模态大模型技术的日益成熟,AI技术的实际应用及其社会价值愈发受到重视。AI智能体(AI Agent)技术通过集成行为规划、记忆存储、工具调用等机制,为大模型装上…...

离散数学【关系】中的一些特殊关系

在数学中,关系是描述集合之间元素间关系的方式。以下是对一些常见关系的详细分析及举例: 1. 空关系 (Empty Relation) 空关系是指在一个集合中,没有任何元素之间存在关系。即对于集合中的所有元素,空关系都不包含任何有序对。 …...

docker 配置代理

创建 Docker 服务配置文件&#xff1a; sudo mkdir -p /etc/systemd/system/docker.service.d sudo vim /etc/systemd/system/docker.service.d/http-proxy.conf添加代理配置&#xff1a; [Service] Environment"HTTP_PROXYhttp://<proxy-address>:<port>&q…...

Dockerfile详解:构建简单高效的容器镜像

引言 在容器化技术日益普及的今天&#xff0c;Dockerfile 成为了构建 Docker 镜像的核心工具。通过编写 Dockerfile&#xff0c;开发者可以将应用程序及其依赖打包成一个可移植、可复用的镜像&#xff0c;从而简化部署和运维工作。本文将详细介绍 Dockerfile 的基本概念、常用指…...

RHCD-----shell

要求&#xff1a; 通过shell脚本分析部署nginx网络服务 1.接收用户部署的服务名称 2.判断服务是否安装 ​ 已安装&#xff1b;自定义网站配置路径为/www&#xff1b;并创建共享目录和网页文件&#xff1b;重启服务 ​ 没有安装&#xff1b;安装对应的软件包 3.测试 判断服务是…...

<硬件有关> 内存攒机认知入门,内存的选择 配置 laptop PC 服务器

原因 这不是黑五吗&#xff0c;给我儿子买了台最便宜 ($300) DELL laptop&#xff0c;CPU 是 i5-1235U&#xff0c;但只有 8GB 内存。升级内存吧。 如何选择内存&#xff1a;家用范围 这里不考虑品牌&#xff0c;在我眼里&#xff0c;区别就是价格&#xff0c;还有所谓的物理…...

基于springboot的来访管理系统的设计与实现

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于springboot的来访管理系统的设计与实…...

window11编译pycdc.exe

一、代码库和参考链接 在对python打包的exe文件进行反编译时&#xff0c;会使用到uncompyle6工具&#xff0c;但是这个工具只支持python3.8及以下&#xff0c;针对更高的版本的python则不能反编译。 关于反编译参考几个文章&#xff1a; Python3.9及以上Pyinstaller 反编译教…...

11.22.2024 面试后记

Watching those fucking ap’s paper is bullshit and wasting your time. you’d mother fucker directly say I’m not qualified. if I’m qualified, how could I see u at this place. your dad is alread being rich and enjoy the world. 抽了一周时间去看那些教授的文章…...

Bug Fix 20241122:缺少lib文件错误

今天有朋友提醒才突然发现 gitee 上传的代码存在两个很严重&#xff0c;同时也很低级的错误。 因为gitee的默认设置不允许二进制文件的提交&#xff0c; 所以PH47框架下的库文件&#xff08;各逻辑层的库文件&#xff09;&#xff0c;以及Stm32Cube驱动的库文件都没上传到Gi…...

Pinia 实战教程:构建高效的 Vue 3 状态管理系统

前言 在前端开发中&#xff0c;状态管理已成为必不可少的一部分&#xff0c;Vue.js 生态系统中提供了多种状态管理解决方案。Pinia 是 Vue 3 推出的一种全新的状态管理库&#xff0c;旨在取代 Vuex&#xff0c;提供更简洁的 API、更优雅的 TypeScript 支持以及更高效的性能表现…...

springboot3如何集成knife4j 4.x版本及如何进行API注解

1. 什么是Knife4j knife4j是为Java MVC框架集成Swagger生成Api文档的增强解决方案, 取名knife4j是希望她能像一把匕首一样小巧,轻量,并且功能强悍!knife4j的前身是swagger-bootstrap-ui,swagger-bootstrap-ui自1.9.6版本后,正式更名为knife4j为了契合微服务的架构发展,由于原来…...

区块链讲解

区块链技术是一种分布式账本技术&#xff0c;其应用场景和优势可以总结如下&#xff1a; 金融服务&#xff1a;区块链可以用于支付、跨境汇款、证券交易、贷款等金融服务领域&#xff0c;通过去中心化的方式实现快速、低成本、安全的交易。 物联网&#xff1a;区块链可以用于物…...

使用eclipse构建SpringBoot项目

我这里用eclipse2018版本做演示&#xff0c;大家有需要的可以下载Eclipse Downloads | The Eclipse Foundation 1.打开eclipse&#xff0c;选择存放代码的位置 2.选择 file >> new >> project >> 选择springboot文件下的 spring starter project 2.这里选择N…...

uniapp input限制输入负数,以及保留小数点两位.

简单处理的方式 限制输入负数,以及保留小数点两位.: <input type"number" placeholder"请输入" v-model"num"input"numnum.toString().replace(/\-/g,).match(/^\d(?:\.\d{0,2})?/)" /> 可以输入负数,保留两位小数点,把rep…...

《FreeRTOS任务删除篇》

任务删除函数 源码1. 进入临界区1.1 第一步1.2 第二步1.3 第三步1.4 第四步 2. 获取待删除任务的任务控制块TCB3. 从就绪/延迟列表中删除任务4. 从事件列表中删除任务5. 如果待删除任务是当前运行的任务6. 如果待删除任务是其它任务7. 退出临界区7.1 第一步7.2 第二步7.3 第三步…...

递归算法专题一>Pow(x, n)

题目&#xff1a; 解析&#xff1a; 代码&#xff1a; public double myPow(double x, int n) {return n < 0 ? 1.0 / pow(x,-n) : pow(x,n); }private double pow(double x, int n){if(n 0) return 1.0;double tmp pow(x,n / 2);return n % 2 0 ? tmp * tmp : tmp …...

数据结构第一讲

数据结构定义 算法的定义 什么是好算法&#xff1f; 空间复杂度 时间复杂度 例子1 打印1到N之间的正整数 有递归和循环两种方法实现。 但是在数字变大后&#xff0c;递归的方法会导致内存占用过多而崩溃。 而循环则不会 例子2 写程序给定多项式在X处的值 从里往外算的算…...

SHELL笔记(循环)

在 Shell 编程中&#xff0c;循环结构是极为重要的一部分&#xff0c;它能够让我们轻松地重复执行特定的代码块&#xff0c;从而高效地处理各种重复性任务。本文将详细介绍 Shell 中常见的循环结构&#xff0c;包括 for 循环、while 循环和 until 循环&#xff0c;并通过具体的…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

现代密码学 | 椭圆曲线密码学—附py代码

Elliptic Curve Cryptography 椭圆曲线密码学&#xff08;ECC&#xff09;是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础&#xff0c;例如椭圆曲线数字签…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...

return this;返回的是谁

一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请&#xff0c;不同级别的经理有不同的审批权限&#xff1a; // 抽象处理者&#xff1a;审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...

华为OD机考-机房布局

import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...