当前位置: 首页 > news >正文

【博学谷学习记录】超强总结,用心分享|Spark的RDD算子分类

概念

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合,它是一种抽象的数据模型,本身并不存储数据,仅仅是一个数据传输的管道,作为使用者,只需要告知RDD,数据从哪里读,中间需要进行什么样的转换逻辑,以及最后需要将结果输出到什么位置即可,RDD启动后,会根据用户设置的规则,完成整个处理操作

分类

所有的RDD算子,共分为2大类

  1. Transformation(转换算子)
    1. 所有的转换算子执行后,都会返回一个新的RDD
    2. 所有转换算子是惰性的,不会立即执行,可以认为只是此时只是定义了RDD的计算规则
    3. 转换算子必须遇到动作算子都会触发执行
    4. 常见转换算子
      1. map, filter, flatMap, mapPartitions, mapPartitionsWithIndex
  2. Action(动作算子)
    1. 动作算子执行后,不会返回一个RDD,要么没有返回值,要么返回其它的
    2. 动作算子都是立即执行,一个动作算子会产生一个Jo任务,运行动作算子所依赖的所有RDD
    3. 常见动作算子
      1. collect, count, first, take, reduce

转换算子

值类型的算子

map算子
  • 格式:rdd.map(fn)
  • 说明:根据传入的函数,对数据进行一对一的转换操作,传入一行,返回一行
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 需求: 请对每一个元素进行 +1 返回
rdd_collect = rdd.map(lambda num: num + 1).collect()
print(rdd_collect)结果:
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

groupBy算子
  • 格式:rdd.groupBy(fn)
  • 说明:根据传入的函数对数据进行分组操作
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 需求: 请将数据分为奇数和偶数二部分
rdd_collect = rdd.groupBy(lambda num: 'o' if num % 2 == 0 else 'j').mapValues(list).collect()
print(rdd_collect)结果:[('j', [1, 3, 5, 7, 9]), ('o', [2, 4, 6, 8, 10])]

filter算子
  • 格式:rdd.filter(fn)
  • 说明:过滤算子, 可以根据函数中指定的过滤条件, 对数据进行过滤操作, 条件返回True表示保留, 返回False表示过滤掉
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 需求: 请将 <=3的数据过滤掉
rdd_collect = rdd.filter(lambda num: num > 3).collect()
print(rdd_collect)结果:[4, 5, 6, 7, 8, 9, 10]

flatMap算子
  • 格式:rdd.flatMap(fn)
  • 说明:在map算子的基础上, 在加入一个压扁的操作, 主要适用于一行中包含多个内容的操作, 实现一转多的操作
rdd = sc.parallelize(['张三 李四 王五 赵六','田七 周八 李九'])# 需求: 将其转换为一个个的姓名
rdd_collect = rdd.flatMap(lambda line: line.split()).collect()
print(rdd_collect)结果:['张三', '李四', '王五', '赵六', '田七', '周八', '李九']

双值类型的算子

union算子
  • 格式:rdd1.union(rdd2)
  • 说明:取两组数据的并集
rdd1 = sc.parallelize([3,1,5,7,9])
rdd2 = sc.parallelize([5,8,2,4,0])# 需求: 取两组数据的并集
rdd1.union(rdd2).collect()结果:[3, 1, 5, 7, 9, 5, 8, 2, 4, 0]# 去重操作: 
rdd1.union(rdd2).distinct().collect()结果:[8, 4, 0, 1, 5, 9, 2, 3, 7]

intersection算子
  • 格式:rdd1.intersection(rdd2)
  • 说明:取两组数据的交集
rdd1.intersection(rdd2).collect()结果:[5]

KV类型的算子

groupByKey算子:
  • 格式: groupByKey()
  • 说明: 根据key进行分组操作
rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c03','赵六'),('c02','田七'),('c02','周八'),('c03','李九')])# 需求: 根据班级分组统计
rdd_collect = rdd.groupByKey().mapValues(list).collect()
print(rdd_collect)结果:[('c01', ['张三']), ('c02', ['李四', '王五', '田七', '周八']), ('c03', ['赵六', '李九'])]

reduceByKey()
  • 格式:  reduceByKey(fn)
  • 说明: 根据key进行分组, 将一个组内的value数据放置到一个列表中, 对这个列表基于 传入函数进行聚合计算操作
rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c03','赵六'),('c02','田七'),('c02','周八'),('c03','李九')])# 需求: 统计每个班级有多少个人
rdd_collect = rdd.map(lambda kv: (kv[0],1)).reduceByKey(lambda agg, curr: agg + curr).collect()
print(rdd_collect)结果:[('c01', 1), ('c02', 4), ('c03', 2)]# 如果不转为1:
rdd.reduceByKey(lambda agg,curr: agg + curr).collect()    
结果: [('c01', '张三'), ('c02', '李四王五田七周八'), ('c03', '赵六李九')]

sortByKey()算子
  • 格式: sortByKey(ascending = True|False)
  • 说明: 根据key进行排序操作, 默认按照key进行升序排序, 如果需要倒序, 设置 ascending  为False
rdd = sc.parallelize([('c03','张三'),('c05','李四'),('c011','王五'),('c09','赵六'),('c02','田七'),('c07','周八'),('c06','李九')])# 根据班级序号排序
rdd.sortByKey().collect()结果: 字典序 如果key是字符串[('c011', '王五'), ('c02', '田七'), ('c03', '张三'), ('c05', '李四'), ('c06', '李九'), ('c07', '周八'), ('c09', '赵六')]

动作算子

collect() 算子

  • 格式: collect()
  • 作用: 收集各个分区的数据, 将数据汇总到一个大的列表返回

reduce() 算子

  • 格式: reduce(fn)
  • 作用: 根据传入的函数对数据进行聚合操作
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 求第1数累加到最后一个数的和
rdd.reduce(lambda agg,curr: agg + curr)结果:55

first()算子

  • 格式: first()
  • 说明: 获取第一个元素
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 获取数据集中的第一个元素
rdd.first()结果:1

take() 算子

  • 格式: take(N)
  • 说明: 获取前N个元素, 类似于limit操作
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 获取数据集中的前5个元素
rdd.take(5)结果
[1, 2, 3, 4, 5]

top() 算子

  • 格式: top(N, [fn])
  • 说明: 对数据集进行倒序排序操作, 如果是kv类型, 默认是针对key进行排序, 获取前N个元素
  • fn: 可以自定义排序, 根据谁来排序
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])rdd.top(3)
结果:
[10, 9, 8]rdd = sc.parallelize([('c03','张三'),('c05','李四'),('c011','王五'),('c09','赵六'),('c02','田七'),('c07','周八'),('c06','李九')])rdd.top(3)
结果:
[('c09', '赵六'), ('c07', '周八'), ('c06', '李九')]rdd = sc.parallelize([('c03',5),('c05',9),('c011',2),('c09',6),('c02',80),('c07',12),('c06',10)])rdd.top(3,lambda kv: kv[1])
结果:
[('c02', 80), ('c07', 12), ('c06', 10)]

count()算子

  • 格式: count()
  • 说明: 统计多少个
rdd = sc.parallelize([('c03',5),('c05',9),('c011',2),('c09',6),('c02',80),('c07',12),('c06',10)])rdd.count()
结果:7

foreach()算子

  • 格式: foreach(fn)
  • 说明: 对数据集进行遍历操作, 遍历后做什么, 取决于传入的函数
rdd = sc.parallelize([('c03',5),('c05',9),('c011',2),('c09',6),('c02',80),('c07',12),('c06',10)])rdd.foreach(lambda kv: print(kv))
结果:('c03', 5)('c05', 9)('c011', 2)('c09', 6)('c02', 80)('c07', 12)('c06', 10)

takeSample()算子

  • 格式: takeSample(True|False, N,seed(种子值))
    • 参数1: 是否允许重复采样
    • 参数2: 采样多少个, 如果允许重复采样, 采样个数不限制, 否则最多等于本身数量个数
    • 参数3: 设置种子值, 值可以随便写, 一旦写死了, 表示每次采样的内容也是固定的(可选的) 如果没有特殊需要, 一般不设置
  • 作用: 数据抽样
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])rdd.takeSample(True,5)
[9, 9, 4, 8, 9]
rdd.takeSample(True,5)
[3, 8, 1, 3, 9]
rdd.takeSample(False,5)
[6, 1, 8, 7, 3]
rdd.takeSample(False,5)
[5, 7, 6, 3, 8]
rdd.takeSample(False,20)
[2, 10, 7, 5, 8, 9, 3, 4, 6, 1]
rdd.takeSample(False,5) 
[8, 3, 10, 7, 9]rdd.takeSample(False,5,2)  
[6, 10, 4, 5, 7]
rdd.takeSample(False,5,2)
[6, 10, 4, 5, 7]
rdd.takeSample(False,5,2)
[6, 10, 4, 5, 7]
rdd.takeSample(False,3,2)
[6, 10, 4]

相关文章:

【博学谷学习记录】超强总结,用心分享|Spark的RDD算子分类

概念 RDD&#xff08;Resilient Distributed Dataset&#xff09;叫做弹性分布式数据集&#xff0c;是Spark中最基本的数据抽象&#xff0c;代表一个不可变、可分区、里面的元素可并行计算的集合&#xff0c;它是一种抽象的数据模型&#xff0c;本身并不存储数据&#xff0c;仅…...

云原生系列之使用 prometheus监控远程主机实战

文章目录前言一. 实验环境二. 安装node_exporter2.1 node_exporter的介绍2.2 node_exporter的安装三. 在prometheus服务端配置监控远程主机3.1 在server端配置拉取node的信息3.2 重启prometheus3.3 通过浏览器查看prometheus总结前言 大家好&#xff0c;又见面了&#xff0c;我…...

2023年地方两会政府工作报告汇总(各省市23年重点工作)

新年伊始&#xff0c;全国各地两会密集召开&#xff0c;各省、市、自治区2023年政府工作报告相继出炉&#xff0c;各地经济增长预期目标均已明确。相较于2022年&#xff0c;多地经济增长目标放缓&#xff0c;经济不断向“高质量”发展优化转型。今年是二十大后的开局之年&#…...

第一章 企业管理概论

目录 一、企业及其形式 二、企业管理概述 三、企业管理理论与实践的产生与发展 四、网络时代的企业环境 五、网络时代企业管理的变革 一、企业及其形式 1、企业的概念 企业以市场为导向&#xff0c;以价值增值作为经济活动的目的&#xff1b; 企业是从事商品生产和流通的…...

独立图片服务器有什么突出之处

服务器是网络中非常重要的设施&#xff0c;承载着不同流量的访问&#xff0c;这就要求服务器具有快速的吞吐量、高稳定性和高可靠性。独立图片服务器作为独立服务器的衍生品&#xff0c;在数据利用方面的应用可以为企业在数据处理和分析方面带来一场革命。本文就将介绍独立图片…...

Linux驱动开发基础__mmap

目录 1 引入 2 内存映射现象与数据结构 3 ARM 架构内存映射简介 3.1 一级页表映射过程 3.2 二级页表映射过程 4 怎么给 APP 新建一块内存映射 4.1 mmap 调用过程 ​编辑4.2 cache 和 buffer 4.3 驱动程序要做的事 5 编程 5.1 app编程 5.2 hello_drv_test…...

若依框架---为什么把添加和更新分成两个接口

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是小童&#xff0c;Java开发工程师&#xff0c;CSDN博客博主&#xff0c;Java领域新星创作者 &#x1f4d5;系列专栏&#xff1a;前端、Java、Java中间件大全、微信小程序、微信支付、若依框架、Spring全家桶 &#x1f4…...

图论算法:Floyd算法

文章目录Floyd算法例题&#xff1a;灾后重建Floyd算法 Floyd算法用于求图中任意两点之间的最短路径&#xff0c;该算法主要运用了动态规划的思想。 思考&#xff1a; 给你几个点与边&#xff0c;可以组成一张图&#xff0c;那么如何求得任意两点之间的最短路径呢&#xff1f;…...

回顾 | .NET MAUI 跨平台应用开发 - 用 .NET MAUI 开发一个无人机应用(下)

点击蓝字关注我们编辑&#xff1a;Alan Wang排版&#xff1a;Rani Sun微软 Reactor 为帮助广开发者&#xff0c;技术爱好者&#xff0c;更好的学习 .NET Core, C#, Python&#xff0c;数据科学&#xff0c;机器学习&#xff0c;AI&#xff0c;区块链, IoT 等技术&#xff0c;将…...

部署有多个仓库的svn服务

centos7自带svn服务&#xff0c;现需要创建多个仓库&#xff0c;并实现用户读写功能 创建svn版本库 mkdir /home/svn mkdir /home/svn/confmkdir /home/svn/yk1 mkdir /home/svn/yk2 svnadmin create /home/svn/yk1 svnadmin create /home/svn/yk2 进入版本库yk1的配置文件路…...

Mapper文件注入问题

Mapper文件注入问题UserMapper that could not be found.原因分析解决方案程序正常运行&#xff0c;但是注入类爆红问题原因分析解决方法UserMapper’ that could not be found. 原因分析 撰写了mapper文件&#xff0c;但是没有注入spring容器 解决方案 添加mybatis.mapper-…...

基于微信小程序的国产动漫论坛小程序

文末联系获取源码 开发语言&#xff1a;Java 框架&#xff1a;ssm JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏览器…...

常用限流算法

简单时间窗口 算法逻辑&#xff1a;设置周期时间内的最大并发量问题&#xff1a;在周期尾端进去阈值并发后&#xff0c;进入下一周期时&#xff0c;又进入阈值并发量&#xff0c;则会出现瞬时并发量是阈值的2倍。 滑动时间窗口&#xff08;优化&#xff09; 算法逻辑&#xf…...

前端面经详解

目录 css 盒子充满屏幕 A.给div设置定位 B.设置html,body的宽高 C.相对当前屏幕高度&#xff08;强烈推荐&#xff09; 三列布局&#xff1a;左右固定&#xff0c;中间自适应 flex布局&#xff08;强烈推荐&#xff09; grid布局 magin负值法 自身浮动 绝对定位 圣…...

网页CAD开发快速入门

演示说明 提示:目前提供两种在网页中浏览编辑CAD图纸方案&#xff0c;详细说明见&#xff1a;MxDraw帮助 网页中打开CAD最简步骤&#xff1a; 第一步: 安装插件运行环境&#xff0c;下载安装(可能需要退杀毒软件)&#xff1a;https://demo.mxdraw3d.com:3562/MxDrawx86Setup…...

C#开发的OpenRA的mod.yaml文件

C#开发的OpenRA的mod.yaml文件 在OpenRA游戏里,会看到这样一段代码: Manifest LoadMod(string id, string path){IReadOnlyPackage package = null;try{if (!Directory.Exists(path)){Log.Write("debug", path + " is not a valid mod package");return …...

【ESP32+freeRTOS学习笔记-(七)中断管理】

目录1、概述2、在ISR中使用FreeRTOS中专用的API2.1 独立的用于ISR中的API2.2 关于xHigherPriorityTaskWoken 参数的初步理解3、延迟中断处理的方法-将中断中的处理推迟到任务中去4 方法一&#xff1a;用二进制信号量来同步ISR与”延时处理的任务“4.1 二进制信号量4.2 函数用法…...

【总结】1591- 从入门到精通:使用 TypeScript 开发超强的 CLI 工具

作为一名开发者&#xff0c;掌握 CLI 工具的开发能力是非常重要的。本文将指导你如何使用 TypeScript 和 CAC 库开发出功能强大的 CLI 工具。快速入门首先&#xff0c;需要先安装 Node.js 和 npm&#xff08;Node Package Manager&#xff09;&#xff0c;然后在项目目录中创建…...

【Java】int和Integer的区别?为什么有包装类?

int和Integer的区别&#xff1f;为什么有包装类&#xff1f; java是一种强类型的语言&#xff0c;所以所有的属性都必须要有一个数据类型。 PS&#xff1a;java10有了局部变量类型推导&#xff0c;可以使用var来代替某个具体的数据类型&#xff0c;但是在字节码阶段&#xff0…...

【LeetCode】石子游戏 IV [H](动态规划)

1510. 石子游戏 IV - 力扣&#xff08;LeetCode&#xff09; 一、题目 Alice 和 Bob 两个人轮流玩一个游戏&#xff0c;Alice 先手。 一开始&#xff0c;有 n 个石子堆在一起。每个人轮流操作&#xff0c;正在操作的玩家可以从石子堆里拿走 任意 非零 平方数 个石子。 如果石…...

CMake基础:构建流程详解

目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句&#xff0c;它能够让用户直接在浏览器内练习SQL的语法&#xff0c;不需要安装任何软件。 链接如下&#xff1a; sqliteviz 注意&#xff1a; 在转写SQL语法时&#xff0c;关键字之间有一个特定的顺序&#xff0c;这个顺序会影响到…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

ElasticSearch搜索引擎之倒排索引及其底层算法

文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

QT: `long long` 类型转换为 `QString` 2025.6.5

在 Qt 中&#xff0c;将 long long 类型转换为 QString 可以通过以下两种常用方法实现&#xff1a; 方法 1&#xff1a;使用 QString::number() 直接调用 QString 的静态方法 number()&#xff0c;将数值转换为字符串&#xff1a; long long value 1234567890123456789LL; …...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)

本文把滑坡位移序列拆开、筛优质因子&#xff0c;再用 CNN-BiLSTM-Attention 来动态预测每个子序列&#xff0c;最后重构出总位移&#xff0c;预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵&#xff08;S…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

NPOI Excel用OLE对象的形式插入文件附件以及插入图片

static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

Caliper 负载(Workload)详细解析

Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...