当前位置: 首页 > news >正文

spark的学习-05

SparkSql

结构化数据与非结构化数据

结构化数据就类似于excel表中的数据(统计的都是结构化的数据)一般都使用sparkSql处理结构化的数据

结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc

结构化的表:数据库中表的数据:MySQL、Oracle、Hive

我们在sparkcore中导入数据使用的是textFile,而在sparksql中怎么导入数据呢

使用的是DataFrame进行数据的导入

将一些结构化的数据进行sql查询,需要将数据变为表,是表就必须有表结构,表结构就是Schema

一个经典的wordcount案例:

代码如下:(里面有sql和dsl两种写法)

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate()print(spark)# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()spark.stop()

以上的代码还可以使用with进行优化

补充:

with的作用: 我们在创建对象的时候,经常需要关闭(close、stop) 如果忘记关闭,太多对象的话就会影响性能,使用with自动帮我们关闭

什么时候可以使用with呢

源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化

优化过后的代码: (此时就不需要在手动stop关闭了)

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()

一个案例:

需求:统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数。

  • 电影评分数据:datas/movie/ratings.dat【用户id、电影id、评分、评分时间】

数据如下:

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
  • 电影信息数据:datas/movie/movies.dat【电影id、电影名称、分类】

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action

首先,给定的数据不是我们所经常使用的格式化数据,所以需要先将数据进行格式化

可以使用RDD的算子将数据改为我们想要的格式化数据

也可以直接利用sql,将非格式化的数据修改为我们需要的格式的数据

写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例

使用RDD+SparkSQL

代码如下:

import os
import refrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("MovieTop10").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:print(spark)rating_df = spark.sparkContext.textFile("../../datas/movie/ratings.dat").map(lambda line:re.split("::",line)) \.filter(lambda item:len(item) == 4).map(lambda item:(item[0],item[1],item[2],item[3])) \.toDF(["user_id","movie_id","score","score_time"]).createOrReplaceTempView("rating")# spark.sql("""#     select * from rating# """).show()movie_df = spark.sparkContext.textFile("../../datas/movie/movies.dat") \.map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2])) \.toDF(["movie_id", "movie_name", "movie_categry"]).createOrReplaceTempView("movie")# spark.sql("""#     select * from movie# """).show(truncate=False)#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数spark.sql("""select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000 order by avgRate desc limit 10""").show(truncate=False)# 保留两位小数后,结果可能有重复的,想要获取重复排名也只算一位的可以使用排名函数,dense_rank()spark.sql("""with t as (select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000),t2 as (select *,dense_rank() over(order by avgRate desc) paiming from t) select * from t2 where paiming <= 10""").show()
复习 排名函数:
1、row_number()

row_number从1开始,按照顺序,生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列

效果如下:
98                1
97                2
97                3
96                4
95                5
95                6没有并列名次情况,顺序递增
2、rank()

生成数据项在分组中的排名,排名相等会在名次中留下空位

效果如下:
98                1
97                2
97                2
96                4
95                5
95                5
94                7
有并列名次情况,顺序跳跃递增
3、dense_rank()

生成数据项在分组中的排名,排名相等会在名次中不会留下空位

效果如下:
98                1
97                2
97                2
96                3
95                4
95                4
94                5
有并列名次情况,顺序递增
只使用 SparkSQL:

以上是RDD + sparkSQL的写法, 还可以通过 sparkSQL的写法硬写出来

通过split()方法,根据非格式化数据的分隔符,将数据切成我们需要的DataFrame类型的数据

df1 = spark.read.text("../../datas/movie/movies.dat").createOrReplaceTempView("movie1")
df2 = spark.read.text("../../datas/movie/ratings.dat").createOrReplaceTempView("rating1")#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1)select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000 order by avgRote desc limit 10
""").show(truncate=False)# 同样也可以写成排名函数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1),t as ( select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000),t2 as ( select *,dense_rank() over(order by avgRote desc) paiming from t)select * from t2 where paiming <= 10
""").show(truncate=False)

相关文章:

spark的学习-05

SparkSql 结构化数据与非结构化数据 结构化数据就类似于excel表中的数据&#xff08;统计的都是结构化的数据&#xff09;一般都使用sparkSql处理结构化的数据 结构化的文件&#xff1a;JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc 结构化的表&#xff1a;…...

SQL注入(SQL Injection)详解

SQL注入&#xff08;SQL Injection&#xff09;是一种代码注入技术&#xff0c;它通过在应用程序的输入字段中插入或“注入”恶意的SQL语句&#xff0c;从而操控后端数据库服务器执行非预期的命令。这种攻击方式常用于绕过应用程序的安全措施&#xff0c;未经授权地访问、修改或…...

深入解析 OpenHarmony 构建系统-2-目录结构与核心组件

引言 OpenHarmony作为一款面向全场景的分布式操作系统,其构建系统在开发过程中扮演着至关重要的角色。本文将详细介绍OpenHarmony构建系统的目录结构和核心组件,帮助开发者更好地理解和使用这一强大的工具。 目录结构概览 以下是OpenHarmony构建系统的目录结构,每个目录和…...

网络安全应急响应(归纳)

目录 一、概述二、理论 系统排查 系统基本信息 windowsLinux用户信息 WindowsLinux启动项&#xff1a;开机系统在前台或者后台运行的程序&#xff0c;是病毒等实现持久化驻留的常用方法。 WindowsLinux任务计划&#xff1a;由于很多计算机都会自动加载“任务计划”&#xff0c…...

【网络协议栈】网络层(上)网络层的基本理解、IP协议格式、网络层分组(内附手画分析图 简单易懂)

绪论​ “It does not matter how slowly you go as long as you do not stop.”。本章是自上而下的进入网络协议栈的第三个篇幅–网络层–&#xff0c;本章我将带你了解网络层&#xff0c;以及网络层中非常重要的IP协议格式和网络层的分片组装问题&#xff0c;后面将持续更新网…...

数据库类型介绍

1. 关系型数据库&#xff08;RDBMS&#xff09; 关系型数据库是最常见的一类数据库&#xff0c;它们通过表&#xff08;Table&#xff09;来存储数据&#xff0c;表之间通过关系&#xff08;如主键和外键&#xff09;来关联。 • MySQL&#xff1a;开源的关系型数据库管理系统&…...

一步一步从asp.net core mvc中访问asp.net core WebApi

"从asp.net core mvc中访问asp.net core WebApi"看到这个标题是不是觉得很绕口啊&#xff0c;但的确就是要讲一讲这样的访问。前面我们介绍了微信小程序访问asp.net core webapi(感兴趣的童鞋可以看看前面的博文有关WEBAPI的搭建)&#xff0c;这里我们重点不关心如何…...

linux中kubectl命令使用

一.命令介绍 kubectl 是 Kubernetes 集群管理的命令行工具&#xff0c;用于与 Kubernetes API 交互。你可以通过它来管理和操作 Kubernetes 集群中的资源&#xff0c;如 Pod、Deployment、Service 等。下面是如何在不同操作系统上下载和使用 kubectl 的方法。 二.下载 kubect…...

Linux 系统结构

Linux系统一般有4个主要部分&#xff1a;内核、shell、文件系统和应用程序。内核、shell和文件系统一起形成了基本的操作系统结构&#xff0c;它们使得用户可以运行程序、管理文件并使用系统。 1. linux内核 内核是操作系统的核心&#xff0c;具有很多最基本功能&#xff0c;它…...

ESP32-S3设备智能化升级,物联网无线AI语音交互,让生活更加便捷和有趣

在人工智能和物联网技术的推动下&#xff0c;无线AI语音交互技术正在成为智能设备的新选择。这种技术的发展&#xff0c;不仅改变了我们与设备的沟通方式&#xff0c;更开启了一个新的智能交互方案。 想象一下&#xff0c;通过简单的语音指令&#xff0c;就能控制家中的灯光、…...

Python的函数(补充浅拷贝和深拷贝)

一、定义 函数的定义&#xff1a;实现【特定功能】的代码块。 形参&#xff1a;函数定义时的参数&#xff0c;没有实际意义 实参&#xff1a;函数调用/使用时的参数&#xff0c;有实际意义 函数的作用&#xff1a; 简化代码提高代码重用性便于维护和修改提高代码的可扩展性…...

oracle查询字段类型长度等字段信息

1.查询oracle数据库的字符集 SELECT * FROM NLS_DATABASE_PARAMETERS WHERE PARAMETER NLS_CHARACTERSET; 2.查询字段长度类型 SELECT * FROM user_tab_columns WHERE table_name user AND COLUMN_NAME SNAME 请确保将user替换为您想要查询的表名。sname为字段名 这里的字…...

C语言 | Leetcode C语言题解之第559题N叉树的最大深度

题目&#xff1a; 题解&#xff1a; /*** Definition for a Node.* struct Node {* int val;* int numChildren;* struct Node** children;* };*/int maxDepth(struct Node* root) {if (!root) {return 0;}int depth 0;// 创建空队列const int qCap 10e4 1;str…...

光流法(Optical Flow)

一、简介 光流法&#xff08;Optical Flow&#xff09;是一种用于检测图像序列中像素运动的计算机视觉技术。其基于以下假设&#xff1a; 1.亮度恒定性假设&#xff1a;物体在运动过程中&#xff0c;其像素值在不同帧中保持不变。 2.空间和时间上的连续性&#xff1a;相邻像素之…...

Rancher的安装

1. 概览 1.1 用户界面优势 Rancher 提供了一个直观的图形用户界面&#xff08;GUI&#xff09;。对于不熟悉 Kubernetes 复杂的命令行操作&#xff08;如使用kubectl&#xff09;的用户来说&#xff0c;通过 Rancher 的界面可以方便地进行资源管理。例如&#xff0c;用户可以在…...

【Linux】获得同一子网下当前在线设备IP/Latency/MAC 通过nmap指定CIDR扫描当前在线设备

【Linux】获得同一子网下当前在线设备IP/Latency/MAC 通过nmap指定CIDR扫描当前在线设备 通过路由器的后台&#xff0c;查看当前在线设备&#xff0c;受到网卡版本的影响&#xff0c;有时会有部分设备看不见MAC和分配的IP。此时&#xff0c;可以借助命令行工具扫描子网下所有连…...

Ubuntu22.04安装DataEase

看到DataEase的驾驶舱&#xff0c;感觉比PowerBI要好用一点&#xff0c;于是搭建起来玩玩。Dataease推荐的操作系统是Ubuntu22.04/Centos 7。 下载了Ubuntu22.04和DataEase 最新版本的离线安装包 一.安装ubuntu22.04 在安装的时候&#xff0c;没有顺手设置IP地址信息&#xff…...

Taro React-Native IOS 打包发布

http网络请求不到 配置 fix react-native facebook::flipper::SocketCertificateProvider‘ (aka ‘int‘) is not a function or func_rn运行debug提示flipper-CSDN博客 Xcode 15&#xff08;iOS17&#xff09;编译适配报错_no template named function in namespace std-CS…...

【卷积神经网络CNN】基于深度学习动物图像识别系统(完整系统源码+数据库+开发笔记+详细部署教程+启动教程)✅

目录 【卷积神经网络CNN】基于深度学习动物图像识别系统&#xff08;完整系统源码数据库开发笔记详细部署教程启动教程&#xff09;✅ 一、项目背景 二、项目目标 三、项目创新点 四、项目功能 五、开发技术介绍 六、数据库设计 七、启动步骤 八、项目功能展示 九、开…...

图像处理椒盐噪声

椒盐噪声&#xff0c;也称为脉冲噪声&#xff0c;是图像中经常见到的一种噪声。它是一种随机出现的白点或者黑点&#xff0c;可能是亮的区域有黑色像素或是在暗的区域有白色像素&#xff08;或是两者皆有&#xff09;。这些白点和黑点会在图像中随机分布&#xff0c;导致图像中…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

Rust 异步编程

Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

Go 语言并发编程基础:无缓冲与有缓冲通道

在上一章节中&#xff0c;我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道&#xff0c;它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好&#xff0…...

【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅

目录 前言 操作系统与驱动程序 是什么&#xff0c;为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中&#xff0c;我们在使用电子设备时&#xff0c;我们所输入执行的每一条指令最终大多都会作用到硬件上&#xff0c;比如下载一款软件最终会下载到硬盘上&am…...

Ubuntu系统复制(U盘-电脑硬盘)

所需环境 电脑自带硬盘&#xff1a;1块 (1T) U盘1&#xff1a;Ubuntu系统引导盘&#xff08;用于“U盘2”复制到“电脑自带硬盘”&#xff09; U盘2&#xff1a;Ubuntu系统盘&#xff08;1T&#xff0c;用于被复制&#xff09; &#xff01;&#xff01;&#xff01;建议“电脑…...

Linux中《基础IO》详细介绍

目录 理解"文件"狭义理解广义理解文件操作的归类认知系统角度文件类别 回顾C文件接口打开文件写文件读文件稍作修改&#xff0c;实现简单cat命令 输出信息到显示器&#xff0c;你有哪些方法stdin & stdout & stderr打开文件的方式 系统⽂件I/O⼀种传递标志位…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能

指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...

【若依】框架项目部署笔记

参考【SpringBoot】【Vue】项目部署_no main manifest attribute, in springboot-0.0.1-sn-CSDN博客 多一个redis安装 准备工作&#xff1a; 压缩包下载&#xff1a;http://download.redis.io/releases 1. 上传压缩包&#xff0c;并进入压缩包所在目录&#xff0c;解压到目标…...