当前位置: 首页 > news >正文

Spark RDD优化

Spark RDD优化

  • 一、分区优化
  • 二、持久化优化
  • 三、依赖优化
  • 四、共享变量优化
  • 五、提交模式与运行模式优化
  • 六、其他优化

一、分区优化

  • 分区数调整:RDD的分区数可以通过repartitioncoalesce方法进行调整。合理的分区数可以提高并行度,但过多的分区会增加管理开销。通常,分区数应根据数据规模和集群资源进行调整。

    val rdd: RDD[String] = rdd.coalesce(numPartitions:Int, shuffle:Boolean)
    val rdd: RDD[String] = rdd.repartition(numPartitions:Int) 
    // repartition(numPartitions: Int) 等价于 coalesce(numPartitions, true) 
    
    1. 缩小分区

      存在过多的小任务的时候收缩合并分区,减少分区的个数,减少任务调度成本
      默认情况下,不会对数据重组,比如:3个合成2个,采用 {1+2},{3},容易导致数据倾斜
      若需数据均衡,则将 shuffle 参数设置为 true 即可

    2. 扩大分区

      若需要扩大分区,shuffle 参数必须设置为 true
      若将2个分区拆分成3个,必须打乱重新分区,否则数据还是在两个分区(有一个分区为空),{1},{2},{空}

  • 数据本地性:Spark会尽量将数据分配给与数据源相同的计算节点上,以减少数据移动的开销。在创建RDD时,可以通过设置分区偏好(如preferredLocations)或自定义分区来优化数据本地性,以最小化网络传输并最大化计算效率。

    自定义分区

    // 自定义分区器
    class MyPartitioner(numPartitions: Int) extends Partitioner {override def numPartitions: Int = numPartitions   // 返回分区器的分区数量override def getPartition(key: Any): Int = {// 这里需要实现分区逻辑// 返回值是一个整数,表示该键应该被分配到哪个分区}
    }
    
    // 使用自定义分区器重新分区  
    val partitionedRDD = rdd.partitionBy(new MyPartitioner(2))  // 传入分区个数
    
  • 处理数据倾斜:数据倾斜是指某些分区包含的数据远远多于其他分区,导致计算资源分配不均。可以使用repartitioncoalesce方法重新分区RDD,或使用reduceByKeygroupByKey的变体等特定操作来减轻数据倾斜的影响。

二、持久化优化

  • 持久化策略:对于需要多次使用的RDD,应该进行持久化操作,以避免重复计算。持久化策略包括内存持久化(如MEMORY_ONLY)、磁盘持久化(如DISK_ONLY)以及内存和磁盘混合持久化(如MEMORY_AND_DISK)等。

  • 序列化:使用序列化可以进一步减少内存消耗,并提高持久化效率。Spark支持多种序列化框架,如Java序列化、Kryo序列化等。Kryo序列化通常比Java序列化更快,且占用空间更小。

    // 临时存储于【xx】重用,job结束后自动删除 
    val rddCache: RDD[T] = rdd.cache()					// 到内存上
    val rdd: RDD[T] = rdd.persist(level:StorageLevel)
    // cache() 		等价于persist(StorageLevel.MEMORY_ONLY)
    // persisit() 	参数如下
    
    StorageLevel.MEMORY_ONLY				只写到内存上
    StorageLevel.DISK_ONLY					只写到磁盘上
    StorageLevel.OFF_HEAP					使用堆外内存
    StorageLevel.MEMORY_AND_DISK			先内存,后磁盘 
    StorageLevel.MEMORY_AND_DISK_SER		先内存,后磁盘,采取序列化方式
    StorageLevel.MEMORY_AND_DISK_SER_2 		先内存,后磁盘,采取二代序列化方式
    
  • 检查点:对于需要长时间运行或可能遭受故障的应用,设置检查点(Checkpoint)可以将RDD的状态保存到稳定存储中,以便在故障后恢复。检查点会切断RDD的血统关系,从而避免重新计算整个血统链。

    // checkpoint 长久存储于【磁盘】重用,job结束后不会删除,涉及IO性能较差,安全且一般和cache组合使用
    val conf = new SparkConf().setAppName("spark_rdd").setMaster("local[4]")
    val sc = SparkContext.getOrCreate(conf)
    // 设置检查点路径
    sc.setCheckpointDir("hdfs://ip:9000/spark/checkpoint")
    // ... 
    rdd.checkpoint()	// 将该 RDD 的内容写入到设置的路径,并在该 RDD 的计算图中插入一个检查点(Checkpoint)节点
    

三、依赖优化

  • 宽依赖与窄依赖:RDD之间的依赖关系分为宽依赖和窄依赖。窄依赖有助于实现数据本地性,而宽依赖则可能导致数据移动和网络开销。在设计RDD转换操作时,应尽量避免不必要的宽依赖。

    1、Driver程序提交后

    1、Spark调度器将所有的RDD看成是一个Stage
    2、然后对此Stage进行逆向回溯,遇到Shuffle就断开,形成一个新的Stage
    3、遇到窄依赖,则归并到同一个Stage
    4、等到所有的步骤回溯完成,便生成一个DAG图

    2、为什么要划分阶段

    1、基于数据的分区,本着传递计算的性能远高于传递数据,所以数据本地化是提升性能的重要途径之一
    2、一组串行的算子,无需 Shuffle,基于数据本地化可以作为一个独立的阶段连续并行执行
    3、经过一组串行算子计算,遇到 Shuffle 操作,默认情况下 Shuffle 不会改变分区数量,但会因为 numPartitions:Int, partitioner:Partitioner 等参数重新分配,过程数据会【写盘供子RDD拉取(类MapReduce)】

    3、RDD依赖关系

    • Lineage:血统、遗传

      RDD最重要的特性之一,保存了RDD的依赖关系

      RDD实现了基于Lineage的容错机制

    • 依赖关系 org.apache.spark.Dependency

      窄依赖 NarrowDependency,1V1 OneToOneDependency,1VN RangeDependency
      宽依赖 ShuffleDependency

    • 当RDD分区丢失时

      对于窄依赖,Spark只需要重新计算丢失分区的父RDD分区即可。
      对于宽依赖,Spark需要重新执行整个shuffle过程,以重新生成丢失的数据。
      若配合持久化更佳:cache, persist, checkpoint

    在这里插入图片描述

    类型
    窄依赖map,flatMap,mapPartitions,mapPartitionsWithIndex,glom,filter,distinct,intersection,sample,union,subtract,zip…,cogroup
    宽依赖sortBy,sortByKey,groupByKey,reduceByKey,cogroup,join,partitionBy,repartition
    不一定的情况在Spark中,并非所有操作都可以明确地归类为宽依赖或窄依赖。有些操作可能根据具体的实现或上下文而有所不同。然而,在大多数情况下,上述提到的算子可以清晰地划分为宽依赖或窄依赖。
    如:reduceByKey(【partitioner: Partitioner】, func: (V, V) => V)
    若使用的是带 partitioner 的重载且 Partitioner 和父RDD的 Partitioner一致
    则为窄依赖RDD,否则为宽依赖ShuffledRDD
    
  • 优化转换操作:在可能的情况下,使用能够减少shuffle操作的转换函数,如mapPartitions代替mapreduceByKey代替groupByKey等。这些操作可以减少数据在网络中的传输量,从而提高性能。

    shuffle性能较差:因为shuffle必须落盘,内存中等数据会OOM
    groupByKey只分组(存在Shuffle) + reduce只聚合<=结果同,性能不同=>
    reduceByKey先分组、预聚合、再聚合(存在Shuffle) 
    

四、共享变量优化

  • 广播大变量:当Spark作业中需要使用到较大的外部变量时,可以将这些变量广播到每个节点的Executor上,而不是每个Task都复制一份。这样可以减少网络传输开销和内存消耗。

    val bc:Broadcast[T] = sc.broadcast(value:T)		// 创建广播变量  
    rdd.mapPartitions(itPar=>{val v:T = bc.value	// 在每个分区内部,通过bc.value获取广播变量的值  ...					// 使用v进行计算...
    })
    
  • 累加器(Accumulators):累加器提供了一种有效的手段来进行分布式计算中的统计和计数操作,减少通信开销,并简化聚合操作。

    累加器:accumulate:只能 add 操作,常用于计数
    1、定义在Driver端的一个变量,Cluster中每一个Task都会有一份Copy
    2、所有的Task都计算完成后,将所有Task中的Copy合并到驱动程序中的变量中
    非累加器:在所有Task中的都会是独立Copy,不会有合并

    累加器
    val accLong: LongAccumulator = sc.longAccumulator("longAcc")	// 定义累加器
    val accDouble: DoubleAccumulator = sc.doubleAccumulator("doubleAcc")
    rdd.mapPartitions(itPar=>{...accLong.add(v:Long)		// 将值添加到累加器中accDouble.add(v:Double)...
    })
    accXxx.reset()		// 重置累加器
    val isZero:Boolean = accXxx.isZero	// 检查累加器是否为零值
    val num:Long|Double = accXxx.value|sum|count|avg // 获取累加器的值、总和、计数或平均值
    
    // 定义一个累加器,用于统计 "bad" 记录的数量
    val errorCount = sc.longAccumulator("Error Count")
    val data = sc.parallelize(Array("good", "bad", "good", "bad", "good"))
    data.foreach(record => if (record == "bad") errorCount.add(1))
    // 打印累加器的值,即 "bad" 记录的总数println(s"Total errors: ${errorCount.value}")
    

    自定义累加器:

    写一个类继承 import org.apache.spark.util.AccumulatorV2[IN, OUT]

    abstract class AccumulatorV2[IN, OUT] extends Serializable {// 返回是否为零值累加器def isZero: Boolean// 创建此累加器的新副本,其为零值def copyAndReset(): AccumulatorV2[IN, OUT] = {...}// 创建此累加器的新副本def copy(): AccumulatorV2[IN, OUT]// 重置此累加器为零值def reset(): Unit// 添加:接收输入并累加def add(v: IN): Unit// 合并:合并另一个相同类型的累加器并更新其状态def merge(other: AccumulatorV2[IN, OUT]): Unit// 当前累加器的值def value: OUT
    }
    
  • 自定义计量器优化(Custom Metrics):自定义计量器允许用户定义和收集特定的性能指标,提供更细粒度的作业监控和调优能力。通过 SparkListener 接口,可以实现自定义的监听器来监控和记录所需的指标。

五、提交模式与运行模式优化

  • 提交模式:Spark支持Client模式和Cluster模式两种提交方式。Client模式便于查看日志和结果,但可能消耗较多资源;Cluster模式则更适合大规模作业,但查看日志和结果可能不太方便。应根据实际情况选择合适的提交模式。

    spark-submit --class <MainClass> --master <MasterURL> --deploy-mode <DeployMode> <PathToJar>
    

    <MainClass>:包含 main 方法的主类的名称。

    <MasterURL>:指定集群的 Master URL。

    <DeployMode>:指定提交模式,可以是 clientcluster

    <PathToJar>:包含 Spark 应用程序的 JAR 文件的路径。

    spark-submit --class SparkClientModeApp --master yarn --deploy-mode client /path/to/your/jarfile.jar	
    spark-submit --class SparkClientModeApp --master yarn --deploy-mode cluster /path/to/your/jarfile.jar
    
  • 运行模式:Spark支持多种运行模式,如Local模式、Standalone模式、YARN模式等。不同的运行模式适用于不同的场景和需求。例如,Local模式适用于本地开发和测试;Standalone模式适用于构建独立的Spark集群;YARN模式则适用于与Hadoop生态系统集成。

    local: 在单核上运行
    local[N]: 在指定数量的 N 个核上运行,如 “local[4]”
    local[*]: 使用所有可用的核
    spark://HOST:PORT: 连接到指定的 Spark standalone cluster
    yarn: 连接到 YARN 集群
    mesos://HOST:PORT: 连接到 Mesos 集群

六、其他优化

  • 序列化框架选择:除了Kryo序列化外,还可以考虑使用其他高效的序列化框架来优化Spark作业的性能。
  • 监控与调优:使用Spark提供的监控工具和API(如Spark UI、getStorageLevel方法等)来监控作业的运行状态和性能瓶颈,并根据监控结果进行调优。

相关文章:

Spark RDD优化

Spark RDD优化 一、分区优化二、持久化优化三、依赖优化四、共享变量优化五、提交模式与运行模式优化六、其他优化 一、分区优化 分区数调整&#xff1a;RDD的分区数可以通过repartition和coalesce方法进行调整。合理的分区数可以提高并行度&#xff0c;但过多的分区会增加管…...

idea:解决Maven报错 Properties in parent definition are prohibited

在父pom文件中定义了 <dhversion>1.0-SNAPSHOT</dhversion> 在子模块中引用 <parent><groupId>com.douhuang</groupId><artifactId>douhuang-springcloud</artifactId><version>${dhversion}</version> </parent&…...

代理IP池:解析与应用

代理IP大家都了解不少了&#xff0c;代理IP池又是什么呢&#xff1f;下面简单介绍一下吧&#xff01; 1. 概述 代理IP池就是由多个代理IP地址组成的集合&#xff0c;用于实现更高效的网络访问和数据获取。这些IP地址通常来自不同的地理位置和网络提供商&#xff0c;经过动态管…...

MQTT是什么,物联网

写文思路&#xff1a; 以下从几个方面介绍MQTT&#xff0c;包括&#xff1a;MQTT是什么&#xff0c;MQTT和webSocket的结合&#xff0c;以及使用场景&#xff0c; 一、MQTT是什么 MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;是一种轻量级的发布/订阅消息…...

分布式训练

一、分布式计算 跟多GPU不同是&#xff1a;数据不是从主存拿的&#xff0c;是在分布式文件系统拿的&#xff0c;有多个工作站&#xff0c;工作站中有多个GPU&#xff0c;通过网络读取数据到GPU中&#xff0c;GPU通过网络接收到来自参数服务器的参数进行运算计算梯度&#xff0c…...

day10:04一文搞懂decode和decoding的区别

在Python 3中&#xff0c;decode()方法和decoding概念同样与字符串的编码和解码紧密相关&#xff0c;但它们的应用场景和上下文有所不同。下面通过案例来解释它们的关系和区别。 1. decode() 方法 decode()方法是字节串&#xff08;bytes&#xff09;类型的一个方法&#xff…...

MechMind结构光相机 采图SDK python调用

测试效果 Mech-Mind结构光相机 Mech Mind(梅卡曼德)的结构光相机,特别是Mech-Eye系列,是工业级的高精度3D相机,广泛应用于工业自动化、机器人导航、质量检测等多个领域。以下是对Mech Mind结构光相机的详细解析: 一、产品概述 Mech Mind的结构光相机,如Mech-Eye PRO,…...

“学习Pandas中时间序列的基本操作“

目录 # 开篇 1. 创建和操作时间序列对象 2. 时间序列数据的读取和存储 3. 时间序列数据的索引和切片 4. 时间序列数据的操作和转换 5. 时间序列数据的可视化 6. 处理时间序列中的缺失值 7. 时间序列数据的聚合和分组 8. 时间序列的时间区间和偏移量操作 示例代码&…...

常用知识碎片 分页组件的使用(arco-design组件库)

目录 分页组件使用 API 组件代码示例 使用思路&#xff1a; 前端示例代码 html script 后端示例代码 Controller Impl xml 总结 分页组件使用 使用Arco Design之前需要配置好搭建前端环境可以看我另外一篇文章&#xff1a; 手把手教你 创建Vue项目并引入Arco Desi…...

WPF 制作一个文字漂浮提示框

WPF好像没有自带的文字提示漂浮&#xff0c;我们可以定制一个。 效果如下&#xff1a; xaml xaml如下&#xff1a; <Window x:Class"GroupServer.MsgTip"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://sc…...

Node.js_fs模块

文件删除 文件重命名和移动&#xff08;本质都是修改路径&#xff09; 文件夹操作 创建文件夹(mkdir) 读取文件夹(readdir) &#xff08;打印出来是该文件夹下名称的数组形式&#xff09; 读取当前的文件夹(readdir) 删除文件夹 &#xff08;rmdir&#xff09; 查看资源状态…...

使用 Vue 3 实现打字机效果

在现代前端开发中&#xff0c;添加一些视觉效果可以提升用户体验。其中&#xff0c;打字机效果是一种常见且吸引人的效果&#xff0c;可以用于展示动态文本。本文将介绍如何在 Vue 3 中实现打字机效果。 实现步骤 1. 创建自定义指令 我们首先创建一个自定义指令 v-typewriter…...

unordered_map和set

前言&#xff1a;本篇文章继续分享新的容器unordered_map和set。前边我们分享过map和set&#xff0c;其底层为红黑树&#xff0c;而unordered_map和set的底层则为哈希表&#xff0c;因此在unordered_map和set的实现中&#xff0c;我们可以效仿许多在map和set的中就分享过的一些…...

java:运用字节缓冲输入流将文件中的数据写到集合中

代码主要是将文本文件中的数据写到集合中&#xff0c;运用到的是java字节缓冲输入流的知识点。 public static void main(String[] args) throws IOException {//创建字符缓冲流输入对象BufferedReader bufferedReader new BufferedReader(new FileReader("student.txt&q…...

【机器学习】支持向量机与主成分分析在机器学习中的应用

文章目录 一、支持向量机概述什么是支持向量机&#xff1f;超平面和支持向量大边距直觉 二、数据预处理与可视化数据集的基本信息导入必要的库加载数据集数据概况数据可视化特征对的散点图矩阵类别分布条形图平均面积与平均光滑度的散点图变量之间的相关性热图 三、模型训练&am…...

SpringBoot项目架构实战之“网关zuul搭建“

第三章 网关zuul搭建 前言&#xff1a; 1、主要功能 zuul主要提供动态路由&#xff08;内置ribbon实现&#xff09;和过滤&#xff08;可以做统一鉴权过滤器、灰度发布过滤器、黑白名单IP过滤器、服务限流过滤器&#xff08;可以配合Sentinel实现&#xff09;&#xff09;功能…...

发挥储能系统领域优势,海博思创坚定不移推动能源消费革命

随着新发展理念的深入贯彻&#xff0c;我国正全面落实“双碳”目标任务&#xff0c;通过积极转变能源消费方式&#xff0c;大幅提升能源利用效率&#xff0c;实现了以年均约3.3%的能源消费增长支撑了年均超过6%的国民经济增长。这一成就的背后&#xff0c;是我国能源结构的持续…...

matlab R2016b安装cplex12.6,测试时cplex出现出现内部错误的解决方法

问题场景 网上搜索matlabyalmipcplex的安装教程&#xff0c;跟着步骤操作即可&#xff0c;假如都安装好了&#xff0c;在matlab中测试安装是否成功&#xff0c;出现以下问题&#xff1a; 1、matlab中设置路径中添加了yalmip和cplex路径&#xff0c;在命令窗口中输入yalmiptest…...

C#中的Dictionary

Dictionary<TKey, TValue> 是一个泛型集合&#xff0c;它存储键值对&#xff08;key-value pairs&#xff09;&#xff0c;其中每个键&#xff08;key&#xff09;都是唯一的。这个集合类提供了快速的数据插入和检索功能&#xff0c;因为它是基于哈希表实现的。 注意 ke…...

VSCode中多行文本的快速前后缩进

快捷键 VSCode提供了一组快捷键&#xff0c;用于快速调整选中文本行的缩进。 增加缩进&#xff08;向前缩进&#xff09;&#xff1a;在Windows和Linux上按 Tab 键&#xff0c;在Mac上按 ⇧⇥&#xff08;Shift Tab&#xff09;。减少缩进&#xff08;向后缩进&#xff09;&…...

基于大模型的 UI 自动化系统

基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

第25节 Node.js 断言测试

Node.js的assert模块主要用于编写程序的单元测试时使用&#xff0c;通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试&#xff0c;通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

CMake 从 GitHub 下载第三方库并使用

有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

Linux 内存管理实战精讲:核心原理与面试常考点全解析

Linux 内存管理实战精讲&#xff1a;核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用&#xff0c;还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...

排序算法总结(C++)

目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指&#xff1a;同样大小的样本 **&#xff08;同样大小的数据&#xff09;**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...

Unity中的transform.up

2025年6月8日&#xff0c;周日下午 在Unity中&#xff0c;transform.up是Transform组件的一个属性&#xff0c;表示游戏对象在世界空间中的“上”方向&#xff08;Y轴正方向&#xff09;&#xff0c;且会随对象旋转动态变化。以下是关键点解析&#xff1a; 基本定义 transfor…...

嵌入式学习之系统编程(九)OSI模型、TCP/IP模型、UDP协议网络相关编程(6.3)

目录 一、网络编程--OSI模型 二、网络编程--TCP/IP模型 三、网络接口 四、UDP网络相关编程及主要函数 ​编辑​编辑 UDP的特征 socke函数 bind函数 recvfrom函数&#xff08;接收函数&#xff09; sendto函数&#xff08;发送函数&#xff09; 五、网络编程之 UDP 用…...

WEB3全栈开发——面试专业技能点P4数据库

一、mysql2 原生驱动及其连接机制 概念介绍 mysql2 是 Node.js 环境中广泛使用的 MySQL 客户端库&#xff0c;基于 mysql 库改进而来&#xff0c;具有更好的性能、Promise 支持、流式查询、二进制数据处理能力等。 主要特点&#xff1a; 支持 Promise / async-await&#xf…...

Netty自定义协议解析

目录 自定义协议设计 实现消息解码器 实现消息编码器 自定义消息对象 配置ChannelPipeline Netty提供了强大的编解码器抽象基类,这些基类能够帮助开发者快速实现自定义协议的解析。 自定义协议设计 在实现自定义协议解析之前,需要明确协议的具体格式。例如,一个简单的…...