Spark高级用法-自定义函数
用户可以根据需求自己封装计算的逻辑,对字段数据进行计算
内置函数,是spark提供的对字段操作的方法 ,split(字段) 对字段中的数进行切割,F.sum(字段) 会将该字段下的数据进行求和
实际业务中又能内置函数不满足计算需求,此时就需要自定义行数,完成字段数据的业务处
函数分类
- udf
- 自定义
- 一进一出
- udaf
- 聚合
- 自定义
- 多进一出
- udtf
- 爆炸
- 一进多出
UDF函数
对每一行数据以此进行计算,返回每一行的结果
1)不带装饰器
# UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()# 自定义字符串长度计算函数
# @F.udf(returnType=IntegerType()) # 使用装饰器注册函数,只能在DSL方法中使用,不能在SQL中使用
def len_func(field):"""自定义函数,函数名可以自己指定:param field: 是用来结构处理的字段数据,可以定义多个。根据实际处理的字段数量决定定义多少个接收参数:return: 返回处理后的数据"""if field is None:return 0else:data = len(field)return data# 将自定义的函数注册到spark中使用
# 参数一 指定spark中使用函数的名
# 参数二 指定自定义函数的名
# 参数三 指定函数的返回值类型
# 接收参数 定义和函数名一样的值
len_func = ss.udf.register('len_func',len_func,returnType = IntegerType())# 在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()# 使用sql语句
df.createTempView('stu')df3 = ss.sql('select * ,len_FUNC(name) from stu')
df3.show()
2)带有装饰器
# UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()# 自定义字符串长度计算函数
@F.udf(returnType=IntegerType()) # 使用装饰器注册函数,只能在DSL方法中使用,不能在SQL中使用
def len_func(field):"""自定义函数,函数名可以自己指定:param field: 是用来结构处理的字段数据,可以定义多个。根据实际处理的字段数量决定定义多少个接收参数:return: 返回处理后的数据"""if field is None:return 0else:data = len(field)return data# 在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()
装饰器注册
-
只能在DSL方法中使用,在sql语句中无法使用
UDAF函数
多进一出 主要是聚合
使用pandas中的series实现,可以读取一列数据存储在pandas的seriess中进行数据的聚合
# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id int,name string,gender string,age int,cls string')df.show()# 自定义udaf函数
# 装饰器注册
@F.pandas_udf(returnType=IntegerType())
# 自定义udaf函数
# fileds:pd.Series 给数据字段指定一个类型
# -> float 指定返回值类型
# udaf函数注册需要两步,第一步现指定装饰器
def sub(filed:pd.Series) -> int:"""自定义udaf函数,实现累减:param field: 接收的字段列数据 pd.Series声明字段数据的类型,接收一列数据可以使用pandas的series类型:return:"""# field是series,就按照series方式操作n = filed[0] # 取出第一个值作为初始值for i in filed[1::]:n-=ireturn n# regidter方法注册
sub = ss.udf.register('sub',sub)# 使用udaf函数 缺少 PyArrow pandas中series类型交个spark程序无法识别,spark是有scala实现,scala中没有对应的series类型
# 可以使用 PyArrow框架将series转为scale能识别的数据类型
df2 = df.select(sub('age'))
df2.show()
- arrow框架 pyarrow
-
Apache Arrow 是一种内存中的列式数据格式,用于Spark中,以在JVM和Python进程之间有效地传输数据。目前这对使用 Pandas/NumPy 数据的 Python 用户最有益,提升传输速度。
-
在线安装 三台机器安装
-
进入虚拟环境 conda activate base
-
在线安装 pip install pyspark[sql] -i Verifying - USTC Mirrors
-
-
离线安装 三台机器安装
-
pip install pyarrow-10.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
-
-
安装pyarrow
conda activate base
pip install pyspark[sql] -i https://pypi.mirrors.ustc.edu.cn/simple/
相关文章:

Spark高级用法-自定义函数
用户可以根据需求自己封装计算的逻辑,对字段数据进行计算 内置函数,是spark提供的对字段操作的方法 ,split(字段) 对字段中的数进行切割,F.sum(字段) 会将该字段下的数据进行求和 实际业务中又能内置函数不满足计算需求࿰…...

『Mysql进阶』Mysql explain详解(五)
目录 Explain 介绍 Explain分析示例 explain中的列 1. id 列 2. select_type 列 3. table 列 4. partitions 列 5. type 列 6. possible_keys 列 7. key 列 8. key_len 列 9. ref 列 10. rows 列 11. filtered 列 12. Extra 列 Explain 介绍 EXPLAIN 语句提供有…...

【工具】音视频翻译工具基于Whisper+ChatGPT
OpenAI推出的开源语音识别工具Whisper,以其卓越的语音识别能力,在音频和视频文件处理领域大放异彩。与此同时,ChatGPT也在翻译领域崭露头角,其强大的翻译能力备受赞誉。因此,一些字幕制作团队敏锐地捕捉到了这两者的结…...

学成在线——关于nacos配置优先级的坑
出错: 本地要起两个微服务,一个是content-api,另一个是gateway网关服务。 发现通过网关服务请求content微服务时,怎么请求都请求不到。 配置如下: content-api-dev.yaml的配置: server:servlet:context-p…...

Nginx在Windows Server下的启动脚本
Nginx在Windows Server下的快捷运行脚本 使用时记得修改NGINX_DIR路径 ECHO OFF CHCP 65001 SET NGINX_DIRD:\software\Nginx\ color 0a TITLE Nginx Management GOTO MENU :MENU CLS ECHO. ECHO. * * * * Nginx Management * * * * * * * * * * * ECHO. * * EC…...

【国科大】C++程序设计秋季——五子棋
【国科大】C程序设计秋季 —— 五子棋程序 下载地址:https://mbd.pub/o/bread/Zp2Ukptx...

Docker 环境下多节点服务器监控实战:从 Prometheus 到 Grafana 的完整部署指南
Docker 环境下多节点服务器监控实战:从 Prometheus 到 Grafana 的完整部署指南 文章目录 Docker 环境下多节点服务器监控实战:从 Prometheus 到 Grafana 的完整部署指南一 多节点部署1 节点一2 节点二3 节点三 二 监控节点部署三 配置 prometheus.yml四 …...

【动手学深度学习】6.3 填充与步幅(个人向笔记)
卷积的输出形状取决于输入形状和卷积核的形状在应用连续的卷积后,我们最终得到的输出大小远小于输入大小,这是由于卷积核的宽度和高度通常大于1导致的比如,一个 240 240 240240 240240像素的图像,经过10层 5 5 55 55的卷积后&am…...

【宝可梦】游戏
pokemmo https://pokemmo.com/zh/ 写在最后:若本文章对您有帮助,请点个赞啦 ٩(๑•̀ω•́๑)۶...

docker启动的rabbitmq如何启动其SSL功能
docker run --hostname my-rabbit --name my-rabbit -p 5671:5671 -p 15671:15671 -p 15672:15672 -e RABBITMQ_DEFAULT_USERabc -e RABBITMQ_DEFAULT_PASSabc -d rabbitmq:4.0-management 使用docker的复制命令将ca.crt、server.crt和server.key文件复制到容器的/etc/server_s…...

易基因: cfMeDIP-seq揭示cfDNA甲基化高效区分原发性和转移性前列腺|Nat Commun
大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。 前列腺癌(Prostate cancer,PCa)是男性中第二常见的恶性肿瘤,也是全球癌症相关死亡的第三大原因。虽然大多数原发性前列腺癌可以治愈&#…...

CMake 教程跟做与翻译 4
目录 添加一个option! 添加一个option! option,正如其意,就是选项的意思。我们这里需要演示一下option的做法。 option对于大型的工程必然是非常常见的:一些模块会被要求编译,另一些客户不准备需要这些模块。option就是将这种需…...

MySQL面试题分享
慢日志(了解) 慢日志开启的变量:slow_query_logON; 如果值为 OFF ,那就是没有开启慢日志 耗时: long_query_time,默认是10秒 redis 和 mysql 慢日志的区别 redis 慢日志默认是没有开启的 mysql 慢日志默认是开启的…...

vue路由缓存问题
什么是路由缓存问题 解决方案: 让组件实例不再复用,强制销毁重建监听路由变化,变化之后执行数据更新操作 方法一 给 routerv-view 添加key属性,强制不添加缓存,破坏缓存,所以这个方法性能会比较差 <Ro…...

RabbitMQ中如何解决消息堆积问题,如何保证消息有序性
RabbitMQ中如何解决消息堆积问题 如何保证消息有序性 只需要让一个消息队列只对应一个消费者即可...

python爬虫案例——selenium爬取淘宝商品信息,实现翻页抓取(14)
文章目录 1、任务目标2、网页分析3、代码编写3.1 代码分析3.2 完整代码1、任务目标 目标网站:淘宝(https://www.taobao.com/) 任务要求:通过selenium实现自动化抓取 淘宝美食 板块下的所有商品信息,并实现翻页抓取,最后以csv格式将数据保存至本地;如: 2、网页分析 首先…...

在VSCode中使用Excalidraw
概述 Excalidraw是一款非常不错的示意图绘制软件,没想到在VSCode中有其扩展,可以在VScode中直接使用。 安装扩展 使用 需要创建.excalidraw.svg、.excalidraw或.excalidraw.png等名称的文件。 搭配手写版使用 自由画笔工具可以配合手写板,…...

25中国投资中投笔试测评秋招校招SHL笔试题型分享
✅中投公司不必过多介绍,和建总都位于金融央企第一档,但是招人更少,竞争更为激烈,看公示录用名单都是清北的金融硕士,投资岗难度更大。 ✅中投公司的笔试往年都是shl系统,但考察范围非常广,包含…...

【LeetCode热题100】分治-快排
本篇博客记录分治快排的4道题目:颜色分类、排序数组、数组中的第K个最大元素、数组中最小的N个元素(库存管理)。 class Solution { public:void sortColors(vector<int>& nums) {int n nums.size();int left -1,right n;for(int…...

Docker 教程四 (Docker 镜像加速)
Docker 镜像加速 国内从 DockerHub 拉取镜像有时会遇到困难,此时可以配置镜像加速器。 目前国内 Docker 镜像源出现了一些问题,基本不能用了,后期能用我再更新下。* Docker 官方和国内很多云服务商都提供了国内加速器服务,例如…...

各类排序详解
前言 本篇博客将为大家介绍各类排序算法,大家知道,在我们生活中,排序其实是一件很重要的事,我们在网上购物,需要根据不同的需求进行排序,异或是我们在高考完报志愿时,需要看看院校的排名&#…...

【c语言——指针详解(4)】
文章目录 一、回调函数是什么?二、qsort的使⽤1、使⽤qsort函数排序整型数据2、使⽤qsort排序结构数据 三、qsort函数的模拟实现 作者主页 一、回调函数是什么? 回调函数就是⼀个通过函数指针调⽤的函数。 如果你把函数的指针(地址…...

C# (.net6)实现Redis发布和订阅简单案例
概念: 在 .NET 6 中使用 Redis 的/订发布阅模式。发布/订阅(Pub/Sub)是 Redis 支持的一种消息传递模式,其中一个或多个发布者向一个或多个订阅者发送消息,Redis 客户端可以订阅任意数量的频道。 多个客户端可以订阅一个相同的频道…...

【golang】gorm 使用map实现in 条件查询用法
当 where 字典的值为数组时 gorm 会自动转换为条件 IN 查询 where : map[string]interface{}{} where["id"] [1,2,3] where["name"] "zhangsan"type userList struct {Id int "gorm:id"Name string "gorm:name" } Table.…...

理论篇| 移动端爬虫
移动应用的快速发展和广泛普及带来了海量的数据,这些数据对于市场分析、用户行为洞察和业务优化具有重要价值。然而,由于移动应用的特殊性和防护措施,传统的爬虫技术在采集移动应用数据方面面临许多挑战。因此,App爬虫采集与逆向在爬虫领域的重要性不可低估 然而,App采集…...

systemd实现seatunnel自动化启停
在 systemd 中,您可以通过配置服务单元文件来设置服务在失败或退出后自动重启。这对于确保关键服务在意外退出时能够自动恢复运行非常有用。下面是实现 systemd 自动重启服务的步骤: 通用操作 1. 创建或编辑服务单元文件 假设服务单元文件位于 /etc/systemd/system/my-ser…...

MySQL-08.DDL-表结构操作-创建-案例
一.MySQL创建表的方式 1.首先根据需求文档定义出原型字段,即从需求文档中可以直接设计出来的字段 2.再在原型字段的基础上加上一些基础字段,构成整个表结构的设计 我们采用基于图形化界面的方式来创建表结构 二.案例 原型字段 各字段设计如下&…...

完成Sentinel-Dashboard控制台数据的持久化-同步到Nacos
本次案例采用的是Sentinel1.8.8版本 一、Sentinel源码环境搭建 1、下载Sentinel源码工程 git clone https://github.com/alibaba/Sentinel.git 2、导入到idea 这里可以先运行DashboardApplication.java试一下是否运行成功,若成功,源码环境搭建完毕&a…...

RocketMq详解:三、RocketMq通用生产和消费方法改造
文章目录 1.背景2.通用方法改造2.1添加maven依赖2.2 RocketMq基础配置2.3 配置类2.5 消息传输的对象和结果2.4 消息生产者2.5 消息消费者2.6 功能测试 1.背景 在第二章:《RocketMq详解:二、SpringBoot集成RocketMq》中我们已经实现了消费基本生产和消费…...

基于SpringBoot+Vue+Uniapp的仓库点单小程序的详细设计和实现
2. 详细视频演示 文章底部名片,联系我获取更详细的演示视频 3. 论文参考 4. 项目运行截图 代码运行效果图 代码运行效果图 代码运行效果图 代码运行效果图代码运行效果图 代码运行效果图 5. 技术框架 5.1 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发…...