spark的学习-05
SparkSql
结构化数据与非结构化数据
结构化数据就类似于excel表中的数据(统计的都是结构化的数据)一般都使用sparkSql处理结构化的数据
结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc
结构化的表:数据库中表的数据:MySQL、Oracle、Hive
我们在sparkcore中导入数据使用的是textFile,而在sparksql中怎么导入数据呢
使用的是DataFrame进行数据的导入
将一些结构化的数据进行sql查询,需要将数据变为表,是表就必须有表结构,表结构就是Schema。
一个经典的wordcount案例:
代码如下:(里面有sql和dsl两种写法)
import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate()print(spark)# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()spark.stop()
以上的代码还可以使用with进行优化
补充:
with的作用: 我们在创建对象的时候,经常需要关闭(close、stop) 如果忘记关闭,太多对象的话就会影响性能,使用with自动帮我们关闭
什么时候可以使用with呢
源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化
优化过后的代码: (此时就不需要在手动stop关闭了)
import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()
一个案例:
需求:统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数。
-
电影评分数据:datas/movie/ratings.dat【用户id、电影id、评分、评分时间】
数据如下:
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
-
电影信息数据:datas/movie/movies.dat【电影id、电影名称、分类】
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
首先,给定的数据不是我们所经常使用的格式化数据,所以需要先将数据进行格式化
可以使用RDD的算子将数据改为我们想要的格式化数据
也可以直接利用sql,将非格式化的数据修改为我们需要的格式的数据
写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例
使用RDD+SparkSQL
代码如下:
import os
import refrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("MovieTop10").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:print(spark)rating_df = spark.sparkContext.textFile("../../datas/movie/ratings.dat").map(lambda line:re.split("::",line)) \.filter(lambda item:len(item) == 4).map(lambda item:(item[0],item[1],item[2],item[3])) \.toDF(["user_id","movie_id","score","score_time"]).createOrReplaceTempView("rating")# spark.sql("""# select * from rating# """).show()movie_df = spark.sparkContext.textFile("../../datas/movie/movies.dat") \.map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2])) \.toDF(["movie_id", "movie_name", "movie_categry"]).createOrReplaceTempView("movie")# spark.sql("""# select * from movie# """).show(truncate=False)#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数spark.sql("""select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000 order by avgRate desc limit 10""").show(truncate=False)# 保留两位小数后,结果可能有重复的,想要获取重复排名也只算一位的可以使用排名函数,dense_rank()spark.sql("""with t as (select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000),t2 as (select *,dense_rank() over(order by avgRate desc) paiming from t) select * from t2 where paiming <= 10""").show()
复习 排名函数:
1、row_number()
row_number从1开始,按照顺序,生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列
效果如下:
98 1
97 2
97 3
96 4
95 5
95 6没有并列名次情况,顺序递增
2、rank()
生成数据项在分组中的排名,排名相等会在名次中留下空位
效果如下:
98 1
97 2
97 2
96 4
95 5
95 5
94 7
有并列名次情况,顺序跳跃递增
3、dense_rank()
生成数据项在分组中的排名,排名相等会在名次中不会留下空位
效果如下:
98 1
97 2
97 2
96 3
95 4
95 4
94 5
有并列名次情况,顺序递增
只使用 SparkSQL:
以上是RDD + sparkSQL的写法, 还可以通过 sparkSQL的写法硬写出来
通过split()方法,根据非格式化数据的分隔符,将数据切成我们需要的DataFrame类型的数据
df1 = spark.read.text("../../datas/movie/movies.dat").createOrReplaceTempView("movie1")
df2 = spark.read.text("../../datas/movie/ratings.dat").createOrReplaceTempView("rating1")#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1)select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000 order by avgRote desc limit 10
""").show(truncate=False)# 同样也可以写成排名函数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1),t as ( select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000),t2 as ( select *,dense_rank() over(order by avgRote desc) paiming from t)select * from t2 where paiming <= 10
""").show(truncate=False)
相关文章:

spark的学习-05
SparkSql 结构化数据与非结构化数据 结构化数据就类似于excel表中的数据(统计的都是结构化的数据)一般都使用sparkSql处理结构化的数据 结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc 结构化的表:…...
SQL注入(SQL Injection)详解
SQL注入(SQL Injection)是一种代码注入技术,它通过在应用程序的输入字段中插入或“注入”恶意的SQL语句,从而操控后端数据库服务器执行非预期的命令。这种攻击方式常用于绕过应用程序的安全措施,未经授权地访问、修改或…...
深入解析 OpenHarmony 构建系统-2-目录结构与核心组件
引言 OpenHarmony作为一款面向全场景的分布式操作系统,其构建系统在开发过程中扮演着至关重要的角色。本文将详细介绍OpenHarmony构建系统的目录结构和核心组件,帮助开发者更好地理解和使用这一强大的工具。 目录结构概览 以下是OpenHarmony构建系统的目录结构,每个目录和…...
网络安全应急响应(归纳)
目录 一、概述二、理论 系统排查 系统基本信息 windowsLinux用户信息 WindowsLinux启动项:开机系统在前台或者后台运行的程序,是病毒等实现持久化驻留的常用方法。 WindowsLinux任务计划:由于很多计算机都会自动加载“任务计划”,…...

【网络协议栈】网络层(上)网络层的基本理解、IP协议格式、网络层分组(内附手画分析图 简单易懂)
绪论 “It does not matter how slowly you go as long as you do not stop.”。本章是自上而下的进入网络协议栈的第三个篇幅–网络层–,本章我将带你了解网络层,以及网络层中非常重要的IP协议格式和网络层的分片组装问题,后面将持续更新网…...

数据库类型介绍
1. 关系型数据库(RDBMS) 关系型数据库是最常见的一类数据库,它们通过表(Table)来存储数据,表之间通过关系(如主键和外键)来关联。 • MySQL:开源的关系型数据库管理系统&…...

一步一步从asp.net core mvc中访问asp.net core WebApi
"从asp.net core mvc中访问asp.net core WebApi"看到这个标题是不是觉得很绕口啊,但的确就是要讲一讲这样的访问。前面我们介绍了微信小程序访问asp.net core webapi(感兴趣的童鞋可以看看前面的博文有关WEBAPI的搭建),这里我们重点不关心如何…...
linux中kubectl命令使用
一.命令介绍 kubectl 是 Kubernetes 集群管理的命令行工具,用于与 Kubernetes API 交互。你可以通过它来管理和操作 Kubernetes 集群中的资源,如 Pod、Deployment、Service 等。下面是如何在不同操作系统上下载和使用 kubectl 的方法。 二.下载 kubect…...

Linux 系统结构
Linux系统一般有4个主要部分:内核、shell、文件系统和应用程序。内核、shell和文件系统一起形成了基本的操作系统结构,它们使得用户可以运行程序、管理文件并使用系统。 1. linux内核 内核是操作系统的核心,具有很多最基本功能,它…...

ESP32-S3设备智能化升级,物联网无线AI语音交互,让生活更加便捷和有趣
在人工智能和物联网技术的推动下,无线AI语音交互技术正在成为智能设备的新选择。这种技术的发展,不仅改变了我们与设备的沟通方式,更开启了一个新的智能交互方案。 想象一下,通过简单的语音指令,就能控制家中的灯光、…...

Python的函数(补充浅拷贝和深拷贝)
一、定义 函数的定义:实现【特定功能】的代码块。 形参:函数定义时的参数,没有实际意义 实参:函数调用/使用时的参数,有实际意义 函数的作用: 简化代码提高代码重用性便于维护和修改提高代码的可扩展性…...

oracle查询字段类型长度等字段信息
1.查询oracle数据库的字符集 SELECT * FROM NLS_DATABASE_PARAMETERS WHERE PARAMETER NLS_CHARACTERSET; 2.查询字段长度类型 SELECT * FROM user_tab_columns WHERE table_name user AND COLUMN_NAME SNAME 请确保将user替换为您想要查询的表名。sname为字段名 这里的字…...

C语言 | Leetcode C语言题解之第559题N叉树的最大深度
题目: 题解: /*** Definition for a Node.* struct Node {* int val;* int numChildren;* struct Node** children;* };*/int maxDepth(struct Node* root) {if (!root) {return 0;}int depth 0;// 创建空队列const int qCap 10e4 1;str…...

光流法(Optical Flow)
一、简介 光流法(Optical Flow)是一种用于检测图像序列中像素运动的计算机视觉技术。其基于以下假设: 1.亮度恒定性假设:物体在运动过程中,其像素值在不同帧中保持不变。 2.空间和时间上的连续性:相邻像素之…...

Rancher的安装
1. 概览 1.1 用户界面优势 Rancher 提供了一个直观的图形用户界面(GUI)。对于不熟悉 Kubernetes 复杂的命令行操作(如使用kubectl)的用户来说,通过 Rancher 的界面可以方便地进行资源管理。例如,用户可以在…...
【Linux】获得同一子网下当前在线设备IP/Latency/MAC 通过nmap指定CIDR扫描当前在线设备
【Linux】获得同一子网下当前在线设备IP/Latency/MAC 通过nmap指定CIDR扫描当前在线设备 通过路由器的后台,查看当前在线设备,受到网卡版本的影响,有时会有部分设备看不见MAC和分配的IP。此时,可以借助命令行工具扫描子网下所有连…...

Ubuntu22.04安装DataEase
看到DataEase的驾驶舱,感觉比PowerBI要好用一点,于是搭建起来玩玩。Dataease推荐的操作系统是Ubuntu22.04/Centos 7。 下载了Ubuntu22.04和DataEase 最新版本的离线安装包 一.安装ubuntu22.04 在安装的时候,没有顺手设置IP地址信息ÿ…...

Taro React-Native IOS 打包发布
http网络请求不到 配置 fix react-native facebook::flipper::SocketCertificateProvider‘ (aka ‘int‘) is not a function or func_rn运行debug提示flipper-CSDN博客 Xcode 15(iOS17)编译适配报错_no template named function in namespace std-CS…...

【卷积神经网络CNN】基于深度学习动物图像识别系统(完整系统源码+数据库+开发笔记+详细部署教程+启动教程)✅
目录 【卷积神经网络CNN】基于深度学习动物图像识别系统(完整系统源码数据库开发笔记详细部署教程启动教程)✅ 一、项目背景 二、项目目标 三、项目创新点 四、项目功能 五、开发技术介绍 六、数据库设计 七、启动步骤 八、项目功能展示 九、开…...
图像处理椒盐噪声
椒盐噪声,也称为脉冲噪声,是图像中经常见到的一种噪声。它是一种随机出现的白点或者黑点,可能是亮的区域有黑色像素或是在暗的区域有白色像素(或是两者皆有)。这些白点和黑点会在图像中随机分布,导致图像中…...

K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...

cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
生成 Git SSH 证书
🔑 1. 生成 SSH 密钥对 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 参数说明: -t rsa&#x…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...

CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...