【大数据学习 | Spark-SQL】定义UDF和DUAF,UDTF函数
1. UDF函数(用户自定义函数)
一般指的是用户自己定义的单行函数。一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值。比如MySQL中的,日期相关的dateDiff函数,字符串相关的substring函数。
先准备数据:
1.1 导入必要的包
首先,确保导入必要的Spark包:
import org.apache.spark.sql.SparkSession
1.2 创建SparkSession
创建一个SparkSession对象,这是与Spark交互的入口。
1.3 定义UDF并注册到SparkSQL
定义一个Scala函数,并将其注册为UDF。示例
1.4 使用UDF在SQL查询中:
调用udf的register方法,第一个参数是udf函数的函数名,第二个参数是要注册为UDF的函数。
session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})
1.5 代码:
尽量使用SparkSQL的sql形式的写法,api写法太麻烦了。
object TestUDF{def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local[*]").appName("testUDF").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3).toInt)}).toDF("id", "name", "salary", "bonus")session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})import org.apache.spark.sql.functions
// df.withColumn("all",functions.callUDF("all_income",$"salary",$"bonus"))
// .select("id","name","all")
// .show()df.createTempView("salary")session.sql("""|select id,name,all_income(salary,bonus) all from salary|""".stripMargin).show()}
}
输出:
2. UDAF(用户自定义的聚合函数)
指的是用户自定义的聚合函数,多进一出,比如MySQL中的,count函数,avg函数。
以学生信息为主进行统计,所有人员的年龄的总和
或者每个性别的年龄的平均值
计算所有人的年龄之和:
package com.atguigu.bigdata.testimport org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator/*** ClassName : TestUDAF* Package : com.atguigu.bigdata.test* Description** @Author HeXua* @Create 2024/11/29 19:09* Version 1.0*/
object TestUDAF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udaf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3))}).toDF("id", "name", "age", "gender")import org.apache.spark.sql.functions._// 注册udaf函数session.udf.register("mysum",udaf(new MySum))df.createTempView("student")session.sql("""|select mysum(age) from student|""".stripMargin).show()}
}
// udaf的类继承Aggregator抽象类
class MySum extends Aggregator[Int,Int,Int]{//初始化def zero: Int = 0//聚合逻辑def reduce(b: Int, a: Int): Int = a+b//整体聚合def merge(b1: Int, b2: Int): Int = b1+b2//最终返回值def finish(reduction: Int): Int = reduction//累加值的类型def bufferEncoder: Encoder[Int] = Encoders.scalaInt//输出结果的类型def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
定义用户自定义聚合函数时,继承Aggregator类需要指定三个泛型参数。这三个泛型参数分别代表不同的概念。
泛型参数解释:
1. 输入类型(IN)
这是聚合函数的输入类型,即每次调用reduce方法时传入的单个元素的类型。例如你要计算一组整数的平均值,输入类型就是int。
2. 缓冲区类型(BUFFER)
这是聚合函数的中间状态类型,也称为缓冲区类型。
例如你要计算一组整数的平均值,缓冲区可能包含两个字段:总和和计数,因为iBUF可能是一个元组。
3. 输出类型(OUT)
这是聚合函数的最终输出类型,即finish方法返回的类型。例如你要计算平均值,最终输出类型是Double。
方法解释:
zero:初始化缓冲区的值,对于平均值计算,初始化和计数都是0。
reduce:更新缓冲区,每次传入一个新的输入值时,更新总和和计数。
finish:计算最终结果,根据缓冲区中的总和和计数,计算平均值。
bufferEncoder:定义缓冲区类型的编码器,用于序列化和反序列化缓冲区。
outputEncoder:定义最终输出类型的编码器,用于序列化和反序列化输出结果。
计算每个性别的年龄的平均值:
case class AggragateVo(var cnt:Int,var sum:Int)
object MyAvg extends Aggregator[Int,AggragateVo,Double]{override def zero: AggragateVo = AggragateVo(0,0)override def reduce(b: AggragateVo, a: Int): AggragateVo = {b.cnt += 1b.sum += ab}override def merge(b1: AggragateVo, b2: AggragateVo): AggragateVo = {b1.cnt += b2.cntb1.sum += b2.sumb1}override def finish(reduction: AggragateVo): Double = {reduction.sum.toDouble /reduction.cnt}override def bufferEncoder: Encoder[AggragateVo] = Encoders.productoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
3. UDTF(用户自定义炸裂函数)
拆分函数,进入的是一行内容出现的结果是多行内容。
spark中并不直接支持UDTF函数。但可以使用hive中的炸裂函数达到效果。
import org.apache.spark.sql.SparkSessionobject TestUDTF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udtf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/m.txt").map(t => {val strs = t.split(",")(strs(0), strs(1), strs(2))}).toDF("id", "name", "actors")//explode map arraydf.createTempView("movies")session.sql("""|select id,name,actor from movies lateral view explode(split(actors,'\\|')) t as actor|""".stripMargin).createTempView("movies1")session.sql("""|select count(1),actor from movies1 group by actor|""".stripMargin).show()}
}
相关文章:

【大数据学习 | Spark-SQL】定义UDF和DUAF,UDTF函数
1. UDF函数(用户自定义函数) 一般指的是用户自己定义的单行函数。一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值。比如MySQL中的,日期相关的dateDiff函数,字符串相关的substring函数。 先…...

#Java-JDK7、8的时间相关类,包装类
1. JDK7-Date类 我们先来看时间的相关知识点 世界标准时间: 格林尼治时间/格林威治时间(Greenwich Mean Time)简称GMT。目前世界标准时间(UTC)已经替换为:原子钟中国标准时间: 世界标准时间8小时 时间单位换算: 1秒1000毫秒 1毫秒1000微秒 1微秒1000纳秒 Date类 Date类…...
tc 命令
Windows Network Shaper目前只能在win10及以下版本使用,在github上有源码。 iperf 是一个网络性能测试工具,可以测试网络带宽和延迟。 webrtc M96版本的GCC sudo tc qdisc del dev eth1 root //关闭限速 sudo tc qdisc add dev eth1 root handle 1: ht…...

基于Java Springboot 协同过滤算法音乐推荐系统
一、作品包含 源码数据库设计文档万字全套环境和工具资源部署教程 二、项目技术 前端技术:Html、Css、Js、Vue2、Element-ui 数据库:MySQL 后端技术:Java、Spring Boot、MyBatis 三、运行环境 开发工具:IDEA 数据库&#x…...
MyBatis框架-关联映射
MyBatis关联映射-一对一 1.1 实体关系 实体–数据实体,实体关系指的就是数据与数据之间的关系 例如:订单和商品,用户和角色 实体关系分为以下四种: **一对一关联:**用户表和用户详情表 数据表关系: 主键关…...

Web开发技术栈选择指南
互联网时代的蓬勃发展,让越来越多人投身软件开发领域。面对前端和后端的选择,很多初学者往往陷入迷茫。让我们一起深入了解这两个领域的特点,帮助你做出最适合自己的选择。 在互联网发展的早期,前端开发主要负责页面布局和简单的…...
工具类的魔力:深入理解 Java 的 String、Math 和 Arrays
Java 提供了许多实用的工具类,帮助开发者简化代码,提升效率。这些工具类包含了各种常见的操作,比如字符串处理、数学计算、数组操作等。掌握这些工具类的高效使用方法,不仅能让你写出更简洁、优雅的代码,还能在性能上有…...
Linux下一次性关闭多个同名进程
要一次性关闭多个同名的 Python 进程,例如: 你可以使用以下几种方法。在执行这些操作之前,请务必确认这些进程确实是你希望终止的,以避免意外关闭其他重要的进程。 方法一:使用 pkill 命令 pkill 是一个用于根据名称…...
记录一些虚拟机桥接网络,windows网络遇到的小问题
1 virtual box 桥接的虚拟系统无 ipv4 地址 https://blog.csdn.net/qq_44847649/article/details/122582954 原因是 wlan 无线网卡没开共享给 virtual box host only (之前用过 vmware 也类似) 2 无法两台 windows10 物理机无法相互 ping 通 https://blog.csdn.net/qq_35…...
MATLAB —— 机械臂工作空间,可达性分析
系列文章目录 前言 本示例展示了如何使用可操作性指数对不同类型的机械手进行工作空间分析。工作空间分析是一种有用的工具,可用于确定机器人工作空间中最容易改变末端效应器位置和方向的区域。本示例的重点是利用不同的可操控性指数类型来分析各种机械手的工作空间。了解工作…...

18:(标准库)DMA二:DMA+串口收发数据
DMA串口收发数据 1、DMA串口发送数据2、DMA中断串口接收定长数据包3、串口空闲中断DMA接收不定长数据包4、串口空闲中断DMA接收不定长数据包DMA发送数据包 1、DMA串口发送数据 当串口的波特率大于115200时,可以通过DMA1进行数据搬运,以防止数据的丢失。如…...

【C++】 算术操作符与数据类型溢出详解
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯C 算术操作符详解基本算术操作符整数除法与取模行为类型转换在算术运算中的作用自增与自减操作符 💯数值溢出:当值超出类型范围时数据类型的取值范围…...

柔性芯片:实现万物互联的催化剂
物联网 (IoT) 市场已经非常成熟,麦肯锡预测,物联网将再创高峰,到 2030 年将达到 12.5 万亿美元的估值。然而,万物互联 (IoE) 的愿景尚未实现,即由数十亿台智能互联设备组成,提供大规模洞察和效率。 究竟是…...

FFmpeg 简介与编译
1. ffmpeg 简介: FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。采用LGPL或GPL许可证。它提供了录制、转换以及流化音视频的完整解决方案。它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移…...

低代码与微服务融合在医疗集团中的补充应用探究
摘要 本论文深入探讨了低代码与微服务融合在医疗系统集群中的应用。分析了其优势,包括提高开发效率、降低技术门槛、灵活适应需求变化和易于维护扩展等;阐述了面临的挑战,如数据安全与隐私保护、技术应用复杂性等;并展望了其在医…...
速盾:高防cdn的搜索引擎回源是什么?
高防CDN(Content Delivery Network)是一种用于加速网站访问速度和增加安全性的服务,它通过将静态和动态内容缓存在全球分布的服务器上,从而将用户请求的响应时间降至最低,并提供有效的防御攻击的能力。在实际使用过程中…...

减少电路和配电系统谐波的五种方法
K 级变压器 ANSI 标准 C57.110-1986 定义了 K 系数来评估电路消耗多少谐波电流并确定该谐波电流的热效应。根据电路 K 系数,变压器按 K 等级制造。值得注意的是,K 级变压器不会减少谐波。K 等级表示变压器承受谐波有害影响的相对能力。K级变压器增加了铁…...

基于Java Springboot Vue3图书管理系统
一、作品包含 源码数据库设计文档万字全套环境和工具资源部署教程 二、项目技术 前端技术:Html、Css、Js、Vue3、Element-ui 数据库:MySQL 后端技术:Java、Spring Boot、MyBatis 三、运行环境 开发工具:IDEA 数据库&#x…...

vue-cli项目质量约束配置
同步发布于我的网站 🚀 package.json scriptslint-stageddevDependencies git-hooksno-eslintdevDependencies - scssdevDependencies - lessengines pre-commit.eslintrc.js.stylelintrc scssless vue.config.jsREADME.md package.json scripts "scripts&…...

第七课 Unity编辑器创建的资源优化_UI篇(UGUI)
上期我们学习了简单的Scene优化,接下来我们继续编辑器创建资源的UGUI优化 UI篇(UGUI) 优化UGUI应从哪些方面入手? 可以从CPU和GPU两方面考虑,CPU方面,避免触发或减少Canvas的Rebuild和Rebatch,…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
Java 语言特性(面试系列2)
一、SQL 基础 1. 复杂查询 (1)连接查询(JOIN) 内连接(INNER JOIN):返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...

聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
uniapp 字符包含的相关方法
在uniapp中,如果你想检查一个字符串是否包含另一个子字符串,你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的,但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

逻辑回归暴力训练预测金融欺诈
简述 「使用逻辑回归暴力预测金融欺诈,并不断增加特征维度持续测试」的做法,体现了一种逐步建模与迭代验证的实验思路,在金融欺诈检测中非常有价值,本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...
Bean 作用域有哪些?如何答出技术深度?
导语: Spring 面试绕不开 Bean 的作用域问题,这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开,结合典型面试题及实战场景,帮你厘清重点,打破模板式回答,…...