【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 )
文章目录
- 一、RDD#sortBy 方法
- 1、RDD#sortBy 语法简介
- 2、RDD#sortBy 传入的函数参数分析
- 二、代码示例 - RDD#sortBy 示例
- 1、需求分析
- 2、代码示例
- 3、执行结果
一、RDD#sortBy 方法
1、RDD#sortBy 语法简介
RDD#sortBy 方法 用于 按照 指定的 键 对 RDD 中的元素进行排序 , 该方法 接受一个 函数 作为 参数 , 该函数从 RDD 中的每个元素提取 排序键 ;
根据 传入 sortBy 方法 的 函数参数 和 其它参数 , 将 RDD 中的元素按 升序 或 降序 进行排序 , 同时还可以指定 新的 RDD 对象的 分区数 ;
RDD#sortBy 语法 :
sortBy(f: (T) ⇒ U, ascending: Boolean, numPartitions: Int): RDD[T]
- 参数说明 :
- f: (T) ⇒ U 参数 : 函数 或 lambda 匿名函数 , 用于 指定 RDD 中的每个元素 的 排序键 ;
- ascending: Boolean 参数 : 排序的升降设置 , True 生序排序 , False 降序排序 ;
- numPartitions: Int 参数 : 设置 排序结果 ( 新的 RDD 对象 ) 中的 分区数 ;
- 当前没有接触到分布式 , 将该参数设置为 1 即可 , 排序完毕后是全局有序的 ;
- 返回值说明 : 返回一个新的 RDD 对象 , 其中的元素是 按照指定的 排序键 进行排序的结果 ;
2、RDD#sortBy 传入的函数参数分析
RDD#sortBy 传入的函数参数 类型为 :
(T) ⇒ U
T 是泛型 , 表示传入的参数类型可以是任意类型 ;
U 也是泛型 , 表示 函数 返回值 的类型 可以是任意类型 ;
T 类型的参数 和 U 类型的返回值 , 可以是相同的类型 , 也可以是不同的类型 ;
二、代码示例 - RDD#sortBy 示例
1、需求分析
统计 文本文件 word.txt 中出现的每个单词的个数 , 并且为每个单词出现的次数进行排序 ;
Tom Jerry
Tom Jerry Tom
Jack Jerry Jack Tom

读取文件中的内容 , 统计文件中单词的个数并排序 ;
思路 :
- 先 读取数据到 RDD 中 ,
- 然后 按照空格分割开 再展平 , 获取到每个单词 ,
- 根据上述单词列表 , 生成一个 二元元组 列表 , 列表中每个元素的 键 Key 为单词 , 值 Value 为 数字 1 ,
- 对上述 二元元组 列表 进行 聚合操作 , 相同的 键 Key 对应的 值 Value 进行相加 ;
- 将聚合后的结果的 单词出现次数作为 排序键 进行排序 , 按照升序进行排序 ;
2、代码示例
对 RDD 数据进行排序的核心代码如下 :
# 对 rdd4 中的数据进行排序
rdd5 = rdd4.sortBy(lambda element: element[1], ascending=True, numPartitions=1)
要排序的数据如下 :
[('Tom', 4), ('Jack', 2), ('Jerry', 3)]
按照上述二元元素的 第二个 元素 进行排序 , 对应的 lambda 表达式为 :
lambda element: element[1]
ascending=True 表示升序排序 ,
numPartitions=1 表示分区个数为 1 ;
排序后的结果为 :
[('Jack', 2), ('Jerry', 3), ('Tom', 4)]
代码示例 :
"""
PySpark 数据处理
"""# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \.setMaster("local[*]") \.setAppName("hello_spark")# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)# 将 文件 转为 RDD 对象
rdd = sparkContext.textFile("word.txt")
print("查看文件内容 : ", rdd.collect())# 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表
# 然后展平数据解除嵌套
rdd2 = rdd.flatMap(lambda element: element.split(" "))
print("查看文件内容展平效果 : ", rdd2.collect())# 将 rdd 数据 的 列表中的元素 转为二元元组, 第二个元素设置为 1
rdd3 = rdd2.map(lambda element: (element, 1))
print("转为二元元组效果 : ", rdd3.collect())# 应用 reduceByKey 操作,
# 将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
print("统计单词 : ", rdd4.collect())# 对 rdd4 中的数据进行排序
rdd5 = rdd4.sortBy(lambda element: element[1], ascending=True, numPartitions=1)
print("最终统计单词并排序 : ", rdd4.collect())# 停止 PySpark 程序
sparkContext.stop()
3、执行结果
执行结果 :
D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
23/08/04 10:49:06 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: Could not locate Hadoop executable: D:\001_Develop\052_Hadoop\hadoop-3.3.4\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
PySpark 版本号 : 3.4.1
查看文件内容 : ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry Jack Tom']
查看文件内容展平效果 : ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry', 'Jack', 'Tom']
转为二元元组效果 : [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1), ('Jack', 1), ('Tom', 1)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
统计单词 : [('Tom', 4), ('Jack', 2), ('Jerry', 3)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
最终统计单词并排序 : [('Jack', 2), ('Jerry', 3), ('Tom', 4)]Process finished with exit code 0

相关文章:
【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 )
文章目录 一、RDD#sortBy 方法1、RDD#sortBy 语法简介2、RDD#sortBy 传入的函数参数分析 二、代码示例 - RDD#sortBy 示例1、需求分析2、代码示例3、执行结果 一、RDD#sortBy 方法 1、RDD#sortBy 语法简介 RDD#sortBy 方法 用于 按照 指定的 键 对 RDD 中的元素进行排序 , 该方…...
Elasticsearch官方测试数据导入
一、数据准备 百度网盘链接 链接:https://pan.baidu.com/s/1rPZBvH-J0367yQDg9qHiwQ?pwd7n5n 提取码:7n5n文档格式 {"index":{"_id":"1"}} {"account_number":1,"balance":39225,"firstnam…...
uniapp项目的pdf文件下载与打开查看
最近写的uniapp项目需要新增一个pdf下载和打开查看功能,摸索了半天终于写了出来,现分享出来供有需要的同行参考,欢迎指正 async function DownloadSignature() {//请求后端接口,返回值为一个url地址let resawait req.flow.flowDo…...
DeepVO 论文阅读
论文信息 题目:DeepVO Towards End-to-End Visual Odometry with Deep Recurrent Convolutional Neural Networks 作者:Sen Wang, Ronald Clark, Hongkai Wen and Niki Trigoni 代码地址:http://senwang.gitlab.io/DeepVO/ (原作者并没有开源…...
HOT71-字符串解码
leetcode原题链接: 字符串解码 题目描述 给定一个经过编码的字符串,返回它解码后的字符串。 编码规则为: k[encoded_string],表示其中方括号内部的 encoded_string 正好重复 k 次。注意 k 保证为正整数。你可以认为输入字符串总是有效的;输…...
redis-server进程无法关闭终极解决方案
先使用命令查看6379端口情况: sudo lsof -i :6379 发现redis进程在占用,redis-server进程无论什么手段都杀不死,使用kill -9 pid杀掉pid后又卷土重来,最后找到了下面这个命令 sudo /etc/init.d/redis-server stop ok,…...
(5)将固件加载到没有ArduPilot固件的主板上
文章目录 前言 5.1 下载驱动程序和烧录工具 5.2 下载ArduPilot固件 5.3 使用测试版和开发版 5.3.1 测试版 5.3.2 最新开发版本 5.4 将固件上传到自动驾驶仪 5.5 替代方法 5.6 将固件加载到带有外部闪存的主板上 前言 ArduPilot 的最新版本(Copter-3.6, Pl…...
wpf画刷学习1
在这2篇博文有提到wpf画刷, https://blog.csdn.net/bcbobo21cn/article/details/109699703 https://blog.csdn.net/bcbobo21cn/article/details/107133703 下面单独学习一下画刷; wpf有五种画刷,也可以自定义画刷,画刷的基类都…...
Opencv C++实现yolov5部署onnx模型完成目标检测
代码分析: 头文件 #include <fstream> //文件 #include <sstream> //流 #include <iostream> #include <opencv2/dnn.hpp> //深度学习模块-仅提供推理功能 #include <opencv2/imgproc.hpp> //图像处理模块 #include &l…...
django bootstrap html实现左右布局,带折叠按钮,左侧可折叠隐藏
一、实现的效果 在django项目中,需要使用bootstrap 实现一个左右分布的布局,左侧区域可以折叠隐藏起来,使得右侧的显示区域变大。(为了区分区域,左右加了配色,不好看的修改颜色即可) 点击折叠按钮,左侧区域隐藏,右侧区域铺满: 二、实现思路 1、使用col-md属性,让左…...
Mapping温度分布验证选择数据记录仪时需要考虑的13件事
01 什么是温度分布验证? 温度分布验证是通过在规定的研究时间内测量定义区域内的多个点来确定特定温度控制环境或过程(如冷冻柜、冰箱、培养箱、稳定室、仓库或高压灭菌器)的温度分布的过程。温度分布验证的目标是确定每个测量点之间的差异&…...
【题解】 判断一个链表是否为回文结构
判断一个链表是否为回文结构 题目链接:判断一个链表是否为回文结构 解题思路1:借助数组 遍历链表将值都放在数组中,再遍历数组元素,判断该数组是否为一个回文结构 代码如下: bool isPail(ListNode* head) {ListNod…...
Microsoft Message Queuing Denial-of-Service Vulnerability
近期官方公布了一个MSMQ的拒绝服务漏洞,可能因为网络安全设备的更新,影响业务,值得大家关注。 漏洞具体描述参见如下: Name: Microsoft Message Queuing Denial-of-Service Vulnerability Description: Microsoft Message Queuing…...
软件设计师(五)软件工程基础知识
一、软件工程概述 软件开发和维护过程中所遇到的各种问题称为“软件危机”。 软件工程是指应用计算机科学、数学及管理科学等原理,以工程化的原则和方法来解决软件问题的工程,其目的是提高软件生产率、提高软件质量、降低软件成本。 #mermaid-svg-h3j6K…...
Java中的JUnit单元测试方法的使用
Java中的JUnit单元测试方法 使用步骤如下: 选中当前工程 - 右键选择:build path - add libraries - JUnit 4 - 下一步创建Java类,进行单元测试。 此时的Java类要求:① 此类是public的 ②此类提供公共的无参的构造器此类中声明单…...
一文学透设计模式——抽象工厂模式
创建者模式 抽象工厂模式 概念 抽象工厂模式是围绕一个超级工厂创建其他工厂。该超级工厂又称为其他工厂的工厂。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。 这是很多地方对于抽象工厂模式的描述,说实话感觉不是特别好懂。…...
Vue3与Vue2区别和总结(1)
在2020年9月18日,Vue.js发布3.0版本,代号:One Piece(海贼王) 既然vue2已经存在了六七年之久为什么还要研发vue3呢? 那就不得不提vue3带来的提升了 1.Vue3带来了什么 1.性能的提升 打包大小减少41% 初次…...
【华秋推荐】物联网入门学习模块 ESP8266
随着全球信息技术的不断进步和普及,物联网成为当今备受关注的技术热点之一。通过物理和数字设备之间的连接来实现自动化和互联互通的网络。无线传感器、云计算和大数据分析等技术,物联网使设备能够相互交流和共享信息,实现智能化的自动化操作…...
本科专科毕业论文如何选题-附1000多论文题目-论文选题--【毕业论文】
文章目录 本系列校训毕设的技术铺垫论文选题选题目的和意义:选题举例参考文献 配套资源 本系列校训 互相伤害互相卷,玩命学习要你管,天生我才必有用,我命由我不由天! 毕业论文不怕难,毕业设计来铺垫&#…...
pip安装jupyter notebook
之前电脑安装了anaconda,里面安装了jupyter notebook,用来做PPT之类的展示总让我觉得有点“炫酷”。 现在换了新电脑。没有anaconda,纯粹只是装了python3.11,然后突然也想手工安装下jupyter notebook,于是只能通过pip方…...
业务系统对接大模型的基础方案:架构设计与关键步骤
业务系统对接大模型:架构设计与关键步骤 在当今数字化转型的浪潮中,大语言模型(LLM)已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中,不仅可以优化用户体验,还能为业务决策提供…...
[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解
突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 安全措施依赖问题 GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
