SparkSql---用户自定义函数UDFUDAF
文章目录
- 1.UDF
- 2.UDAF
- 2.1 UDF函数实现原理
- 2.2需求:计算用户平均年龄
- 2.2.1 使用RDD实现
- 2.2.2 使用UDAF弱类型实现
- 2.2.3 使用UDAF强类型实现
1.UDF
用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能。
如:实现需求在用户name前加上"Name:"字符串,并打印在控制台
def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._//创建DataFrameval dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24)))val dataframe = dataRDD.toDF("name","age")//注册udf函数sc.udf.register("addName",(x:String)=>"Name:"+x)//创建临时视图dataframe.createOrReplaceTempView("people")//对临时视图使用udf函数sc.sql("select addName(name) from people").show()sc.stop()}

2.UDAF
强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。**通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。**从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator。
2.1 UDF函数实现原理

在Spark中,UDF(用户自定义函数)在对表中的数据进行处理时,通常会将数据放入缓冲区中以便进行计算。这种缓冲策略可以提高数据处理的效率,特别是对于大数据集。
2.2需求:计算用户平均年龄
2.2.1 使用RDD实现
val dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24),("wangwu",26)))val reduceResult: (Int, Int) = dataRDD.map({case (name, age) => {(age, 1)}}).reduce((t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})println(reduceResult._1/reduceResult._2)

2.2.2 使用UDAF弱类型实现
需要用户自定义类实现UserDefinedAggregateFunction,并重写其中的方法,当前已不推荐使用。
package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo02 {def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))val dataFrame: DataFrame = dataRDD.toDF("name","age")dataFrame.createOrReplaceTempView("user")//创建聚合函数var myAvg=new MyAverageUDAF()//在Spark中注册自定义的聚合函数sc.udf.register("avgMy",myAvg)sc.sql("select avgMy(age) from user").show()sc.stop()}case class User(var name:String,var age:Int)}class MyAverageUDAF extends UserDefinedAggregateFunction{//输入的要进行聚合的参数的类型override def inputSchema: StructType = StructType(Array(StructField("age",IntegerType)))//聚合函数缓冲区中的值的数据类型override def bufferSchema: StructType = StructType(Array(StructField("sum",LongType),StructField("count",LongType)))//函数返回的值的数据类型override def dataType: DataType = DoubleType//判断函数的稳定性//对于相同类型的输入是否有相同类型的输出override def deterministic: Boolean = true//聚合函数缓冲区中值的初始化//因为数据是弱类型的,函数缓冲区中是根据索引来找到对应的变量override def initialize(buffer: MutableAggregationBuffer): Unit = {//年龄的总和buffer(0)=0L//年龄的个数buffer(1)=0L}//更新缓冲区中的数据(执行操作步骤)override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={//第0个索引值是否为空if(!input.isNullAt(0)) {//更新年龄sum的值buffer(0)=buffer.getLong(0)+input.getInt(0)//更新年龄个数buffer(1)=buffer.getLong(1)+1;}}//合并缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Double = {buffer.getLong(0).toDouble / buffer.getLong(1)}
}

2.2.3 使用UDAF强类型实现
Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction
package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo03 {def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))val dataFrame: DataFrame = dataRDD.toDF("name","age")val dataset: Dataset[User01] = dataFrame.as[User01]//创建聚合函数var myAvg=new MyAverageUDAF01()//将聚合函数转换为查询的列val col: TypedColumn[User01, Double] = myAvg.toColumn//执行查询操作dataset.select(col).show()sc.stop()}case class User(var name:String,var age:Int)}//输入数据类型
case class User01(var name:String,var age:Int)
//缓存中的数据类型
case class AgeBuffer(var sum:Long,var count:Long)class MyAverageUDAF01 extends Aggregator[User01,AgeBuffer,Double]{//设置初始值override def zero: AgeBuffer = {AgeBuffer(0L,0L)}//缓冲区实现聚合override def reduce(b: AgeBuffer, a: User01): AgeBuffer = {b.sum = b.sum + a.ageb.count = b.count + 1b}//合并缓冲区override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {b1.sum+=b2.sumb1.count+=b2.countb1}//计算最终结果override def finish(buff: AgeBuffer): Double = {buff.sum.toDouble/buff.count}//设置编码器和解码器//自定义类型就是 product 自带类型根据类型选择override def bufferEncoder: Encoder[AgeBuffer] = {Encoders.product}override def outputEncoder: Encoder[Double] = {Encoders.scalaDouble}
}

相关文章:
SparkSql---用户自定义函数UDFUDAF
文章目录 1.UDF2.UDAF2.1 UDF函数实现原理2.2需求:计算用户平均年龄2.2.1 使用RDD实现2.2.2 使用UDAF弱类型实现2.2.3 使用UDAF强类型实现 1.UDF 用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能。 如:实现需求在用户name前加上"Name:…...
系统架构15 - 软件工程(3)
软件过程模型 瀑布模型特点缺点 原型化模型特点两个阶段不同类型注意 螺旋模型V 模型特点 增量模型特点 喷泉模型基于构件的开发模型(CBSD)形式化方法模型敏捷模型特点“适应性” (adaptive) 而非“预设性” (predictive)“面向人的” (People-oriented) 而非“面向过程的” (P…...
两个近期的计算机领域国际学术会议(软件工程、计算机安全):欢迎投稿
近期,受邀担任两个国际学术会议的Special session共同主席及程序委员会成员(TPC member),欢迎广大学界同行踊跃投稿,分享最新研究成果。期待这个夏天能够在夏威夷檀香山或者加利福尼亚圣荷西与各位学者深入交流。 SERA…...
(二十一)Flask之上下文管理第二篇(细细扣一遍源码)
每篇前言: 🏆🏆作者介绍:【孤寒者】—CSDN全栈领域优质创作者、HDZ核心组成员、华为云享专家Python全栈领域博主、CSDN原力计划作者 🔥🔥本文已收录于Flask框架从入门到实战专栏:《Flask框架从入…...
Java项目:基于SSM框架实现的企业员工岗前培训管理系统(ssm+B/S架构+源码+数据库+毕业论文)
一、项目简介 本项目是一套ssm821基于ssm框架实现的企业员工岗前培训管理系统,主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含:项目源码、数据库脚本等,该项目附带全部源码可作为毕设使用。 项目都经过严格…...
深入了解Redis:选择适用于你的场景的持久化方案
自然语言处理的发展 文章目录 自然语言处理的发展强烈推荐前言:Redis提供了几种主要的持久化方案:RDB快照持久化:工作原理: AOF日志文件持久化:混合持久化: 总结强烈推荐专栏集锦写在最后 强烈推荐 前些天…...
【Git配置代理】Failed to connect to github.com port 443 问题解决方法
前言: 在学习代码审计时,有时会需要使用git去拉取代码,然后就出现了如下错误 看过网上很多解决方法,觉得问题的关键还是因为命令行在拉取/推送代码时并没有使用VPN进行代理。 解决办法 : 配置http代理:…...
python提取word文档内容的示例
一、微软Word历史、背景: Word 的特异功能就是把那些应该写成简单的 TXT 或 PDF 格式的文件,变成了既大又慢且难以打开的怪兽,它们经常在系统切换和版本切换中出现格式不兼容,而且因为某些原因在文件内容已经定稿后仍处于可编辑的…...
MarkDown快速入门-以Obsidian编辑器为例
直接上图,左右对应。 首先是基础语法。 # 标题,几个就代表几级标题;* 单个是序号,两个在一起就是斜体;- [ ] 代表任务,注意其中的空格; 然后是表格按钮代码 | 使用中竖线代表表格,…...
【计算机网络】协议,电路交换,分组交换
定义了在两个或多个通信实体之间交换的报文格式和次序,以及报文发送和/或接收一个报文或其他事件所采取的动作.网络边缘: 端系统 (因为处在因特网的边缘) 主机 端系统 客户 client服务器 server今天大部分服务器都属于大型数据中心(data center)接入网(access network) 指将端…...
加速应用开发:低代码云SaaS和源码交付模式如何选
随着数字化转型的加速,企业对于快速开发和交付高质量应用的需求也越来越迫切。为了满足这一需求,开发者们开始探索采用低代码平台进行软件开发工作,以加速应用开发过程。 目前,市场上的低代码产品众多,但基本可分为简单…...
ATT汇编
指令后缀 AT&T格式的汇编指令有不同的后缀 其中 b表示byte,字节 w表示word,字/两字节 l表示long,32位系统下的long是4字节 q表示quad,意味四重,表示4个字/8字节 寄存器用途 参见 AT&T的汇编世界 - Gemfield…...
java split 拆分字符串
今天突然把java里split 跟,kotlin中的split 弄混了 kotlin中split 的用法跟python 中的split 用法是一样的,java中由于返回值是String[] 的数组,所以 在使用的时候需要注意下返回值如果要获取里面的内容,还是需要遍历下里面的内…...
【InternLM 大模型实战】作业与笔记汇总
笔记1:https://blog.csdn.net/weixin_42567071/article/details/135375937 笔记2:https://blog.csdn.net/weixin_42567071/article/details/135423120 作业2:https://github.com/xiaomile/InternLM-homework/tree/main/%E4%BD%9C%E4%B8%9A1 笔…...
解析PreMaint在石油化工设备预测性维护领域的卓越表现
石油化工行业一直在寻找能够确保设备高效运行的先进维护解决方案。在这个领域,PreMaint以其卓越的性能和创新的技术引起了广泛关注。 一、为何选择预测性维护? 传统的维护方法,基于固定的时间表,无法灵活应对设备的真实运行状况。…...
C++面试宝典第25题:阶乘末尾零的个数
题目 给定一个整数n,返回n!(n的阶乘)结果尾数中零的个数。 示例 1: 输入:3 输出:0 解释:3! = 6,尾数中没有零。 示例 2: 输入:5 输出:1 解释:5! = 120,尾数中有1个零。 解析 这道题主要考察应聘者对于数学问题的分析和理解能力,以及在多个解决方案中,寻求最优…...
PCIE 4.0 Equalizaiton(LTSSM 均衡流程)
1. 均衡 在Tx端有FFE(Feed Forward Equalizer,前馈均衡器);在Rx端有:CTLE(Continuous Time Linear Equalizer,连续时间线性均衡器)和DFE(Decision Feedback Equalizer&a…...
[libwebsockets]lighttpd+libwebsockets支持ws和wss配置方法说明
libwebsockets介绍 libwebsockets是一款轻量级用来开发服务器和客户端的C库。它不仅支持ws,wss还同时支持http与https,可以轻轻松松结合openssl等库来实现ssl加密。 官方参考链接: https://libwebsockets.org/ lighttpd版本 lighttpd/1.4.59 (ssl) - a light and fast w…...
常用软件安装
服务器版本为Centos7.8 x86_64 1.yum下载提速 1.wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 2. yum clean all 3.yum makecache2.jdk yum install java-1.8.0-openjdk* -y # yum update 时自动更新jdk版本 1.yum -y install …...
翻译: GPT-4 Vision静态图表转换为动态数据可视化 升级Streamlit 三
GPT-4 Vision 系列: 翻译: GPT-4 with Vision 升级 Streamlit 应用程序的 7 种方式一翻译: GPT-4 with Vision 升级 Streamlit 应用程序的 7 种方式二 1. 将任何静态图表转换为动态数据可视化 ChatGPT Vision 不仅可以将涂鸦变成功能齐全的 Streamlit 应用程序,还…...
Java 语言特性(面试系列2)
一、SQL 基础 1. 复杂查询 (1)连接查询(JOIN) 内连接(INNER JOIN):返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...
五年级数学知识边界总结思考-下册
目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...
Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP
编辑-虚拟网络编辑器-更改设置 选择桥接模式,然后找到相应的网卡(可以查看自己本机的网络连接) windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置,选择刚才配置的桥接模式 静态ip设置: 我用的ubuntu24桌…...
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...
处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...
