RDD算子(四)、血缘关系、持久化
1. foreach
分布式遍历每一个元素,调用指定函数
val rdd = sc.makeRDD(List(1, 2, 3, 4))
rdd.foreach(println)

结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印,则是在Driver端执行。
RDD的方法之所以叫算子,就是为了与scala集合的方法区分开来, scala集合的方法是在同一个节点执行的,而RDD的算子则是在Executor(分布式节点)执行的。从计算的角度讲,RDD算子外部的操作都是在Driver端执行,算子内部的操作都是在Executor端执行。
2. 闭包检测
class User {var age : Int = 30
}
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})
因为foreach内部的操作是在Executor上执行的,所以在Driver上创建的user需要传递给各个Executor,如果user没有序列化,则会报错

class User extends Serializable{var age : Int = 30
}
或者将User变为样例类,因为样例类在编译时会自动混入序列化特质(实现可序列化接口)
case class User {var age : Int = 30
}
如果把原始集合变为空,依然会报错,这是因为RDD算子中传递的函数会包含闭包操作(匿名函数,算子内用到了算子外的数据),所以会进行闭包检测,即检查里面的变量是否序列化。
val rdd = sc.makeRDD(List[Int]())
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})
再看如下案例:
class Search(query:String) {def isMatch(s:String): Boolean {s.contains(query)}def getMatch1(rdd: RDD[String]): RDD[String] {rdd.filter(isMatch)}def getMatch2(rdd: RDD[String]): RDD[String] {rdd.filter(x => x.contains(query))}
}
val rdd = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
val search = new Search("h")
search.getMatch1(rdd).collect().foreach(println)
此时会报错Search类没有序列化,因为在rdd的filter算子内调用了query,而query作为类的构造参数,实际上是类的私有变量,isMatch方法相当于:
def isMatch(s:String): Boolean {s.contains(this.query)
}
this相当于类的对象,因此需要进行闭包检测。getMatch2也有类似的问题。除了将类序列化以及改为样例类之外,还可以将query赋给方法内部的临时变量:
def getMatch2(rdd: RDD[String]): RDD[String] {val s = queryrdd.filter(x => x.contains(s))
}
3. 依赖关系

每个RDD会保存血缘关系(不会保存数据),这样提高了容错性,因为如果其中某个RDD转换到另一个RDD失败了,就可以根据血缘关系来重新读取。RDD保存依赖关系而示意图如下:

血缘关系展示代码:
val lines : RDD[String] = sc.textFile("datas")
println(lines.toDebugString)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.toDebugString)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.toDebugString)
println("******************")



查看依赖关系只需直接将代码中的toDebugString改为dependencies即可
val lines : RDD[String] = sc.textFile("datas")
println(lines.dependencies)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.dependencies)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.dependencies)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.dependencies)
println("******************")


一对一的依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的一个分区的数据,也叫窄依赖,而Shuffle依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的多个分区的数据,也叫宽依赖。
4. 阶段和任务划分
窄依赖中,分区有多少个,就有多少个任务,只有一个阶段;宽依赖中,有两个阶段,每个阶段的任务数等于分区数。
RDD阶段由有向无环图(DAG)表示

Application->Job->Stage->Task每一个层级是1对n的关系。
每个Application中可能会提交多个作业,一个作业会划分为多个阶段(阶段数=宽依赖个数+1),一个阶段可能因为多个分区而包含多个任务,一个阶段中的任务数=最后一个RDD的分区数。
5. cache和persist
如果对于同一份数据源,想做多个不同的功能(比如统计单词数以及根据单词分组),这些不同的功能在实现过程中有很多重复的步骤(比如很多相同的RDD转换),此时可能会引入性能问题。虽然看起来RDD转换的过程复用了,但是RDD不存储数据,只有逻辑,所以最终的行为算子会从头开始再读取相同的数据,比如下面代码:
val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)
可以看到,在一行*上下,@都执行了,说明数据从头开始读取,从最开始的RDD再次执行。
为了解决这种性能问题,可以对mapRDD里的数据进行缓存,要么缓存在内存中,要么缓存在磁盘中,得看具体情况,这就是RDD的持久化操作。使用cache方法缓存在内存中:
val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.cache()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)
放在内存中可能不安全,使用persist方法缓存在磁盘中:
val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.persist(StorageLevel.DISK_ONLY)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)
注意:持久化操作也是等到行动算子触发才会真正执行。持久化操作不一定都是为了重用才引入的,有些情况下,前面一些RDD转换操作耗时很长或者数据很重要的场合,也可以进行持久化操作,这样一旦中间出了问题,重新执行任务不至于再执行之前耗时很长的操作。
除了以上的cache和persist方法,还可以使用检查点(checkpoint)的方法进行持久化操作。checkpoint需要落盘,因此需要指定存储路径,之前的persist方法也要落盘,只不过它存储在临时路径,任务执行完就会删除,而checkpoint是永久化存储。
sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)
但是checkpoint会单独再开启一个作业,因此效率可能更低。但是与cache联合执行,即先cache,再checkpoint,就不会开启新的作业。
另外,使用cache方法会改变RDD的血缘关系:
val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)
可以看到,cache方法(其实persis方法也会) 在血缘关系中添加新的依赖(原来的依赖还保留)。但是checkpoint方法会改变原来的血缘关系,建立新的血缘关系(等同于数据源变了):
sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)
6. 自定义分区器
class MyPartitioner extends Partitioner {override def numPartitions : Int = n //自定义,可以写死override def getPartition(key : Any) : Int = {key match {case "xxx" => 0case _ => 1}}}
分区器传给RDD:
val partitionRDD = rdd.partitionBy(new MyPartitioner)
相关文章:
RDD算子(四)、血缘关系、持久化
1. foreach 分布式遍历每一个元素,调用指定函数 val rdd sc.makeRDD(List(1, 2, 3, 4)) rdd.foreach(println) 结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印…...
51之定时器与中断系统
目录 1.定时器与中断系统简介 1.1中断系统 1.2定时器 1.2.1定时器简介 1.2.2定时器大致原理及其配置 1.2.3定时器所需的所有配置总介 2.定时器0实现LED闪烁 3.使用软件生成定时器初始化程序 1.定时器与中断系统简介 1.1中断系统 首先,我们需要来了解一下什么…...
C语言中的内存函数
相比于内存函数,字符串函数和字符函数是对字符串和字符进行操作,内存函数是对内存进行操的。下面跟大家分享我学到的几个内存函数。 memcpy函数 void* memcpy(void* dest, const void* sour, size_t num); dest是目标地址,sour要拷贝的源地…...
JS继承与原型、原型链
在 JavaScript 中,继承是实现代码复用和构建对象关系的重要概念。本文将讨论原型链继承、构造函数继承以及组合继承等几种常见的继承方式,并提供相应的示例代码,并分析它们的特点、优缺点以及适用场景。 在开始讲解 JavaScript 的继承方式之…...
C#基础知识总结
C语言、C和C#的区别 ✔ 面向对象编程(OOP): C 是一种过程化的编程语言,它不直接支持面向对象编程。然而,C 是一种支持 OOP 的 C 的超集,它引入了类、对象、继承、多态等概念。C# 是完全面向对象的ÿ…...
机器学习模型——决策树
决策树的定义: 决策树利用树形数据结构来展示决策规则和分类结果,它是一种归纳学习算法,可以将复杂数据转化为可以预测未知数据的模型。每一条从根节点到叶节点的路径都代表一条决策规则。 决策树内的一些重要名词: 信息熵&am…...
【HTML】制作一个简单的三角形动态图形
目录 前言 开始 HTML部分 CSS部分 效果图 总结 前言 无需多言,本文将详细介绍一段HTML和CSS代码,具体内容如下: 开始 首先新建文件夹,创建两个文本文档,其中HTML的文件名改为[index.html],CSS的文件名…...
Acwing.504 转圈游戏(带取余的快速幂)
题目 n个小伙伴(编号从 0到 n−1)围坐一圈玩游戏。 按照顺时针方向给 n个位置编号,从 0到 n−1。 最初,第 0号小伙伴在第 0号位置,第 1号小伙伴在第 1号位置,…,依此类推。 游戏规…...
pair作为unordered_map的key报错
问题 pair作为unordered_map的key报错,编译时会报错 原因 因为pair没有哈希函数 解决方法 定义哈希函数 template <typename T> inline void hash_combine(std::size_t &seed, const T &val) {seed ^ std::hash<T>()(val) 0x9e3779b9 (…...
Windows提权—数据库提权-mysql提权mssql提权Oracle数据库提权
目录 Windows 提权—数据库提权一、mysql提权1.1 udf提权1.1.2 操作方法一 、MSF自动化--UDF提权--漏洞利用1.1.3 操作方法二、 手工导出sqlmap中的dll1.1.4 操作方法三、 moon.php大马利用 1.2 mof提权1.3 启动项提权1.4 反弹shell 二、MSSQL提权MSSQL提权方法1.使用xp_cmdshe…...
为什么android创建Fragment推荐用newInstance
FullScreenDialogFragment使用newInstance方法不是因为它是一个单例,而是因为这是创建DialogFragment实例并同时提供参数的一种标准模式。这种模式通常称为静态工厂方法模式,在Android开发中被广泛使用,尤其是用于Fragment的实例化。 newIns…...
MyBatis的xml实现方式
1、该项目引入的依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.o…...
大模型prompt技巧——思维链(Chain-of-Thought)
1、Zero-shot、One-shot、Few-shot 与fintune prompt的时候给出例子答案,然后再让模型回答。 2、zero-shot-CoT “Let’s think step by step”有奇迹效果 3、多数投票提高CoT性能——自洽性(Self-consistency) 多个思维链,然后取…...
内网穿透的应用-如何在Android Termux上部署MySQL数据库并实现无公网IP远程访问
文章目录 前言1.安装MariaDB2.安装cpolar内网穿透工具3. 创建安全隧道映射mysql4. 公网远程连接5. 固定远程连接地址 前言 Android作为移动设备,尽管最初并非设计为服务器,但是随着技术的进步我们可以将Android配置为生产力工具,变成一个随身…...
面试算法-133-区间子数组个数
题目 给你一个整数数组 nums 和两个整数:left 及 right 。找出 nums 中连续、非空且其中最大元素在范围 [left, right] 内的子数组,并返回满足条件的子数组的个数。 生成的测试用例保证结果符合 32-bit 整数范围。 示例 1: 输入ÿ…...
物联网实战--入门篇之(八)嵌入式-空气净化器
目录 一、风扇调速 二、通讯协议 三、净化器运行逻辑 一、风扇调速 单片机是不能直接驱动电机的,因为主芯片的驱动电流比较小(50mA左右),他们之间正常还要有个电机驱动器,常用的有TB6612、L298和L9110等,目前项目用的这个电机它…...
macOS上QT打开麦克风和摄像头的权限问题
同样的代码在Windows上可以轻松操作麦克风和摄像头,特别是用QT这种跨平台的框架。但是对macOS这种权限要求完善的系统还需要进行一些配置,那就是增加Info.plist属性配置文件。如果是之前的早期5.x版本的QTCreator因为使用的是qmake构建系统,估…...
鸿蒙手机cordova-plugin-camera不能拍照和图片不显示问题
鸿蒙手机cordova-plugin-camera不能拍照和图片不显示问题 一、运行环境 1、硬件 手机型号:NOVA 7 系统:HarmonyOS版本 4.0.0 2、软件 android SDK platforms:14.0(API Level 34)、13.0(API Level 33) SDK Build-T…...
Spring源码解析上
spring源码解析 整体架构 defaultListableBeanFactory xmlBeanDefinitionReader 创建XmlBeanFactory 对资源文件进行加载–Resource 利用LoadBeandefinitions(resource)方法加载配置中的bean loadBeandefinitions加载步骤 doLoadBeanDefinition xml配置模式 validationMode 获…...
第九题:最大间隙
题目描述 给定一个序列 a1,a2,⋯ ,an。其中 a1≤a2≤⋯≤an。 相邻两个数之间的差(后一个数减前一个数)称为它们的间隙。 请问序列中最大的间隙值是多少? 输入描述 输入的第一行包含一个整数 n,表示序列的长度。 第二行包含…...
未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...
DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
UE5 学习系列(三)创建和移动物体
这篇博客是该系列的第三篇,是在之前两篇博客的基础上展开,主要介绍如何在操作界面中创建和拖动物体,这篇博客跟随的视频链接如下: B 站视频:s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...
蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...
华硕a豆14 Air香氛版,美学与科技的馨香融合
在快节奏的现代生活中,我们渴望一个能激发创想、愉悦感官的工作与生活伙伴,它不仅是冰冷的科技工具,更能触动我们内心深处的细腻情感。正是在这样的期许下,华硕a豆14 Air香氛版翩然而至,它以一种前所未有的方式&#x…...
