spark的学习-05
SparkSql
结构化数据与非结构化数据
结构化数据就类似于excel表中的数据(统计的都是结构化的数据)一般都使用sparkSql处理结构化的数据
结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc
结构化的表:数据库中表的数据:MySQL、Oracle、Hive
我们在sparkcore中导入数据使用的是textFile,而在sparksql中怎么导入数据呢
使用的是DataFrame进行数据的导入

将一些结构化的数据进行sql查询,需要将数据变为表,是表就必须有表结构,表结构就是Schema。
一个经典的wordcount案例:
代码如下:(里面有sql和dsl两种写法)
import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate()print(spark)# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()spark.stop()
以上的代码还可以使用with进行优化
补充:
with的作用: 我们在创建对象的时候,经常需要关闭(close、stop) 如果忘记关闭,太多对象的话就会影响性能,使用with自动帮我们关闭
什么时候可以使用with呢
源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化
优化过后的代码: (此时就不需要在手动stop关闭了)
import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()
一个案例:
需求:统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数。
-
电影评分数据:datas/movie/ratings.dat【用户id、电影id、评分、评分时间】
数据如下:
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
-
电影信息数据:datas/movie/movies.dat【电影id、电影名称、分类】
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
首先,给定的数据不是我们所经常使用的格式化数据,所以需要先将数据进行格式化
可以使用RDD的算子将数据改为我们想要的格式化数据
也可以直接利用sql,将非格式化的数据修改为我们需要的格式的数据
写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例
使用RDD+SparkSQL
代码如下:
import os
import refrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("MovieTop10").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:print(spark)rating_df = spark.sparkContext.textFile("../../datas/movie/ratings.dat").map(lambda line:re.split("::",line)) \.filter(lambda item:len(item) == 4).map(lambda item:(item[0],item[1],item[2],item[3])) \.toDF(["user_id","movie_id","score","score_time"]).createOrReplaceTempView("rating")# spark.sql("""# select * from rating# """).show()movie_df = spark.sparkContext.textFile("../../datas/movie/movies.dat") \.map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2])) \.toDF(["movie_id", "movie_name", "movie_categry"]).createOrReplaceTempView("movie")# spark.sql("""# select * from movie# """).show(truncate=False)#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数spark.sql("""select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000 order by avgRate desc limit 10""").show(truncate=False)# 保留两位小数后,结果可能有重复的,想要获取重复排名也只算一位的可以使用排名函数,dense_rank()spark.sql("""with t as (select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000),t2 as (select *,dense_rank() over(order by avgRate desc) paiming from t) select * from t2 where paiming <= 10""").show()
复习 排名函数:
1、row_number()
row_number从1开始,按照顺序,生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列
效果如下:
98 1
97 2
97 3
96 4
95 5
95 6没有并列名次情况,顺序递增
2、rank()
生成数据项在分组中的排名,排名相等会在名次中留下空位
效果如下:
98 1
97 2
97 2
96 4
95 5
95 5
94 7
有并列名次情况,顺序跳跃递增
3、dense_rank()
生成数据项在分组中的排名,排名相等会在名次中不会留下空位
效果如下:
98 1
97 2
97 2
96 3
95 4
95 4
94 5
有并列名次情况,顺序递增
只使用 SparkSQL:
以上是RDD + sparkSQL的写法, 还可以通过 sparkSQL的写法硬写出来
通过split()方法,根据非格式化数据的分隔符,将数据切成我们需要的DataFrame类型的数据
df1 = spark.read.text("../../datas/movie/movies.dat").createOrReplaceTempView("movie1")
df2 = spark.read.text("../../datas/movie/ratings.dat").createOrReplaceTempView("rating1")#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1)select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000 order by avgRote desc limit 10
""").show(truncate=False)# 同样也可以写成排名函数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1),t as ( select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000),t2 as ( select *,dense_rank() over(order by avgRote desc) paiming from t)select * from t2 where paiming <= 10
""").show(truncate=False)相关文章:
spark的学习-05
SparkSql 结构化数据与非结构化数据 结构化数据就类似于excel表中的数据(统计的都是结构化的数据)一般都使用sparkSql处理结构化的数据 结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc 结构化的表:…...
SQL注入(SQL Injection)详解
SQL注入(SQL Injection)是一种代码注入技术,它通过在应用程序的输入字段中插入或“注入”恶意的SQL语句,从而操控后端数据库服务器执行非预期的命令。这种攻击方式常用于绕过应用程序的安全措施,未经授权地访问、修改或…...
深入解析 OpenHarmony 构建系统-2-目录结构与核心组件
引言 OpenHarmony作为一款面向全场景的分布式操作系统,其构建系统在开发过程中扮演着至关重要的角色。本文将详细介绍OpenHarmony构建系统的目录结构和核心组件,帮助开发者更好地理解和使用这一强大的工具。 目录结构概览 以下是OpenHarmony构建系统的目录结构,每个目录和…...
网络安全应急响应(归纳)
目录 一、概述二、理论 系统排查 系统基本信息 windowsLinux用户信息 WindowsLinux启动项:开机系统在前台或者后台运行的程序,是病毒等实现持久化驻留的常用方法。 WindowsLinux任务计划:由于很多计算机都会自动加载“任务计划”,…...
【网络协议栈】网络层(上)网络层的基本理解、IP协议格式、网络层分组(内附手画分析图 简单易懂)
绪论 “It does not matter how slowly you go as long as you do not stop.”。本章是自上而下的进入网络协议栈的第三个篇幅–网络层–,本章我将带你了解网络层,以及网络层中非常重要的IP协议格式和网络层的分片组装问题,后面将持续更新网…...
数据库类型介绍
1. 关系型数据库(RDBMS) 关系型数据库是最常见的一类数据库,它们通过表(Table)来存储数据,表之间通过关系(如主键和外键)来关联。 • MySQL:开源的关系型数据库管理系统&…...
一步一步从asp.net core mvc中访问asp.net core WebApi
"从asp.net core mvc中访问asp.net core WebApi"看到这个标题是不是觉得很绕口啊,但的确就是要讲一讲这样的访问。前面我们介绍了微信小程序访问asp.net core webapi(感兴趣的童鞋可以看看前面的博文有关WEBAPI的搭建),这里我们重点不关心如何…...
linux中kubectl命令使用
一.命令介绍 kubectl 是 Kubernetes 集群管理的命令行工具,用于与 Kubernetes API 交互。你可以通过它来管理和操作 Kubernetes 集群中的资源,如 Pod、Deployment、Service 等。下面是如何在不同操作系统上下载和使用 kubectl 的方法。 二.下载 kubect…...
Linux 系统结构
Linux系统一般有4个主要部分:内核、shell、文件系统和应用程序。内核、shell和文件系统一起形成了基本的操作系统结构,它们使得用户可以运行程序、管理文件并使用系统。 1. linux内核 内核是操作系统的核心,具有很多最基本功能,它…...
ESP32-S3设备智能化升级,物联网无线AI语音交互,让生活更加便捷和有趣
在人工智能和物联网技术的推动下,无线AI语音交互技术正在成为智能设备的新选择。这种技术的发展,不仅改变了我们与设备的沟通方式,更开启了一个新的智能交互方案。 想象一下,通过简单的语音指令,就能控制家中的灯光、…...
Python的函数(补充浅拷贝和深拷贝)
一、定义 函数的定义:实现【特定功能】的代码块。 形参:函数定义时的参数,没有实际意义 实参:函数调用/使用时的参数,有实际意义 函数的作用: 简化代码提高代码重用性便于维护和修改提高代码的可扩展性…...
oracle查询字段类型长度等字段信息
1.查询oracle数据库的字符集 SELECT * FROM NLS_DATABASE_PARAMETERS WHERE PARAMETER NLS_CHARACTERSET; 2.查询字段长度类型 SELECT * FROM user_tab_columns WHERE table_name user AND COLUMN_NAME SNAME 请确保将user替换为您想要查询的表名。sname为字段名 这里的字…...
C语言 | Leetcode C语言题解之第559题N叉树的最大深度
题目: 题解: /*** Definition for a Node.* struct Node {* int val;* int numChildren;* struct Node** children;* };*/int maxDepth(struct Node* root) {if (!root) {return 0;}int depth 0;// 创建空队列const int qCap 10e4 1;str…...
光流法(Optical Flow)
一、简介 光流法(Optical Flow)是一种用于检测图像序列中像素运动的计算机视觉技术。其基于以下假设: 1.亮度恒定性假设:物体在运动过程中,其像素值在不同帧中保持不变。 2.空间和时间上的连续性:相邻像素之…...
Rancher的安装
1. 概览 1.1 用户界面优势 Rancher 提供了一个直观的图形用户界面(GUI)。对于不熟悉 Kubernetes 复杂的命令行操作(如使用kubectl)的用户来说,通过 Rancher 的界面可以方便地进行资源管理。例如,用户可以在…...
【Linux】获得同一子网下当前在线设备IP/Latency/MAC 通过nmap指定CIDR扫描当前在线设备
【Linux】获得同一子网下当前在线设备IP/Latency/MAC 通过nmap指定CIDR扫描当前在线设备 通过路由器的后台,查看当前在线设备,受到网卡版本的影响,有时会有部分设备看不见MAC和分配的IP。此时,可以借助命令行工具扫描子网下所有连…...
Ubuntu22.04安装DataEase
看到DataEase的驾驶舱,感觉比PowerBI要好用一点,于是搭建起来玩玩。Dataease推荐的操作系统是Ubuntu22.04/Centos 7。 下载了Ubuntu22.04和DataEase 最新版本的离线安装包 一.安装ubuntu22.04 在安装的时候,没有顺手设置IP地址信息ÿ…...
Taro React-Native IOS 打包发布
http网络请求不到 配置 fix react-native facebook::flipper::SocketCertificateProvider‘ (aka ‘int‘) is not a function or func_rn运行debug提示flipper-CSDN博客 Xcode 15(iOS17)编译适配报错_no template named function in namespace std-CS…...
【卷积神经网络CNN】基于深度学习动物图像识别系统(完整系统源码+数据库+开发笔记+详细部署教程+启动教程)✅
目录 【卷积神经网络CNN】基于深度学习动物图像识别系统(完整系统源码数据库开发笔记详细部署教程启动教程)✅ 一、项目背景 二、项目目标 三、项目创新点 四、项目功能 五、开发技术介绍 六、数据库设计 七、启动步骤 八、项目功能展示 九、开…...
图像处理椒盐噪声
椒盐噪声,也称为脉冲噪声,是图像中经常见到的一种噪声。它是一种随机出现的白点或者黑点,可能是亮的区域有黑色像素或是在暗的区域有白色像素(或是两者皆有)。这些白点和黑点会在图像中随机分布,导致图像中…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...
自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...
Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理
引言 Bitmap(位图)是Android应用内存占用的“头号杀手”。一张1080P(1920x1080)的图片以ARGB_8888格式加载时,内存占用高达8MB(192010804字节)。据统计,超过60%的应用OOM崩溃与Bitm…...
SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...
SpringAI实战:ChatModel智能对话全解
一、引言:Spring AI 与 Chat Model 的核心价值 🚀 在 Java 生态中集成大模型能力,Spring AI 提供了高效的解决方案 🤖。其中 Chat Model 作为核心交互组件,通过标准化接口简化了与大语言模型(LLM࿰…...
使用SSE解决获取状态不一致问题
使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件,这个上传文件是整体功能的一部分,文件在上传的过程中…...
Mysql故障排插与环境优化
前置知识点 最上层是一些客户端和连接服务,包含本 sock 通信和大多数jiyukehuduan/服务端工具实现的TCP/IP通信。主要完成一些简介处理、授权认证、及相关的安全方案等。在该层上引入了线程池的概念,为通过安全认证接入的客户端提供线程。同样在该层上可…...
高分辨率图像合成归一化流扩展
大家读完觉得有帮助记得关注和点赞!!! 1 摘要 我们提出了STARFlow,一种基于归一化流的可扩展生成模型,它在高分辨率图像合成方面取得了强大的性能。STARFlow的主要构建块是Transformer自回归流(TARFlow&am…...
DAY 45 超大力王爱学Python
来自超大力王的友情提示:在用tensordoard的时候一定一定要用绝对位置,例如:tensorboard --logdir"D:\代码\archive (1)\runs\cifar10_mlp_experiment_2" 不然读取不了数据 知识点回顾: tensorboard的发展历史和原理tens…...
