RDD算子介绍(三)
1. join
将相同的key的值连接在一起,值的类型可以不同
val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)

如果有不同的key,则不会连接
val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("d", 4), ("c", 6), ("a", 4)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
相同的key有多个,则会两两匹配(笛卡尔乘积)
val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("c", 6), ("a", 4)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
类似于SQL中的join,RDD中的join也会出现笛卡尔乘积,所以需要谨慎使用。同样类似于SQL,RDD也有leftOuterJoin和rightOuterJoin。
val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.leftOuterJoin(rdd2)
joinRDD.collect().foreach(println)

val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.rightOuterJoin(rdd2)
joinRDD.collect().foreach(println)

2. cogroup
congroup即connect+group,相同的key放在一个组里,然后连接在一起
val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))
val cgRDD : RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
cogroup的参数可以不止一个rdd,最多可以有三个rdd
3. 案例实操:统计各个省份点击数前三的广告
val dataRDD = sc.textFile("data")// 转换为((省份, 广告), 1)的格式
val mapRDD : RDD[((String, String), Int)] = dataRDD.map(line => {val datas = datas.split(" ")((data(1), data(4)), 1)
})// 聚合
val reduceRDD : RDD[((String, String), Int)] = mapRDD.reduceByKey(_+_)// 转换为(省份, (广告, sum))的格式
val newMapRDD = reduceRDD.map{case((prv, ad), sum) => {(prv, (ad, sum))}
}// 按照key(省份)分组
val groupRDD : RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()// 按照值的第二个参数降序排序,取前三
val resultRDD = groupRDD.mapValues(iter => {iter.toList.sortBy(_._2)(Ordering.Int.reverse)
}).take(3)resultRDD.collect().foreach(println)
4. reduce
上述都是转换算子,接下来介绍行动算子。行动算子会触发作业的执行,底层调用的是runJob方法,会创建ActiveJob,并提交执行。
val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val i : Int = rdd.reduce(_+_)
println(i)
5. collect
将不同分区的数据按照分区的顺序采集到Driver端内存中,形成数组。
val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val i : Array[Int] = rdd.collect()
println(i.mkString(","))
6. count, first, take, takeOrdered
val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val i : Int = rdd.count()
println(i)val first : Int = rdd.first()
println(first)val takei : Array[Int]= rdd.take(3)
println(takei.mkString(","))val rdd1 : RDD[Int] = sc.makeRDD(List(4, 2, 3, 1))
val takeOrderedi : Array[Int]= rdd1.takeOrdered(3)
println(takeOrderedi.mkString(","))
7. aggregate、fold
与aggregateByKey类似,传入初始值、分区内计算规则、分区间计算规则,得到计算结果。只不过aggregateByKey需要根据不同的key进行分组,最后得到的是RDD,而aggregate不需要根据key进行分组计算,直接得到计算结果。
val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val result : Int = rdd.aggregate(0)(_+_, _+_)
println(result)
另外一个区别就是aggregate的初始值不进会参与分区内计算,还会参与分区间计算,而aggregateByKey的初始值只参与分区内计算,所以下面程序的允许结果为40
val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val result : Int = rdd.aggregate(10)(_+_, _+_)
println(result)
如果分区内计算规则和分区间计算规则相同,可以使用fold简化
val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val result : Int = rdd.fold(10)(_+_)
println(result)
8. countByKey、countByValue
val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val result : collection.Map[Int, Long] = rdd.countByValue()
println(result)
表示4出现了依次,2出现了依次,1出现了1次,3出现了1次。
val rdd = sc.makeRDD(List(("a", 1), ("a", 1), ("a", 1)))
val result : collection.Map[String, Long] = rdd.countByKey()
println(result)
按照key统计次数,跟值无关,这里a出现了3次。
9. WordCount的多种实现方式
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val group : RDD[(String, Iterable[String])] = words.groupBy(word=>word)
val wordCount : RDD[(String, Int)] = group.mapValues(iter=>iter.size)
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val group : RDD[(String, Iterable[Int])] = wordOne.groupByKey()
val wordCount : RDD[(String, Int)] = group.mapValues(iter=>iter.size)
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount : RDD[(String, Int)] = wordOne.reduceByKey()
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount : RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_)
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount : RDD[(String, Int)] = wordOne.combineByKey(v=>v)((x : Int, y : Int) => x + y, (x : Int, y : Int) => x + y)
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount : collection.Map[String, Long] = wordOne.countByKey()
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordCount : collection.Map[String, Long] = words.countByValue()
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordMap = word.map(word => {mutable.Map[String, Int]((word, 1))
})
val wordCount = wordMap.reduce((map1, map2) => {map2.foreach{case(word, count) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}map1
})
10. save
save相关的方法主要有saveAsTextFile、saveAsObjectFile、saveAsSequenceFile。其中,saveAsSequenceFile需要数据元素类型为key-value类型。
相关文章:
RDD算子介绍(三)
1. join 将相同的key的值连接在一起,值的类型可以不同 val rdd1 : RDD[(String, Int)] sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3))) val rdd2 : RDD[(String, Int)] sc.makeRDD(List(("a", 4), ("b", 5…...
Redis的脑裂问题
Redis 脑裂(Split-brain)问题是指在分布式系统中,特别是基于主从复制和哨兵(Sentinel)模式的Redis集群中,由于网络分区(network partition)而导致部分节点组成了独立可用的服务&…...
【算法】雪花算法生成分布式 ID
SueWakeup 个人中心:SueWakeup 系列专栏:学习Java框架 个性签名:人生乏味啊,我欲令之光怪陆离 本文封面由 凯楠📷 友情赞助播出! 目录 1. 什么是分布式 ID 2. 分布式 ID 基本要求 3. 数据库主键自增 4. UUID 5. S…...
FFplay使用滤镜添加字幕到现有视频显示
1.创建字幕文件4k.srt 4k.srt内容: 1 00:00:01.000 --> 00:00:30.000 日照香炉生紫烟2 00:00:31.000 --> 00:00:60.000 遥看瀑布挂前川3 00:01:01.000 --> 00:01:30.000 飞流直下三千尺4 00:01:31.000 --> 00:02:00.000 疑是银河落九天2.通过使用滤镜显示字幕在视…...
【Python + Django】Django模板语法 + 请求和响应
前言: 现在现在,我们要开始将变量的值展现在页面上面啦! 要是只会显示静态页面,我们的页面也太难看和死板了, 并且数据库的数据也没法展现在页面上。 但是呢,模板语法学习之后就可以啦!&…...
大数据面试总结 四
1、当hadoop集群中某一个节点挂了,内部数据流程是如何进行的? 每一个datanode都会定期向namenode发送heardbeat消息,当一段时间namenode没有接收到某一个datanode的消息,此时namenode就会将该datanode标记为死亡,并不…...
Spring Boot: 使用MongoOperations操作mongodb
一、添加依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4…...
PyTorch 深度学习(GPT 重译)(六)
十四、端到端结节分析,以及接下来的步骤 本章内容包括 连接分割和分类模型 为新任务微调网络 将直方图和其他指标类型添加到 TensorBoard 从过拟合到泛化 在过去的几章中,我们已经构建了许多对我们的项目至关重要的系统。我们开始加载数据…...
MyBatis3源码深度解析(十七)MyBatis缓存(一)一级缓存和二级缓存的实现原理
文章目录 前言第六章 MyBatis缓存6.1 MyBatis缓存实现类6.2 MyBatis一级缓存实现原理6.2.1 一级缓存在查询时的使用6.2.2 一级缓存在更新时的清空 6.3 MyBatis二级缓存的实现原理6.3.1 实现的二级缓存的Executor类型6.3.2 二级缓存在查询时使用6.3.3 二级缓存在更新时清空 前言…...
Go --- Go语言垃圾处理
概念 垃圾回收(GC-Garbage Collection)暂停程序业务逻辑SWT(stop the world)程序根节点:程序中被直接或间接引用的对象集合,能通过他们找出所有可以被访问到的对象,所以Go程序的根节点通常包括…...
力扣每日一题30:串联所有单词的子串
题目描述 给定一个字符串 s 和一个字符串数组 words。 words 中所有字符串 长度相同。 s 中的 串联子串 是指一个包含 words 中所有字符串以任意顺序排列连接起来的子串。 例如,如果 words ["ab","cd","ef"], 那么 &q…...
vim | vim的快捷命令行
快捷进入shell界面 -> :nnoremap <F8> :sh<CR> -> 绑定到了F8 :nnoremap <F8> :sh<CR> 快捷执行 -> :nnoremap <F5> :wa<CR>:!g % -o a.out && ./a.out<CR> -> 绑定到了F5 :nnoremap <F5> :wa<CR>…...
项目管理平台-01-BugClose 入门介绍
拓展阅读 Devops-01-devops 是什么? Devops-02-Jpom 简而轻的低侵入式在线构建、自动部署、日常运维、项目监控软件 代码质量管理 SonarQube-01-入门介绍 项目管理平台-01-jira 入门介绍 缺陷跟踪管理系统,为针对缺陷管理、任务追踪和项目管理的商业…...
web集群-lvs-DR模式基本配置
目录 环境: 一、配置RS 1、安装常见软件 2、配置web服务 3、添加vip 4、arp抑制 二、配置LVS 1、添加vip 2、安装配置工具 3、配置DR 三、测试 四、脚本方式配置 1、LVS-DR 2、LVS-RS 环境: master lvs 192.168.80.161 no…...
基于深度学习的面部情绪识别算法仿真与分析
声明:以下内容均属于本人本科论文内容,禁止盗用,否则将追究相关责任 基于深度学习的面部情绪识别算法仿真与分析 摘要结果分析1、本次设计通过网络爬虫技术获取了七种面部情绪图片:吃惊、恐惧、厌恶、高兴、伤心、愤怒、自然各若…...
C语言经典面试题目(十六)
1、什么是C语言中的指针常量和指针变量?它们有什么区别? 在C语言中,指针常量和指针变量是指针的两种不同类型。它们的区别在于指针的指向和指针本身是否可以被修改。 指针常量:指针指向的内存地址不可变,但指针本身的…...
【C语言】文件操作揭秘:C语言中文件的顺序读写、随机读写、判断文件结束和文件缓冲区详细解析【图文详解】
欢迎来CILMY23的博客喔,本篇为【C语言】文件操作揭秘:C语言中文件的顺序读写、随机读写、判断文件结束和文件缓冲区详细解析【图文详解】,感谢观看,支持的可以给个一键三连,点赞关注收藏。 前言 欢迎来到本篇博客&…...
JAVA八股文面经问题整理第6弹
文章目录 目录 文章目录 提问问题 问题1 问题2 问题3 问题4 问题5 问题6 问题7 问题8 问题9 问题10 问题11 问题12 写在最后 提问问题 介绍一下Linux常⽤命令,例如:Vim快捷键,常⽤查看Log的命令,路径相关&#x…...
pytest相关面试题
pytest是什么?它有什么优点? pytest是一个非常流行的Python测试框架,它具有简洁、易用、高校等优点。他可以帮助测试人员方便地编写和运行测试用例,并且提供了丰富的插件和扩展,支持各种测试需求介绍下pytest常用的库 …...
Keras库搭建神经网络
Keras并非简单的神经网络库,而是一个基于Theano的强大的深度学习库,利用它不仅仅可以搭建普通的神经网络,还可以搭建各种深度学习模型,如自编码器、循环神经网络、递归神经网络、卷积神经网络等。 安装代码: pip ins…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
Matlab | matlab常用命令总结
常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...
leetcodeSQL解题:3564. 季节性销售分析
leetcodeSQL解题:3564. 季节性销售分析 题目: 表:sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具
第2章 虚拟机性能监控,故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令:jps [options] [hostid] 功能:本地虚拟机进程显示进程ID(与ps相同),可同时显示主类&#x…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
