【Spark | Spark-Core篇】RDD行动算子action
使用转换算子是产生一个新的rdd,此时在driver端会生成一个逻辑上的执行计划,但任务还没有执行。但所谓的行动算子,其实就是触发作业执行的方法(runJob)。底层代码调用的是环境对象的runJob方法。
1. reduce
函数源码:
def reduce(f: (T, T) => T): T = withScope {val cleanF = sc.clean(f)val reducePartition: Iterator[T] => Option[T] = iter => {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T] = Noneval mergeResult = (_: Int, taskResult: Option[T]) => {if (taskResult.isDefined) {jobResult = jobResult match {case Some(value) => Some(f(value, taskResult.get))case None => taskResult}}}sc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was emptyjobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}
函数说明:
简而言之,就是先聚合分区内的数据,再聚合分区间的数据。
object Spark01_RDD_reduce_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val i = rdd.reduce(_ + _)println(i)// 10// 分区数据:[1, 2], [3, 4]// reduce聚集分区的所有元素:先聚合分区内的数据,再聚合分区间的数据// [1, 2]=>3, [3, 4]=>7 3 + 7 => 10}
}
2. collect
函数源码:
def collect(): Array[T] = withScope {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)}
函数说明:
在驱动程序中,以数组Array的形式返回数据集的所有元素。
object Spark02_RDD_collect_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val mapRDD = rdd.map(_ * 2)// 代码运行到collect时才开始触发执行任务。在这之前只是在driver构建一个逻辑的执行计划。// collect源码存在runJob函数。println(mapRDD.collect().mkString(","))// 将executor端的分区内的数据按分区有序的生成一个数组并返回到driver端// 调用collect函数的输出:2,4,6,8mapRDD.foreach(println)// 不调用collect函数的输出:// 2 6 8 4无序}
}
3. count 和 first
函数源码:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sumdef first(): T = withScope {take(1) match {case Array(t) => tcase _ => throw new UnsupportedOperationException("empty collection")}}
first函数底层调用了take函数,take函数底层调用了runJob函数,所以first也是行动算子。
函数说明:
object Spark03_RDD_count_first_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// count函数获取rdd的数据的个数val cnt = rdd.count()// 4println(cnt)// first获取数据源中数据的第一个元素val first = rdd.first()println(first)// 1sc.stop()}
}
4. take和takeOrdered
函数源码:
def take(num: Int): Array[T] = withScope {val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)if (num == 0) {new Array[T](0)} else {val buf = new ArrayBuffer[T]val totalParts = this.partitions.lengthvar partsScanned = 0while (buf.size < num && partsScanned < totalParts) {// The number of partitions to try in this iteration. It is ok for this number to be// greater than totalParts because we actually cap it at totalParts in runJob.var numPartsToTry = 1Lval left = num - buf.sizeif (partsScanned > 0) {// If we didn't find any rows after the previous iteration, quadruple and retry.// Otherwise, interpolate the number of partitions we need to try, but overestimate// it by 50%. We also cap the estimation in the end.if (buf.isEmpty) {numPartsToTry = partsScanned * scaleUpFactor} else {// As left > 0, numPartsToTry is always >= 1numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toIntnumPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)}}val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)res.foreach(buf ++= _.take(num - buf.size))partsScanned += p.size}buf.toArray}}
函数说明:
object Spark05_RDD_take_takeOrdered_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(4, 3, 2, 1, 6, 7), 3)println(rdd.take(3).mkString(" "))// 4, 3, 2// take方法返回的是一个数组// 从数据源取前N个数据println(rdd.takeOrdered(3).mkString(", "))// 1, 2, 3// takeOrdered方法返回的是一个有序数组,第二个参数可以传入排序规则。// 源码中的Ordering特质继承自Comparator[T],即相当于java中的比较器。// 从数据源获取前N个有序的数据sc.stop()}
}
5. aggregate
函数源码:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {// Clone the zero value since we will also be serializing it as part of tasksvar jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp = sc.clean(seqOp)val cleanCombOp = sc.clean(combOp)val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition, mergeResult)jobResult}
函数说明:
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
object Spark04_RDD_aggregate_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val i = rdd.aggregate(0)(_ + _, _ + _)println(i)// 10// 0 + 1 + 2 => 3, 0 + 3 + 4 => 7// 0 + 3 + 7 => 10val i1 = rdd.aggregate(10)(_ + _, _ + _)println(i1)// 40// 10 + 1 + 2 => 13, 10 + 3 + 4 => 17// 10 + 13 + 17 => 40sc.stop()}
}
6. fold
函数源码;
略。
函数说明:
折叠操作,aggregate的简化版操作。
即aggregate的分区内计算的逻辑和分区间计算的逻辑相同。
7. countByKey和countByValue
countByKey函数源码:
def countByKey(): Map[K, Long] = self.withScope {self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}
返回值类型是map集合。即Map[K, long]
K即是key-value的key类型,Long则是key出现的次数。
countByValue函数源码:
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {map(value => (value, null)).countByKey()}
得知,底层调用了map方法,将rdd的每个数据映射为一个元组然后调用countByKey方法。
object Spark06_RDD_countByKey_countByValue_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)))// 返回的是Map集合,key是key-value的key,value是key出现的countval rdd_key = rdd.countByKey()println(rdd_key)// Map(a -> 2, b -> 1, c -> 1)// a作为key出现了两次,b作为key出现了一次...val rdd_value = rdd.countByValue()println(rdd_value)// Map((a,2) -> 1, (b,2) -> 1, (c,3) -> 1, (a,1) -> 1)// 底层调用了map方法和countbyKey方法sc.stop()}
}
8. save相关算子
save相关算子包括
saveAsTextFile, saveAsObjectFile, saveAsSequenceFile
函数源码:
def saveAsTextFile(path: String): Unit = withScope {saveAsTextFile(path, null)}def saveAsObjectFile(path: String): Unit = withScope {this.mapPartitions(iter => iter.grouped(10).map(_.toArray)).map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)}def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {def anyToWritable[U <% Writable](u: U): Writable = u// TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and// valueWritableClass at the compile time. To implement that, we need to add type parameters to// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a// breaking change.val convertKey = self.keyClass != _keyWritableClassval convertValue = self.valueClass != _valueWritableClasslogInfo("Saving as sequence file of type " +s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" )val format = classOf[SequenceFileOutputFormat[Writable, Writable]]val jobConf = new JobConf(self.context.hadoopConfiguration)if (!convertKey && !convertValue) {self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (!convertKey && convertValue) {self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey && !convertValue) {self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey && convertValue) {self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)}}
}
函数说明:
object Spark06_RDD_save_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)))// 保存在文件中rdd.saveAsTextFile("datas")// 序列化成对象保存在文件中rdd.saveAsObjectFile("datas")// 要求数据的格式必须为key-value类型rdd.saveAsSequenceFile("datas")sc.stop()}
}
9. foreach
函数源码:
底层一直在调用runJob函数。
def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}
函数说明:
分布式遍历RDD中的每一个元素,调用该函数。
object Spark07_RDD_foreach_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)), 2)rdd.collect().foreach(println)// collect会按分区建立的顺序把数据采集过来到driver端
// (a,1)
// (b,2)
// (c,3)
// (a,2)println("*************")// 而foreach直接在executor端内存数据的打印rdd.foreach(println)
// (c,3)//(a,2)//(a,1)//(b,2)sc.stop()}
}
相关文章:
【Spark | Spark-Core篇】RDD行动算子action
使用转换算子是产生一个新的rdd,此时在driver端会生成一个逻辑上的执行计划,但任务还没有执行。但所谓的行动算子,其实就是触发作业执行的方法(runJob)。底层代码调用的是环境对象的runJob方法。 1. reduce 函数源码&…...
23.Redis核心数据结构
一、String(k-v) 字符串常规操作 备注 应用场景 SET key value 存入字符转键值对 单值缓存、对象缓存 MSET [key value, key value] 批量存储字符串键值对 对象缓存 SETNX key value 存入一个不存在的键值对 分布式锁 GET KEY 获取一个字符串键值 MGET [key,key,…...
免费送源码:Node.JS+Express+MySQL Express 流浪动物救助系统 计算机毕业设计原创定制
摘 要 随着互联网大趋势的到来,社会的方方面面,各行各业都在考虑利用互联网作为媒介将自己的信息更及时有效地推广出去,而其中最好的方式就是建立网络管理系统,并对其进行信息管理。由于现在网络的发达,流浪动物救助系…...
基于Java+Springboot+Vue开发的旅游景区管理系统
项目简介 该项目是基于JavaSpringbootVue开发的旅游景区管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的旅…...
Python 实现的风控系统(使用了kafka、Faust、模拟drools、redis、分布式数据库)
以下是一个使用 Python 实现的风控系统示例,涵盖以下技术组件: Kafka 消息中间件:用于实时接收支付业务系统传递的交易数据。Faust(Kafka Streams 的 Python 等价):用于流式处理 Kafka 中的消息。规则引擎…...
Linux运维_Rocky8 安装配置Zabbix
Zabbix 是一个开源的监控解决方案,用于监控网络、服务器、应用程序和服务的性能。它提供实时监控、数据收集、告警通知以及图形化界面,方便用户查看和分析监控数据。Zabbix 支持多种数据收集方式,包括 SNMP、IPMI、JMX 和自定义脚本ÿ…...
jQuery Mobile 滚屏事件
jQuery Mobile 滚屏事件 在移动开发中,滚屏事件是一个非常重要的交互方式,它可以让用户通过滚动屏幕来浏览内容。jQuery Mobile 是一个流行的移动框架,它提供了一套丰富的组件和事件,使得在移动设备上实现滚屏效果变得简单。本文将详细介绍 jQuery Mobile 中的滚屏事件,包…...
3.1.1ReactOS系统中搜索给定长度的空间地址区间函数的实现
系列文章目录 //搜索给定长度的空间地址区间 MmFindGap(); PMADDRESS_SPACE AddressSpace,//该进程用户空间 ULONG_PTR Length,//寻找的空间间隔大小 ULONG_PTR Granularity,//粒度位,表明空间起点的对齐要求,注意是起…...
arm64系统不支持32位的解决armel armhf
初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github:codetoys,所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的,可以在任何平台上使用。 源码指引:github源…...
【毕业设计】工具大礼包之『Maven3.6.3安装与配置』
系统版本 电脑系统:Windows 10 一.Maven下载 🎯 统一版本 apache-maven-3.6.3,下面两种下载方式2选1即可 1.官网直下 官网下载地址 https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/ 找到apache-maven-3.6.3-bin.zip 云盘…...
gin入门教程(9):路由分组与路由版本控制
在使用 Gin 框架构建 RESTful API 时,路由分组与版本控制是一种常见的实践,可以帮助你更好地管理不同版本的 API。下面是如何在 Gin 中实现路由分组和版本控制的示例。 目录结构 /hello-gin │ ├── cmd/ │ └── main.go ├── api/ │ ├── v1/ │ │ └─…...
rt-thread移植SystemView中遇到的问题
源代码地址dujunqiu/SystemView 我使用的rt-thread版本是5.2.0,应该是rt-thread适配的还有点问题 报错处理 1:warning: #223-D: function “typeof” declared implicitly 如下 typedef 的warning是C99规范没有typedef的定义,需要在keii中…...
【C++STL】list的模拟实现
✨ Blog’s 主页: 白乐天_ξ( ✿>◡❛) 🌈 个人Motto:他强任他强,清风拂山冈! 🔥 所属专栏:C深入学习笔记 💫 欢迎来到我的学习笔记! 一、三个类与成员函数接口 在list.…...
以30个面试问题和案例为导向:全面解析 Java Servlet是什么?基本概念、实现原理、生命周期、类结构、请求与响应的处理机制,以及性能优化和安全性管理
Servlet 是 Java Web 开发的核心组件之一,负责处理客户端请求并生成动态响应。本文将深入探讨 Servlet 的基本概念、实现原理、生命周期、类结构、请求与响应的处理机制,以及性能优化和安全性管理,帮助开发者从多方面掌握 Servlet。 文章目录…...
MFC小游戏设计
框架: 各个界面: 用户: 登录注册:账号和密码(昵称) 主菜单:各种游戏,查看自己信息(积分,装备【游戏数据】),退出 游戏界面&#…...
[漏洞挖掘与防护] 04.Windows系统安全缺陷之5次Shift漏洞启动计算机机理分析
这是作者新开的一个专栏——“漏洞挖掘与防护”,前期会复现各种经典和最新漏洞,并总结防护技巧;后期尝试从零学习漏洞挖掘技术,包括Web漏洞和二进制及IOT相关漏洞,以及Fuzzing技术。新的征程,新的开启,漫漫长征路,偏向虎山行。享受过程,感谢您的陪伴,一起加油~ 欢迎关…...
手机极简待办app哪款好用?
在快节奏的现代生活中,我们常常需要处理大量的任务和信息,这时候一款好用的极简待办软件就显得尤为重要。它们不仅能帮助我们记录和管理待办事项,还能提高我们的工作效率和生活质量。 在众多的待办软件中,敬业签是一款非常受欢迎…...
SpringBoot高级-底层原理
目录 1 SpringBoot自动化配置原理 01-SpringBoot2高级-starter依赖管理机制 02-SpringBoot2高级-自动化配置初体验 03-SpringBoot2高级-底层原理-Configuration配置注解 04-SpringBoot2高级-底层原理-Import注解使用1 05-SpringBoot2高级-底层原理-Import注解使用2 06-S…...
LabVIEW提高开发效率技巧----插入式架构
随着LabVIEW项目规模的扩大和系统复杂性的增加,传统的单一代码架构难以应对后期维护和功能扩展的需求。插入式架构(Plug-In Architecture)作为一种模块化设计方式,通过动态加载和运行子VI,使系统功能更加灵活、模块化&…...
MySQL COUNT(*)、COUNT(1)、COUNT(id)、COUNT(字段)效果及性能
文章目录 前言COUNT(exper)COUNT(*)优化COUNT(*) 与COUNT(1) COUNT(1)COUNT(id)COUNT(字段)总结参考 前言 业务开发中,我们经常要使用count做一些数据统计。今天根据MySQL5.7官方文档及丁奇老师的MySQL45讲,介绍一下COUNT(*)、COUNT(1)、COUNT(id)、COU…...
保姆级教程:用Docker Compose一键部署ZLMediaKit流媒体服务器(含OBS推流配置)
从零搭建私有流媒体平台:Docker Compose ZLMediaKit OBS全流程指南 流媒体技术正在重塑内容传播的方式。无论是企业内部培训、游戏直播还是产品演示,一个稳定高效的私有流媒体平台都能显著提升沟通效率。本文将手把手教你如何用Docker Compose快速部署…...
如何为你的单片机项目选择最佳通信协议?I²C、SPI、UART全解析
单片机通信协议深度指南:从理论到实战的精准选择策略 当你的单片机需要与外部世界对话时,选择正确的通信协议就像为不同场合挑选合适的语言——商务会议需要正式严谨,朋友聊天则讲究轻松随意。在嵌入式系统设计中,UART、IC和SPI这…...
深度解析跨平台音频驱动:FlexASIO实战配置指南
深度解析跨平台音频驱动:FlexASIO实战配置指南 【免费下载链接】FlexASIO A flexible universal ASIO driver that uses the PortAudio sound I/O library. Supports WASAPI (shared and exclusive), KS, DirectSound and MME. 项目地址: https://gitcode.com/gh_…...
目标检测模型优化:如何用Focal Loss解决样本不平衡问题(附RetinaNet调参心得)
目标检测模型优化:Focal Loss实战指南与RetinaNet调参策略 在商品自动识别系统中,我们常遇到这样的困境:摄像头拍下的货架照片中,目标商品可能只占画面的5%,而95%都是无关背景。传统交叉熵损失函数会让模型陷入"偷…...
AXI非对齐访问实战指南:从WSTRB信号到DMA数据搬运的避坑细节
AXI非对齐访问实战指南:从WSTRB信号到DMA数据搬运的避坑细节 在FPGA与ASIC设计中,AXI总线作为AMBA协议族的核心成员,其非对齐访问特性常被开发者视为"双刃剑"。当处理摄像头YUV数据、音频采样流或网络封包等非规整数据时࿰…...
如何让经典GTA游戏重获新生:终极SilentPatch修复工具完全指南
如何让经典GTA游戏重获新生:终极SilentPatch修复工具完全指南 【免费下载链接】SilentPatch SilentPatch for GTA III, Vice City, and San Andreas 项目地址: https://gitcode.com/gh_mirrors/si/SilentPatch 你是否还记得那些在GTA III、Vice City和San An…...
别再只用LSTM了!用XGBoost做电力负荷预测,从特征工程到模型部署的完整实战(附Python代码)
电力负荷预测实战:XGBoost如何超越LSTM的五大技术突破 在能源管理领域,准确预测电力负荷一直是行业痛点。当大多数团队还在使用LSTM等深度学习模型时,一个令人惊讶的事实正在发生:经过精心调优的XGBoost模型在多个工业场景中表现优…...
智能体架构的创新突破:Agent-S框架的技术解析与实战应用
智能体架构的创新突破:Agent-S框架的技术解析与实战应用 【免费下载链接】Agent-S Agent S: an open agentic framework that uses computers like a human 项目地址: https://gitcode.com/GitHub_Trending/ag/Agent-S Agent-S作为开源的智能体框架ÿ…...
3个步骤在Docker容器中运行本地Windows ISO镜像:从配置到优化
3个步骤在Docker容器中运行本地Windows ISO镜像:从配置到优化 【免费下载链接】windows Windows inside a Docker container. 项目地址: https://gitcode.com/GitHub_Trending/wi/windows 问题导入:为什么需要本地ISO镜像? 在使用Doc…...
Windows右键菜单终极整理指南:用ContextMenuManager轻松打造高效工作流
Windows右键菜单终极整理指南:用ContextMenuManager轻松打造高效工作流 【免费下载链接】ContextMenuManager 🖱️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 你是否曾经在Windows系统中为…...
