当前位置: 首页 > news >正文

Spark 7:Spark SQL 函数定义

SparkSQL 定义UDF函数

8e9d7ddcc2f04a4fa3d0b7e78921b4f8.png

21cc4f7f05aa4c4d8e9db15c1d271cb1.png

方式1语法:
udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
方式2语法:
udf对象 = F.udf(参数1, 参数2)
参数1:被注册成UDF的方法名
参数2:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
其中F是:
from pyspark.sql import functions as F
其中,被注册成UDF的方法名是指具体的计算方法,如:
def add(x, y): x + y
add就是将要被注册成UDF的方法名

8dee25a8da254eec809eb0c8a5d95baf.png  

# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 构建一个RDDrdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])df = rdd.toDF(["num"])# TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用# UDF的处理函数def num_ride_10(num):return num * 10# 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格# 参数2: UDF的处理逻辑, 是一个单独的方法# 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致# 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法# 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())# SQL风格中使用# selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)# select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算df.selectExpr("udf1(num)").show()# DSL 风格中使用# 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象df.select(udf2(df['num'])).show()# TODO 2: 方式2注册, 仅能用于DSL风格udf3 = F.udf(num_ride_10, IntegerType())df.select(udf3(df['num'])).show()df.selectExpr("udf3(num)").show()

2562d46c19ee47c3b20f47fea545fb36.png

af9b0504fc474fd8896cb20f93094922.png 

# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 构建一个RDDrdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])df = rdd.toDF(["line"])# 注册UDF, UDF的执行函数定义def split_line(data):return data.split(" ")  # 返回值是一个Array对象# TODO1 方式1 构建UDFudf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))# DLS风格df.select(udf2(df['line'])).show()# SQL风格df.createTempView("lines")spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)# TODO 2 方式2的形式构建UDFudf3 = F.udf(split_line, ArrayType(StringType()))df.select(udf3(df['line'])).show(truncate=False)

0e2c268c421848cfbc639a80fc067838.png

fe0861021c4d4be5a8b0566697bf5905.png 

# coding:utf8
import string
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 假设 有三个数字  1 2 3  我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回# 比如传入1 我们返回 {"num":1, "letters": "a"}rdd = sc.parallelize([[1], [2], [3]])df = rdd.toDF(["num"])# 注册UDFdef process(data):return {"num": data, "letters": string.ascii_letters[data]}"""UDF的返回值是字典的话, 需要用StructType来接收"""udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\add("letters", StringType(), nullable=True))df.selectExpr("udf1(num)").show(truncate=False)df.select(udf1(df['num'])).show(truncate=False)

2034e0d567664867a1c76c153620c99e.png  

c5a05c06e0c24009abe4b20cdc81ed7e.png

 SparkSQL 使用窗口函数

f3cf9802e8084fc4bf2518765b500397.png

7d43ea9fba3c479094ff69bb62cd99da.png

# coding:utf8
# 演示sparksql 窗口函数(开窗函数)
import string
from pyspark.sql import SparkSession
# 导入StructType对象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder. \appName("create df"). \master("local[*]"). \config("spark.sql.shuffle.partitions", "2"). \getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([('张三', 'class_1', 99),('王五', 'class_2', 35),('王三', 'class_3', 57),('王久', 'class_4', 12),('王丽', 'class_5', 99),('王娟', 'class_1', 90),('王军', 'class_2', 91),('王俊', 'class_3', 33),('王君', 'class_4', 55),('王珺', 'class_5', 66),('郑颖', 'class_1', 11),('郑辉', 'class_2', 33),('张丽', 'class_3', 36),('张张', 'class_4', 79),('黄凯', 'class_5', 90),('黄开', 'class_1', 90),('黄恺', 'class_2', 90),('王凯', 'class_3', 11),('王凯杰', 'class_1', 11),('王开杰', 'class_2', 3),('王景亮', 'class_3', 99)
])
schema = StructType().add("name", StringType()). \add("class", StringType()). \add("score", IntegerType())
df = rdd.toDF(schema)
# 窗口函数只用于SQL风格, 所以注册表先
df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
SELECT *, AVG(score) OVER() AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu
# 两个SQL的结果集进行整合而来
spark.sql("""
SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu GROUP BY class
# 两个SQL的结果集进行整合而来
# TODO 排序窗口
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, 
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, 
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# TODO NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()

SparkSQL支持UDF和UDAF定义,但在Python中,暂时只能定义UDF
UDF定义支持2种方式, 1:使用SparkSession对象构建. 2: 使用functions包中提供的UDF API构建. 要注意, 方式1可用DSL和SQL风格, 方式2 仅可用于DSL风格
SparkSQL支持窗口函数使用, 常用SQL中的窗口函数均支持, 如聚合窗口\排序窗口\NTILE分组窗口等

 

相关文章:

Spark 7:Spark SQL 函数定义

SparkSQL 定义UDF函数 方式1语法: udf对象 sparksession.udf.register(参数1,参数2,参数3) 参数1:UDF名称,可用于SQL风格 参数2:被注册成UDF的方法名 参数3:声明UDF的返回值类型 ud…...

ThinkPHP 文件上传 fileSystem 扩展的使用

ThinkPHP 文件上传 ThinkPHP 文件上传 扩展 filesystem一、安装 FileSystem 扩展二、认识 filesystem 配置文件 config/filesystem.php三、上传验证(涉及到验证器的知识点)四、文件上传demo ThinkPHP 文件上传 扩展 filesystem ThinkPHP 为我们 提供了 …...

液体神经网络LLN:通过动态信息流彻底改变人工智能

巴乌米克泰吉 一、说明 在在人工智能领域,神经网络已被证明是解决复杂问题的非常强大的工具。多年来,研究人员不断寻求创新方法来提高其性能并扩展其能力。其中一种方法是液体神经网络(LNN)的概念,这是一个利用动态计算…...

2023年的今天,PMP项目管理认证还值得考吗?

首先我肯定它值得考,PMP认证的教材和考纲都会随着项目管理工具和市场趋势而更新,不用担心会过时。 PMP项目管理认证是什么? 英文全称是Project Management Professional,中文全称叫做项目管理专业人士资格认证。它是由美国项目管…...

【JavaSE专栏91】Java如何主动发起Http、Https请求?

作者主页:Designer 小郑 作者简介:3年JAVA全栈开发经验,专注JAVA技术、系统定制、远程指导,致力于企业数字化转型,CSDN学院、蓝桥云课认证讲师。 主打方向:Vue、SpringBoot、微信小程序 本文讲解了如何使用…...

给oracle逻辑导出clob大字段、大数据量表提提速

文章目录 前言一、大表数据附:查询大表 二、解题思路1.导出排除大表的数据2.rowid切片导出大表数据Linux代码如下(示例):Windows代码如下(示例):手工执行代码如下(示例)&…...

研发规范第九讲:通用类命名规范(重点)

研发规范第九讲:通用类命名规范(重点) 无规范不成方圆。我自己非常注重搭建项目结构的起步过程,应用命名规范、模块的划分、目录(包)的命名,我觉得非常重要,如果做的足够好&#xff…...

python+django+协同过滤算法-基于爬虫的个性化书籍推荐系统(包含报告+源码+开题)

为了提高个性化书籍推荐信息管理的效率;充分利用现有资源;减少不必要的人力、物力和财政支出来实现管理人员更充分掌握个性化书籍推荐信息的管理;开发设计专用系统--基于爬虫的个性化书籍推荐系统来进行管理个性化书籍推荐信息,以…...

系统架构:软件工程

文章目录 资源知识点自顶向下与自底向上形式化方法结构化方法敏捷方法净室软件工程面向服务的方法面向对象的方法快速应用开发螺旋模型软件过程和活动开放式源码开发方法功用驱动开发方法统一过程模型RUP基于构件的软件开发UML 资源 信息系统开发方法 知识点 自顶向下与自底…...

泰迪大数据实训平台产品介绍

大数据产品包括:大数据实训管理平台、大数据开发实训平台、大数据编程实训平台等 大数据实训管理平台 泰迪大数据实训平台从课程管理、资源管理、实训管理等方面出发,主要解决现有实验室无法满足教学需求、传统教学流程和工具低效耗时和内部教学…...

Linux- 文件夹相关的常用指令

1. 统计文件夹下的文件数量 在 Linux 下&#xff0c;有几种方法可以统计文件夹下的文件数量&#xff1a; 使用 ls 和 wc 命令&#xff1a; 这种方式可以统计目录下的直接子文件&#xff08;不包括子目录里的文件&#xff09;。 ls -l <目录路径> | wc -l注意&#xff1a…...

在 macOS 中安装 TensorFlow 1g

tensorflow 需要多大空间 pip install tensorflow pip install tensorflow Looking in indexes: https://pypi.douban.com/simple/ Collecting tensorflowDownloading https://pypi.doubanio.com/packages/1a/c1/9c14df0625836af8ba6628585c6d3c3bf8f1e1101cafa2435eb28a7764…...

数学建模:CRITIC赋权法

&#x1f506; 文章首发于我的个人博客&#xff1a;欢迎大佬们来逛逛 CRITIC赋权法 算法流程 构建原始数据矩阵 X X X&#xff0c;他是一个 m ∗ n m * n m∗n 的矩阵&#xff0c; m m m 表示评价对象个数&#xff0c; n n n 表示指标个数对原始数据矩阵进行正向化处理计算…...

Facebook message tag 使用攻略

Messenger 讯息传不出去&#xff1f;无法发送FB 讯息给非好友&#xff1f; 2020年3月&#xff0c;Facebook 为了防止用户被过多的推广或垃圾讯息困扰而更新使用条款&#xff0c;现在商家要用FB传讯息给所有人&#xff08;包括非好友&#xff09;&#xff0c;应该使用 Facebook …...

气传导耳机哪个品牌比较好?综合表现很不错的气传导耳机推荐

​气传导耳机不仅能够提升幸福感还能听到周围环境声&#xff0c;大大提高安全性。如果你在寻找一款高品质的气传导耳机&#xff0c;又不知从何入手时&#xff0c;不要担心&#xff0c;我已经为你精心挑选了四款市面上综合表现很不错的气传导耳机&#xff0c;让你享受更好的音质…...

Rabbitmq的消息转换器

Spring会把你发送的消息序列化为字节发送给MQ&#xff0c;接收消息的时候&#xff0c;还会把字节反序列化为Java对象 ,只不过&#xff0c;默认情况下Spring采用的序列化方式是JDK序列化。众所周知&#xff0c;JDK序列化存在下列问题&#xff1a; 数据体积过大 有安全漏洞 可读…...

nvidia-docker的使用

拉取镜像 docker pull nvidia/cuda可能出现的问题 问题描述 Error response from daemon: manifest for nvidia/cuda:latest not found: manifest unknown: manifest解决方法&#xff1a; 为找到正确且合适的docker镜像版本 在supported-tags中找到与自己系统对应的cuda版本…...

C++新经典 | C语言

目录 一、基础之查漏补缺 1.float精度问题 2.字符型数据 3.变量初值问题 4.赋值&初始化 5.头文件之<> VS " " 6.逻辑运算 7.数组 7.1 二维数组初始化 7.2 字符数组 8.字符串处理函数 8.1 strcat 8.2 strcpy 8.3 strcmp 8.4 strlen 9.函数 …...

物联网智慧种植农业大棚系统

一、项目背景 智慧农业是是将物联网技术和农业生产箱管理的新型农业&#xff0c;依托部署在农业生产现场的各种传感节点&#xff0c;以物联网网关为通道形成数据传输网络&#xff0c;可以实现控制柜、环境监测传感器、气象监测机器等设备的远程监控&#xff0c;达到及时高校的…...

TabBar组件如何跳转页面?

1、先引入 2、假数据 const tabs [{key: home,title: 首页,icon: <AppOutline />,badge: Badge.dot,},{key: todo,title: 待办,icon: <UnorderedListOutline />,badge: 5,},{key: message,title: 消息,icon: (active: boolean) >active ? <MessageFill /&…...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解&#xff08;7大核心步骤实战技巧&#xff09; 一、爬虫完整工作流程 以下是爬虫开发的完整流程&#xff0c;我将结合具体技术点和实战经验展开说明&#xff1a; 1. 目标分析与前期准备 网站技术分析&#xff1a; 使用浏览器开发者工具&#xff08;F12&…...

【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分

一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计&#xff0c;提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合&#xff1a;各模块职责清晰&#xff0c;便于独立开发…...

laravel8+vue3.0+element-plus搭建方法

创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表

##鸿蒙核心技术##运动开发##Sensor Service Kit&#xff08;传感器服务&#xff09;# 前言 在运动类应用中&#xff0c;运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据&#xff0c;如配速、距离、卡路里消耗等&#xff0c;用户可以更清晰…...

(一)单例模式

一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...

如何应对敏捷转型中的团队阻力

应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中&#xff0c;明确沟通敏捷转型目的尤为关键&#xff0c;团队成员只有清晰理解转型背后的原因和利益&#xff0c;才能降低对变化的…...