【Spark | Spark-Core篇】RDD行动算子action
使用转换算子是产生一个新的rdd,此时在driver端会生成一个逻辑上的执行计划,但任务还没有执行。但所谓的行动算子,其实就是触发作业执行的方法(runJob)。底层代码调用的是环境对象的runJob方法。
1. reduce
函数源码:
def reduce(f: (T, T) => T): T = withScope {val cleanF = sc.clean(f)val reducePartition: Iterator[T] => Option[T] = iter => {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T] = Noneval mergeResult = (_: Int, taskResult: Option[T]) => {if (taskResult.isDefined) {jobResult = jobResult match {case Some(value) => Some(f(value, taskResult.get))case None => taskResult}}}sc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was emptyjobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}
函数说明:
简而言之,就是先聚合分区内的数据,再聚合分区间的数据。
object Spark01_RDD_reduce_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val i = rdd.reduce(_ + _)println(i)// 10// 分区数据:[1, 2], [3, 4]// reduce聚集分区的所有元素:先聚合分区内的数据,再聚合分区间的数据// [1, 2]=>3, [3, 4]=>7 3 + 7 => 10}
}
2. collect
函数源码:
def collect(): Array[T] = withScope {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)}
函数说明:
在驱动程序中,以数组Array的形式返回数据集的所有元素。
object Spark02_RDD_collect_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val mapRDD = rdd.map(_ * 2)// 代码运行到collect时才开始触发执行任务。在这之前只是在driver构建一个逻辑的执行计划。// collect源码存在runJob函数。println(mapRDD.collect().mkString(","))// 将executor端的分区内的数据按分区有序的生成一个数组并返回到driver端// 调用collect函数的输出:2,4,6,8mapRDD.foreach(println)// 不调用collect函数的输出:// 2 6 8 4无序}
}
3. count 和 first
函数源码:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sumdef first(): T = withScope {take(1) match {case Array(t) => tcase _ => throw new UnsupportedOperationException("empty collection")}}
first函数底层调用了take函数,take函数底层调用了runJob函数,所以first也是行动算子。
函数说明:
object Spark03_RDD_count_first_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// count函数获取rdd的数据的个数val cnt = rdd.count()// 4println(cnt)// first获取数据源中数据的第一个元素val first = rdd.first()println(first)// 1sc.stop()}
}
4. take和takeOrdered
函数源码:
def take(num: Int): Array[T] = withScope {val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)if (num == 0) {new Array[T](0)} else {val buf = new ArrayBuffer[T]val totalParts = this.partitions.lengthvar partsScanned = 0while (buf.size < num && partsScanned < totalParts) {// The number of partitions to try in this iteration. It is ok for this number to be// greater than totalParts because we actually cap it at totalParts in runJob.var numPartsToTry = 1Lval left = num - buf.sizeif (partsScanned > 0) {// If we didn't find any rows after the previous iteration, quadruple and retry.// Otherwise, interpolate the number of partitions we need to try, but overestimate// it by 50%. We also cap the estimation in the end.if (buf.isEmpty) {numPartsToTry = partsScanned * scaleUpFactor} else {// As left > 0, numPartsToTry is always >= 1numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toIntnumPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)}}val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)res.foreach(buf ++= _.take(num - buf.size))partsScanned += p.size}buf.toArray}}
函数说明:
object Spark05_RDD_take_takeOrdered_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(4, 3, 2, 1, 6, 7), 3)println(rdd.take(3).mkString(" "))// 4, 3, 2// take方法返回的是一个数组// 从数据源取前N个数据println(rdd.takeOrdered(3).mkString(", "))// 1, 2, 3// takeOrdered方法返回的是一个有序数组,第二个参数可以传入排序规则。// 源码中的Ordering特质继承自Comparator[T],即相当于java中的比较器。// 从数据源获取前N个有序的数据sc.stop()}
}
5. aggregate
函数源码:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {// Clone the zero value since we will also be serializing it as part of tasksvar jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp = sc.clean(seqOp)val cleanCombOp = sc.clean(combOp)val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition, mergeResult)jobResult}
函数说明:
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
object Spark04_RDD_aggregate_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val i = rdd.aggregate(0)(_ + _, _ + _)println(i)// 10// 0 + 1 + 2 => 3, 0 + 3 + 4 => 7// 0 + 3 + 7 => 10val i1 = rdd.aggregate(10)(_ + _, _ + _)println(i1)// 40// 10 + 1 + 2 => 13, 10 + 3 + 4 => 17// 10 + 13 + 17 => 40sc.stop()}
}
6. fold
函数源码;
略。
函数说明:
折叠操作,aggregate的简化版操作。
即aggregate的分区内计算的逻辑和分区间计算的逻辑相同。
7. countByKey和countByValue
countByKey函数源码:
def countByKey(): Map[K, Long] = self.withScope {self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}
返回值类型是map集合。即Map[K, long]
K即是key-value的key类型,Long则是key出现的次数。
countByValue函数源码:
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {map(value => (value, null)).countByKey()}
得知,底层调用了map方法,将rdd的每个数据映射为一个元组然后调用countByKey方法。
object Spark06_RDD_countByKey_countByValue_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)))// 返回的是Map集合,key是key-value的key,value是key出现的countval rdd_key = rdd.countByKey()println(rdd_key)// Map(a -> 2, b -> 1, c -> 1)// a作为key出现了两次,b作为key出现了一次...val rdd_value = rdd.countByValue()println(rdd_value)// Map((a,2) -> 1, (b,2) -> 1, (c,3) -> 1, (a,1) -> 1)// 底层调用了map方法和countbyKey方法sc.stop()}
}
8. save相关算子
save相关算子包括
saveAsTextFile, saveAsObjectFile, saveAsSequenceFile
函数源码:
def saveAsTextFile(path: String): Unit = withScope {saveAsTextFile(path, null)}def saveAsObjectFile(path: String): Unit = withScope {this.mapPartitions(iter => iter.grouped(10).map(_.toArray)).map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)}def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {def anyToWritable[U <% Writable](u: U): Writable = u// TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and// valueWritableClass at the compile time. To implement that, we need to add type parameters to// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a// breaking change.val convertKey = self.keyClass != _keyWritableClassval convertValue = self.valueClass != _valueWritableClasslogInfo("Saving as sequence file of type " +s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" )val format = classOf[SequenceFileOutputFormat[Writable, Writable]]val jobConf = new JobConf(self.context.hadoopConfiguration)if (!convertKey && !convertValue) {self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (!convertKey && convertValue) {self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey && !convertValue) {self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey && convertValue) {self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)}}
}
函数说明:
object Spark06_RDD_save_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)))// 保存在文件中rdd.saveAsTextFile("datas")// 序列化成对象保存在文件中rdd.saveAsObjectFile("datas")// 要求数据的格式必须为key-value类型rdd.saveAsSequenceFile("datas")sc.stop()}
}
9. foreach
函数源码:
底层一直在调用runJob函数。
def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}
函数说明:
分布式遍历RDD中的每一个元素,调用该函数。
object Spark07_RDD_foreach_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)), 2)rdd.collect().foreach(println)// collect会按分区建立的顺序把数据采集过来到driver端
// (a,1)
// (b,2)
// (c,3)
// (a,2)println("*************")// 而foreach直接在executor端内存数据的打印rdd.foreach(println)
// (c,3)//(a,2)//(a,1)//(b,2)sc.stop()}
}
相关文章:
【Spark | Spark-Core篇】RDD行动算子action
使用转换算子是产生一个新的rdd,此时在driver端会生成一个逻辑上的执行计划,但任务还没有执行。但所谓的行动算子,其实就是触发作业执行的方法(runJob)。底层代码调用的是环境对象的runJob方法。 1. reduce 函数源码&…...
23.Redis核心数据结构
一、String(k-v) 字符串常规操作 备注 应用场景 SET key value 存入字符转键值对 单值缓存、对象缓存 MSET [key value, key value] 批量存储字符串键值对 对象缓存 SETNX key value 存入一个不存在的键值对 分布式锁 GET KEY 获取一个字符串键值 MGET [key,key,…...
免费送源码:Node.JS+Express+MySQL Express 流浪动物救助系统 计算机毕业设计原创定制
摘 要 随着互联网大趋势的到来,社会的方方面面,各行各业都在考虑利用互联网作为媒介将自己的信息更及时有效地推广出去,而其中最好的方式就是建立网络管理系统,并对其进行信息管理。由于现在网络的发达,流浪动物救助系…...
基于Java+Springboot+Vue开发的旅游景区管理系统
项目简介 该项目是基于JavaSpringbootVue开发的旅游景区管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的旅…...
Python 实现的风控系统(使用了kafka、Faust、模拟drools、redis、分布式数据库)
以下是一个使用 Python 实现的风控系统示例,涵盖以下技术组件: Kafka 消息中间件:用于实时接收支付业务系统传递的交易数据。Faust(Kafka Streams 的 Python 等价):用于流式处理 Kafka 中的消息。规则引擎…...
Linux运维_Rocky8 安装配置Zabbix
Zabbix 是一个开源的监控解决方案,用于监控网络、服务器、应用程序和服务的性能。它提供实时监控、数据收集、告警通知以及图形化界面,方便用户查看和分析监控数据。Zabbix 支持多种数据收集方式,包括 SNMP、IPMI、JMX 和自定义脚本ÿ…...
jQuery Mobile 滚屏事件
jQuery Mobile 滚屏事件 在移动开发中,滚屏事件是一个非常重要的交互方式,它可以让用户通过滚动屏幕来浏览内容。jQuery Mobile 是一个流行的移动框架,它提供了一套丰富的组件和事件,使得在移动设备上实现滚屏效果变得简单。本文将详细介绍 jQuery Mobile 中的滚屏事件,包…...
3.1.1ReactOS系统中搜索给定长度的空间地址区间函数的实现
系列文章目录 //搜索给定长度的空间地址区间 MmFindGap(); PMADDRESS_SPACE AddressSpace,//该进程用户空间 ULONG_PTR Length,//寻找的空间间隔大小 ULONG_PTR Granularity,//粒度位,表明空间起点的对齐要求,注意是起…...
arm64系统不支持32位的解决armel armhf
初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github:codetoys,所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的,可以在任何平台上使用。 源码指引:github源…...
【毕业设计】工具大礼包之『Maven3.6.3安装与配置』
系统版本 电脑系统:Windows 10 一.Maven下载 🎯 统一版本 apache-maven-3.6.3,下面两种下载方式2选1即可 1.官网直下 官网下载地址 https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/ 找到apache-maven-3.6.3-bin.zip 云盘…...
gin入门教程(9):路由分组与路由版本控制
在使用 Gin 框架构建 RESTful API 时,路由分组与版本控制是一种常见的实践,可以帮助你更好地管理不同版本的 API。下面是如何在 Gin 中实现路由分组和版本控制的示例。 目录结构 /hello-gin │ ├── cmd/ │ └── main.go ├── api/ │ ├── v1/ │ │ └─…...
rt-thread移植SystemView中遇到的问题
源代码地址dujunqiu/SystemView 我使用的rt-thread版本是5.2.0,应该是rt-thread适配的还有点问题 报错处理 1:warning: #223-D: function “typeof” declared implicitly 如下 typedef 的warning是C99规范没有typedef的定义,需要在keii中…...
【C++STL】list的模拟实现
✨ Blog’s 主页: 白乐天_ξ( ✿>◡❛) 🌈 个人Motto:他强任他强,清风拂山冈! 🔥 所属专栏:C深入学习笔记 💫 欢迎来到我的学习笔记! 一、三个类与成员函数接口 在list.…...
以30个面试问题和案例为导向:全面解析 Java Servlet是什么?基本概念、实现原理、生命周期、类结构、请求与响应的处理机制,以及性能优化和安全性管理
Servlet 是 Java Web 开发的核心组件之一,负责处理客户端请求并生成动态响应。本文将深入探讨 Servlet 的基本概念、实现原理、生命周期、类结构、请求与响应的处理机制,以及性能优化和安全性管理,帮助开发者从多方面掌握 Servlet。 文章目录…...
MFC小游戏设计
框架: 各个界面: 用户: 登录注册:账号和密码(昵称) 主菜单:各种游戏,查看自己信息(积分,装备【游戏数据】),退出 游戏界面&#…...
[漏洞挖掘与防护] 04.Windows系统安全缺陷之5次Shift漏洞启动计算机机理分析
这是作者新开的一个专栏——“漏洞挖掘与防护”,前期会复现各种经典和最新漏洞,并总结防护技巧;后期尝试从零学习漏洞挖掘技术,包括Web漏洞和二进制及IOT相关漏洞,以及Fuzzing技术。新的征程,新的开启,漫漫长征路,偏向虎山行。享受过程,感谢您的陪伴,一起加油~ 欢迎关…...
手机极简待办app哪款好用?
在快节奏的现代生活中,我们常常需要处理大量的任务和信息,这时候一款好用的极简待办软件就显得尤为重要。它们不仅能帮助我们记录和管理待办事项,还能提高我们的工作效率和生活质量。 在众多的待办软件中,敬业签是一款非常受欢迎…...
SpringBoot高级-底层原理
目录 1 SpringBoot自动化配置原理 01-SpringBoot2高级-starter依赖管理机制 02-SpringBoot2高级-自动化配置初体验 03-SpringBoot2高级-底层原理-Configuration配置注解 04-SpringBoot2高级-底层原理-Import注解使用1 05-SpringBoot2高级-底层原理-Import注解使用2 06-S…...
LabVIEW提高开发效率技巧----插入式架构
随着LabVIEW项目规模的扩大和系统复杂性的增加,传统的单一代码架构难以应对后期维护和功能扩展的需求。插入式架构(Plug-In Architecture)作为一种模块化设计方式,通过动态加载和运行子VI,使系统功能更加灵活、模块化&…...
MySQL COUNT(*)、COUNT(1)、COUNT(id)、COUNT(字段)效果及性能
文章目录 前言COUNT(exper)COUNT(*)优化COUNT(*) 与COUNT(1) COUNT(1)COUNT(id)COUNT(字段)总结参考 前言 业务开发中,我们经常要使用count做一些数据统计。今天根据MySQL5.7官方文档及丁奇老师的MySQL45讲,介绍一下COUNT(*)、COUNT(1)、COUNT(id)、COU…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
Unity3D中Gfx.WaitForPresent优化方案
前言 在Unity中,Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染(即CPU被阻塞),这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案: 对惹,这里有一个游戏开发交流小组&…...
定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
如何为服务器生成TLS证书
TLS(Transport Layer Security)证书是确保网络通信安全的重要手段,它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书,可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
网站指纹识别
网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...
作为测试我们应该关注redis哪些方面
1、功能测试 数据结构操作:验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化:测试aof和aof持久化机制,确保数据在开启后正确恢复。 事务:检查事务的原子性和回滚机制。 发布订阅:确保消息正确传递。 2、性…...
