当前位置: 首页 > news >正文

spark.sql

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, rank, row_number, desc
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化 SparkSession 对象
spark = SparkSession.builder \.appName("Example PySpark Script with TempView and SQL") \.getOrCreate()# 定义数据结构
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True),StructField("city", StringType(), True)
])# 创建第一个 DataFrame
data1 = [("Alice", 34, "New York"),("Bob", 45, "Los Angeles"),("Cathy", 29, "San Francisco"),("David", 32, "Chicago"),("Eve", 27, "Seattle")
]
df1 = spark.createDataFrame(data=data1, schema=schema)# 创建第二个 DataFrame
data2 = [("Frank", 30, "New York"),("Grace", 38, "Los Angeles"),("Hannah", 25, "San Francisco"),("Ian", 42, "Chicago"),("Jack", 28, "Seattle")
]
df2 = spark.createDataFrame(data=data2, schema=schema)# 查看 DataFrame 结构
df1.printSchema()
df2.printSchema()# 使用 filter 过滤年龄大于等于 30 的记录
filtered_df1 = df1.filter(col("age") >= 30)
filtered_df2 = df2.filter(col("age") >= 30)# 使用 group by 计算每个城市的平均年龄
grouped_df1 = filtered_df1.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)grouped_df2 = filtered_df2.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)# 合并两个 DataFrame
merged_df = grouped_df1.union(grouped_df2)# 从合并后的 DataFrame 中随机抽取 50% 的样本
sampled_df = merged_df.sample(withReplacement=False, fraction=0.5)# 限制结果集的大小为 10 条记录
limited_df = sampled_df.limit(10)# 使用窗口函数进行排名
window_spec = Window.partitionBy("city").orderBy(desc("avg_age"))
ranked_df = limited_df.withColumn("rank", rank().over(window_spec)).withColumn("row_number", row_number().over(window_spec))# 将 DataFrame 注册为临时视图
ranked_df.createOrReplaceTempView("ranked_cities")# 使用 SQL 查询
sql_query = """
SELECT city, count, avg_age, rank, row_number
FROM ranked_cities
WHERE rank <= 2
"""# 执行 SQL 查询
sql_results = spark.sql(sql_query)# 显示结果
sql_results.show(truncate=False)# 关闭 SparkSession
spark.stop()

在 PySpark 中,createOrReplaceTempView 方法可以将 DataFrame 注册为临时视图(temporary view),这样就可以使用 SQL 查询来操作 DataFrame。临时视图只在当前 SparkSession 的生命周期内有效,并且在同一 SparkSession 中可以被多次替换。

我们可以在之前的示例中加入 createOrReplaceTempView,以便使用 SQL 查询来完成一些操作。

代码解释

  1. 创建 DataFrame:定义数据结构,并创建两个 DataFrame。
  2. 使用 filter:过滤符合条件的记录。
  3. 使用 group by:按字段进行分组聚合。
  4. 使用 union:将两个 DataFrame 合并。
  5. 使用 sample:从 DataFrame 中随机抽取样本。
  6. 使用 limit:限制结果集的大小。
  7. 使用窗口函数:添加窗口函数来执行复杂的分析。
  8. 使用 createOrReplaceTempView:注册临时视图。
  9. 使用 SQL 查询:执行 SQL 查询。 

在 PySpark 中,执行 SQL 查询可能会比直接使用 DataFrame API 慢一些,原因在于以下几个方面:

  1. SQL 解析和优化:当使用 SQL 查询时,PySpark 需要解析 SQL 语句,将其转换成逻辑计划,然后进行优化,最终生成物理执行计划。这个过程可能需要一些时间,尤其是在复杂的查询中。

  2. Shuffle 操作:如果 SQL 查询涉及 shuffle 操作(例如 group by、join 等),那么数据需要重新分区和排序,这会导致额外的计算开销和磁盘 I/O。在你的例子中,虽然没有涉及 shuffle 操作,但如果查询复杂度增加,shuffle 可能成为瓶颈。

  3. 数据序列化和反序列化:在执行 SQL 查询时,数据可能需要多次序列化和反序列化,这也会影响性能。

  4. 执行计划缓存:对于重复执行的查询,执行计划可以被缓存,从而加速后续执行。但是,对于一次性查询,这种缓存带来的好处有限。

  5. 数据量:如果数据量很大,即使是简单的筛选操作也可能花费一定的时间。

优化建议

为了提高 SQL 查询的性能,可以考虑以下几个优化策略:

  1. 减少 Shuffle:尽量减少涉及 shuffle 的操作,例如使用广播 join 而不是普通的 join。

  2. 缓存 DataFrame:如果你反复使用同一个 DataFrame,可以将其缓存(persist 或 cache)以减少重复计算。

  3. 使用 DataFrame API:尽可能使用 DataFrame API 替代 SQL 查询,因为 DataFrame API 通常更高效。

  4. 索引:虽然 PySpark 本身没有索引的概念,但可以通过预处理数据来减少查询时的数据扫描范围。

  5. 调整配置:调整 Spark 的配置参数,例如增加内存分配、调整 shuffle 的参数等。

相关文章:

spark.sql

from pyspark.sql import SparkSession from pyspark.sql.functions import col, count, mean, rank, row_number, desc from pyspark.sql.window import Window from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化 SparkSession 对象 s…...

2024 数学建模高教社杯 国赛(A题)| “板凳龙”舞龙队 | 建模秘籍文章代码思路大全

铛铛&#xff01;小秘籍来咯&#xff01; 小秘籍团队独辟蹊径&#xff0c;运用等距螺线&#xff0c;多目标规划等强大工具&#xff0c;构建了这一题的详细解答哦&#xff01; 为大家量身打造创新解决方案。小秘籍团队&#xff0c;始终引领着建模问题求解的风潮。 抓紧小秘籍&am…...

kaggle注册收不到验证码、插件如何下载安装

综合这三个来看&#xff0c; 1.插件下载用的大佬给的分享链接 2.下载好压缩包以后需要解压缩 Header Editor插件网盘下载安装教程 - 哔哩哔哩 (bilibili.com) 3.安装插件时没找到crx文件&#xff0c;在浏览器插件界面点击“加载解压缩的扩展” 4.复制网址到插件里&#xff…...

k8s相关技术栈

文章目录 一、k8s技术栈核心组件常见工具和服务生态系统 二、k8s服务组件控制平面组件节点组件附加组件和服务 三、k8s 常见资源核心资源扩展资源 四、系列文档其他参考 一、k8s技术栈 Kubernetes&#xff08;常被简称为 K8s&#xff0c;其中 “K” 代表 “Kubernetes” 的首字…...

uniapp h5项目页面中使用了iframe导致浏览器返回按键无法使用, 返回不了上一页.

uniapp h5项目页面中使用了iframe导致浏览器返回按键无法使用, 返回不了上一页. 在 UniApp 中使用 iframe 加载外部页面时&#xff0c;可能会遇到返回键行为不符合预期的问题。这是因为 iframe 本身可以包含多个页面的历史记录&#xff0c;而默认情况下&#xff0c;浏览器的返…...

《2024网络安全十大创新方向》

网络安全是创新驱动型产业&#xff0c;技术创新可以有效应对新的网络安全挑战&#xff1b;或是通过技术创新降低人力成本投入&#xff0c;提升企业运营效率。为推动行业技术创新、产品创新与应用创新&#xff0c;数说安全发布《2024年中国网络安全十大创新方向》&#xff0c;涵…...

深入解析反射型 XSS 与存储型 XSS:原理、危害与防范

在网络安全领域&#xff0c;跨站脚本攻击&#xff08;XSS&#xff09;是一种常见的安全漏洞。XSS 攻击可以分为反射型 XSS 和存储型 XSS 两种类型。本文将详细介绍这两种类型的 XSS 攻击的原理、危害和防范措施。 一、反射型 XSS 1、原理 反射型 XSS 攻击也称为非持久性 XSS …...

【STM32+HAL库】---- 驱动MAX30102心率血氧传感器

硬件开发板&#xff1a;STM32F407VET6 软件平台&#xff1a;cubemaxkeilVScode1 MAX30102心率血氧传感器工作原理 MAX30102传感器是一种集成了红外光源、光电检测器和信号处理电路的高度集成传感器&#xff0c;主要用于心率和血氧饱和度的测量。以下是MAX30102传感器的主要特点…...

InstantX团队新作!基于端到端训练的风格转换模型CSGO

由InstantX团队、南京理工大学、北京航空航天大学以及北京大学联合提出了一种基于端到端训练的风格转换模型 CSGO&#xff0c;它采用独立的特征注入明确地解耦内容和风格特征。统一的 CSGO 实现了图像驱动的风格转换、文本驱动的风格化合成和文本编辑驱动的风格化合成。大量实验…...

Nginx安全性配置

文章目录 引言I Nginx简单的安全性配置禁止特定的HTTP方法限制URL长度禁止某些用户代理限制请求速率连接限制禁止访问某些文件类型II 常见的安全规则防御CC攻击User-Agent过滤GET-URL过滤GET-参数过滤POST过滤(sql注入、xss攻击 )引言 Nginx本身并不具备复杂的防火墙规则定制…...

k8s单master多node环境搭建-k8s版本低于1.24,容器运行时为docker

k8s 1.20.6单master多node环境搭建 1.环境规划2.初始化服务器1&#xff09;配置主机名2&#xff09;设置IP为静态IP3&#xff09;关闭selinux4&#xff09;配置主机hosts文件5&#xff09;配置三台主机之间免密登录6&#xff09;关闭交换分区swap&#xff0c;提升性能7&#xf…...

taro ui 小程序at-calendar日历组件自定义样式+选择范围日历崩溃处理

taro ui 日历文档 目录 单选标记时间&#xff1a; 效果&#xff1a; template&#xff1a; data&#xff1a; methods: 日历--范围选择&#xff1a; 效果&#xff1a; template&#xff1a; data&#xff1a; methods&#xff1a; 日历--间隔多选&#xff1a;利用标…...

ARM发布新一代高性能处理器N3

简介 就在2月21日&#xff0c;ARM发布了新一代面向服务器的高性能处理器N3和V3&#xff0c;N系列平衡性能和功耗&#xff0c;而V系列则注重更高的性能。此次发布的N3&#xff0c;单个die最高32核&#xff08;并加入到CCS&#xff0c;Compute Subsystems&#xff0c;包含Core&a…...

基于Pytorch框架的深度学习U2Net网络天空语义精细分割系统源码

第一步&#xff1a;准备数据 头发分割数据&#xff0c;总共有10276张图片&#xff0c;里面的像素值为0和1&#xff0c;所以看起来全部是黑的&#xff0c;不影响使用 第二步&#xff1a;搭建模型 级联模式 通常多个类似U-Net按顺序堆叠&#xff0c;以建立级联模型&#xff0c…...

50ETF期权和股指期权有什么区别?ETF期权应该怎么做?

今天期权懂带你了解50ETF期权和股指期权有什么区别&#xff1f;ETF期权应该怎么做&#xff1f;ETF是对个股期权&#xff0c;股指期权是对应该股指期货的&#xff0c;那么股指期权和etf期权有什么区别&#xff1f; 股指期权怎么交易 股指期权交易要开通股指期货账户&#xff0…...

JS设计模式之“神奇的魔术师” - 简单工厂模式

引言 在JavaScript开发中&#xff0c;我们经常需要创建和管理各种对象&#xff0c;而简单工厂模式就是一种最简单的用来创建对象的设计模式。 简单工厂模式通过一个工厂类来创建相似的对象&#xff0c;而无需直接使用具体类来实例化对象。这样可以将对象的创建过程与使用过程…...

【河北航空-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…...

亚信安慧AntDB-T数据库内核之MVCC机制

本文主要介绍AntDB数据库内核中的一个很重要的机制——MVCC机制。 MVCC简介 MVCC&#xff08;多版本并发控制&#xff09;是AntDB数据库中实现事务隔离级别的一种机制。它允许多个事务同时对数据进行读写和修改操作&#xff0c;而不会相互干扰。在MVCC中&#xff0c;每个数据…...

【python】socket 入门以及多线程tcp链接

Socket 入门 及 多线程tcp链接 网络基础知识三要素 Socket是套接字的意思,是网络编程的核心对象,通信两端都独有自己的Socket对象, 数据在两个Socket之间通过 字节流(TCP协议) 或者 数据报包(UDP协议)的形式进行传输. 本文主要针对tcp流程进行讲解 socket-tcp流程图 1.创建服…...

【ZYNQ MPSoC开发】lwIP TCP发送用于数据缓存的软件FIFO设计

设计背景 任务是在ZYNQ的PS上使用裸机运行lwIP协议栈使用TCP把PL端通过AXI DMA传来的将近100K采样率的ADC数据发送出去&#xff0c;但由于数据带宽很大&#xff0c;有853.3mbps&#xff0c;所以在每一次AXI DMA简单传输结束后&#xff0c;lwIP未必有足够的发送buffer立即把数据…...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...

【Java学习笔记】BigInteger 和 BigDecimal 类

BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点&#xff1a;传参类型必须是类对象 一、BigInteger 1. 作用&#xff1a;适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息&#xff0c;对客户进行统一管理&#xff0c;可以把所有客户信息录入系统&#xff0c;进行维护和统计功能。可通过文件的方式保存相关录入数据&#xff0c;对…...

李沐--动手学深度学习--GRU

1.GRU从零开始实现 #9.1.2GRU从零开始实现 import torch from torch import nn from d2l import torch as d2l#首先读取 8.5节中使用的时间机器数据集 batch_size,num_steps 32,35 train_iter,vocab d2l.load_data_time_machine(batch_size,num_steps) #初始化模型参数 def …...

EasyRTC音视频实时通话功能在WebRTC与智能硬件整合中的应用与优势

一、WebRTC与智能硬件整合趋势​ 随着物联网和实时通信需求的爆发式增长&#xff0c;WebRTC作为开源实时通信技术&#xff0c;为浏览器与移动应用提供免插件的音视频通信能力&#xff0c;在智能硬件领域的融合应用已成必然趋势。智能硬件不再局限于单一功能&#xff0c;对实时…...

RKNN开发环境搭建2-RKNN Model Zoo 环境搭建

目录 1.简介2.环境搭建2.1 启动 docker 环境2.2 安装依赖工具2.3 下载 RKNN Model Zoo2.4 RKNN模型转化2.5编译C++1.简介 RKNN Model Zoo基于 RKNPU SDK 工具链开发, 提供了目前主流算法的部署例程. 例程包含导出RKNN模型, 使用 Python API, CAPI 推理 RKNN 模型的流程.   本…...

20250609在荣品的PRO-RK3566开发板的Android13下解决串口可以执行命令但是脚本执行命令异常的问题

20250609在荣品的PRO-RK3566开发板的Android13下解决串口可以执行命令但是脚本执行命令异常的问题 2025/6/9 20:54 缘起&#xff0c;为了跨网段推流&#xff0c;千辛万苦配置好了网络参数。 但是命令iptables -t filter -F tetherctrl_FORWARD可以在调试串口/DEBUG口正确执行。…...

Spring Boot 中实现 HTTPS 加密通信及常见问题排查指南

Spring Boot 中实现 HTTPS 加密通信及常见问题排查指南 在金融行业安全审计中&#xff0c;未启用HTTPS的Web应用被列为高危漏洞。通过正确配置HTTPS&#xff0c;可将中间人攻击风险降低98%——本文将全面解析Spring Boot中HTTPS的实现方案与实战避坑指南。 一、HTTPS 核心原理与…...

python3GUI--基于PyQt5+DeepSort+YOLOv8智能人员入侵检测系统(详细图文介绍)

文章目录 一&#xff0e;前言二&#xff0e;技术介绍1.PyQt52.DeepSort3.卡尔曼滤波4.YOLOv85.SQLite36.多线程7.入侵人员检测8.ROI区域 三&#xff0e;核心功能1.登录注册1.登录2.注册 2.主界面1.主界面简介2.数据输入3.参数配置4.告警配置5.操作控制台6.核心内容显示区域7.检…...

.Net Framework 4/C# 面向对象编程进阶

一、继承 (一)使用继承 子类可以继承父类原有的属性和方法,也可以增加原来父类不具备的属性和方法,或者直接重写父类中的某些方法。 C# 中使用“:”来表示两个类的继承。子类不能访问父类的私有成员,但是可以访问其公有成员,即只要使用 public 声明类成员,就既可以让一…...