当前位置: 首页 > news >正文

Spark---RDD(Key-Value类型转换算子)

文章目录

  • 1.RDD Key-Value类型
      • 1.1 partitionBy
      • 1.2 reduceByKey
      • 1.3 groupByKey
          • reduceByKey和groupByKey的区别
          • 分区间和分区内
      • 1.4 aggregateByKey
          • 获取相同key的value的平均值
      • 1.5 foldByKey
      • 1.6 combineByKey
      • 1.7 sortByKey
      • 1.8 join
      • 1.9 leftOuterJoin
      • 1.10 cogroup

1.RDD Key-Value类型

Key-Value类型的算子即对键值对进行操作。

1.1 partitionBy

将数据按照指定的 Partitioner(分区器) 重新进行分区。Spark 默认的分区器为HashPartitioner,Spark除了默认的分区器外,常见的分区器还有:RangePartitioner、Custom Partitioner、SinglePartitioner等。

函数定义:
def partitionBy(partitioner: Partitioner): RDD[(K, V)]

	//使用HashPartitioner分区器并设置分区个数为2val data1: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)data1.partitionBy(new HashPartitioner(2));data1.collect().foreach(println)

在这里插入图片描述

1.2 reduceByKey

可以将数据按照相同的 Key 对 Value 进行聚合

函数定义:
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

	//将数据按照相同的key对value进行聚合val data1: RDD[(String, Int)] = sparkRdd.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a",4),("b",5)))val data2: RDD[(String, Int)] = data1.reduceByKey((x: Int, y: Int) => {x + y})data2.collect().foreach(println)

在这里插入图片描述

1.3 groupByKey

将数据源的数据根据 key 对 value 进行分组

函数定义:
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

    val dataRDD1 = sparkRdd.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("a",4),("b",5)))val data1 = dataRDD1.groupByKey()//指定分区个数为2val data2 = dataRDD1.groupByKey(2)//指定分区器和分区个数val data3 = dataRDD1.groupByKey(new HashPartitioner(2))data1.collect().foreach(println)println("-------------------->")data2.collect().foreach(println)println("-------------------->")data3.collect().foreach(println)

在这里插入图片描述

reduceByKey和groupByKey的区别

从功能的角度来看:reduceByKey包含了分组和聚合功能,而groupByKey只包含了分组功能。
从shuffle的角度来看:为了避免占用过多的内存空间,reduceByKey和groupByKey在执行的过程中,都会执行shuffle操作,将数据打散写入到磁盘的临时文件中,而reduceByKey在进行shuffle前会对数据进行预聚合的操作,致使shuffle的效率得到的提升,因为减少了落盘的数据量。但是groupByKey在shuffle前不会进行预聚合操作。所以,reduceByKey在进行分组的时候,效率相对groupByKey来说较高。

reduceByKey:
在这里插入图片描述

groupByKey:

在这里插入图片描述

分区间和分区内

分区间: 顾名思义,分区间就是指的多个分区之间的操作。如reduceByKey在shuffle操作后将不同分区的数据传输在同一个分区中进行聚合。
分区内: 分区内字面意思指的是单个分区内之间的操作。如reduceByKey的预聚合功能就是在分区内完成

1.4 aggregateByKey

将数据根据不同的规则进行分区内计算和分区间计算,如reduceByKey中分区间和分区内都是聚合操作,而使用aggregateByKey可以设置分区间和分区内执行不同的操作。

函数定义:
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]

    //取出每个分区内相同 key 的最大值然后分区间相加// aggregateByKey 算子是函数柯里化,存在两个参数列表// 1. 第一个参数列表中的参数表示初始值// 2. 第二个参数列表中含有两个参数// 2.1 第一个参数表示分区内的计算规则// 2.2 第二个参数表示分区间的计算规则val data1 = sparkRdd.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)),2)val data2 = data1.aggregateByKey(0)((x,y)=>{Math.max(x,y)},(x,y)=>{x+y})data2.collect().foreach(println)**

在这里插入图片描述
注意:最终的结果会受到设置的初始值的影响,返回结果的值的类型和初始值保持一致。

获取相同key的value的平均值
    val data1:RDD[(String,Int)] = sparkRdd.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4),("b",5),("a",6)),2)//设置初始值,初始值为一个元组,元组第一个元素表示value,第二个表示出现次数,初始默认都为0val data2:RDD[(String,(Int,Int))] = data1.aggregateByKey((0,0))((t, v)=> {(t._1 + v, t._2 + 1)} ,//分区内计算(t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)}//分区间计算)//和除以次数求出平均值val data3 = data2.mapValues({case (sum, count) => sum / count})data3.collect().foreach(println)

在这里插入图片描述

1.5 foldByKey

当分区内和分区间的计算规则相同的时候,aggregateByKey 就可以简化为 foldByKey

函数定义:
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

    val dataRDD1 = sparkRdd.makeRDD(List(("a",1),("b",2),("a",3)))val dataRDD2 = dataRDD1.foldByKey(0)(_+_)dataRDD2.collect().foreach(println)

在这里插入图片描述

1.6 combineByKey

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

函数定义:

def combineByKey[C](
createCombiner: V => C,//对数据进行转换
mergeValue: (C, V) => C, //分区内合并
mergeCombiners: (C, C) => C): RDD[(K, C)] //分区间合并

//将数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个 key 的平均值
val rddSource: RDD[(String, Int)] = sparkRdd.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
val combinRdd: RDD[(String, (Int, Int))] = rddSource.combineByKey(((x:Int)=>{(x,1)}),//对每个value进行转换,转换后为(value,1),第一个元素为值,第二个元素为出现的次数((t1:(Int,Int),v)=>{(t1._1+v,t1._2+1)}),//分区内合并((t1,t2)=>{(t1._1+t2._1,t1._2+t2._2)})//分区间合并
)//mapValues算子是在key保持不变的时候对value进行操作val mapRdd: RDD[(String, Int)] = combinRdd.mapValues({case ((sum: Int, count: Int)) => sum / count})mapRdd.collect().foreach(println)

在这里插入图片描述
由此看出,combineByKey和aggreateByKey的不同之处在于,combineByKey可以不设置初始值,只需要对第一个元素进行转换,转换到合适的计算格式即可。

1.7 sortByKey

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

函数定义:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

//升序排序val dataRDD1 = sparkRdd.makeRDD(List(("a",1),("b",2),("c",3)))val sortRdd: RDD[(String, Int)] = dataRDD1.sortByKey()sortRdd.collect().foreach(print)

在这里插入图片描述
sortByKey默认为升序排序,如果想要降序排序,只需要将sortByKey第一个参数修改为false即可。
在这里插入图片描述

1.8 join

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD

函数定义:

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

	//join操作相当于数据库中的内连接,在连接的时候自动去除两边的悬浮元组val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(Array((1, 4), (2, 5), (3, 6)))rdd0.join(rdd1).collect().foreach(print)//修改rdd1,使其少了key=3的这个元素val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(Array((1, 4), (2, 5)))rdd0.join(rdd1).collect().foreach(print)

在这里插入图片描述
在这里插入图片描述

1.9 leftOuterJoin

类似于 SQL 语句的左外连接

函数定义:

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(List((1, "a"), (2, "b")))val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(List((1, 4), (2, 5),(3, 6)))val rddRes = rdd0.leftOuterJoin(rdd1)rddRes.collect().foreach(print)

在这里插入图片描述

1.10 cogroup

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD,即先对

函数定义:

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

    val rdd0: RDD[(Int, String)] = sparkRdd.makeRDD(List((1, "a"), (2, "b"),(3,"c")))val rdd1: RDD[(Int, Int)] = sparkRdd.makeRDD(List((1, 4), (2, 5)))val rddRes = rdd0.cogroup(rdd1)rddRes.collect().foreach(print)

在这里插入图片描述

相关文章:

Spark---RDD(Key-Value类型转换算子)

文章目录 1.RDD Key-Value类型1.1 partitionBy1.2 reduceByKey1.3 groupByKeyreduceByKey和groupByKey的区别分区间和分区内 1.4 aggregateByKey获取相同key的value的平均值 1.5 foldByKey1.6 combineByKey1.7 sortByKey1.8 join1.9 leftOuterJoin1.10 cogroup 1.RDD Key-Value…...

后台代码New出来DataGridTextColumn 动态添加到DataGrain 设置 Margin属性

在 WPF 中给 DataGridTextColumn 设置 MarginProperty 可以通过自定义 DataGridTemplateColumn 来实现。以下是一个示例代码&#xff1a; <DataGrid><DataGrid.Columns><DataGridTemplateColumn><DataGridTemplateColumn.CellTemplate><DataTempla…...

MySQL面试题(下)

09&#xff09;查询学过「张三」老师授课的同学的信息 SELECTs.*,c.cname,t.tnameFROMt_mysql_teacher t,t_mysql_student s,t_mysql_course c,t_mysql_score scWHEREt.tidc.tid and c.cidsc.cid and sc.sids.sid and tname 张三 10&#xff09;查询没有学全所有课程的同学的…...

【Linux】如何检查Linux用户是否具有sudo权限

问题背景或前提知识 在Linux系统中&#xff0c;sudo&#xff08;superuser do&#xff09;是一个重要的命令&#xff0c;它允许普通用户以系统管理员的身份执行命令。了解用户是否拥有sudo权限对于系统管理和安全性来说是非常重要的。 技术名词解释 sudo&#xff1a;一种程序…...

2024.1.13 Kafka六大机制和Structured Streaming

目录 一 . Kafka中生产者数据分发策略 二. Kafka消费者的负载均衡机制 三 . 数据不丢失机制 生产者端是如何保证数据不丢失的呢&#xff1f; Broker端如何保证数据不丢失 消费端如何保证数据不丢失 Kafka中消费者如何对数据仅且只消费一次 四 . 启动Kafka eagle命令 数…...

遥感影像-语义分割数据集:Landsat8云数据集详细介绍及训练样本处理流程

原始数据集详情 简介&#xff1a;该云数据集包括RGB三通道的高分辨率图像&#xff0c;在全球不同区域的分辨率15米。这些图像采集自Lansat8的五种主要土地覆盖类型&#xff0c;即水、植被、湿地、城市、冰雪和贫瘠土地。 KeyValue卫星类型landsat8覆盖区域未知场景水、植被、…...

YOLOV8在coco128上的训练

coco128是coco数据集的子集只有128张图片 训练代码main.py from ultralytics import YOLO# Load a model model YOLO("yolov8n.yaml") # build a new model from scratch model YOLO("yolov8n.pt") # load a pretrained model (recommended for trai…...

设计模式——享元模式

享元模式&#xff08;Flyweight Pattern&#xff09;是一种结构型设计模式&#xff0c;它的主要目的是通过共享已存在的对象来大幅度减少需要创建的对象数量&#xff0c;从而降低系统内存消耗和提高性能。它通过将对象的状态划分为内部状态&#xff08;Intrinsic State&#xf…...

【Python机器学习】分类器的不确定估计——决策函数

scikit-learn接口的分类器能够给出预测的不确定度估计&#xff0c;一般来说&#xff0c;分类器会预测一个测试点属于哪个类别&#xff0c;还包括它对这个预测的置信程度。 scikit-learn中有两个函数可以用于获取分类器的不确定度估计&#xff1a;decidion_function和predict_pr…...

云原生周刊:K8sGPT 加入 CNCF | 2024.1.8

开源项目推荐 VolSync VolSync 使用 rsync 或 rclone 在集群之间异步复制 Kubernetes 持久卷。它还支持通过 Restic 创建持久卷的备份。 KubeClarity KubeClarity 是一种用于检测和管理软件物料清单 (SBOM) 以及容器映像和文件系统漏洞的工具。它扫描运行时 K8s 集群和 CI/…...

LightGBM原理和调参

背景知识 LightGBM(Light Gradient Boosting Machine)是一个实现GBDT算法的框架&#xff0c;具有支持高效率的并行训练、更快的训练速度、更低的内存消耗、更好的准确率、支持分布式可以处理海量数据等优点。 普通的GBDT算法不支持用mini-batch的方式训练&#xff0c;在每一次…...

ROS无人机开发常见错误

飞控部分 一、解锁时飞控不闪红灯&#xff0c;无任何反应&#xff0c;地面站也无报错 解决办法&#xff1a; 打开地面站的遥控器一栏 首先检查右下角Channel Monitor是否有识别出遥控各通道的值&#xff0c;如果没有&#xff0c;检查遥控器是否打开&#xff0c;遥控器和接收…...

Baumer工业相机堡盟工业相机如何联合NEOAPI SDK和OpenCV实现相机图像转换为AVI视频格式(C#)

Baumer工业相机堡盟工业相机如何联合NEOAPI SDK和OpenCV实现相机图像转换为视频格式&#xff08;C#&#xff09; Baumer工业相机Baumer工业相机的图像转换为OpenCV的图像的技术背景在NEOAPI SDK里实现相机图像转换为视频格式 工业相机通过OpenCV实现相机图像转换为视频格式的优…...

第一次面试总结 - 迈瑞医疗 - 软件测试

&#x1f9f8;欢迎来到dream_ready的博客&#xff0c;&#x1f4dc;相信您对专栏 “本人真实面经” 很感兴趣o (ˉ▽ˉ&#xff1b;) 专栏 —— 本人真实面经&#xff0c;更多真实面试经验&#xff0c;中大厂面试总结等您挖掘 注&#xff1a;此次面经全靠小嘴八八&#xff0c;没…...

利用Qt输出XML文件

使用Qt输出xml文件 void PixelConversionLibrary::generateXML() {QFile file("D:/TEST.xml");//创建xml文件if (!file.open(QIODevice::WriteOnly | QIODevice::Text))//以只写方式&#xff0c;文本模式打开文件{qDebug() << "generateXML:Failed to op…...

OpenWrt智能路由器Wan PPPoE拨号配置方法

OpenWrt智能路由器的wan PPPoE拨号配置方法和我们常见的不太一样, 需要先找到wan网卡,然后将协议切换为 PPPoE然后才能看到输入上网账号和密码的地方. 首先登录路由器 http://openwrt.lan/ 然后找到 Network --> Interfaces 这里会显示你当前的路由器的所有接口, 选择 …...

(十一)IIC总线-AT24C02-EEPROM

文章目录 IIC总线篇AT24C02-EEPROM篇主要特性引脚说明AT24Cxx用几位数据地址随机寻址的(存储器组织)AT24C02设备操作AT24CXX设备寻址EEPROM写操作的种类EEPROM读操作的种类实现单字节写实现任意读读写应用 IIC总线篇 前面介绍过了&#xff0c;请参考 (十)IIC总线-PCF8591-ADC/…...

现在做电商还有发展空间吗?哪个平台的盈利比较大?

我是电商珠珠 对于部分人来说&#xff0c;实体店的投入太大&#xff0c;一上来就是十几w&#xff0c;有时候还看不到结果。 所以有的人就瞄准了电商这个圈子&#xff0c;做线上平台。 大家都知道&#xff0c;近年来直播电商很火&#xff0c;所以很多商家都会去找达人带货&am…...

多节点 docker 部署 elastic 集群

参考 Install Elasticsearch with Docker Images 环境 docker # docker version Client: Docker Engine - CommunityVersion: 24.0.7API version: 1.43Go version: go1.20.10Git commit: afdd53bBuilt: Thu Oct 26 09:08:01 202…...

2023年全国职业院校技能大赛软件测试赛题—单元测试卷⑨

单元测试 一、任务要求 题目1&#xff1a;根据下列流程图编写程序实现相应分析处理并显示结果。返回文字“xa*a*b的值&#xff1a;”和x的值&#xff1b;返回文字“xa-b的值&#xff1a;”和x的值&#xff1b;返回文字“xab的值&#xff1a;”和x的值。其中变量a、b均须为整型…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

QT: `long long` 类型转换为 `QString` 2025.6.5

在 Qt 中&#xff0c;将 long long 类型转换为 QString 可以通过以下两种常用方法实现&#xff1a; 方法 1&#xff1a;使用 QString::number() 直接调用 QString 的静态方法 number()&#xff0c;将数值转换为字符串&#xff1a; long long value 1234567890123456789LL; …...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !

我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...

uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)

UniApp 集成腾讯云 IM 富媒体消息全攻略&#xff08;地理位置/文件&#xff09; 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型&#xff0c;核心实现方式&#xff1a; 标准消息类型&#xff1a;直接使用 SDK 内置类型&#xff08;文件、图片等&#xff09;自…...