大数据Flink(一百二十):Flink SQL自定义函数(UDF)

文章目录
Flink SQL自定义函数(UDF)
一、概述
二、自定义标量函数(UDSF)
三、自定义聚合函数(UDAF)
四、 自定义表值函数(UDTF)
Flink SQL自定义函数(UDF)
Flink全托管支持在SQL作业中使用Python自定义函数,Flink支持以下3类自定义函数:UDSF(User Defined Scalar Function)、UDAF(User Defined Aggregation Function)、UDTF(User Defined Table-valued Function)。
一、概述
在资料udf函数中可以看到udx.zip压缩包,将其解压后可以看到有以下文件:
其中udfs.py udafs.py udtfs.py分别对应了UDSF、UDAF、UDTF三个函数的示例。
进入阿里云Flink开发平台,点击左侧导航栏SQL开发,点击左侧的函数页签,单击注册UDF,将udx.zip上传,如下图所示。

点击确定后,Flink开发控制台会解析UDF文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。可以看到这里识别出了三个函数。
点击创建函数,可以看到函数页签下出现了udx目录,下面有三个自定义函数,此时自定义函数创建完成。

二、自定义标量函数(UDSF)
- 自定义标量函数(UDSF)将0个、1个或多个标量值映射到一个新的标量值。输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
udfs.py内容如下:
from pyflink.table import DataTypes
from pyflink.table.udf import udf@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):return s[begin:end]
说明:
- sub_string定义了获取每条数据中从begin~end位的字符的代码;
- 需要通过名字为 “ udf ” 的装饰器,声明这是一个 scalar function;
- 需要通过装饰器中的 result_type 参数,声明 scalar function 的结果类型;
进入阿里云Flink开发平台,在test作业草稿下,进行建表,语句如下:
CREATE TABLE function_udf(a VARCHAR,b INT,c INT
) WITH ('connector' = 'socket','hostname' = '178.23.146.213','port' = '9999','format' = 'csv'
);
- 查询语句如下
SELECT sub_string(a,2,5)
FROM function_udf;
- 在ecs监听9999端口:nc -lk 9999,然后选中查询语句,点击调试.
- 在ecs向9999发送数据
123|456,4,2
12|3456,7,1
结果如下:

查询结果是function_udf表中a字段每行字符串的第3-5个字符。
三、自定义聚合函数(UDAF)
- 自定义聚合函数(UDAF),将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。
udafs.py内容如下:
from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udafclass WeightedAvg(AggregateFunction):def create_accumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, accumulator: Row) -> float:if accumulator[1] == 0:return 0else:return accumulator[0] / accumulator[1]def accumulate(self, accumulator: Row, value, weight):accumulator[0] += value * weightaccumulator[1] += weightdef retract(self, accumulator: Row, value, weight):accumulator[0] -= value * weightaccumulator[1] -= weightweighted_avg = udaf(f=WeightedAvg(),result_type=DataTypes.DOUBLE(),accumulator_type=DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()),DataTypes.FIELD("f1", DataTypes.BIGINT())]))
说明:
- 该示例中,weighted_avg定义了当前数据和历史数据求含权重的均值的代码。
- 需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function,
- 需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate function 的结果类型及 accumulator 类型;
- create_accumulator,get_value 和 accumulate 这 3 个方法必须要定义,retract 方法可以根据需要定义;需要注意的是,由于必须定义 create_accumulator,get_value 和 accumulate 这 3 个方法,Python UDAF 只能通过继承AggregateFunction 的方式进行定义。
仍然使用function_udf表,查询语句如下:
SELECT weighted_avg(b,c)
FROM function_udf;
选中查询语句运行之后,向9999端口依次发送数据,如下:
123|456,4,2

12|3456,7,1
查询结果是以c字段为权重的b字段当前数据和历史数据的均值。
四、 自定义表值函数(UDTF)
自定义表值函数(UDTF),将0个、1个或多个标量值作为输入参数(可以是变长参数)。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。与自定义的标量函数类似,但与标量函数不同。
udtfs.py内容如下:
from pyflink.table import DataTypes
from pyflink.table.udf import udtf@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str):splits = s.split("|")yield splits[0], splits[1]
说明:
- 该示例中,split定义了将一行字符串按照竖线(|)分割成多列字符串的代码。
- 需要通过名字为 “ udtf ” 的装饰器,声明这是一个 table function。
- 需要通过装饰器中的 result_types 参数,声明 table function 的结果类型。由于 table function 每条输出可以包含多个列,result_types 需要指定所有输出列的类型。
仍然使用function_udf表,查询语句如下:
SELECT a,b,c,d,e
FROM function_udf,lateral table(split(a)) as T(d,e);
- 选中查询语句运行之后,向9999端口发送数据,如下
123|456,4,2
12|3456,7,1
结果:

查询结果中,会将function_udf表中每行字符串的a字段按照竖线(|)分割成d,e两列。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:
大数据Flink(一百二十):Flink SQL自定义函数(UDF)
文章目录 Flink SQL自定义函数(UDF) 一、概述 二、自定义标量函数(UDSF) 三、自定义聚合函数(UDAF) 四、 自定义表值函数(UDTF) Flink SQL自定义函数…...
【图像检索】基于灰度共生矩的纹理图像检索,matlab实现
博主简介:matlab图像代码项目合作(扣扣:3249726188) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 本次案例是基于灰度共生矩的纹理图像检索,用matlab实现。 一、案例背景和算法介绍 …...
【操作系统】02.深入理解操作系统
一、操作系统的定位 任何计算机系统都包含一个基本的程序集合,称为操作系统(OS)。笼统的理解,操作系统包括操作系统内核和其他程序。 由上述的宏观图其实我们就知道:操作系统是一款进行软硬件资源管理的软件。 二、设计操作系统的目的 操…...
【Python】探索 Errbot:多功能聊天机器人框架
不是旅行治愈了你,是你在路上放过了自己。 在当今的数字化时代,聊天机器人已成为企业与客户互动、提升工作效率和增加乐趣的重要工具。Errbot是一个高度可扩展的聊天机器人框架,它允许开发者使用Python轻松创建和定制机器人。本文将介绍Errb…...
Linux 调试器 GDB 使用指南
在Linux环境下开发和调试程序时,GNU调试器(GDB)是一个强大的工具。它支持多种编程语言(如C、C、Fortran等),并且可以帮助开发人员检测、排除和修复程序中的错误。GDB能够让你在程序运行时暂停,查…...
MiniCPM3-4B | 笔记本电脑运行端侧大模型OpenBMB/MiniCPM3-4B-GPTQ-Int4量化版 | PyCharm环境
MiniCPM3-4B,轻松在笔记本电脑上运行大模型? 背景一、选择模型二、模型下载三、模型运行四、总结 背景 2024年9月5日,面壁智能发布了MiniCPM3-4B,面壁的测试结果声称MiniCPM3-4B表现超越 Phi-3.5-mini-instruct 和 GPT-3.5-Turbo-…...
【chromedriver编译-绕过selenium机器人检测】
有小伙伴说使用selenium没能绕过机器人检测,盘他。 selenium机器人检测有2种,一是cdp检测,二是webdriver特征检测。cdp检测前面的博客已写过,这里就提下webdriver特征检测。一、selenium简介 Selenium 是一个强大的工具ÿ…...
【JavaEE精炼宝库】HTTP | HTTPS 协议详解
文章目录 一、HTTP 简介二、HTTP 协议格式:2.1 抓包工具的使用:2.2 HTTP 请求报文格式:2.3 HTTP 响应报文格式:2.4 HTTP 协议格式总结: 三、HTTP 请求详解:3.1 刨析 URL:3.2 方法(method)&#…...
Go语言基础学习01
目录 Linux环境下配置安装VScode并配置Go语言开发环境工作区和GOPATH 之前学习过Go语言,学习的时候没有记录笔记,最近找了个极客时间的Go语言36讲,打算时间学习并记录学习过程。 自己抽空看了一点内容,发现这个内容对于0基础解除G…...
基于SSM+Vue+MySQL的酒店管理系统
系统展示 用户前台界面 管理员后台界面 系统背景 随着旅游业的蓬勃发展,酒店业作为旅游产业链中的重要一环,面临着日益增长的客户需求和激烈的市场竞争。传统的人工酒店管理模式已难以满足高效、精准、个性化的服务要求。因此,开发一套基于SS…...
在WPF中保存控件内容为图片
在WPF中保存控件内容为图片 实现代码如下 1 private void SaveControlContentAsImage(FrameworkElement element,string fileName)2 {3 var render new RenderTargetBitmap((int)element.ActualWidth, (int)element.ActualHeight, 96, 96, PixelFormats…...
C#用SDK打开海康工业相机,callback取图Bitmap格式,并保存
上次写了python版本的,但是python虽好不方便发布,她带着重重的解释器有时候不方便玩耍.于是C#来了哦. C#图像一般用Bitmap表示,所以完全C#就够,别的格式可以自行想转换. 命令行哦,没界面. MVCamera.cs从MVS示例里面添加到项目中,using MvCamCtrl.NET; 就可以,不需要添加mvca…...
C语言字符学习初级优先看这个就够了
1. 字符的基本概念 在C语言中,字符(char)是一个基本的数据类型,用来表示单个字符。字符用单引号( )括起来,例如 a、1 等。字符在内存中实际上是以整数的形式存储的,即 ASCII 码。例…...
Python JSON
JSON 函数 json.dumps 语法 实例 json.loads 语法 实例 使用第三方库:Demjson 环境配置 JSON 函数 encode 语法 实例 decode 语法 实例 JSON 函数 使用 JSON 函数需要导入 json 库:import json。 函数描述json.dumps将 Python 对象编码…...
【华为杯】2024华为杯数模研赛F题 解题思路
题目 X射线脉冲星光子到达时间建模 问题背景 高速公路拥堵现象的原因众多,除了交通事故外,最典型的就是部分路段出现瓶颈现象,主要原因是车辆汇聚,而拥堵后又容易蔓延。高速公路一些特定的路段容易形成堵点,如匝道出…...
Object Pascal 结构化程序设计
Object Pascal 关系运算符 运算符名称等于<>不等于>大于<小于>大于等于<小于等于< (属于元素的)包含于> (属于元素的)包含in (属于元素的)属于 # Object Pascal 逻辑运算符 运算符名称含义Not逻辑非单目运算符,进行取反操作,由T…...
机器学习算法与实践_03概率论与贝叶斯算法笔记
1、概率论基础知识介绍 人工智能项目本质上是一个统计学项目,是通过对 样本 的分析,来评估/估计 总体 的情况,与数学知识相关联 高等数学 ——> 模型优化 概率论与数理统计 ——> 建模思想 线性代数 ——> 高性能计算 在机器学…...
如何使用Privoxy将SOCKS5代理转换为HTTP代理?
在这篇博客中,我将介绍如何使用Privoxy将SOCKS5代理转换为HTTP代理。我们将从下载和安装Privoxy开始,接着配置Privoxy,最后配置Windows以便浏览器使用该代理。 1. 下载并安装Privoxy 首先,您需要下载并安装Privoxy。您可以从Pri…...
AJAX(一)HTTP协议(请求响应报文),AJAX发送请求,请求问题处理
文章目录 一、AJAX二、HTTP协议1. 请求报文2. 响应报文 三、AJAX案例准备1. 安装node2. Express搭建服务器3. 安装nodemon实现自动重启 四、AJAX发送请求1. GET请求2. POST请求(1) 配置请求体(2) 配置请求头 3. 响应JSON数据的两种方式(1) 手动,JSON.parse()(2) 设置…...
Git进阶(十五):Git LFS 使用详解
文章目录 一、介绍二、Git LFS 使用步骤三、场景示例四、拓展阅读 一、介绍 Git LFS (Large File Storage) 是一个 Git 扩展,它使 Git 更适合处理大型文件,如音频、视频、图像或任何其他二进制大文件。Git LFS 替换仓库中的大文件为文本指针文件&#x…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...
