spark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, rank, row_number, desc
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化 SparkSession 对象
spark = SparkSession.builder \.appName("Example PySpark Script with TempView and SQL") \.getOrCreate()# 定义数据结构
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True),StructField("city", StringType(), True)
])# 创建第一个 DataFrame
data1 = [("Alice", 34, "New York"),("Bob", 45, "Los Angeles"),("Cathy", 29, "San Francisco"),("David", 32, "Chicago"),("Eve", 27, "Seattle")
]
df1 = spark.createDataFrame(data=data1, schema=schema)# 创建第二个 DataFrame
data2 = [("Frank", 30, "New York"),("Grace", 38, "Los Angeles"),("Hannah", 25, "San Francisco"),("Ian", 42, "Chicago"),("Jack", 28, "Seattle")
]
df2 = spark.createDataFrame(data=data2, schema=schema)# 查看 DataFrame 结构
df1.printSchema()
df2.printSchema()# 使用 filter 过滤年龄大于等于 30 的记录
filtered_df1 = df1.filter(col("age") >= 30)
filtered_df2 = df2.filter(col("age") >= 30)# 使用 group by 计算每个城市的平均年龄
grouped_df1 = filtered_df1.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)grouped_df2 = filtered_df2.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)# 合并两个 DataFrame
merged_df = grouped_df1.union(grouped_df2)# 从合并后的 DataFrame 中随机抽取 50% 的样本
sampled_df = merged_df.sample(withReplacement=False, fraction=0.5)# 限制结果集的大小为 10 条记录
limited_df = sampled_df.limit(10)# 使用窗口函数进行排名
window_spec = Window.partitionBy("city").orderBy(desc("avg_age"))
ranked_df = limited_df.withColumn("rank", rank().over(window_spec)).withColumn("row_number", row_number().over(window_spec))# 将 DataFrame 注册为临时视图
ranked_df.createOrReplaceTempView("ranked_cities")# 使用 SQL 查询
sql_query = """
SELECT city, count, avg_age, rank, row_number
FROM ranked_cities
WHERE rank <= 2
"""# 执行 SQL 查询
sql_results = spark.sql(sql_query)# 显示结果
sql_results.show(truncate=False)# 关闭 SparkSession
spark.stop()
在 PySpark 中,createOrReplaceTempView 方法可以将 DataFrame 注册为临时视图(temporary view),这样就可以使用 SQL 查询来操作 DataFrame。临时视图只在当前 SparkSession 的生命周期内有效,并且在同一 SparkSession 中可以被多次替换。
我们可以在之前的示例中加入 createOrReplaceTempView,以便使用 SQL 查询来完成一些操作。
代码解释
- 创建 DataFrame:定义数据结构,并创建两个 DataFrame。
- 使用
filter:过滤符合条件的记录。 - 使用
group by:按字段进行分组聚合。 - 使用
union:将两个 DataFrame 合并。 - 使用
sample:从 DataFrame 中随机抽取样本。 - 使用
limit:限制结果集的大小。 - 使用窗口函数:添加窗口函数来执行复杂的分析。
- 使用
createOrReplaceTempView:注册临时视图。 - 使用 SQL 查询:执行 SQL 查询。
在 PySpark 中,执行 SQL 查询可能会比直接使用 DataFrame API 慢一些,原因在于以下几个方面:
-
SQL 解析和优化:当使用 SQL 查询时,PySpark 需要解析 SQL 语句,将其转换成逻辑计划,然后进行优化,最终生成物理执行计划。这个过程可能需要一些时间,尤其是在复杂的查询中。
-
Shuffle 操作:如果 SQL 查询涉及 shuffle 操作(例如 group by、join 等),那么数据需要重新分区和排序,这会导致额外的计算开销和磁盘 I/O。在你的例子中,虽然没有涉及 shuffle 操作,但如果查询复杂度增加,shuffle 可能成为瓶颈。
-
数据序列化和反序列化:在执行 SQL 查询时,数据可能需要多次序列化和反序列化,这也会影响性能。
-
执行计划缓存:对于重复执行的查询,执行计划可以被缓存,从而加速后续执行。但是,对于一次性查询,这种缓存带来的好处有限。
-
数据量:如果数据量很大,即使是简单的筛选操作也可能花费一定的时间。
优化建议
为了提高 SQL 查询的性能,可以考虑以下几个优化策略:
-
减少 Shuffle:尽量减少涉及 shuffle 的操作,例如使用广播 join 而不是普通的 join。
-
缓存 DataFrame:如果你反复使用同一个 DataFrame,可以将其缓存(persist 或 cache)以减少重复计算。
-
使用 DataFrame API:尽可能使用 DataFrame API 替代 SQL 查询,因为 DataFrame API 通常更高效。
-
索引:虽然 PySpark 本身没有索引的概念,但可以通过预处理数据来减少查询时的数据扫描范围。
-
调整配置:调整 Spark 的配置参数,例如增加内存分配、调整 shuffle 的参数等。
相关文章:
spark.sql
from pyspark.sql import SparkSession from pyspark.sql.functions import col, count, mean, rank, row_number, desc from pyspark.sql.window import Window from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化 SparkSession 对象 s…...
2024 数学建模高教社杯 国赛(A题)| “板凳龙”舞龙队 | 建模秘籍文章代码思路大全
铛铛!小秘籍来咯! 小秘籍团队独辟蹊径,运用等距螺线,多目标规划等强大工具,构建了这一题的详细解答哦! 为大家量身打造创新解决方案。小秘籍团队,始终引领着建模问题求解的风潮。 抓紧小秘籍&am…...
kaggle注册收不到验证码、插件如何下载安装
综合这三个来看, 1.插件下载用的大佬给的分享链接 2.下载好压缩包以后需要解压缩 Header Editor插件网盘下载安装教程 - 哔哩哔哩 (bilibili.com) 3.安装插件时没找到crx文件,在浏览器插件界面点击“加载解压缩的扩展” 4.复制网址到插件里ÿ…...
k8s相关技术栈
文章目录 一、k8s技术栈核心组件常见工具和服务生态系统 二、k8s服务组件控制平面组件节点组件附加组件和服务 三、k8s 常见资源核心资源扩展资源 四、系列文档其他参考 一、k8s技术栈 Kubernetes(常被简称为 K8s,其中 “K” 代表 “Kubernetes” 的首字…...
uniapp h5项目页面中使用了iframe导致浏览器返回按键无法使用, 返回不了上一页.
uniapp h5项目页面中使用了iframe导致浏览器返回按键无法使用, 返回不了上一页. 在 UniApp 中使用 iframe 加载外部页面时,可能会遇到返回键行为不符合预期的问题。这是因为 iframe 本身可以包含多个页面的历史记录,而默认情况下,浏览器的返…...
《2024网络安全十大创新方向》
网络安全是创新驱动型产业,技术创新可以有效应对新的网络安全挑战;或是通过技术创新降低人力成本投入,提升企业运营效率。为推动行业技术创新、产品创新与应用创新,数说安全发布《2024年中国网络安全十大创新方向》,涵…...
深入解析反射型 XSS 与存储型 XSS:原理、危害与防范
在网络安全领域,跨站脚本攻击(XSS)是一种常见的安全漏洞。XSS 攻击可以分为反射型 XSS 和存储型 XSS 两种类型。本文将详细介绍这两种类型的 XSS 攻击的原理、危害和防范措施。 一、反射型 XSS 1、原理 反射型 XSS 攻击也称为非持久性 XSS …...
【STM32+HAL库】---- 驱动MAX30102心率血氧传感器
硬件开发板:STM32F407VET6 软件平台:cubemaxkeilVScode1 MAX30102心率血氧传感器工作原理 MAX30102传感器是一种集成了红外光源、光电检测器和信号处理电路的高度集成传感器,主要用于心率和血氧饱和度的测量。以下是MAX30102传感器的主要特点…...
InstantX团队新作!基于端到端训练的风格转换模型CSGO
由InstantX团队、南京理工大学、北京航空航天大学以及北京大学联合提出了一种基于端到端训练的风格转换模型 CSGO,它采用独立的特征注入明确地解耦内容和风格特征。统一的 CSGO 实现了图像驱动的风格转换、文本驱动的风格化合成和文本编辑驱动的风格化合成。大量实验…...
Nginx安全性配置
文章目录 引言I Nginx简单的安全性配置禁止特定的HTTP方法限制URL长度禁止某些用户代理限制请求速率连接限制禁止访问某些文件类型II 常见的安全规则防御CC攻击User-Agent过滤GET-URL过滤GET-参数过滤POST过滤(sql注入、xss攻击 )引言 Nginx本身并不具备复杂的防火墙规则定制…...
k8s单master多node环境搭建-k8s版本低于1.24,容器运行时为docker
k8s 1.20.6单master多node环境搭建 1.环境规划2.初始化服务器1)配置主机名2)设置IP为静态IP3)关闭selinux4)配置主机hosts文件5)配置三台主机之间免密登录6)关闭交换分区swap,提升性能7…...
taro ui 小程序at-calendar日历组件自定义样式+选择范围日历崩溃处理
taro ui 日历文档 目录 单选标记时间: 效果: template: data: methods: 日历--范围选择: 效果: template: data: methods: 日历--间隔多选:利用标…...
ARM发布新一代高性能处理器N3
简介 就在2月21日,ARM发布了新一代面向服务器的高性能处理器N3和V3,N系列平衡性能和功耗,而V系列则注重更高的性能。此次发布的N3,单个die最高32核(并加入到CCS,Compute Subsystems,包含Core&a…...
基于Pytorch框架的深度学习U2Net网络天空语义精细分割系统源码
第一步:准备数据 头发分割数据,总共有10276张图片,里面的像素值为0和1,所以看起来全部是黑的,不影响使用 第二步:搭建模型 级联模式 通常多个类似U-Net按顺序堆叠,以建立级联模型,…...
50ETF期权和股指期权有什么区别?ETF期权应该怎么做?
今天期权懂带你了解50ETF期权和股指期权有什么区别?ETF期权应该怎么做?ETF是对个股期权,股指期权是对应该股指期货的,那么股指期权和etf期权有什么区别? 股指期权怎么交易 股指期权交易要开通股指期货账户࿰…...
JS设计模式之“神奇的魔术师” - 简单工厂模式
引言 在JavaScript开发中,我们经常需要创建和管理各种对象,而简单工厂模式就是一种最简单的用来创建对象的设计模式。 简单工厂模式通过一个工厂类来创建相似的对象,而无需直接使用具体类来实例化对象。这样可以将对象的创建过程与使用过程…...
【河北航空-注册安全分析报告-无验证方式导致安全隐患】
前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 1. 暴力破解密码,造成用户信息泄露 2. 短信盗刷的安全问题,影响业务及导致用户投诉 3. 带来经济损失,尤其是后付费客户,风险巨大,造…...
亚信安慧AntDB-T数据库内核之MVCC机制
本文主要介绍AntDB数据库内核中的一个很重要的机制——MVCC机制。 MVCC简介 MVCC(多版本并发控制)是AntDB数据库中实现事务隔离级别的一种机制。它允许多个事务同时对数据进行读写和修改操作,而不会相互干扰。在MVCC中,每个数据…...
【python】socket 入门以及多线程tcp链接
Socket 入门 及 多线程tcp链接 网络基础知识三要素 Socket是套接字的意思,是网络编程的核心对象,通信两端都独有自己的Socket对象, 数据在两个Socket之间通过 字节流(TCP协议) 或者 数据报包(UDP协议)的形式进行传输. 本文主要针对tcp流程进行讲解 socket-tcp流程图 1.创建服…...
【ZYNQ MPSoC开发】lwIP TCP发送用于数据缓存的软件FIFO设计
设计背景 任务是在ZYNQ的PS上使用裸机运行lwIP协议栈使用TCP把PL端通过AXI DMA传来的将近100K采样率的ADC数据发送出去,但由于数据带宽很大,有853.3mbps,所以在每一次AXI DMA简单传输结束后,lwIP未必有足够的发送buffer立即把数据…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度
文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...
HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
jmeter聚合报告中参数详解
sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample(样本数) 表示测试中发送的请求数量,即测试执行了多少次请求。 单位,以个或者次数表示。 示例:…...
嵌入式常见 CPU 架构
架构类型架构厂商芯片厂商典型芯片特点与应用场景PICRISC (8/16 位)MicrochipMicrochipPIC16F877A、PIC18F4550简化指令集,单周期执行;低功耗、CIP 独立外设;用于家电、小电机控制、安防面板等嵌入式场景8051CISC (8 位)Intel(原始…...
