当前位置: 首页 > news >正文

spark.sql

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, rank, row_number, desc
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化 SparkSession 对象
spark = SparkSession.builder \.appName("Example PySpark Script with TempView and SQL") \.getOrCreate()# 定义数据结构
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True),StructField("city", StringType(), True)
])# 创建第一个 DataFrame
data1 = [("Alice", 34, "New York"),("Bob", 45, "Los Angeles"),("Cathy", 29, "San Francisco"),("David", 32, "Chicago"),("Eve", 27, "Seattle")
]
df1 = spark.createDataFrame(data=data1, schema=schema)# 创建第二个 DataFrame
data2 = [("Frank", 30, "New York"),("Grace", 38, "Los Angeles"),("Hannah", 25, "San Francisco"),("Ian", 42, "Chicago"),("Jack", 28, "Seattle")
]
df2 = spark.createDataFrame(data=data2, schema=schema)# 查看 DataFrame 结构
df1.printSchema()
df2.printSchema()# 使用 filter 过滤年龄大于等于 30 的记录
filtered_df1 = df1.filter(col("age") >= 30)
filtered_df2 = df2.filter(col("age") >= 30)# 使用 group by 计算每个城市的平均年龄
grouped_df1 = filtered_df1.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)grouped_df2 = filtered_df2.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)# 合并两个 DataFrame
merged_df = grouped_df1.union(grouped_df2)# 从合并后的 DataFrame 中随机抽取 50% 的样本
sampled_df = merged_df.sample(withReplacement=False, fraction=0.5)# 限制结果集的大小为 10 条记录
limited_df = sampled_df.limit(10)# 使用窗口函数进行排名
window_spec = Window.partitionBy("city").orderBy(desc("avg_age"))
ranked_df = limited_df.withColumn("rank", rank().over(window_spec)).withColumn("row_number", row_number().over(window_spec))# 将 DataFrame 注册为临时视图
ranked_df.createOrReplaceTempView("ranked_cities")# 使用 SQL 查询
sql_query = """
SELECT city, count, avg_age, rank, row_number
FROM ranked_cities
WHERE rank <= 2
"""# 执行 SQL 查询
sql_results = spark.sql(sql_query)# 显示结果
sql_results.show(truncate=False)# 关闭 SparkSession
spark.stop()

在 PySpark 中,createOrReplaceTempView 方法可以将 DataFrame 注册为临时视图(temporary view),这样就可以使用 SQL 查询来操作 DataFrame。临时视图只在当前 SparkSession 的生命周期内有效,并且在同一 SparkSession 中可以被多次替换。

我们可以在之前的示例中加入 createOrReplaceTempView,以便使用 SQL 查询来完成一些操作。

代码解释

  1. 创建 DataFrame:定义数据结构,并创建两个 DataFrame。
  2. 使用 filter:过滤符合条件的记录。
  3. 使用 group by:按字段进行分组聚合。
  4. 使用 union:将两个 DataFrame 合并。
  5. 使用 sample:从 DataFrame 中随机抽取样本。
  6. 使用 limit:限制结果集的大小。
  7. 使用窗口函数:添加窗口函数来执行复杂的分析。
  8. 使用 createOrReplaceTempView:注册临时视图。
  9. 使用 SQL 查询:执行 SQL 查询。 

在 PySpark 中,执行 SQL 查询可能会比直接使用 DataFrame API 慢一些,原因在于以下几个方面:

  1. SQL 解析和优化:当使用 SQL 查询时,PySpark 需要解析 SQL 语句,将其转换成逻辑计划,然后进行优化,最终生成物理执行计划。这个过程可能需要一些时间,尤其是在复杂的查询中。

  2. Shuffle 操作:如果 SQL 查询涉及 shuffle 操作(例如 group by、join 等),那么数据需要重新分区和排序,这会导致额外的计算开销和磁盘 I/O。在你的例子中,虽然没有涉及 shuffle 操作,但如果查询复杂度增加,shuffle 可能成为瓶颈。

  3. 数据序列化和反序列化:在执行 SQL 查询时,数据可能需要多次序列化和反序列化,这也会影响性能。

  4. 执行计划缓存:对于重复执行的查询,执行计划可以被缓存,从而加速后续执行。但是,对于一次性查询,这种缓存带来的好处有限。

  5. 数据量:如果数据量很大,即使是简单的筛选操作也可能花费一定的时间。

优化建议

为了提高 SQL 查询的性能,可以考虑以下几个优化策略:

  1. 减少 Shuffle:尽量减少涉及 shuffle 的操作,例如使用广播 join 而不是普通的 join。

  2. 缓存 DataFrame:如果你反复使用同一个 DataFrame,可以将其缓存(persist 或 cache)以减少重复计算。

  3. 使用 DataFrame API:尽可能使用 DataFrame API 替代 SQL 查询,因为 DataFrame API 通常更高效。

  4. 索引:虽然 PySpark 本身没有索引的概念,但可以通过预处理数据来减少查询时的数据扫描范围。

  5. 调整配置:调整 Spark 的配置参数,例如增加内存分配、调整 shuffle 的参数等。

相关文章:

spark.sql

from pyspark.sql import SparkSession from pyspark.sql.functions import col, count, mean, rank, row_number, desc from pyspark.sql.window import Window from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化 SparkSession 对象 s…...

2024 数学建模高教社杯 国赛(A题)| “板凳龙”舞龙队 | 建模秘籍文章代码思路大全

铛铛&#xff01;小秘籍来咯&#xff01; 小秘籍团队独辟蹊径&#xff0c;运用等距螺线&#xff0c;多目标规划等强大工具&#xff0c;构建了这一题的详细解答哦&#xff01; 为大家量身打造创新解决方案。小秘籍团队&#xff0c;始终引领着建模问题求解的风潮。 抓紧小秘籍&am…...

kaggle注册收不到验证码、插件如何下载安装

综合这三个来看&#xff0c; 1.插件下载用的大佬给的分享链接 2.下载好压缩包以后需要解压缩 Header Editor插件网盘下载安装教程 - 哔哩哔哩 (bilibili.com) 3.安装插件时没找到crx文件&#xff0c;在浏览器插件界面点击“加载解压缩的扩展” 4.复制网址到插件里&#xff…...

k8s相关技术栈

文章目录 一、k8s技术栈核心组件常见工具和服务生态系统 二、k8s服务组件控制平面组件节点组件附加组件和服务 三、k8s 常见资源核心资源扩展资源 四、系列文档其他参考 一、k8s技术栈 Kubernetes&#xff08;常被简称为 K8s&#xff0c;其中 “K” 代表 “Kubernetes” 的首字…...

uniapp h5项目页面中使用了iframe导致浏览器返回按键无法使用, 返回不了上一页.

uniapp h5项目页面中使用了iframe导致浏览器返回按键无法使用, 返回不了上一页. 在 UniApp 中使用 iframe 加载外部页面时&#xff0c;可能会遇到返回键行为不符合预期的问题。这是因为 iframe 本身可以包含多个页面的历史记录&#xff0c;而默认情况下&#xff0c;浏览器的返…...

《2024网络安全十大创新方向》

网络安全是创新驱动型产业&#xff0c;技术创新可以有效应对新的网络安全挑战&#xff1b;或是通过技术创新降低人力成本投入&#xff0c;提升企业运营效率。为推动行业技术创新、产品创新与应用创新&#xff0c;数说安全发布《2024年中国网络安全十大创新方向》&#xff0c;涵…...

深入解析反射型 XSS 与存储型 XSS:原理、危害与防范

在网络安全领域&#xff0c;跨站脚本攻击&#xff08;XSS&#xff09;是一种常见的安全漏洞。XSS 攻击可以分为反射型 XSS 和存储型 XSS 两种类型。本文将详细介绍这两种类型的 XSS 攻击的原理、危害和防范措施。 一、反射型 XSS 1、原理 反射型 XSS 攻击也称为非持久性 XSS …...

【STM32+HAL库】---- 驱动MAX30102心率血氧传感器

硬件开发板&#xff1a;STM32F407VET6 软件平台&#xff1a;cubemaxkeilVScode1 MAX30102心率血氧传感器工作原理 MAX30102传感器是一种集成了红外光源、光电检测器和信号处理电路的高度集成传感器&#xff0c;主要用于心率和血氧饱和度的测量。以下是MAX30102传感器的主要特点…...

InstantX团队新作!基于端到端训练的风格转换模型CSGO

由InstantX团队、南京理工大学、北京航空航天大学以及北京大学联合提出了一种基于端到端训练的风格转换模型 CSGO&#xff0c;它采用独立的特征注入明确地解耦内容和风格特征。统一的 CSGO 实现了图像驱动的风格转换、文本驱动的风格化合成和文本编辑驱动的风格化合成。大量实验…...

Nginx安全性配置

文章目录 引言I Nginx简单的安全性配置禁止特定的HTTP方法限制URL长度禁止某些用户代理限制请求速率连接限制禁止访问某些文件类型II 常见的安全规则防御CC攻击User-Agent过滤GET-URL过滤GET-参数过滤POST过滤(sql注入、xss攻击 )引言 Nginx本身并不具备复杂的防火墙规则定制…...

k8s单master多node环境搭建-k8s版本低于1.24,容器运行时为docker

k8s 1.20.6单master多node环境搭建 1.环境规划2.初始化服务器1&#xff09;配置主机名2&#xff09;设置IP为静态IP3&#xff09;关闭selinux4&#xff09;配置主机hosts文件5&#xff09;配置三台主机之间免密登录6&#xff09;关闭交换分区swap&#xff0c;提升性能7&#xf…...

taro ui 小程序at-calendar日历组件自定义样式+选择范围日历崩溃处理

taro ui 日历文档 目录 单选标记时间&#xff1a; 效果&#xff1a; template&#xff1a; data&#xff1a; methods: 日历--范围选择&#xff1a; 效果&#xff1a; template&#xff1a; data&#xff1a; methods&#xff1a; 日历--间隔多选&#xff1a;利用标…...

ARM发布新一代高性能处理器N3

简介 就在2月21日&#xff0c;ARM发布了新一代面向服务器的高性能处理器N3和V3&#xff0c;N系列平衡性能和功耗&#xff0c;而V系列则注重更高的性能。此次发布的N3&#xff0c;单个die最高32核&#xff08;并加入到CCS&#xff0c;Compute Subsystems&#xff0c;包含Core&a…...

基于Pytorch框架的深度学习U2Net网络天空语义精细分割系统源码

第一步&#xff1a;准备数据 头发分割数据&#xff0c;总共有10276张图片&#xff0c;里面的像素值为0和1&#xff0c;所以看起来全部是黑的&#xff0c;不影响使用 第二步&#xff1a;搭建模型 级联模式 通常多个类似U-Net按顺序堆叠&#xff0c;以建立级联模型&#xff0c…...

50ETF期权和股指期权有什么区别?ETF期权应该怎么做?

今天期权懂带你了解50ETF期权和股指期权有什么区别&#xff1f;ETF期权应该怎么做&#xff1f;ETF是对个股期权&#xff0c;股指期权是对应该股指期货的&#xff0c;那么股指期权和etf期权有什么区别&#xff1f; 股指期权怎么交易 股指期权交易要开通股指期货账户&#xff0…...

JS设计模式之“神奇的魔术师” - 简单工厂模式

引言 在JavaScript开发中&#xff0c;我们经常需要创建和管理各种对象&#xff0c;而简单工厂模式就是一种最简单的用来创建对象的设计模式。 简单工厂模式通过一个工厂类来创建相似的对象&#xff0c;而无需直接使用具体类来实例化对象。这样可以将对象的创建过程与使用过程…...

【河北航空-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…...

亚信安慧AntDB-T数据库内核之MVCC机制

本文主要介绍AntDB数据库内核中的一个很重要的机制——MVCC机制。 MVCC简介 MVCC&#xff08;多版本并发控制&#xff09;是AntDB数据库中实现事务隔离级别的一种机制。它允许多个事务同时对数据进行读写和修改操作&#xff0c;而不会相互干扰。在MVCC中&#xff0c;每个数据…...

【python】socket 入门以及多线程tcp链接

Socket 入门 及 多线程tcp链接 网络基础知识三要素 Socket是套接字的意思,是网络编程的核心对象,通信两端都独有自己的Socket对象, 数据在两个Socket之间通过 字节流(TCP协议) 或者 数据报包(UDP协议)的形式进行传输. 本文主要针对tcp流程进行讲解 socket-tcp流程图 1.创建服…...

【ZYNQ MPSoC开发】lwIP TCP发送用于数据缓存的软件FIFO设计

设计背景 任务是在ZYNQ的PS上使用裸机运行lwIP协议栈使用TCP把PL端通过AXI DMA传来的将近100K采样率的ADC数据发送出去&#xff0c;但由于数据带宽很大&#xff0c;有853.3mbps&#xff0c;所以在每一次AXI DMA简单传输结束后&#xff0c;lwIP未必有足够的发送buffer立即把数据…...

在软件开发中正确使用MySQL日期时间类型的深度解析

在日常软件开发场景中&#xff0c;时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志&#xff0c;到供应链系统的物流节点时间戳&#xff0c;时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库&#xff0c;其日期时间类型的…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

服务器硬防的应用场景都有哪些?

服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式&#xff0c;避免服务器受到各种恶意攻击和网络威胁&#xff0c;那么&#xff0c;服务器硬防通常都会应用在哪些场景当中呢&#xff1f; 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

【配置 YOLOX 用于按目录分类的图片数据集】

现在的图标点选越来越多&#xff0c;如何一步解决&#xff0c;采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集&#xff08;每个目录代表一个类别&#xff0c;目录下是该类别的所有图片&#xff09;&#xff0c;你需要进行以下配置步骤&#x…...

【JavaWeb】Docker项目部署

引言 之前学习了Linux操作系统的常见命令&#xff0c;在Linux上安装软件&#xff0c;以及如何在Linux上部署一个单体项目&#xff0c;大多数同学都会有相同的感受&#xff0c;那就是麻烦。 核心体现在三点&#xff1a; 命令太多了&#xff0c;记不住 软件安装包名字复杂&…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能

1. 开发环境准备 ​​安装DevEco Studio 3.1​​&#xff1a; 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK ​​项目配置​​&#xff1a; // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...

libfmt: 现代C++的格式化工具库介绍与酷炫功能

libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库&#xff0c;提供了高效、安全的文本格式化功能&#xff0c;是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全&#xff1a…...

uniapp 小程序 学习(一)

利用Hbuilder 创建项目 运行到内置浏览器看效果 下载微信小程序 安装到Hbuilder 下载地址 &#xff1a;开发者工具默认安装 设置服务端口号 在Hbuilder中设置微信小程序 配置 找到运行设置&#xff0c;将微信开发者工具放入到Hbuilder中&#xff0c; 打开后出现 如下 bug 解…...