【Spark | Spark-Core篇】RDD行动算子action
使用转换算子是产生一个新的rdd,此时在driver端会生成一个逻辑上的执行计划,但任务还没有执行。但所谓的行动算子,其实就是触发作业执行的方法(runJob)。底层代码调用的是环境对象的runJob方法。
1. reduce
函数源码:
def reduce(f: (T, T) => T): T = withScope {val cleanF = sc.clean(f)val reducePartition: Iterator[T] => Option[T] = iter => {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T] = Noneval mergeResult = (_: Int, taskResult: Option[T]) => {if (taskResult.isDefined) {jobResult = jobResult match {case Some(value) => Some(f(value, taskResult.get))case None => taskResult}}}sc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was emptyjobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}
函数说明:
简而言之,就是先聚合分区内的数据,再聚合分区间的数据。
object Spark01_RDD_reduce_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val i = rdd.reduce(_ + _)println(i)// 10// 分区数据:[1, 2], [3, 4]// reduce聚集分区的所有元素:先聚合分区内的数据,再聚合分区间的数据// [1, 2]=>3, [3, 4]=>7 3 + 7 => 10}
}
2. collect
函数源码:
def collect(): Array[T] = withScope {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)}
函数说明:
在驱动程序中,以数组Array的形式返回数据集的所有元素。
object Spark02_RDD_collect_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val mapRDD = rdd.map(_ * 2)// 代码运行到collect时才开始触发执行任务。在这之前只是在driver构建一个逻辑的执行计划。// collect源码存在runJob函数。println(mapRDD.collect().mkString(","))// 将executor端的分区内的数据按分区有序的生成一个数组并返回到driver端// 调用collect函数的输出:2,4,6,8mapRDD.foreach(println)// 不调用collect函数的输出:// 2 6 8 4无序}
}
3. count 和 first
函数源码:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sumdef first(): T = withScope {take(1) match {case Array(t) => tcase _ => throw new UnsupportedOperationException("empty collection")}}
first函数底层调用了take函数,take函数底层调用了runJob函数,所以first也是行动算子。
函数说明:
object Spark03_RDD_count_first_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// count函数获取rdd的数据的个数val cnt = rdd.count()// 4println(cnt)// first获取数据源中数据的第一个元素val first = rdd.first()println(first)// 1sc.stop()}
}
4. take和takeOrdered
函数源码:
def take(num: Int): Array[T] = withScope {val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)if (num == 0) {new Array[T](0)} else {val buf = new ArrayBuffer[T]val totalParts = this.partitions.lengthvar partsScanned = 0while (buf.size < num && partsScanned < totalParts) {// The number of partitions to try in this iteration. It is ok for this number to be// greater than totalParts because we actually cap it at totalParts in runJob.var numPartsToTry = 1Lval left = num - buf.sizeif (partsScanned > 0) {// If we didn't find any rows after the previous iteration, quadruple and retry.// Otherwise, interpolate the number of partitions we need to try, but overestimate// it by 50%. We also cap the estimation in the end.if (buf.isEmpty) {numPartsToTry = partsScanned * scaleUpFactor} else {// As left > 0, numPartsToTry is always >= 1numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toIntnumPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)}}val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)res.foreach(buf ++= _.take(num - buf.size))partsScanned += p.size}buf.toArray}}
函数说明:
object Spark05_RDD_take_takeOrdered_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(4, 3, 2, 1, 6, 7), 3)println(rdd.take(3).mkString(" "))// 4, 3, 2// take方法返回的是一个数组// 从数据源取前N个数据println(rdd.takeOrdered(3).mkString(", "))// 1, 2, 3// takeOrdered方法返回的是一个有序数组,第二个参数可以传入排序规则。// 源码中的Ordering特质继承自Comparator[T],即相当于java中的比较器。// 从数据源获取前N个有序的数据sc.stop()}
}
5. aggregate
函数源码:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {// Clone the zero value since we will also be serializing it as part of tasksvar jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp = sc.clean(seqOp)val cleanCombOp = sc.clean(combOp)val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition, mergeResult)jobResult}
函数说明:
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
object Spark04_RDD_aggregate_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val i = rdd.aggregate(0)(_ + _, _ + _)println(i)// 10// 0 + 1 + 2 => 3, 0 + 3 + 4 => 7// 0 + 3 + 7 => 10val i1 = rdd.aggregate(10)(_ + _, _ + _)println(i1)// 40// 10 + 1 + 2 => 13, 10 + 3 + 4 => 17// 10 + 13 + 17 => 40sc.stop()}
}
6. fold
函数源码;
略。
函数说明:
折叠操作,aggregate的简化版操作。
即aggregate的分区内计算的逻辑和分区间计算的逻辑相同。
7. countByKey和countByValue
countByKey函数源码:
def countByKey(): Map[K, Long] = self.withScope {self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}
返回值类型是map集合。即Map[K, long]
K即是key-value的key类型,Long则是key出现的次数。
countByValue函数源码:
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {map(value => (value, null)).countByKey()}
得知,底层调用了map方法,将rdd的每个数据映射为一个元组然后调用countByKey方法。
object Spark06_RDD_countByKey_countByValue_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)))// 返回的是Map集合,key是key-value的key,value是key出现的countval rdd_key = rdd.countByKey()println(rdd_key)// Map(a -> 2, b -> 1, c -> 1)// a作为key出现了两次,b作为key出现了一次...val rdd_value = rdd.countByValue()println(rdd_value)// Map((a,2) -> 1, (b,2) -> 1, (c,3) -> 1, (a,1) -> 1)// 底层调用了map方法和countbyKey方法sc.stop()}
}
8. save相关算子
save相关算子包括
saveAsTextFile, saveAsObjectFile, saveAsSequenceFile
函数源码:
def saveAsTextFile(path: String): Unit = withScope {saveAsTextFile(path, null)}def saveAsObjectFile(path: String): Unit = withScope {this.mapPartitions(iter => iter.grouped(10).map(_.toArray)).map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)}def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {def anyToWritable[U <% Writable](u: U): Writable = u// TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and// valueWritableClass at the compile time. To implement that, we need to add type parameters to// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a// breaking change.val convertKey = self.keyClass != _keyWritableClassval convertValue = self.valueClass != _valueWritableClasslogInfo("Saving as sequence file of type " +s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" )val format = classOf[SequenceFileOutputFormat[Writable, Writable]]val jobConf = new JobConf(self.context.hadoopConfiguration)if (!convertKey && !convertValue) {self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (!convertKey && convertValue) {self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey && !convertValue) {self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey && convertValue) {self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)}}
}
函数说明:
object Spark06_RDD_save_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)))// 保存在文件中rdd.saveAsTextFile("datas")// 序列化成对象保存在文件中rdd.saveAsObjectFile("datas")// 要求数据的格式必须为key-value类型rdd.saveAsSequenceFile("datas")sc.stop()}
}
9. foreach
函数源码:
底层一直在调用runJob函数。
def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}
函数说明:
分布式遍历RDD中的每一个元素,调用该函数。
object Spark07_RDD_foreach_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)), 2)rdd.collect().foreach(println)// collect会按分区建立的顺序把数据采集过来到driver端
// (a,1)
// (b,2)
// (c,3)
// (a,2)println("*************")// 而foreach直接在executor端内存数据的打印rdd.foreach(println)
// (c,3)//(a,2)//(a,1)//(b,2)sc.stop()}
}
相关文章:
【Spark | Spark-Core篇】RDD行动算子action
使用转换算子是产生一个新的rdd,此时在driver端会生成一个逻辑上的执行计划,但任务还没有执行。但所谓的行动算子,其实就是触发作业执行的方法(runJob)。底层代码调用的是环境对象的runJob方法。 1. reduce 函数源码&…...
23.Redis核心数据结构
一、String(k-v) 字符串常规操作 备注 应用场景 SET key value 存入字符转键值对 单值缓存、对象缓存 MSET [key value, key value] 批量存储字符串键值对 对象缓存 SETNX key value 存入一个不存在的键值对 分布式锁 GET KEY 获取一个字符串键值 MGET [key,key,…...
免费送源码:Node.JS+Express+MySQL Express 流浪动物救助系统 计算机毕业设计原创定制
摘 要 随着互联网大趋势的到来,社会的方方面面,各行各业都在考虑利用互联网作为媒介将自己的信息更及时有效地推广出去,而其中最好的方式就是建立网络管理系统,并对其进行信息管理。由于现在网络的发达,流浪动物救助系…...
基于Java+Springboot+Vue开发的旅游景区管理系统
项目简介 该项目是基于JavaSpringbootVue开发的旅游景区管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的旅…...
Python 实现的风控系统(使用了kafka、Faust、模拟drools、redis、分布式数据库)
以下是一个使用 Python 实现的风控系统示例,涵盖以下技术组件: Kafka 消息中间件:用于实时接收支付业务系统传递的交易数据。Faust(Kafka Streams 的 Python 等价):用于流式处理 Kafka 中的消息。规则引擎…...
Linux运维_Rocky8 安装配置Zabbix
Zabbix 是一个开源的监控解决方案,用于监控网络、服务器、应用程序和服务的性能。它提供实时监控、数据收集、告警通知以及图形化界面,方便用户查看和分析监控数据。Zabbix 支持多种数据收集方式,包括 SNMP、IPMI、JMX 和自定义脚本ÿ…...
jQuery Mobile 滚屏事件
jQuery Mobile 滚屏事件 在移动开发中,滚屏事件是一个非常重要的交互方式,它可以让用户通过滚动屏幕来浏览内容。jQuery Mobile 是一个流行的移动框架,它提供了一套丰富的组件和事件,使得在移动设备上实现滚屏效果变得简单。本文将详细介绍 jQuery Mobile 中的滚屏事件,包…...
3.1.1ReactOS系统中搜索给定长度的空间地址区间函数的实现
系列文章目录 //搜索给定长度的空间地址区间 MmFindGap(); PMADDRESS_SPACE AddressSpace,//该进程用户空间 ULONG_PTR Length,//寻找的空间间隔大小 ULONG_PTR Granularity,//粒度位,表明空间起点的对齐要求,注意是起…...
arm64系统不支持32位的解决armel armhf
初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github:codetoys,所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的,可以在任何平台上使用。 源码指引:github源…...
【毕业设计】工具大礼包之『Maven3.6.3安装与配置』
系统版本 电脑系统:Windows 10 一.Maven下载 🎯 统一版本 apache-maven-3.6.3,下面两种下载方式2选1即可 1.官网直下 官网下载地址 https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/ 找到apache-maven-3.6.3-bin.zip 云盘…...
gin入门教程(9):路由分组与路由版本控制
在使用 Gin 框架构建 RESTful API 时,路由分组与版本控制是一种常见的实践,可以帮助你更好地管理不同版本的 API。下面是如何在 Gin 中实现路由分组和版本控制的示例。 目录结构 /hello-gin │ ├── cmd/ │ └── main.go ├── api/ │ ├── v1/ │ │ └─…...
rt-thread移植SystemView中遇到的问题
源代码地址dujunqiu/SystemView 我使用的rt-thread版本是5.2.0,应该是rt-thread适配的还有点问题 报错处理 1:warning: #223-D: function “typeof” declared implicitly 如下 typedef 的warning是C99规范没有typedef的定义,需要在keii中…...
【C++STL】list的模拟实现
✨ Blog’s 主页: 白乐天_ξ( ✿>◡❛) 🌈 个人Motto:他强任他强,清风拂山冈! 🔥 所属专栏:C深入学习笔记 💫 欢迎来到我的学习笔记! 一、三个类与成员函数接口 在list.…...
以30个面试问题和案例为导向:全面解析 Java Servlet是什么?基本概念、实现原理、生命周期、类结构、请求与响应的处理机制,以及性能优化和安全性管理
Servlet 是 Java Web 开发的核心组件之一,负责处理客户端请求并生成动态响应。本文将深入探讨 Servlet 的基本概念、实现原理、生命周期、类结构、请求与响应的处理机制,以及性能优化和安全性管理,帮助开发者从多方面掌握 Servlet。 文章目录…...
MFC小游戏设计
框架: 各个界面: 用户: 登录注册:账号和密码(昵称) 主菜单:各种游戏,查看自己信息(积分,装备【游戏数据】),退出 游戏界面&#…...
[漏洞挖掘与防护] 04.Windows系统安全缺陷之5次Shift漏洞启动计算机机理分析
这是作者新开的一个专栏——“漏洞挖掘与防护”,前期会复现各种经典和最新漏洞,并总结防护技巧;后期尝试从零学习漏洞挖掘技术,包括Web漏洞和二进制及IOT相关漏洞,以及Fuzzing技术。新的征程,新的开启,漫漫长征路,偏向虎山行。享受过程,感谢您的陪伴,一起加油~ 欢迎关…...
手机极简待办app哪款好用?
在快节奏的现代生活中,我们常常需要处理大量的任务和信息,这时候一款好用的极简待办软件就显得尤为重要。它们不仅能帮助我们记录和管理待办事项,还能提高我们的工作效率和生活质量。 在众多的待办软件中,敬业签是一款非常受欢迎…...
SpringBoot高级-底层原理
目录 1 SpringBoot自动化配置原理 01-SpringBoot2高级-starter依赖管理机制 02-SpringBoot2高级-自动化配置初体验 03-SpringBoot2高级-底层原理-Configuration配置注解 04-SpringBoot2高级-底层原理-Import注解使用1 05-SpringBoot2高级-底层原理-Import注解使用2 06-S…...
LabVIEW提高开发效率技巧----插入式架构
随着LabVIEW项目规模的扩大和系统复杂性的增加,传统的单一代码架构难以应对后期维护和功能扩展的需求。插入式架构(Plug-In Architecture)作为一种模块化设计方式,通过动态加载和运行子VI,使系统功能更加灵活、模块化&…...
MySQL COUNT(*)、COUNT(1)、COUNT(id)、COUNT(字段)效果及性能
文章目录 前言COUNT(exper)COUNT(*)优化COUNT(*) 与COUNT(1) COUNT(1)COUNT(id)COUNT(字段)总结参考 前言 业务开发中,我们经常要使用count做一些数据统计。今天根据MySQL5.7官方文档及丁奇老师的MySQL45讲,介绍一下COUNT(*)、COUNT(1)、COUNT(id)、COU…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
智能AI电话机器人系统的识别能力现状与发展水平
一、引言 随着人工智能技术的飞速发展,AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术,在客户服务、营销推广、信息查询等领域发挥着越来越重要…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
Chrome 浏览器前端与客户端双向通信实战
Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...
Spring Security 认证流程——补充
一、认证流程概述 Spring Security 的认证流程基于 过滤器链(Filter Chain),核心组件包括 UsernamePasswordAuthenticationFilter、AuthenticationManager、UserDetailsService 等。整个流程可分为以下步骤: 用户提交登录请求拦…...
