当前位置: 首页 > news >正文

RDD算子(四)、血缘关系、持久化

1. foreach

分布式遍历每一个元素,调用指定函数

val rdd = sc.makeRDD(List(1, 2, 3, 4))
rdd.foreach(println)

结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印,则是在Driver端执行。

RDD的方法之所以叫算子,就是为了与scala集合的方法区分开来, scala集合的方法是在同一个节点执行的,而RDD的算子则是在Executor(分布式节点)执行的。从计算的角度讲,RDD算子外部的操作都是在Driver端执行,算子内部的操作都是在Executor端执行。

2. 闭包检测

class User {var age : Int = 30
}
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})

因为foreach内部的操作是在Executor上执行的,所以在Driver上创建的user需要传递给各个Executor,如果user没有序列化,则会报错

class User extends Serializable{var age : Int = 30
}

或者将User变为样例类,因为样例类在编译时会自动混入序列化特质(实现可序列化接口)

case class User {var age : Int = 30
}

如果把原始集合变为空,依然会报错,这是因为RDD算子中传递的函数会包含闭包操作(匿名函数,算子内用到了算子外的数据),所以会进行闭包检测,即检查里面的变量是否序列化

val rdd = sc.makeRDD(List[Int]())
val user = new User()
rdd.foreach(num => {println("age = " + (user.age + num))
})

 再看如下案例:

class Search(query:String) {def isMatch(s:String): Boolean {s.contains(query)}def getMatch1(rdd: RDD[String]): RDD[String] {rdd.filter(isMatch)}def getMatch2(rdd: RDD[String]): RDD[String] {rdd.filter(x => x.contains(query))}
}
val rdd = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
val search = new Search("h")
search.getMatch1(rdd).collect().foreach(println)

此时会报错Search类没有序列化,因为在rdd的filter算子内调用了query,而query作为类的构造参数,实际上是类的私有变量,isMatch方法相当于:

def isMatch(s:String): Boolean {s.contains(this.query)
}

this相当于类的对象,因此需要进行闭包检测。getMatch2也有类似的问题。除了将类序列化以及改为样例类之外,还可以将query赋给方法内部的临时变量:

def getMatch2(rdd: RDD[String]): RDD[String] {val s = queryrdd.filter(x => x.contains(s))
}

3. 依赖关系

 每个RDD会保存血缘关系(不会保存数据),这样提高了容错性,因为如果其中某个RDD转换到另一个RDD失败了,就可以根据血缘关系来重新读取。RDD保存依赖关系而示意图如下:

血缘关系展示代码:

val lines : RDD[String] = sc.textFile("datas")
println(lines.toDebugString)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.toDebugString)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.toDebugString)
println("******************")

查看依赖关系只需直接将代码中的toDebugString改为dependencies即可

val lines : RDD[String] = sc.textFile("datas")
println(lines.dependencies)
println("******************")val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.dependencies)
println("******************")val wordToOne = words.map(word=>(word, 1))
println(wordToOne.dependencies)
println("******************")val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.dependencies)
println("******************")

​​​​​​​

 一对一的依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的一个分区的数据,也叫窄依赖,而Shuffle依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的多个分区的数据,也叫宽依赖。

4. 阶段和任务划分

窄依赖中,分区有多少个,就有多少个任务,只有一个阶段;宽依赖中,有两个阶段,每个阶段的任务数等于分区数。

RDD阶段由有向无环图(DAG)表示

Application->Job->Stage->Task每一个层级是1对n的关系。

每个Application中可能会提交多个作业,一个作业会划分为多个阶段(阶段数=宽依赖个数+1),一个阶段可能因为多个分区而包含多个任务,一个阶段中的任务数=最后一个RDD的分区数。

5. cache和persist

如果对于同一份数据源,想做多个不同的功能(比如统计单词数以及根据单词分组),这些不同的功能在实现过程中有很多重复的步骤(比如很多相同的RDD转换),此时可能会引入性能问题。虽然看起来RDD转换的过程复用了,但是RDD不存储数据,只有逻辑,所以最终的行为算子会从头开始再读取相同的数据,比如下面代码:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

 

可以看到,在一行*上下,@都执行了,说明数据从头开始读取,从最开始的RDD再次执行。 

为了解决这种性能问题,可以对mapRDD里的数据进行缓存,要么缓存在内存中,要么缓存在磁盘中,得看具体情况,这就是RDD的持久化操作。使用cache方法缓存在内存中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.cache()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

 

放在内存中可能不安全,使用persist方法缓存在磁盘中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.persist(StorageLevel.DISK_ONLY)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

注意:持久化操作也是等到行动算子触发才会真正执行。持久化操作不一定都是为了重用才引入的,有些情况下,前面一些RDD转换操作耗时很长或者数据很重要的场合,也可以进行持久化操作,这样一旦中间出了问题,重新执行任务不至于再执行之前耗时很长的操作。

除了以上的cache和persist方法,还可以使用检查点(checkpoint)的方法进行持久化操作。checkpoint需要落盘,因此需要指定存储路径,之前的persist方法也要落盘,只不过它存储在临时路径,任务执行完就会删除,而checkpoint是永久化存储

sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")val groupRDD = mapRDD.groupByKey()groupRDD.collect.foreach(println)

但是checkpoint会单独再开启一个作业,因此效率可能更低。但是与cache联合执行,即先cache,再checkpoint,就不会开启新的作业。

另外,使用cache方法会改变RDD的血缘关系:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)

 

 

可以看到,cache方法(其实persis方法也会) 在血缘关系中添加新的依赖(原来的依赖还保留)。但是checkpoint方法会改变原来的血缘关系,建立新的血缘关系(等同于数据源变了):

sc.setCheckPointDir("cp")val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD : RDD[String] = data.flatMap(_.split(" "))val mapRDD = flatRDD.map(word=>{println("@@@@@@@@@@@@@@@")(word, 1)
})mapRDD.checkpoint()
println(mapRDD.toDebugString)val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)reduceRDD.collect.foreach(println)println("****************")
println(mapRDD.toDebugString)

 

6. 自定义分区器

class MyPartitioner extends Partitioner {override def numPartitions : Int = n  //自定义,可以写死override def getPartition(key : Any) : Int = {key match {case "xxx" => 0case _ => 1}}}

分区器传给RDD:

val partitionRDD = rdd.partitionBy(new MyPartitioner)

 

相关文章:

RDD算子(四)、血缘关系、持久化

1. foreach 分布式遍历每一个元素,调用指定函数 val rdd sc.makeRDD(List(1, 2, 3, 4)) rdd.foreach(println) 结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印&#xf…...

51之定时器与中断系统

目录 1.定时器与中断系统简介 1.1中断系统 1.2定时器 1.2.1定时器简介 1.2.2定时器大致原理及其配置 1.2.3定时器所需的所有配置总介 2.定时器0实现LED闪烁 3.使用软件生成定时器初始化程序 1.定时器与中断系统简介 1.1中断系统 首先,我们需要来了解一下什么…...

C语言中的内存函数

相比于内存函数,字符串函数和字符函数是对字符串和字符进行操作,内存函数是对内存进行操的。下面跟大家分享我学到的几个内存函数。 memcpy函数 void* memcpy(void* dest, const void* sour, size_t num); dest是目标地址,sour要拷贝的源地…...

JS继承与原型、原型链

在 JavaScript 中,继承是实现代码复用和构建对象关系的重要概念。本文将讨论原型链继承、构造函数继承以及组合继承等几种常见的继承方式,并提供相应的示例代码,并分析它们的特点、优缺点以及适用场景。 在开始讲解 JavaScript 的继承方式之…...

C#基础知识总结

C语言、C和C#的区别 ✔ 面向对象编程(OOP): C 是一种过程化的编程语言,它不直接支持面向对象编程。然而,C 是一种支持 OOP 的 C 的超集,它引入了类、对象、继承、多态等概念。C# 是完全面向对象的&#xff…...

机器学习模型——决策树

决策树的定义: 决策树利用树形数据结构来展示决策规则和分类结果,它是一种归纳学习算法,可以将复杂数据转化为可以预测未知数据的模型。每一条从根节点到叶节点的路径都代表一条决策规则。 决策树内的一些重要名词: 信息熵&am…...

【HTML】制作一个简单的三角形动态图形

目录 前言 开始 HTML部分 CSS部分 效果图 总结 前言 无需多言,本文将详细介绍一段HTML和CSS代码,具体内容如下: 开始 首先新建文件夹,创建两个文本文档,其中HTML的文件名改为[index.html],CSS的文件名…...

Acwing.504 转圈游戏(带取余的快速幂)

题目 n个小伙伴(编号从 0到 n−1)围坐一圈玩游戏。 按照顺时针方向给 n个位置编号,从 0到 n−1。 最初,第 0号小伙伴在第 0号位置,第 1号小伙伴在第 1号位置,…,依此类推。 游戏规…...

pair作为unordered_map的key报错

问题 pair作为unordered_map的key报错&#xff0c;编译时会报错 原因 因为pair没有哈希函数 解决方法 定义哈希函数 template <typename T> inline void hash_combine(std::size_t &seed, const T &val) {seed ^ std::hash<T>()(val) 0x9e3779b9 (…...

Windows提权—数据库提权-mysql提权mssql提权Oracle数据库提权

目录 Windows 提权—数据库提权一、mysql提权1.1 udf提权1.1.2 操作方法一 、MSF自动化--UDF提权--漏洞利用1.1.3 操作方法二、 手工导出sqlmap中的dll1.1.4 操作方法三、 moon.php大马利用 1.2 mof提权1.3 启动项提权1.4 反弹shell 二、MSSQL提权MSSQL提权方法1.使用xp_cmdshe…...

为什么android创建Fragment推荐用newInstance

FullScreenDialogFragment使用newInstance方法不是因为它是一个单例&#xff0c;而是因为这是创建DialogFragment实例并同时提供参数的一种标准模式。这种模式通常称为静态工厂方法模式&#xff0c;在Android开发中被广泛使用&#xff0c;尤其是用于Fragment的实例化。 newIns…...

MyBatis的xml实现方式

1、该项目引入的依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.o…...

大模型prompt技巧——思维链(Chain-of-Thought)

1、Zero-shot、One-shot、Few-shot 与fintune prompt的时候给出例子答案&#xff0c;然后再让模型回答。 2、zero-shot-CoT “Let’s think step by step”有奇迹效果 3、多数投票提高CoT性能——自洽性&#xff08;Self-consistency&#xff09; 多个思维链&#xff0c;然后取…...

内网穿透的应用-如何在Android Termux上部署MySQL数据库并实现无公网IP远程访问

文章目录 前言1.安装MariaDB2.安装cpolar内网穿透工具3. 创建安全隧道映射mysql4. 公网远程连接5. 固定远程连接地址 前言 Android作为移动设备&#xff0c;尽管最初并非设计为服务器&#xff0c;但是随着技术的进步我们可以将Android配置为生产力工具&#xff0c;变成一个随身…...

面试算法-133-区间子数组个数

题目 给你一个整数数组 nums 和两个整数&#xff1a;left 及 right 。找出 nums 中连续、非空且其中最大元素在范围 [left, right] 内的子数组&#xff0c;并返回满足条件的子数组的个数。 生成的测试用例保证结果符合 32-bit 整数范围。 示例 1&#xff1a; 输入&#xff…...

物联网实战--入门篇之(八)嵌入式-空气净化器

目录 一、风扇调速 二、通讯协议 三、净化器运行逻辑 一、风扇调速 单片机是不能直接驱动电机的&#xff0c;因为主芯片的驱动电流比较小(50mA左右)&#xff0c;他们之间正常还要有个电机驱动器&#xff0c;常用的有TB6612、L298和L9110等&#xff0c;目前项目用的这个电机它…...

macOS上QT打开麦克风和摄像头的权限问题

同样的代码在Windows上可以轻松操作麦克风和摄像头&#xff0c;特别是用QT这种跨平台的框架。但是对macOS这种权限要求完善的系统还需要进行一些配置&#xff0c;那就是增加Info.plist属性配置文件。如果是之前的早期5.x版本的QTCreator因为使用的是qmake构建系统&#xff0c;估…...

鸿蒙手机cordova-plugin-camera不能拍照和图片不显示问题

鸿蒙手机cordova-plugin-camera不能拍照和图片不显示问题 一、运行环境 1、硬件 手机型号&#xff1a;NOVA 7 系统&#xff1a;HarmonyOS版本 4.0.0 2、软件 android SDK platforms&#xff1a;14.0(API Level 34)、13.0&#xff08;API Level 33&#xff09; SDK Build-T…...

Spring源码解析上

spring源码解析 整体架构 defaultListableBeanFactory xmlBeanDefinitionReader 创建XmlBeanFactory 对资源文件进行加载–Resource 利用LoadBeandefinitions(resource)方法加载配置中的bean loadBeandefinitions加载步骤 doLoadBeanDefinition xml配置模式 validationMode 获…...

第九题:最大间隙

题目描述 给定一个序列 a1,a2,⋯ ,an。其中 a1≤a2≤⋯≤an。 相邻两个数之间的差&#xff08;后一个数减前一个数&#xff09;称为它们的间隙。 请问序列中最大的间隙值是多少&#xff1f; 输入描述 输入的第一行包含一个整数 n&#xff0c;表示序列的长度。 第二行包含…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南&#xff1a;从入门到实战 一、Grunt 是什么&#xff1f; Grunt是一个基于 Node.js 的前端自动化任务运行器&#xff0c;主要用于自动化执行项目开发中重复性高的任务&#xff0c;例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制

目录 节点的功能承载层&#xff08;GATT/Adv&#xff09;局限性&#xff1a; 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能&#xff0c;如 Configuration …...

在 Spring Boot 项目里,MYSQL中json类型字段使用

前言&#xff1a; 因为程序特殊需求导致&#xff0c;需要mysql数据库存储json类型数据&#xff0c;因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...

uniapp 小程序 学习(一)

利用Hbuilder 创建项目 运行到内置浏览器看效果 下载微信小程序 安装到Hbuilder 下载地址 &#xff1a;开发者工具默认安装 设置服务端口号 在Hbuilder中设置微信小程序 配置 找到运行设置&#xff0c;将微信开发者工具放入到Hbuilder中&#xff0c; 打开后出现 如下 bug 解…...

Visual Studio Code 扩展

Visual Studio Code 扩展 change-case 大小写转换EmmyLua for VSCode 调试插件Bookmarks 书签 change-case 大小写转换 https://marketplace.visualstudio.com/items?itemNamewmaurer.change-case 选中单词后&#xff0c;命令 changeCase.commands 可预览转换效果 EmmyLua…...

Android写一个捕获全局异常的工具类

项目开发和实际运行过程中难免会遇到异常发生&#xff0c;系统提供了一个可以捕获全局异常的工具Uncaughtexceptionhandler&#xff0c;它是Thread的子类&#xff08;就是package java.lang;里线程的Thread&#xff09;。本文将利用它将设备信息、报错信息以及错误的发生时间都…...

在RK3588上搭建ROS1环境:创建节点与数据可视化实战指南

在RK3588上搭建ROS1环境:创建节点与数据可视化实战指南 背景介绍完整操作步骤1. 创建Docker容器环境2. 验证GUI显示功能3. 安装ROS Noetic4. 配置环境变量5. 创建ROS节点(小球运动模拟)6. 配置RVIZ默认视图7. 创建启动脚本8. 运行可视化系统效果展示与交互技术解析ROS节点通…...

华为OD机考- 简单的自动曝光/平均像素

import java.util.Arrays; import java.util.Scanner;public class DemoTest4 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint[] arr Array…...