【Spark分布式内存计算框架——Spark Core】4. RDD函数(下) 重分区函数、聚合函数
重分区函数
如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。
1)、增加分区函数
函数名称:repartition,此函数使用的谨慎,会产生Shuffle。
2)、减少分区函数
函数名称:coalesce,此函数不会产生Shuffle,当且仅当降低RDD分区数目。
比如RDD的分区数目为10个分区,此时调用rdd.coalesce(12),不会对RDD进行任何操作。
3)、调整分区函数
在PairRDDFunctions(此类专门针对RDD中数据类型为KeyValue对提供函数)工具类中
partitionBy函数:
范例演示代码,适当使用函数调整RDD分区数目:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD中分区函数,调整RDD分区数目,可以增加分区和减少分区
*/
object SparkPartitionTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// 读取本地文件系统文本文件数据
val datasRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data", minPartitions = 2)
// TODO: 增加RDD分区数
val etlRDD: RDD[String] = datasRDD.repartition(3)
println(s"EtlRDD 分区数目 = ${etlRDD.getNumPartitions}")
// 词频统计
val resultRDD: RDD[(String, Int)] = etlRDD
// 数据分析,考虑过滤脏数据
.filter(line => null != line && line.trim.length > 0)
// 分割单词,注意去除左右空格
.flatMap(line => line.trim.split("\\s+"))
// 转换为二元组,表示单词出现一次
.mapPartitions{iter =>
iter.map(word => (word, 1))
}
// 分组聚合,按照Key单词
.reduceByKey((tmp, item) => tmp + item)
// 输出结果RDD
resultRDD
// TODO: 对结果RDD降低分区数目
.coalesce(1)
.foreachPartition(iter => iter.foreach(println))
// 应用程序运行结束,关闭资源
sc.stop()
}
}
在实际开发中,什么时候适当调整RDD的分区数目呢?让程序性能更好好呢????
第一点:增加分区数目
- 当处理的数据很多的时候,可以考虑增加RDD的分区数目
第二点:减少分区数目
- 其一:当对RDD数据进行过滤操作(filter函数)后,考虑是否降低RDD分区数目
- 其二:当对结果RDD存储到外部系统
聚合函数
在数据分析领域中,对数据聚合操作是最为关键的,在Spark框架中各个模块使用时,主要就是其中聚合函数的使用。
集合中聚合函数
回顾列表List中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量。查看列表List中聚合函数reduce和fold源码如下:
通过代码,看看列表List中聚合函数使用:
运行截图如下所示:
fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:
聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:
RDD 中聚合函数
在RDD中提供类似列表List中聚合函数reduce和fold,查看如下:
案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:
运行原理分析:
使用RDD中fold聚合函数:
查看RDD中高级聚合函数aggregate,函数声明如下:
业务需求:使用aggregate函数实现RDD中最大的两个数据,分析如下:
核心业务代码如下:
运行结果原理剖析示意图:
上述完整范例演示代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* RDD中聚合函数:reduce、aggregate函数
*/
object SparkAggTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// 模拟数据,1 到 10 的列表,通过并行方式创建RDD
val datas = 1 to 10
val datasRDD: RDD[Int] = sc.parallelize(datas, numSlices = 2)
// 查看每个分区中的数据
datasRDD.foreachPartition{iter =>
println(s"p-${TaskContext.getPartitionId()}: ${iter.mkString(", ")}")
}
println("=========================================")
// 使用reduce函数聚合
val result: Int = datasRDD.reduce((tmp, item) => {
println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")
tmp + item
})
println(result)
println("=========================================")
// 使用fold函数聚合
val result2: Int = datasRDD.fold(0)((tmp, item) => {
println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")
tmp + item
})
println(result2)
println("=========================================")
// 使用aggregate函数获取最大的两个值
val top2: mutable.Seq[Int] = datasRDD.aggregate(new ListBuffer[Int]())(
// 分区内聚合函数,每个分区内数据如何聚合 seqOp: (U, T) => U,
(u, t) => {
println(s"p-${TaskContext.getPartitionId()}: u = $u, t = $t")
// 将元素加入到列表中
u += t //
// 降序排序
val top = u.sorted.takeRight(2)
// 返回
top
},
// 分区间聚合函数,每个分区聚合的结果如何聚合 combOp: (U, U) => U
(u1, u2) => {
println(s"p-${TaskContext.getPartitionId()}: u1 = $u1, u2 = $u2")
u1 ++= u2 // 将列表的数据合并,到u1中
//
u1.sorted.takeRight(2)
}
)
println(top2)
// 应用程序运行结束,关闭资源
sc.stop()
}
}
PairRDDFunctions 聚合函数
在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。*ByKey函数:将相同Key的Value进行聚合操作的,省去先分组再聚合。
第一类:分组函数groupByKey
第二类:分组聚合函数reduceByKey和foldByKey
但是reduceByKey和foldByKey聚合以后的结果数据类型与RDD中Value的数据类型是一样的。
第三类:分组聚合函数aggregateByKey
在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,基本上都能完成任意聚合功能。
演示范例代码如下:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD中聚合函数,针对RDD中数据类型Key/Value对:
* groupByKey
* reduceByKey/foldByKey
* aggregateByKey
* combineByKey
*/
object SparkAggByKeyTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// 1、并行化集合创建RDD数据集
val linesSeq: Seq[String] = Seq(
"hadoop scala hive spark scala sql sql", //
"hadoop scala spark hdfs hive spark", //
"spark hdfs spark hdfs scala hive spark" //
)
val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
// 2、分割单词,转换为二元组
val wordsRDD: RDD[(String, Int)] = inputRDD
.flatMap(line => line.split("\\s+"))
.map(word => word -> 1)
// TODO: 先使用groupByKey函数分组,再使用map函数聚合
val wordsGroupRDD: RDD[(String, Iterable[Int])] = wordsRDD.groupByKey()
val resultRDD: RDD[(String, Int)] = wordsGroupRDD.map{ case (word, values) =>
val count: Int = values.sum
word -> count
}
println(resultRDD.collectAsMap())
// TODO: 直接使用reduceByKey或foldByKey分组聚合
val resultRDD2: RDD[(String, Int)] = wordsRDD.reduceByKey((tmp, item) => tmp + item)
println(resultRDD2.collectAsMap())
val resultRDD3 = wordsRDD.foldByKey(0)((tmp, item) => tmp + item)
println(resultRDD3.collectAsMap())
// TODO: 使用aggregateByKey聚合
/*
def aggregateByKey[U: ClassTag]
(zeroValue: U) // 聚合中间临时变量初始值,类似fold函数zeroValue
(
seqOp: (U, V) => U, // 各个分区内数据聚合操作函数
combOp: (U, U) => U // 分区间聚合结果的聚合操作函数
): RDD[(K, U)]
*/
val resultRDD4 = wordsRDD.aggregateByKey(0)(
(tmp: Int, item: Int) => {
tmp + item
},
(tmp: Int, result: Int) => {
tmp + result
}
)
println(resultRDD4.collectAsMap())
// 应用程序运行结束,关闭资源
Thread.sleep(1000000)
sc.stop()
}
}
面试题
RDD中groupByKey和reduceByKey区别???
- reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
- groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的函数,将相同key的值聚合到一起,与reduceByKey的区别是只生成一个sequence。
相关文章:

【Spark分布式内存计算框架——Spark Core】4. RDD函数(下) 重分区函数、聚合函数
重分区函数 如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。 1)、增加分区函数 函数名称:repartition,此函数使用的谨慎,会产生Shuffle。 2)、…...

智能工厂自动化设备如何将数据采集到物联网云平台上
制造业工厂在进行生产管理、数字化转型升级的过程中,大量自动化设备的数据采集上云一直是困扰厂商的难题之一。因设备种类多、工艺复杂、设备老旧无多余通信接口导致数据无法集中、工艺无法实时管控,加上设备服务商的本地支持比较有限,因此设…...

SpringBoot整合Mybatis的核心原理
0. 前言:1. 自动配置类MybatisAutoConfiguration:1.1. SqlSessionFactory的生成:1.2. Mapper的扫描和代理生成:1.2.1. MapperScannerConfigurer1.2.2. MapperFactoryBean1.2.3. getMapper生成代理对象2. 小结:0. 前言&…...

滴滴一面:order by 调优10倍,思路是啥?
背景说明: Mysql调优,是大家日常常见的调优工作。 所以,Mysql调优是一个非常、非常核心的面试知识点。 在40岁老架构师 尼恩的读者交流群(50)中,其相关面试题是一个非常、非常高频的交流话题。 近段时间,有小伙伴面…...

Vue框架学习篇(五)
Vue框架学习篇(五) 1 组件 1.1 组件的基本使用 1.1.1 基本流程 a 引入外部vue组件必须要的js文件 <script src"../js/httpVueLoader.js"></script>b 创建.vue文件 <template><!--公共模板内容--></template><script><!…...

(蓝桥杯 刷题全集)【备战(蓝桥杯)算法竞赛-第1天(基础算法-上 专题)】( 从头开始重新做题,记录备战竞赛路上的每一道题 )距离蓝桥杯还有75天
🏆🏆🏆🏆🏆🏆🏆 欢迎观看我的博客,如有问题交流,欢迎评论区留言,一定尽快回复!(大家可以去看我的专栏,是所有文章的目录&a…...

C++——继承那些事儿你真的知道吗?
目录1.继承的概念及定义1.1继承的概念1.2 继承定义1.2.1定义格式1.2.2继承关系和访问限定符1.2.3继承基类成员访问方式的变化2.父类和子类对象赋值转换3.继承中的作用域4.派生类的默认成员函数5.继承与友元6. 继承与静态成员7.复杂的菱形继承及菱形虚拟继承如何解决数据冗余和二…...

leetcode 困难 —— N 皇后(简单递归)
(不知道为啥总是给这种简单的递归设为困难题,虽然优化部分很不错,但是题目太好过了) 题目: 按照国际象棋的规则,皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个…...

AWS实战:Dynamodb到Redshift数据同步
AWS Dynamodb简介 Amazon DynamoDB 是一种完全托管式、无服务器的 NoSQL 键值数据库,旨在运行任何规模的高性能应用程序。DynamoDB能在任何规模下实现不到10毫秒级的一致响应,并且它的存储空间无限,可在任何规模提供可靠的性能。DynamoDB 提…...

机器学习评估指标的十个常见面试问题
评估指标是用于评估机器学习模型性能的定量指标。它们提供了一种系统和客观的方法来比较不同的模型并衡量它们在解决特定问题方面的成功程度。通过比较不同模型的结果并评估其性能可以对使用哪些模型、如何改进现有模型以及如何优化给定任务的性能做出正确的决定,所…...

常见的安全问题汇总 学习记录
声明 本文是学习2017中国网站安全形势分析报告. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 2017年重大网站安全漏洞 CVE-2017-3248 :WebLogic 远程代码执行 2017年1月27日,WebLogic官方发布了一个编号为CVE-2017-3248 的…...

元宵晚会节目预告没有岳云鹏,是不敢透露还是另有隐情
在刚刚结束的元宵节晚会上,德云社的岳云鹏,再一次参加并引起轰动,并获得了观众朋友们的一致好评。 不过有细心的网友发现,早前央视元宵晚会节目预告,并没有看到小岳岳,难道是不敢提前透露,怕公布…...

计算机视觉 吴恩达 week 10 卷积
文章目录一、边缘检测二、填充 padding1、valid convolution2、same convolution三、卷积步长 strided convolution四、三维卷积五、池化层 pooling六、 为什么要使用卷积神经网络一、边缘检测 可以通过卷积操作来进行 原图像 n✖n 卷积核 f✖f 则输出的图像为 n-f1 二、填充…...
JavaScript 函数定义
JavaScript 函数定义 函数是 JavaScript 中的基本组件之一。一个函数是 JavaScript 过程 — 一组执行任务或计算值的语句。要使用一个函数,你必须将其定义在你希望调用它的作用域内。 一个 JavaScript 函数用function关键字定义,后面跟着函数名和圆括号…...

设计模式:建造者模式教你创建复杂对象
一、问题场景 当我们需要创建资源池配置对象的时候,资源池配置类里面有以下成员变量: 如果我们使用new关键字调用构造函数,构造函数参数列表就会太长。 如果我们使用set方法设置字段值,那minIdle<maxIdle<maxTotal的约束逻辑就没地方…...
在C++中将引用转换为指针表示
在C中将引用转换为指针表示 有没有办法在c 中"转换"对指针的引用?在下面的例子,func2已经定义了原型和我不能改变它,但func是我的API,我想为pass两个参数,或一(组和第二组,以NULL)或既不(均设置为NULL): void func2(some1 *p1, some2 *p2); func(some1…...

PS快速入门系列
01-界面构成 1菜单栏 2工具箱 3工县属性栏 4悬浮面板 5画布 ctr1N新建对话框(针对画布进行设置) 打开对话框:ctrl0(字母) 画布三种显示方式切换:F 隐藏工具箱,工具属性栏,悬浮面板…...

ASP.NET CORE 3.1 MVC“指定的网络名不再可用\企图在不存在的网络连接上进行操作”的问题解决过程
ASP.NET CORE 3.1 MVC“指定的网络名不再可用\企图在不存在的网络连接上进行操作”的问题解决过程 我家里的MAC没这个问题。这个是在windows上发生的。 起因很简单我用ASP.NET CORE 3.1 MVC做个项目做登录将数据从VIEW post到Controller上结果意外的报了错误。 各种百度都说…...

JVM从看懂到看开Ⅲ -- 类加载与字节码技术【下】
文章目录编译期处理默认构造器自动拆装箱泛型集合取值可变参数foreach 循环switch 字符串switch 枚举枚举类try-with-resources方法重写时的桥接方法匿名内部类类加载阶段加载链接初始化相关练习和应用类加载器类与类加载器启动类加载器拓展类加载器双亲委派模式自定义类加载器…...
服务器常用的41个状态码及其对应的含义
服务器常用的状态码及其对应的含义如下: 100——客户必须继续发出请求 101——客户要求服务器根据请求转换HTTP协议版本 200——交易成功 201——提示知道新文件的URL 202——接受和处理、但处理未完成 203——返回信息不确定或不完整 204——请求收到&#…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径
目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》
引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验
Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...

Ubuntu系统复制(U盘-电脑硬盘)
所需环境 电脑自带硬盘:1块 (1T) U盘1:Ubuntu系统引导盘(用于“U盘2”复制到“电脑自带硬盘”) U盘2:Ubuntu系统盘(1T,用于被复制) !!!建议“电脑…...
基于鸿蒙(HarmonyOS5)的打车小程序
1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...