当前位置: 首页 > news >正文

RDD算子(四)、血缘关系、持久化

1. foreach

分布式遍历每一个元素,调用指定函数

val rdd = sc.makeRDD(List(1, 2, 3, 4))
rdd.foreach(println)

结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印,则是在Driver端执行。

RDD的方法之所以叫算子,就是为了与scala集合的方法区分开来, scala集合的方法是在同一个节点执行的,而RDD的算子则是在Executor(分布式节点)执行的。从计算的角度讲,RDD算子外部的操作都是在Driver端执行,算子内部的操作都是在Executor端执行。

2. 闭包检测

class User {var age : Int = 30
}
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})

因为foreach内部的操作是在Executor上执行的,所以在Driver上创建的user需要传递给各个Executor,如果user没有序列化,则会报错

class User extends Serializable{var age : Int = 30
}

或者将User变为样例类,因为样例类在编译时会自动混入序列化特质(实现可序列化接口)

case class User {var age : Int = 30
}

如果把原始集合变为空,依然会报错,这是因为RDD算子中传递的函数会包含闭包操作(匿名函数,算子内用到了算子外的数据),所以会进行闭包检测,即检查里面的变量是否序列化

val rdd = sc.makeRDD(List[Int]())
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})

 再看如下案例:

class Search(query:String) {def isMatch(s:String): Boolean {s.contains(query)}def getMatch1(rdd: RDD[String]): RDD[String] {rdd.filter(isMatch)}def getMatch2(rdd: RDD[String]): RDD[String] {rdd.filter(x => x.contains(query))}
}
val rdd = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
val search = new Search("h")
search.getMatch1(rdd).collect().foreach(println)

此时会报错Search类没有序列化,因为在rdd的filter算子内调用了query,而query作为类的构造参数,实际上是类的私有变量,isMatch方法相当于:

def isMatch(s:String): Boolean {s.contains(this.query)
}

this相当于类的对象,因此需要进行闭包检测。getMatch2也有类似的问题。除了将类序列化以及改为样例类之外,还可以将query赋给方法内部的临时变量:

def getMatch2(rdd: RDD[String]): RDD[String] {val s = queryrdd.filter(x => x.contains(s))
}

3. 依赖关系

 每个RDD会保存血缘关系(不会保存数据),这样提高了容错性,因为如果其中某个RDD转换到另一个RDD失败了,就可以根据血缘关系来重新读取。RDD保存依赖关系而示意图如下:

血缘关系展示代码:

val lines : RDD[String] = sc.textFile("datas")
println(lines.toDebugString)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.toDebugString)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.toDebugString)
println("******************")

查看依赖关系只需直接将代码中的toDebugString改为dependencies即可

val lines : RDD[String] = sc.textFile("datas")
println(lines.dependencies)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.dependencies)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.dependencies)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.dependencies)
println("******************")

​​​​​​​

 一对一的依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的一个分区的数据,也叫窄依赖,而Shuffle依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的多个分区的数据,也叫宽依赖。

4. 阶段和任务划分

窄依赖中,分区有多少个,就有多少个任务,只有一个阶段;宽依赖中,有两个阶段,每个阶段的任务数等于分区数。

RDD阶段由有向无环图(DAG)表示

Application->Job->Stage->Task每一个层级是1对n的关系。

每个Application中可能会提交多个作业,一个作业会划分为多个阶段(阶段数=宽依赖个数+1),一个阶段可能因为多个分区而包含多个任务,一个阶段中的任务数=最后一个RDD的分区数。

5. cache和persist

如果对于同一份数据源,想做多个不同的功能(比如统计单词数以及根据单词分组),这些不同的功能在实现过程中有很多重复的步骤(比如很多相同的RDD转换),此时可能会引入性能问题。虽然看起来RDD转换的过程复用了,但是RDD不存储数据,只有逻辑,所以最终的行为算子会从头开始再读取相同的数据,比如下面代码:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

 

可以看到,在一行*上下,@都执行了,说明数据从头开始读取,从最开始的RDD再次执行。 

为了解决这种性能问题,可以对mapRDD里的数据进行缓存,要么缓存在内存中,要么缓存在磁盘中,得看具体情况,这就是RDD的持久化操作。使用cache方法缓存在内存中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.cache()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

 

放在内存中可能不安全,使用persist方法缓存在磁盘中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.persist(StorageLevel.DISK_ONLY)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

注意:持久化操作也是等到行动算子触发才会真正执行。持久化操作不一定都是为了重用才引入的,有些情况下,前面一些RDD转换操作耗时很长或者数据很重要的场合,也可以进行持久化操作,这样一旦中间出了问题,重新执行任务不至于再执行之前耗时很长的操作。

除了以上的cache和persist方法,还可以使用检查点(checkpoint)的方法进行持久化操作。checkpoint需要落盘,因此需要指定存储路径,之前的persist方法也要落盘,只不过它存储在临时路径,任务执行完就会删除,而checkpoint是永久化存储

sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

但是checkpoint会单独再开启一个作业,因此效率可能更低。但是与cache联合执行,即先cache,再checkpoint,就不会开启新的作业。

另外,使用cache方法会改变RDD的血缘关系:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)

 

 

可以看到,cache方法(其实persis方法也会) 在血缘关系中添加新的依赖(原来的依赖还保留)。但是checkpoint方法会改变原来的血缘关系,建立新的血缘关系(等同于数据源变了):

sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)

 

6. 自定义分区器

class MyPartitioner extends Partitioner {override def numPartitions : Int = n  //自定义,可以写死override def getPartition(key : Any) : Int = {key match {case "xxx" => 0case _ => 1}}}

分区器传给RDD:

val partitionRDD = rdd.partitionBy(new MyPartitioner)

 

相关文章:

RDD算子(四)、血缘关系、持久化

1. foreach 分布式遍历每一个元素,调用指定函数 val rdd sc.makeRDD(List(1, 2, 3, 4)) rdd.foreach(println) 结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印&#xf…...

51之定时器与中断系统

目录 1.定时器与中断系统简介 1.1中断系统 1.2定时器 1.2.1定时器简介 1.2.2定时器大致原理及其配置 1.2.3定时器所需的所有配置总介 2.定时器0实现LED闪烁 3.使用软件生成定时器初始化程序 1.定时器与中断系统简介 1.1中断系统 首先,我们需要来了解一下什么…...

C语言中的内存函数

相比于内存函数,字符串函数和字符函数是对字符串和字符进行操作,内存函数是对内存进行操的。下面跟大家分享我学到的几个内存函数。 memcpy函数 void* memcpy(void* dest, const void* sour, size_t num); dest是目标地址,sour要拷贝的源地…...

JS继承与原型、原型链

在 JavaScript 中,继承是实现代码复用和构建对象关系的重要概念。本文将讨论原型链继承、构造函数继承以及组合继承等几种常见的继承方式,并提供相应的示例代码,并分析它们的特点、优缺点以及适用场景。 在开始讲解 JavaScript 的继承方式之…...

C#基础知识总结

C语言、C和C#的区别 ✔ 面向对象编程(OOP): C 是一种过程化的编程语言,它不直接支持面向对象编程。然而,C 是一种支持 OOP 的 C 的超集,它引入了类、对象、继承、多态等概念。C# 是完全面向对象的&#xff…...

机器学习模型——决策树

决策树的定义: 决策树利用树形数据结构来展示决策规则和分类结果,它是一种归纳学习算法,可以将复杂数据转化为可以预测未知数据的模型。每一条从根节点到叶节点的路径都代表一条决策规则。 决策树内的一些重要名词: 信息熵&am…...

【HTML】制作一个简单的三角形动态图形

目录 前言 开始 HTML部分 CSS部分 效果图 总结 前言 无需多言,本文将详细介绍一段HTML和CSS代码,具体内容如下: 开始 首先新建文件夹,创建两个文本文档,其中HTML的文件名改为[index.html],CSS的文件名…...

Acwing.504 转圈游戏(带取余的快速幂)

题目 n个小伙伴(编号从 0到 n−1)围坐一圈玩游戏。 按照顺时针方向给 n个位置编号,从 0到 n−1。 最初,第 0号小伙伴在第 0号位置,第 1号小伙伴在第 1号位置,…,依此类推。 游戏规…...

pair作为unordered_map的key报错

问题 pair作为unordered_map的key报错&#xff0c;编译时会报错 原因 因为pair没有哈希函数 解决方法 定义哈希函数 template <typename T> inline void hash_combine(std::size_t &seed, const T &val) {seed ^ std::hash<T>()(val) 0x9e3779b9 (…...

Windows提权—数据库提权-mysql提权mssql提权Oracle数据库提权

目录 Windows 提权—数据库提权一、mysql提权1.1 udf提权1.1.2 操作方法一 、MSF自动化--UDF提权--漏洞利用1.1.3 操作方法二、 手工导出sqlmap中的dll1.1.4 操作方法三、 moon.php大马利用 1.2 mof提权1.3 启动项提权1.4 反弹shell 二、MSSQL提权MSSQL提权方法1.使用xp_cmdshe…...

为什么android创建Fragment推荐用newInstance

FullScreenDialogFragment使用newInstance方法不是因为它是一个单例&#xff0c;而是因为这是创建DialogFragment实例并同时提供参数的一种标准模式。这种模式通常称为静态工厂方法模式&#xff0c;在Android开发中被广泛使用&#xff0c;尤其是用于Fragment的实例化。 newIns…...

MyBatis的xml实现方式

1、该项目引入的依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.o…...

大模型prompt技巧——思维链(Chain-of-Thought)

1、Zero-shot、One-shot、Few-shot 与fintune prompt的时候给出例子答案&#xff0c;然后再让模型回答。 2、zero-shot-CoT “Let’s think step by step”有奇迹效果 3、多数投票提高CoT性能——自洽性&#xff08;Self-consistency&#xff09; 多个思维链&#xff0c;然后取…...

内网穿透的应用-如何在Android Termux上部署MySQL数据库并实现无公网IP远程访问

文章目录 前言1.安装MariaDB2.安装cpolar内网穿透工具3. 创建安全隧道映射mysql4. 公网远程连接5. 固定远程连接地址 前言 Android作为移动设备&#xff0c;尽管最初并非设计为服务器&#xff0c;但是随着技术的进步我们可以将Android配置为生产力工具&#xff0c;变成一个随身…...

面试算法-133-区间子数组个数

题目 给你一个整数数组 nums 和两个整数&#xff1a;left 及 right 。找出 nums 中连续、非空且其中最大元素在范围 [left, right] 内的子数组&#xff0c;并返回满足条件的子数组的个数。 生成的测试用例保证结果符合 32-bit 整数范围。 示例 1&#xff1a; 输入&#xff…...

物联网实战--入门篇之(八)嵌入式-空气净化器

目录 一、风扇调速 二、通讯协议 三、净化器运行逻辑 一、风扇调速 单片机是不能直接驱动电机的&#xff0c;因为主芯片的驱动电流比较小(50mA左右)&#xff0c;他们之间正常还要有个电机驱动器&#xff0c;常用的有TB6612、L298和L9110等&#xff0c;目前项目用的这个电机它…...

macOS上QT打开麦克风和摄像头的权限问题

同样的代码在Windows上可以轻松操作麦克风和摄像头&#xff0c;特别是用QT这种跨平台的框架。但是对macOS这种权限要求完善的系统还需要进行一些配置&#xff0c;那就是增加Info.plist属性配置文件。如果是之前的早期5.x版本的QTCreator因为使用的是qmake构建系统&#xff0c;估…...

鸿蒙手机cordova-plugin-camera不能拍照和图片不显示问题

鸿蒙手机cordova-plugin-camera不能拍照和图片不显示问题 一、运行环境 1、硬件 手机型号&#xff1a;NOVA 7 系统&#xff1a;HarmonyOS版本 4.0.0 2、软件 android SDK platforms&#xff1a;14.0(API Level 34)、13.0&#xff08;API Level 33&#xff09; SDK Build-T…...

Spring源码解析上

spring源码解析 整体架构 defaultListableBeanFactory xmlBeanDefinitionReader 创建XmlBeanFactory 对资源文件进行加载–Resource 利用LoadBeandefinitions(resource)方法加载配置中的bean loadBeandefinitions加载步骤 doLoadBeanDefinition xml配置模式 validationMode 获…...

第九题:最大间隙

题目描述 给定一个序列 a1,a2,⋯ ,an。其中 a1≤a2≤⋯≤an。 相邻两个数之间的差&#xff08;后一个数减前一个数&#xff09;称为它们的间隙。 请问序列中最大的间隙值是多少&#xff1f; 输入描述 输入的第一行包含一个整数 n&#xff0c;表示序列的长度。 第二行包含…...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装&#xff08;Encapsulation&#xff09; 定义&#xff1a;将数据&#xff08;属性&#xff09;和操作数据的方法绑定在一起&#xff0c;通过访问控制符&#xff08;private、protected、public&#xff09;隐藏内部实现细节。示例&#xff1a; public …...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引&#xff0c;可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度&#xff08;创建索引的主要原因&#xff09;。3. 可以加速表和表之间的连接&#xff0c;实现数据的参考完整性。4. 可以在查询过程中&#xff0c;…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

Linux-07 ubuntu 的 chrome 启动不了

文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了&#xff0c;报错如下四、启动不了&#xff0c;解决如下 总结 问题原因 在应用中可以看到chrome&#xff0c;但是打不开(说明&#xff1a;原来的ubuntu系统出问题了&#xff0c;这个是备用的硬盘&a…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南

&#x1f680; C extern 关键字深度解析&#xff1a;跨文件编程的终极指南 &#x1f4c5; 更新时间&#xff1a;2025年6月5日 &#x1f3f7;️ 标签&#xff1a;C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言&#x1f525;一、extern 是什么&#xff1f;&…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf

FTP 客服管理系统 实现kefu123登录&#xff0c;不允许匿名访问&#xff0c;kefu只能访问/data/kefu目录&#xff0c;不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)

macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 &#x1f37a; 最新版brew安装慢到怀疑人生&#xff1f;别怕&#xff0c;教你轻松起飞&#xff01; 最近Homebrew更新至最新版&#xff0c;每次执行 brew 命令时都会自动从官方地址 https://formulae.…...