当前位置: 首页 > news >正文

spark.sql

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, rank, row_number, desc
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化 SparkSession 对象
spark = SparkSession.builder \.appName("Example PySpark Script with TempView and SQL") \.getOrCreate()# 定义数据结构
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True),StructField("city", StringType(), True)
])# 创建第一个 DataFrame
data1 = [("Alice", 34, "New York"),("Bob", 45, "Los Angeles"),("Cathy", 29, "San Francisco"),("David", 32, "Chicago"),("Eve", 27, "Seattle")
]
df1 = spark.createDataFrame(data=data1, schema=schema)# 创建第二个 DataFrame
data2 = [("Frank", 30, "New York"),("Grace", 38, "Los Angeles"),("Hannah", 25, "San Francisco"),("Ian", 42, "Chicago"),("Jack", 28, "Seattle")
]
df2 = spark.createDataFrame(data=data2, schema=schema)# 查看 DataFrame 结构
df1.printSchema()
df2.printSchema()# 使用 filter 过滤年龄大于等于 30 的记录
filtered_df1 = df1.filter(col("age") >= 30)
filtered_df2 = df2.filter(col("age") >= 30)# 使用 group by 计算每个城市的平均年龄
grouped_df1 = filtered_df1.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)grouped_df2 = filtered_df2.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)# 合并两个 DataFrame
merged_df = grouped_df1.union(grouped_df2)# 从合并后的 DataFrame 中随机抽取 50% 的样本
sampled_df = merged_df.sample(withReplacement=False, fraction=0.5)# 限制结果集的大小为 10 条记录
limited_df = sampled_df.limit(10)# 使用窗口函数进行排名
window_spec = Window.partitionBy("city").orderBy(desc("avg_age"))
ranked_df = limited_df.withColumn("rank", rank().over(window_spec)).withColumn("row_number", row_number().over(window_spec))# 将 DataFrame 注册为临时视图
ranked_df.createOrReplaceTempView("ranked_cities")# 使用 SQL 查询
sql_query = """
SELECT city, count, avg_age, rank, row_number
FROM ranked_cities
WHERE rank <= 2
"""# 执行 SQL 查询
sql_results = spark.sql(sql_query)# 显示结果
sql_results.show(truncate=False)# 关闭 SparkSession
spark.stop()

在 PySpark 中,createOrReplaceTempView 方法可以将 DataFrame 注册为临时视图(temporary view),这样就可以使用 SQL 查询来操作 DataFrame。临时视图只在当前 SparkSession 的生命周期内有效,并且在同一 SparkSession 中可以被多次替换。

我们可以在之前的示例中加入 createOrReplaceTempView,以便使用 SQL 查询来完成一些操作。

代码解释

  1. 创建 DataFrame:定义数据结构,并创建两个 DataFrame。
  2. 使用 filter:过滤符合条件的记录。
  3. 使用 group by:按字段进行分组聚合。
  4. 使用 union:将两个 DataFrame 合并。
  5. 使用 sample:从 DataFrame 中随机抽取样本。
  6. 使用 limit:限制结果集的大小。
  7. 使用窗口函数:添加窗口函数来执行复杂的分析。
  8. 使用 createOrReplaceTempView:注册临时视图。
  9. 使用 SQL 查询:执行 SQL 查询。 

在 PySpark 中,执行 SQL 查询可能会比直接使用 DataFrame API 慢一些,原因在于以下几个方面:

  1. SQL 解析和优化:当使用 SQL 查询时,PySpark 需要解析 SQL 语句,将其转换成逻辑计划,然后进行优化,最终生成物理执行计划。这个过程可能需要一些时间,尤其是在复杂的查询中。

  2. Shuffle 操作:如果 SQL 查询涉及 shuffle 操作(例如 group by、join 等),那么数据需要重新分区和排序,这会导致额外的计算开销和磁盘 I/O。在你的例子中,虽然没有涉及 shuffle 操作,但如果查询复杂度增加,shuffle 可能成为瓶颈。

  3. 数据序列化和反序列化:在执行 SQL 查询时,数据可能需要多次序列化和反序列化,这也会影响性能。

  4. 执行计划缓存:对于重复执行的查询,执行计划可以被缓存,从而加速后续执行。但是,对于一次性查询,这种缓存带来的好处有限。

  5. 数据量:如果数据量很大,即使是简单的筛选操作也可能花费一定的时间。

优化建议

为了提高 SQL 查询的性能,可以考虑以下几个优化策略:

  1. 减少 Shuffle:尽量减少涉及 shuffle 的操作,例如使用广播 join 而不是普通的 join。

  2. 缓存 DataFrame:如果你反复使用同一个 DataFrame,可以将其缓存(persist 或 cache)以减少重复计算。

  3. 使用 DataFrame API:尽可能使用 DataFrame API 替代 SQL 查询,因为 DataFrame API 通常更高效。

  4. 索引:虽然 PySpark 本身没有索引的概念,但可以通过预处理数据来减少查询时的数据扫描范围。

  5. 调整配置:调整 Spark 的配置参数,例如增加内存分配、调整 shuffle 的参数等。

相关文章:

spark.sql

from pyspark.sql import SparkSession from pyspark.sql.functions import col, count, mean, rank, row_number, desc from pyspark.sql.window import Window from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化 SparkSession 对象 s…...

2024 数学建模高教社杯 国赛(A题)| “板凳龙”舞龙队 | 建模秘籍文章代码思路大全

铛铛&#xff01;小秘籍来咯&#xff01; 小秘籍团队独辟蹊径&#xff0c;运用等距螺线&#xff0c;多目标规划等强大工具&#xff0c;构建了这一题的详细解答哦&#xff01; 为大家量身打造创新解决方案。小秘籍团队&#xff0c;始终引领着建模问题求解的风潮。 抓紧小秘籍&am…...

kaggle注册收不到验证码、插件如何下载安装

综合这三个来看&#xff0c; 1.插件下载用的大佬给的分享链接 2.下载好压缩包以后需要解压缩 Header Editor插件网盘下载安装教程 - 哔哩哔哩 (bilibili.com) 3.安装插件时没找到crx文件&#xff0c;在浏览器插件界面点击“加载解压缩的扩展” 4.复制网址到插件里&#xff…...

k8s相关技术栈

文章目录 一、k8s技术栈核心组件常见工具和服务生态系统 二、k8s服务组件控制平面组件节点组件附加组件和服务 三、k8s 常见资源核心资源扩展资源 四、系列文档其他参考 一、k8s技术栈 Kubernetes&#xff08;常被简称为 K8s&#xff0c;其中 “K” 代表 “Kubernetes” 的首字…...

uniapp h5项目页面中使用了iframe导致浏览器返回按键无法使用, 返回不了上一页.

uniapp h5项目页面中使用了iframe导致浏览器返回按键无法使用, 返回不了上一页. 在 UniApp 中使用 iframe 加载外部页面时&#xff0c;可能会遇到返回键行为不符合预期的问题。这是因为 iframe 本身可以包含多个页面的历史记录&#xff0c;而默认情况下&#xff0c;浏览器的返…...

《2024网络安全十大创新方向》

网络安全是创新驱动型产业&#xff0c;技术创新可以有效应对新的网络安全挑战&#xff1b;或是通过技术创新降低人力成本投入&#xff0c;提升企业运营效率。为推动行业技术创新、产品创新与应用创新&#xff0c;数说安全发布《2024年中国网络安全十大创新方向》&#xff0c;涵…...

深入解析反射型 XSS 与存储型 XSS:原理、危害与防范

在网络安全领域&#xff0c;跨站脚本攻击&#xff08;XSS&#xff09;是一种常见的安全漏洞。XSS 攻击可以分为反射型 XSS 和存储型 XSS 两种类型。本文将详细介绍这两种类型的 XSS 攻击的原理、危害和防范措施。 一、反射型 XSS 1、原理 反射型 XSS 攻击也称为非持久性 XSS …...

【STM32+HAL库】---- 驱动MAX30102心率血氧传感器

硬件开发板&#xff1a;STM32F407VET6 软件平台&#xff1a;cubemaxkeilVScode1 MAX30102心率血氧传感器工作原理 MAX30102传感器是一种集成了红外光源、光电检测器和信号处理电路的高度集成传感器&#xff0c;主要用于心率和血氧饱和度的测量。以下是MAX30102传感器的主要特点…...

InstantX团队新作!基于端到端训练的风格转换模型CSGO

由InstantX团队、南京理工大学、北京航空航天大学以及北京大学联合提出了一种基于端到端训练的风格转换模型 CSGO&#xff0c;它采用独立的特征注入明确地解耦内容和风格特征。统一的 CSGO 实现了图像驱动的风格转换、文本驱动的风格化合成和文本编辑驱动的风格化合成。大量实验…...

Nginx安全性配置

文章目录 引言I Nginx简单的安全性配置禁止特定的HTTP方法限制URL长度禁止某些用户代理限制请求速率连接限制禁止访问某些文件类型II 常见的安全规则防御CC攻击User-Agent过滤GET-URL过滤GET-参数过滤POST过滤(sql注入、xss攻击 )引言 Nginx本身并不具备复杂的防火墙规则定制…...

k8s单master多node环境搭建-k8s版本低于1.24,容器运行时为docker

k8s 1.20.6单master多node环境搭建 1.环境规划2.初始化服务器1&#xff09;配置主机名2&#xff09;设置IP为静态IP3&#xff09;关闭selinux4&#xff09;配置主机hosts文件5&#xff09;配置三台主机之间免密登录6&#xff09;关闭交换分区swap&#xff0c;提升性能7&#xf…...

taro ui 小程序at-calendar日历组件自定义样式+选择范围日历崩溃处理

taro ui 日历文档 目录 单选标记时间&#xff1a; 效果&#xff1a; template&#xff1a; data&#xff1a; methods: 日历--范围选择&#xff1a; 效果&#xff1a; template&#xff1a; data&#xff1a; methods&#xff1a; 日历--间隔多选&#xff1a;利用标…...

ARM发布新一代高性能处理器N3

简介 就在2月21日&#xff0c;ARM发布了新一代面向服务器的高性能处理器N3和V3&#xff0c;N系列平衡性能和功耗&#xff0c;而V系列则注重更高的性能。此次发布的N3&#xff0c;单个die最高32核&#xff08;并加入到CCS&#xff0c;Compute Subsystems&#xff0c;包含Core&a…...

基于Pytorch框架的深度学习U2Net网络天空语义精细分割系统源码

第一步&#xff1a;准备数据 头发分割数据&#xff0c;总共有10276张图片&#xff0c;里面的像素值为0和1&#xff0c;所以看起来全部是黑的&#xff0c;不影响使用 第二步&#xff1a;搭建模型 级联模式 通常多个类似U-Net按顺序堆叠&#xff0c;以建立级联模型&#xff0c…...

50ETF期权和股指期权有什么区别?ETF期权应该怎么做?

今天期权懂带你了解50ETF期权和股指期权有什么区别&#xff1f;ETF期权应该怎么做&#xff1f;ETF是对个股期权&#xff0c;股指期权是对应该股指期货的&#xff0c;那么股指期权和etf期权有什么区别&#xff1f; 股指期权怎么交易 股指期权交易要开通股指期货账户&#xff0…...

JS设计模式之“神奇的魔术师” - 简单工厂模式

引言 在JavaScript开发中&#xff0c;我们经常需要创建和管理各种对象&#xff0c;而简单工厂模式就是一种最简单的用来创建对象的设计模式。 简单工厂模式通过一个工厂类来创建相似的对象&#xff0c;而无需直接使用具体类来实例化对象。这样可以将对象的创建过程与使用过程…...

【河北航空-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…...

亚信安慧AntDB-T数据库内核之MVCC机制

本文主要介绍AntDB数据库内核中的一个很重要的机制——MVCC机制。 MVCC简介 MVCC&#xff08;多版本并发控制&#xff09;是AntDB数据库中实现事务隔离级别的一种机制。它允许多个事务同时对数据进行读写和修改操作&#xff0c;而不会相互干扰。在MVCC中&#xff0c;每个数据…...

【python】socket 入门以及多线程tcp链接

Socket 入门 及 多线程tcp链接 网络基础知识三要素 Socket是套接字的意思,是网络编程的核心对象,通信两端都独有自己的Socket对象, 数据在两个Socket之间通过 字节流(TCP协议) 或者 数据报包(UDP协议)的形式进行传输. 本文主要针对tcp流程进行讲解 socket-tcp流程图 1.创建服…...

【ZYNQ MPSoC开发】lwIP TCP发送用于数据缓存的软件FIFO设计

设计背景 任务是在ZYNQ的PS上使用裸机运行lwIP协议栈使用TCP把PL端通过AXI DMA传来的将近100K采样率的ADC数据发送出去&#xff0c;但由于数据带宽很大&#xff0c;有853.3mbps&#xff0c;所以在每一次AXI DMA简单传输结束后&#xff0c;lwIP未必有足够的发送buffer立即把数据…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

7.4.分块查找

一.分块查找的算法思想&#xff1a; 1.实例&#xff1a; 以上述图片的顺序表为例&#xff0c; 该顺序表的数据元素从整体来看是乱序的&#xff0c;但如果把这些数据元素分成一块一块的小区间&#xff0c; 第一个区间[0,1]索引上的数据元素都是小于等于10的&#xff0c; 第二…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍

文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结&#xff1a; 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析&#xff1a; 实际业务去理解体会统一注…...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

【网络安全】开源系统getshell漏洞挖掘

审计过程&#xff1a; 在入口文件admin/index.php中&#xff1a; 用户可以通过m,c,a等参数控制加载的文件和方法&#xff0c;在app/system/entrance.php中存在重点代码&#xff1a; 当M_TYPE system并且M_MODULE include时&#xff0c;会设置常量PATH_OWN_FILE为PATH_APP.M_T…...

深度剖析 DeepSeek 开源模型部署与应用:策略、权衡与未来走向

在人工智能技术呈指数级发展的当下&#xff0c;大模型已然成为推动各行业变革的核心驱动力。DeepSeek 开源模型以其卓越的性能和灵活的开源特性&#xff0c;吸引了众多企业与开发者的目光。如何高效且合理地部署与运用 DeepSeek 模型&#xff0c;成为释放其巨大潜力的关键所在&…...