当前位置: 首页 > news >正文

Spark高级用法-自定义函数

用户可以根据需求自己封装计算的逻辑,对字段数据进行计算

内置函数,是spark提供的对字段操作的方法 ,split(字段) 对字段中的数进行切割,F.sum(字段) 会将该字段下的数据进行求和

实际业务中又能内置函数不满足计算需求,此时就需要自定义行数,完成字段数据的业务处

 函数分类

  • udf
    • 自定义
    • 一进一出
  • udaf
    • 聚合
    • 自定义
    • 多进一出
  • udtf
    • 爆炸
    • 一进多出

UDF函数

对每一行数据以此进行计算,返回每一行的结果 

1)不带装饰器

# UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()# 自定义字符串长度计算函数
# @F.udf(returnType=IntegerType())  # 使用装饰器注册函数,只能在DSL方法中使用,不能在SQL中使用
def len_func(field):"""自定义函数,函数名可以自己指定:param field: 是用来结构处理的字段数据,可以定义多个。根据实际处理的字段数量决定定义多少个接收参数:return: 返回处理后的数据"""if field is None:return 0else:data = len(field)return data# 将自定义的函数注册到spark中使用
# 参数一 指定spark中使用函数的名
# 参数二  指定自定义函数的名
# 参数三  指定函数的返回值类型
# 接收参数  定义和函数名一样的值
len_func = ss.udf.register('len_func',len_func,returnType = IntegerType())# 在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()# 使用sql语句
df.createTempView('stu')df3 = ss.sql('select * ,len_FUNC(name) from stu')
df3.show()

2)带有装饰器

# UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()# 自定义字符串长度计算函数
@F.udf(returnType=IntegerType())  # 使用装饰器注册函数,只能在DSL方法中使用,不能在SQL中使用
def len_func(field):"""自定义函数,函数名可以自己指定:param field: 是用来结构处理的字段数据,可以定义多个。根据实际处理的字段数量决定定义多少个接收参数:return: 返回处理后的数据"""if field is None:return 0else:data = len(field)return data# 在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()

装饰器注册

  • 只能在DSL方法中使用,在sql语句中无法使用

UDAF函数

多进一出 主要是聚合

使用pandas中的series实现,可以读取一列数据存储在pandas的seriess中进行数据的聚合

# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id int,name string,gender string,age int,cls string')df.show()# 自定义udaf函数
# 装饰器注册
@F.pandas_udf(returnType=IntegerType())
# 自定义udaf函数
# fileds:pd.Series 给数据字段指定一个类型
#  -> float 指定返回值类型
# udaf函数注册需要两步,第一步现指定装饰器
def sub(filed:pd.Series) -> int:"""自定义udaf函数,实现累减:param field: 接收的字段列数据  pd.Series声明字段数据的类型,接收一列数据可以使用pandas的series类型:return:"""# field是series,就按照series方式操作n = filed[0] # 取出第一个值作为初始值for i in filed[1::]:n-=ireturn n# regidter方法注册
sub = ss.udf.register('sub',sub)# 使用udaf函数  缺少  PyArrow  pandas中series类型交个spark程序无法识别,spark是有scala实现,scala中没有对应的series类型
# 可以使用 PyArrow框架将series转为scale能识别的数据类型
df2 = df.select(sub('age'))
df2.show()

  • arrow框架 pyarrow
    • Apache Arrow 是一种内存中的列式数据格式,用于Spark中,以在JVM和Python进程之间有效地传输数据。目前这对使用 Pandas/NumPy 数据的 Python 用户最有益,提升传输速度。

    • 在线安装 三台机器安装

      • 进入虚拟环境 conda activate base

      • 在线安装 pip install pyspark[sql] -i Verifying - USTC Mirrors

    • 离线安装 三台机器安装

      • pip install pyarrow-10.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

 

 安装pyarrow

conda activate base
pip install pyspark[sql] -i  https://pypi.mirrors.ustc.edu.cn/simple/

相关文章:

Spark高级用法-自定义函数

用户可以根据需求自己封装计算的逻辑,对字段数据进行计算 内置函数,是spark提供的对字段操作的方法 ,split(字段) 对字段中的数进行切割,F.sum(字段) 会将该字段下的数据进行求和 实际业务中又能内置函数不满足计算需求&#xff0…...

『Mysql进阶』Mysql explain详解(五)

目录 Explain 介绍 Explain分析示例 explain中的列 1. id 列 2. select_type 列 3. table 列 4. partitions 列 5. type 列 6. possible_keys 列 7. key 列 8. key_len 列 9. ref 列 10. rows 列 11. filtered 列 12. Extra 列 Explain 介绍 EXPLAIN 语句提供有…...

【工具】音视频翻译工具基于Whisper+ChatGPT

OpenAI推出的开源语音识别工具Whisper,以其卓越的语音识别能力,在音频和视频文件处理领域大放异彩。与此同时,ChatGPT也在翻译领域崭露头角,其强大的翻译能力备受赞誉。因此,一些字幕制作团队敏锐地捕捉到了这两者的结…...

学成在线——关于nacos配置优先级的坑

出错: 本地要起两个微服务,一个是content-api,另一个是gateway网关服务。 发现通过网关服务请求content微服务时,怎么请求都请求不到。 配置如下: content-api-dev.yaml的配置: server:servlet:context-p…...

Nginx在Windows Server下的启动脚本

Nginx在Windows Server下的快捷运行脚本 使用时记得修改NGINX_DIR路径 ECHO OFF CHCP 65001 SET NGINX_DIRD:\software\Nginx\ color 0a TITLE Nginx Management GOTO MENU :MENU CLS ECHO. ECHO. * * * * Nginx Management * * * * * * * * * * * ECHO. * * EC…...

【国科大】C++程序设计秋季——五子棋

【国科大】C程序设计秋季 —— 五子棋程序 下载地址:https://mbd.pub/o/bread/Zp2Ukptx...

Docker 环境下多节点服务器监控实战:从 Prometheus 到 Grafana 的完整部署指南

Docker 环境下多节点服务器监控实战:从 Prometheus 到 Grafana 的完整部署指南 文章目录 Docker 环境下多节点服务器监控实战:从 Prometheus 到 Grafana 的完整部署指南一 多节点部署1 节点一2 节点二3 节点三 二 监控节点部署三 配置 prometheus.yml四 …...

【动手学深度学习】6.3 填充与步幅(个人向笔记)

卷积的输出形状取决于输入形状和卷积核的形状在应用连续的卷积后,我们最终得到的输出大小远小于输入大小,这是由于卷积核的宽度和高度通常大于1导致的比如,一个 240 240 240240 240240像素的图像,经过10层 5 5 55 55的卷积后&am…...

【宝可梦】游戏

pokemmo https://pokemmo.com/zh/ 写在最后:若本文章对您有帮助,请点个赞啦 ٩(๑•̀ω•́๑)۶...

docker启动的rabbitmq如何启动其SSL功能

docker run --hostname my-rabbit --name my-rabbit -p 5671:5671 -p 15671:15671 -p 15672:15672 -e RABBITMQ_DEFAULT_USERabc -e RABBITMQ_DEFAULT_PASSabc -d rabbitmq:4.0-management 使用docker的复制命令将ca.crt、server.crt和server.key文件复制到容器的/etc/server_s…...

易基因: cfMeDIP-seq揭示cfDNA甲基化高效区分原发性和转移性前列腺|Nat Commun

大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。 前列腺癌(Prostate cancer,PCa)是男性中第二常见的恶性肿瘤,也是全球癌症相关死亡的第三大原因。虽然大多数原发性前列腺癌可以治愈&#…...

CMake 教程跟做与翻译 4

目录 添加一个option! 添加一个option! option,正如其意,就是选项的意思。我们这里需要演示一下option的做法。 option对于大型的工程必然是非常常见的:一些模块会被要求编译,另一些客户不准备需要这些模块。option就是将这种需…...

MySQL面试题分享

慢日志(了解) 慢日志开启的变量:slow_query_logON; 如果值为 OFF ,那就是没有开启慢日志 耗时: long_query_time,默认是10秒 redis 和 mysql 慢日志的区别 redis 慢日志默认是没有开启的 mysql 慢日志默认是开启的…...

vue路由缓存问题

什么是路由缓存问题 解决方案&#xff1a; 让组件实例不再复用&#xff0c;强制销毁重建监听路由变化&#xff0c;变化之后执行数据更新操作 方法一 给 routerv-view 添加key属性&#xff0c;强制不添加缓存&#xff0c;破坏缓存&#xff0c;所以这个方法性能会比较差 <Ro…...

RabbitMQ中如何解决消息堆积问题,如何保证消息有序性

RabbitMQ中如何解决消息堆积问题 如何保证消息有序性 只需要让一个消息队列只对应一个消费者即可...

python爬虫案例——selenium爬取淘宝商品信息,实现翻页抓取(14)

文章目录 1、任务目标2、网页分析3、代码编写3.1 代码分析3.2 完整代码1、任务目标 目标网站:淘宝(https://www.taobao.com/) 任务要求:通过selenium实现自动化抓取 淘宝美食 板块下的所有商品信息,并实现翻页抓取,最后以csv格式将数据保存至本地;如: 2、网页分析 首先…...

在VSCode中使用Excalidraw

概述 Excalidraw是一款非常不错的示意图绘制软件&#xff0c;没想到在VSCode中有其扩展&#xff0c;可以在VScode中直接使用。 安装扩展 使用 需要创建.excalidraw.svg、.excalidraw或.excalidraw.png等名称的文件。 搭配手写版使用 自由画笔工具可以配合手写板&#xff0c…...

25中国投资中投笔试测评秋招校招SHL笔试题型分享

✅中投公司不必过多介绍&#xff0c;和建总都位于金融央企第一档&#xff0c;但是招人更少&#xff0c;竞争更为激烈&#xff0c;看公示录用名单都是清北的金融硕士&#xff0c;投资岗难度更大。 ✅中投公司的笔试往年都是shl系统&#xff0c;但考察范围非常广&#xff0c;包含…...

【LeetCode热题100】分治-快排

本篇博客记录分治快排的4道题目&#xff1a;颜色分类、排序数组、数组中的第K个最大元素、数组中最小的N个元素&#xff08;库存管理&#xff09;。 class Solution { public:void sortColors(vector<int>& nums) {int n nums.size();int left -1,right n;for(int…...

Docker 教程四 (Docker 镜像加速)

Docker 镜像加速 国内从 DockerHub 拉取镜像有时会遇到困难&#xff0c;此时可以配置镜像加速器。 目前国内 Docker 镜像源出现了一些问题&#xff0c;基本不能用了&#xff0c;后期能用我再更新下。* Docker 官方和国内很多云服务商都提供了国内加速器服务&#xff0c;例如…...

2025盘古石杯决赛【手机取证】

前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来&#xff0c;实在找不到&#xff0c;希望有大佬教一下我。 还有就会议时间&#xff0c;我感觉不是图片时间&#xff0c;因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

【JavaWeb】Docker项目部署

引言 之前学习了Linux操作系统的常见命令&#xff0c;在Linux上安装软件&#xff0c;以及如何在Linux上部署一个单体项目&#xff0c;大多数同学都会有相同的感受&#xff0c;那就是麻烦。 核心体现在三点&#xff1a; 命令太多了&#xff0c;记不住 软件安装包名字复杂&…...

return this;返回的是谁

一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请&#xff0c;不同级别的经理有不同的审批权限&#xff1a; // 抽象处理者&#xff1a;审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

C++.OpenGL (20/64)混合(Blending)

混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...

Java求职者面试指南:计算机基础与源码原理深度解析

Java求职者面试指南&#xff1a;计算机基础与源码原理深度解析 第一轮提问&#xff1a;基础概念问题 1. 请解释什么是进程和线程的区别&#xff1f; 面试官&#xff1a;进程是程序的一次执行过程&#xff0c;是系统进行资源分配和调度的基本单位&#xff1b;而线程是进程中的…...

【Linux】自动化构建-Make/Makefile

前言 上文我们讲到了Linux中的编译器gcc/g 【Linux】编译器gcc/g及其库的详细介绍-CSDN博客 本来我们将一个对于编译来说很重要的工具&#xff1a;make/makfile 1.背景 在一个工程中源文件不计其数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;mak…...

LOOI机器人的技术实现解析:从手势识别到边缘检测

LOOI机器人作为一款创新的AI硬件产品&#xff0c;通过将智能手机转变为具有情感交互能力的桌面机器人&#xff0c;展示了前沿AI技术与传统硬件设计的完美结合。作为AI与玩具领域的专家&#xff0c;我将全面解析LOOI的技术实现架构&#xff0c;特别是其手势识别、物体识别和环境…...

ubuntu22.04 安装docker 和docker-compose

首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...

使用SSE解决获取状态不一致问题

使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件&#xff0c;这个上传文件是整体功能的一部分&#xff0c;文件在上传的过程中…...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践

在 Kubernetes 集群中&#xff0c;如何在保障应用高可用的同时有效地管理资源&#xff0c;一直是运维人员和开发者关注的重点。随着微服务架构的普及&#xff0c;集群内各个服务的负载波动日趋明显&#xff0c;传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...