当前位置: 首页 > news >正文

RDD算子介绍(三)

1. join

相同的key的值连接在一起,值的类型可以不同

val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)

如果有不同的key,则不会连接

val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("d", 4), ("c", 6), ("a", 4)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)

 

 相同的key有多个,则会两两匹配(笛卡尔乘积)

val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("c", 6), ("a", 4)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)

 

 类似于SQL中的join,RDD中的join也会出现笛卡尔乘积,所以需要谨慎使用。同样类似于SQL,RDD也有leftOuterJoin和rightOuterJoin。

val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.leftOuterJoin(rdd2)
joinRDD.collect().foreach(println)

val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))
val joinRDD : RDD[(String, (Int, Int))] = rdd1.rightOuterJoin(rdd2)
joinRDD.collect().foreach(println)

 

 2. cogroup

congroup即connect+group,相同的key放在一个组里,然后连接在一起

val rdd1 : RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2)))
val rdd2 : RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))
val cgRDD : RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)

 

cogroup的参数可以不止一个rdd,最多可以有三个rdd

3.  案例实操:统计各个省份点击数前三的广告

val dataRDD = sc.textFile("data")// 转换为((省份, 广告), 1)的格式
val mapRDD : RDD[((String, String), Int)] = dataRDD.map(line => {val datas = datas.split(" ")((data(1), data(4)), 1)
})// 聚合
val reduceRDD : RDD[((String, String), Int)] = mapRDD.reduceByKey(_+_)// 转换为(省份, (广告, sum))的格式
val newMapRDD = reduceRDD.map{case((prv, ad), sum) => {(prv, (ad, sum))}
}// 按照key(省份)分组
val groupRDD : RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()// 按照值的第二个参数降序排序,取前三
val resultRDD = groupRDD.mapValues(iter => {iter.toList.sortBy(_._2)(Ordering.Int.reverse)
}).take(3)resultRDD.collect().foreach(println)

4.  reduce

上述都是转换算子,接下来介绍行动算子。行动算子会触发作业的执行,底层调用的是runJob方法,会创建ActiveJob,并提交执行。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val i : Int = rdd.reduce(_+_)
println(i)

5.  collect

将不同分区的数据按照分区的顺序采集到Driver端内存中,形成数组。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val i : Array[Int] = rdd.collect()
println(i.mkString(","))

 

 6. count, first, take, takeOrdered

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val i : Int = rdd.count()
println(i)val first : Int = rdd.first()
println(first)val takei : Array[Int]= rdd.take(3)
println(takei.mkString(","))val rdd1 : RDD[Int] = sc.makeRDD(List(4, 2, 3, 1))
val takeOrderedi : Array[Int]= rdd1.takeOrdered(3)
println(takeOrderedi.mkString(","))

 

 7. aggregate、fold

与aggregateByKey类似,传入初始值、分区内计算规则、分区间计算规则,得到计算结果。只不过aggregateByKey需要根据不同的key进行分组,最后得到的是RDD,而aggregate不需要根据key进行分组计算,直接得到计算结果。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val result : Int = rdd.aggregate(0)(_+_, _+_)
println(result)

 

另外一个区别就是aggregate的初始值不进会参与分区内计算,还会参与分区间计算,而aggregateByKey的初始值只参与分区内计算,所以下面程序的允许结果为40

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val result : Int = rdd.aggregate(10)(_+_, _+_)
println(result)

 如果分区内计算规则和分区间计算规则相同,可以使用fold简化

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val result : Int = rdd.fold(10)(_+_)
println(result)

8. countByKey、countByValue

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val result : collection.Map[Int, Long] = rdd.countByValue()
println(result)

 

表示4出现了依次,2出现了依次,1出现了1次,3出现了1次。

val rdd = sc.makeRDD(List(("a", 1), ("a", 1), ("a", 1)))
val result : collection.Map[String, Long] = rdd.countByKey()
println(result)

 

按照key统计次数,跟值无关,这里a出现了3次。 

9. WordCount的多种实现方式

val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val group : RDD[(String, Iterable[String])] = words.groupBy(word=>word)
val wordCount : RDD[(String, Int)] = group.mapValues(iter=>iter.size)
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val group : RDD[(String, Iterable[Int])] = wordOne.groupByKey()
val wordCount : RDD[(String, Int)] = group.mapValues(iter=>iter.size)
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount : RDD[(String, Int)] = wordOne.reduceByKey()
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount : RDD[(String, Int)] = wordOne.aggregateByKey(0)(_+_, _+_)
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount : RDD[(String, Int)] = wordOne.combineByKey(v=>v)((x : Int, y : Int) => x + y, (x : Int, y : Int) => x + y)
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount : collection.Map[String, Long] = wordOne.countByKey()
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordCount : collection.Map[String, Long] = words.countByValue()
val rdd = sc.makeRDD(List("Hello Scala", "Hello Scala"))
val words : RDD[String] = rdd.flatMap(_.split(" "))
val wordMap = word.map(word => {mutable.Map[String, Int]((word, 1))
})
val wordCount = wordMap.reduce((map1, map2) => {map2.foreach{case(word, count) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}map1
})

 10. save

save相关的方法主要有saveAsTextFile、saveAsObjectFile、saveAsSequenceFile。其中,saveAsSequenceFile需要数据元素类型为key-value类型。

相关文章:

RDD算子介绍(三)

1. join 将相同的key的值连接在一起,值的类型可以不同 val rdd1 : RDD[(String, Int)] sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3))) val rdd2 : RDD[(String, Int)] sc.makeRDD(List(("a", 4), ("b", 5…...

Redis的脑裂问题

Redis 脑裂(Split-brain)问题是指在分布式系统中,特别是基于主从复制和哨兵(Sentinel)模式的Redis集群中,由于网络分区(network partition)而导致部分节点组成了独立可用的服务&…...

【算法】雪花算法生成分布式 ID

SueWakeup 个人中心:SueWakeup 系列专栏:学习Java框架 个性签名:人生乏味啊,我欲令之光怪陆离 本文封面由 凯楠📷 友情赞助播出! 目录 1. 什么是分布式 ID 2. 分布式 ID 基本要求 3. 数据库主键自增 4. UUID 5. S…...

FFplay使用滤镜添加字幕到现有视频显示

1.创建字幕文件4k.srt 4k.srt内容: 1 00:00:01.000 --> 00:00:30.000 日照香炉生紫烟2 00:00:31.000 --> 00:00:60.000 遥看瀑布挂前川3 00:01:01.000 --> 00:01:30.000 飞流直下三千尺4 00:01:31.000 --> 00:02:00.000 疑是银河落九天2.通过使用滤镜显示字幕在视…...

【Python + Django】Django模板语法 + 请求和响应

前言: 现在现在,我们要开始将变量的值展现在页面上面啦! 要是只会显示静态页面,我们的页面也太难看和死板了, 并且数据库的数据也没法展现在页面上。 但是呢,模板语法学习之后就可以啦!&…...

大数据面试总结 四

1、当hadoop集群中某一个节点挂了,内部数据流程是如何进行的? 每一个datanode都会定期向namenode发送heardbeat消息,当一段时间namenode没有接收到某一个datanode的消息,此时namenode就会将该datanode标记为死亡,并不…...

Spring Boot: 使用MongoOperations操作mongodb

一、添加依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4…...

PyTorch 深度学习(GPT 重译)(六)

十四、端到端结节分析&#xff0c;以及接下来的步骤 本章内容包括 连接分割和分类模型 为新任务微调网络 将直方图和其他指标类型添加到 TensorBoard 从过拟合到泛化 在过去的几章中&#xff0c;我们已经构建了许多对我们的项目至关重要的系统。我们开始加载数据&#xf…...

MyBatis3源码深度解析(十七)MyBatis缓存(一)一级缓存和二级缓存的实现原理

文章目录 前言第六章 MyBatis缓存6.1 MyBatis缓存实现类6.2 MyBatis一级缓存实现原理6.2.1 一级缓存在查询时的使用6.2.2 一级缓存在更新时的清空 6.3 MyBatis二级缓存的实现原理6.3.1 实现的二级缓存的Executor类型6.3.2 二级缓存在查询时使用6.3.3 二级缓存在更新时清空 前言…...

Go --- Go语言垃圾处理

概念 垃圾回收&#xff08;GC-Garbage Collection&#xff09;暂停程序业务逻辑SWT&#xff08;stop the world&#xff09;程序根节点&#xff1a;程序中被直接或间接引用的对象集合&#xff0c;能通过他们找出所有可以被访问到的对象&#xff0c;所以Go程序的根节点通常包括…...

力扣每日一题30:串联所有单词的子串

题目描述 给定一个字符串 s 和一个字符串数组 words。 words 中所有字符串 长度相同。 s 中的 串联子串 是指一个包含 words 中所有字符串以任意顺序排列连接起来的子串。 例如&#xff0c;如果 words ["ab","cd","ef"]&#xff0c; 那么 &q…...

vim | vim的快捷命令行

快捷进入shell界面 -> :nnoremap <F8> :sh<CR> -> 绑定到了F8 :nnoremap <F8> :sh<CR> 快捷执行 -> :nnoremap <F5> :wa<CR>:!g % -o a.out && ./a.out<CR> -> 绑定到了F5 :nnoremap <F5> :wa<CR>…...

项目管理平台-01-BugClose 入门介绍

拓展阅读 Devops-01-devops 是什么&#xff1f; Devops-02-Jpom 简而轻的低侵入式在线构建、自动部署、日常运维、项目监控软件 代码质量管理 SonarQube-01-入门介绍 项目管理平台-01-jira 入门介绍 缺陷跟踪管理系统&#xff0c;为针对缺陷管理、任务追踪和项目管理的商业…...

web集群-lvs-DR模式基本配置

目录 环境&#xff1a; 一、配置RS 1、安装常见软件 2、配置web服务 3、添加vip 4、arp抑制 二、配置LVS 1、添加vip 2、安装配置工具 3、配置DR 三、测试 四、脚本方式配置 1、LVS-DR 2、LVS-RS 环境&#xff1a; master lvs 192.168.80.161 no…...

基于深度学习的面部情绪识别算法仿真与分析

声明&#xff1a;以下内容均属于本人本科论文内容&#xff0c;禁止盗用&#xff0c;否则将追究相关责任 基于深度学习的面部情绪识别算法仿真与分析 摘要结果分析1、本次设计通过网络爬虫技术获取了七种面部情绪图片&#xff1a;吃惊、恐惧、厌恶、高兴、伤心、愤怒、自然各若…...

C语言经典面试题目(十六)

1、什么是C语言中的指针常量和指针变量&#xff1f;它们有什么区别&#xff1f; 在C语言中&#xff0c;指针常量和指针变量是指针的两种不同类型。它们的区别在于指针的指向和指针本身是否可以被修改。 指针常量&#xff1a;指针指向的内存地址不可变&#xff0c;但指针本身的…...

【C语言】文件操作揭秘:C语言中文件的顺序读写、随机读写、判断文件结束和文件缓冲区详细解析【图文详解】

欢迎来CILMY23的博客喔&#xff0c;本篇为【C语言】文件操作揭秘&#xff1a;C语言中文件的顺序读写、随机读写、判断文件结束和文件缓冲区详细解析【图文详解】&#xff0c;感谢观看&#xff0c;支持的可以给个一键三连&#xff0c;点赞关注收藏。 前言 欢迎来到本篇博客&…...

JAVA八股文面经问题整理第6弹

文章目录 目录 文章目录 提问问题 问题1 问题2 问题3 问题4 问题5 问题6 问题7 问题8 问题9 问题10 问题11 问题12 写在最后 提问问题 介绍一下Linux常⽤命令&#xff0c;例如&#xff1a;Vim快捷键&#xff0c;常⽤查看Log的命令&#xff0c;路径相关&#x…...

pytest相关面试题

pytest是什么&#xff1f;它有什么优点&#xff1f; pytest是一个非常流行的Python测试框架&#xff0c;它具有简洁、易用、高校等优点。他可以帮助测试人员方便地编写和运行测试用例&#xff0c;并且提供了丰富的插件和扩展&#xff0c;支持各种测试需求介绍下pytest常用的库 …...

Keras库搭建神经网络

Keras并非简单的神经网络库&#xff0c;而是一个基于Theano的强大的深度学习库&#xff0c;利用它不仅仅可以搭建普通的神经网络&#xff0c;还可以搭建各种深度学习模型&#xff0c;如自编码器、循环神经网络、递归神经网络、卷积神经网络等。 安装代码&#xff1a; pip ins…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

ElasticSearch搜索引擎之倒排索引及其底层算法

文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

C++.OpenGL (10/64)基础光照(Basic Lighting)

基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

动态 Web 开发技术入门篇

一、HTTP 协议核心 1.1 HTTP 基础 协议全称 &#xff1a;HyperText Transfer Protocol&#xff08;超文本传输协议&#xff09; 默认端口 &#xff1a;HTTP 使用 80 端口&#xff0c;HTTPS 使用 443 端口。 请求方法 &#xff1a; GET &#xff1a;用于获取资源&#xff0c;…...