当前位置: 首页 > news >正文

sparkSQL的UDF,最常用的regeister方式自定义函数和udf注册方式定义UDF函数 (详细讲解)

- UDF:一对一的函数【User Defined Functions】
  - substr、split、concat、instr、length、from_unixtime
- UDAF:多对一的函数【User Defined Aggregation Functions】 聚合函数
  - count、sum、max、min、avg、collect_set/list
- UDTF:一对多的函数【User Defined Tabular Functions】
  - explode、json_tuple【解析JSON格式】、parse_url_tuple【解析URL函数】

Spark中支持UDF和UDAF两种,支持直接使用Hive中的UDF、UDAF、UDTF.

pyspark中自定义函数的三种写法:

使用最常用的regeister方式自定义函数 

 最常用的方式,这种方式编写的函数,既能用于SQL中,也能用于DSL中

语法:

UDF变量名 = spark.udf.register(UDF函数名, 函数的处理逻辑)

定义:spark.udf.register()
UDF变量名:DSL中调用UDF使用的
UDF函数名:SQL中调用UDF使用

案例:

查看以下数据

id    name    msg
01    周杰伦    150/175
02    周杰    130/185
03    周华健    148/178
04    周星驰    130/175
05    闫妮    110/180

将以上数据,通过自定义函数,变为如下数据:
01    周杰伦     150斤/175cm
02    周杰      130斤/185cm
03    周华健  148斤/178cm

 

第一步 :自定义函数 

# 编写一个普通的函数,用于写逻辑
def get_data(str1):
    list1 = str1.split("/")
    return list1[0] + "斤/" + list1[1] + "cm" 

 第二步:注册函数

# 定义一个UDF:变量名-dsl = spark.udf.register(函数名-sql, 处理逻辑, 返回值)
    # get_new_info 用于 sql 中
    # get_info 用于DSL
    get_info = spark.udf.register(name="get_new_info", f=lambda oldinfo: get_data(oldinfo))

第三步:使用函数

#使用sql的方式调用
    spark.sql("select id,name,get_new_info(msg) from star").show()

    # 使用dsl的方式调用
    # DSL:用变量名
    import pyspark.sql.functions as F

    new_df.select(F.col("id"), F.col("name"), F.col("msg"), get_info(F.col("msg")).alias("newinfo")).show()

代码演示 以及解释

import osfrom pyspark.sql import SparkSessionimport pyspark.sql.functions as F
if __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:/java/jdk'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 第一种方案# format("csv") 是读取文件格式,后面是文件路径  toDF("id","name","msg")是把文件里面的数据源变成指定的字段df = spark.read.format("csv").option("sep", "\t").load("../../datas/function/udf.txt").toDF("id", "name","msg") df.createOrReplaceTempView("t") #给数据源起个名字# 编写sqlspark.sql("""select id,name,concat(split(msg,"/")[0],'斤/',split(msg,"/")[1],'cm')msg from t   """).show()# 第二种方案 自定义函数#第一步 定义函数def my_function(msg):return msg.split("/")[0] + "斤/" + msg.split("/")[1] + "cm"# 第二步注册函数my_function2 = spark.udf.register("my_function",my_function)# 第三步调用函数spark.sql("""select id,name,my_function(msg) msg from t""").show()# 自定义函数DSL使用 registerdf.select(F.col("id"),F.col("name"),my_function2(F.col("msg"))).show()spark.stop()

 

 

相关文章:

sparkSQL的UDF,最常用的regeister方式自定义函数和udf注册方式定义UDF函数 (详细讲解)

- UDF:一对一的函数【User Defined Functions】 - substr、split、concat、instr、length、from_unixtime - UDAF:多对一的函数【User Defined Aggregation Functions】 聚合函数 - count、sum、max、min、avg、collect_set/list - UDTF:…...

【Ubuntu20】VSCode Python代码规范工具配置 Pylint + Black + MyPy + isort

​ 常用工具: 在 Ubuntu20 下,有以下常见的 Python 代码工具: 静态分析工具: Pylint 和 Flake8 功能范围:Pylint功能非常强大,能够检查代码质量、潜在错误、代码风格、复杂度等多个方面, 并生成详细的报…...

游戏提示错误:xinput1_3.dll缺失?四种修复错误的xinput1_3.dll文件

在计算机的运行过程中,我们可能会遇到各种各样的问题,其中与“xinput1_3.dll”相关的问题也并不罕见。“xinput1_3.dll”是一个在许多游戏和多媒体应用程序运行过程中可能会用到的动态链接库文件。当我们启动某些游戏时,可能会突然弹出一个错…...

YOLOv11融合IncepitonNeXt[CVPR2024]及相关改进思路

YOLOv11v10v8使用教程: YOLOv11入门到入土使用教程 一、 模块介绍 论文链接:https://arxiv.org/abs/2303.16900 代码链接:https://github.com/sail-sg/inceptionnext 论文速览:受 ViT 长距离建模能力的启发,大核卷积…...

[Web安全 网络安全]-学习文章汇总导航(持续更新中)

文章目录: 一:学习路线资源 1.路线 2.资源 二:工具 三:学习笔记 1.基础阶段 2.进阶阶段 四:好的参考 五:靶场 博主对网络安全很感兴趣,但是不知道如何取学习,自己一步一步…...

Docker Compose部署Rabbitmq(Docker file安装延迟队列)

整个工具的代码都在Gitee或者Github地址内 gitee:solomon-parent: 这个项目主要是总结了工作上遇到的问题以及学习一些框架用于整合例如:rabbitMq、reids、Mqtt、S3协议的文件服务器、mongodb github:GitHub - ZeroNing/solomon-parent: 这个项目主要是…...

SpringBoot+FileBeat+ELK8.x版本收集日志

一、准备环境 1、ElasticSearch:8.1.0 2、FileBeat:8.1.0 3、Kibana:8.1.0 4、logstach:8.1.0 本次统一版本:8.1.0,4个组件,划分目录,保持版本一致。 说明:elasticsearch和kib…...

本地模型导入ollama

文章目录 Modelfile模板导入到 ollama Modelfile模板 在本地模型目录下创建 Modelfile FROM ./qwen2.5-7b-instruct-q4_k_m.gguf# 设定温度参数为1 [更高的更具有创新性,更低的更富有连贯性] PARAMETER temperature 1 # 将上下文窗口大小设置为4096,这…...

scala Map训练

Map实训内容: 1.创建一个可变Map,用于存储图书馆中的书籍信息(键为书籍编号,值为包含书籍名称、作者、库存数量的元组),初始化为包含几本你喜欢的书籍信息。 2.使用 操作符添加两本新的书籍到图书馆集合中。 3.根据书籍编号查询某一本特定的书籍信息&…...

WorkFlow源码剖析——Communicator之TCPServer(下)

WorkFlow源码剖析——Communicator之TCPServer(下) 前言 系列链接如下: WorkFlow源码剖析——GO-Task 源码分析 WorkFlow源码剖析——Communicator之TCPServer(上) WorkFlow源码剖析——Communicator之TCPServer&…...

数据结构与算法分析:专题内容——动态规划2之例题讲解(代码详解+万字长文+算法导论+力扣题)

一、最长公共子序列 在生物应用中,经常需要比较两个(或多个)不同生物体的 DNA。一个 DNA 串由一串称为碱基(base)的分子组成,碱基有腺嘌呤、鸟嘌呤、胞嘧啶和胸腺嘧啶 4 种类型。我们用英文单词首字母表示 4 种碱基,这样就可以将一个 DNA 串…...

【Qt】QTreeView 和 QStandardItemModel的关系

QTreeView 和 QAbstractItemModel(通常是其子类,如 QStandardItemModel 或自定义模型)是 Qt 框架中的两个关键组件,它们之间存在密切的关系。 关系概述 QTreeView: QTreeView 是一个用于显示和编辑层次数据的视图小部…...

containerd配置私有仓库registry

机器ip端口regtisry192.168.0.725000k8s-*-------k8s集群 1、镜像上传 rootadmin:~# docker push 192.168.0.72:5000/nginx:1.26.1-alpine The push refers to repository [192.168.0.72:5000/nginx] 6961f0b8531c: Pushed 3112cd521249: Pushed d3f50ce9b5b5: Pushed 9efaf2eb…...

深入解析语音识别中的关键技术:GMM、HMM、DNN和语言模型

目录 一、高斯混合模型(GMM)与期望最大化(EM)算法二、隐马尔可夫模型(HMM)三、深度神经网络(DNN)四、语言模型(LM)五、ASR系统的整体工作流程结论 在现代语音…...

C++循环引用

C循环引用‌指的是两个或多个类之间互相引用对方,形成一个循环的引用关系。 循环引用的问题: 编译错误‌:编译器在编译过程中会按照包含关系依次编译每个文件,当编译ClassA时,它会尝试包含ClassB.h文件,而…...

dayseven-因果分析-图模型与结构因果模型

在数学上,​“图”(graph)是顶点(vertex,也可以称为节点)和边(edge)的集合,表示为图G(V,E),其中V是节点的集合,E是边的集合,图中的节点之间通过边相连(也可以不相连&…...

并发编程(8)—— std::async、std::future 源码解析

文章目录 八、day81. std::async2. std::future2.1 wait()2.2 get() 八、day8 之前说过,std::async内部的处理逻辑和std::thread相似,而且std::async和std::future有密不可分的联系。今天,通过对std::async和std::future源码进行解析&#x…...

稻米分类和病害检测数据集(猫脸码客 第237期)

稻米分类图像数据集:推动农业智能化发展的关键资源 在农业领域,稻米作为世界上最重要的粮食作物之一,其品种繁多,各具特色。然而,传统的稻米分类方法往往依赖于人工观察和经验判断,不仅耗时费力&#xff0…...

HANDLINK ISS-7000v2 网关 login_handler.cgi 未授权RCE漏洞复现

0x01 产品简介 瀚霖科技股份有限公司ISS-7000 v2网络网关服务器是台高性能的网关,提供各类酒店网络认证计费的完整解决方案。由于智慧手机与平板电脑日渐普及,人们工作之时开始使用随身携带的设备,因此无线网络也成为网络使用者基本服务的项目。ISS-7000 v2可登录300至1000…...

基于Multisim串联型连续可调直流稳压正电源电路设计与仿真

设计任务和要求: (1)输出直流电压 1.5∽10V 可调; (2)输出电流 IOm300mA;(有电流扩展功能) (3)稳压系数 Sr≤0.05; (4&…...

Flask RESTful 示例

目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...

css实现圆环展示百分比,根据值动态展示所占比例

代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

JVM垃圾回收机制全解析

Java虚拟机&#xff08;JVM&#xff09;中的垃圾收集器&#xff08;Garbage Collector&#xff0c;简称GC&#xff09;是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象&#xff0c;从而释放内存空间&#xff0c;避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

SpringTask-03.入门案例

一.入门案例 启动类&#xff1a; package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...