大数据Flink(一百二十):Flink SQL自定义函数(UDF)

文章目录
Flink SQL自定义函数(UDF)
一、概述
二、自定义标量函数(UDSF)
三、自定义聚合函数(UDAF)
四、 自定义表值函数(UDTF)
Flink SQL自定义函数(UDF)
Flink全托管支持在SQL作业中使用Python自定义函数,Flink支持以下3类自定义函数:UDSF(User Defined Scalar Function)、UDAF(User Defined Aggregation Function)、UDTF(User Defined Table-valued Function)。
一、概述
在资料udf函数中可以看到udx.zip压缩包,将其解压后可以看到有以下文件:
其中udfs.py udafs.py udtfs.py分别对应了UDSF、UDAF、UDTF三个函数的示例。
进入阿里云Flink开发平台,点击左侧导航栏SQL开发,点击左侧的函数页签,单击注册UDF,将udx.zip上传,如下图所示。

点击确定后,Flink开发控制台会解析UDF文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。可以看到这里识别出了三个函数。
点击创建函数,可以看到函数页签下出现了udx目录,下面有三个自定义函数,此时自定义函数创建完成。

二、自定义标量函数(UDSF)
- 自定义标量函数(UDSF)将0个、1个或多个标量值映射到一个新的标量值。输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
udfs.py内容如下:
from pyflink.table import DataTypes
from pyflink.table.udf import udf@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):return s[begin:end]
说明:
- sub_string定义了获取每条数据中从begin~end位的字符的代码;
- 需要通过名字为 “ udf ” 的装饰器,声明这是一个 scalar function;
- 需要通过装饰器中的 result_type 参数,声明 scalar function 的结果类型;
进入阿里云Flink开发平台,在test作业草稿下,进行建表,语句如下:
CREATE TABLE function_udf(a VARCHAR,b INT,c INT
) WITH ('connector' = 'socket','hostname' = '178.23.146.213','port' = '9999','format' = 'csv'
);
- 查询语句如下
SELECT sub_string(a,2,5)
FROM function_udf;
- 在ecs监听9999端口:nc -lk 9999,然后选中查询语句,点击调试.
- 在ecs向9999发送数据
123|456,4,2
12|3456,7,1
结果如下:

查询结果是function_udf表中a字段每行字符串的第3-5个字符。
三、自定义聚合函数(UDAF)
- 自定义聚合函数(UDAF),将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。
udafs.py内容如下:
from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udafclass WeightedAvg(AggregateFunction):def create_accumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, accumulator: Row) -> float:if accumulator[1] == 0:return 0else:return accumulator[0] / accumulator[1]def accumulate(self, accumulator: Row, value, weight):accumulator[0] += value * weightaccumulator[1] += weightdef retract(self, accumulator: Row, value, weight):accumulator[0] -= value * weightaccumulator[1] -= weightweighted_avg = udaf(f=WeightedAvg(),result_type=DataTypes.DOUBLE(),accumulator_type=DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()),DataTypes.FIELD("f1", DataTypes.BIGINT())]))
说明:
- 该示例中,weighted_avg定义了当前数据和历史数据求含权重的均值的代码。
- 需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function,
- 需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate function 的结果类型及 accumulator 类型;
- create_accumulator,get_value 和 accumulate 这 3 个方法必须要定义,retract 方法可以根据需要定义;需要注意的是,由于必须定义 create_accumulator,get_value 和 accumulate 这 3 个方法,Python UDAF 只能通过继承AggregateFunction 的方式进行定义。
仍然使用function_udf表,查询语句如下:
SELECT weighted_avg(b,c)
FROM function_udf;
选中查询语句运行之后,向9999端口依次发送数据,如下:
123|456,4,2

12|3456,7,1
查询结果是以c字段为权重的b字段当前数据和历史数据的均值。
四、 自定义表值函数(UDTF)
自定义表值函数(UDTF),将0个、1个或多个标量值作为输入参数(可以是变长参数)。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。与自定义的标量函数类似,但与标量函数不同。
udtfs.py内容如下:
from pyflink.table import DataTypes
from pyflink.table.udf import udtf@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str):splits = s.split("|")yield splits[0], splits[1]
说明:
- 该示例中,split定义了将一行字符串按照竖线(|)分割成多列字符串的代码。
- 需要通过名字为 “ udtf ” 的装饰器,声明这是一个 table function。
- 需要通过装饰器中的 result_types 参数,声明 table function 的结果类型。由于 table function 每条输出可以包含多个列,result_types 需要指定所有输出列的类型。
仍然使用function_udf表,查询语句如下:
SELECT a,b,c,d,e
FROM function_udf,lateral table(split(a)) as T(d,e);
- 选中查询语句运行之后,向9999端口发送数据,如下
123|456,4,2
12|3456,7,1
结果:

查询结果中,会将function_udf表中每行字符串的a字段按照竖线(|)分割成d,e两列。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:
大数据Flink(一百二十):Flink SQL自定义函数(UDF)
文章目录 Flink SQL自定义函数(UDF) 一、概述 二、自定义标量函数(UDSF) 三、自定义聚合函数(UDAF) 四、 自定义表值函数(UDTF) Flink SQL自定义函数…...
【图像检索】基于灰度共生矩的纹理图像检索,matlab实现
博主简介:matlab图像代码项目合作(扣扣:3249726188) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 本次案例是基于灰度共生矩的纹理图像检索,用matlab实现。 一、案例背景和算法介绍 …...
【操作系统】02.深入理解操作系统
一、操作系统的定位 任何计算机系统都包含一个基本的程序集合,称为操作系统(OS)。笼统的理解,操作系统包括操作系统内核和其他程序。 由上述的宏观图其实我们就知道:操作系统是一款进行软硬件资源管理的软件。 二、设计操作系统的目的 操…...
【Python】探索 Errbot:多功能聊天机器人框架
不是旅行治愈了你,是你在路上放过了自己。 在当今的数字化时代,聊天机器人已成为企业与客户互动、提升工作效率和增加乐趣的重要工具。Errbot是一个高度可扩展的聊天机器人框架,它允许开发者使用Python轻松创建和定制机器人。本文将介绍Errb…...
Linux 调试器 GDB 使用指南
在Linux环境下开发和调试程序时,GNU调试器(GDB)是一个强大的工具。它支持多种编程语言(如C、C、Fortran等),并且可以帮助开发人员检测、排除和修复程序中的错误。GDB能够让你在程序运行时暂停,查…...
MiniCPM3-4B | 笔记本电脑运行端侧大模型OpenBMB/MiniCPM3-4B-GPTQ-Int4量化版 | PyCharm环境
MiniCPM3-4B,轻松在笔记本电脑上运行大模型? 背景一、选择模型二、模型下载三、模型运行四、总结 背景 2024年9月5日,面壁智能发布了MiniCPM3-4B,面壁的测试结果声称MiniCPM3-4B表现超越 Phi-3.5-mini-instruct 和 GPT-3.5-Turbo-…...
【chromedriver编译-绕过selenium机器人检测】
有小伙伴说使用selenium没能绕过机器人检测,盘他。 selenium机器人检测有2种,一是cdp检测,二是webdriver特征检测。cdp检测前面的博客已写过,这里就提下webdriver特征检测。一、selenium简介 Selenium 是一个强大的工具ÿ…...
【JavaEE精炼宝库】HTTP | HTTPS 协议详解
文章目录 一、HTTP 简介二、HTTP 协议格式:2.1 抓包工具的使用:2.2 HTTP 请求报文格式:2.3 HTTP 响应报文格式:2.4 HTTP 协议格式总结: 三、HTTP 请求详解:3.1 刨析 URL:3.2 方法(method)&#…...
Go语言基础学习01
目录 Linux环境下配置安装VScode并配置Go语言开发环境工作区和GOPATH 之前学习过Go语言,学习的时候没有记录笔记,最近找了个极客时间的Go语言36讲,打算时间学习并记录学习过程。 自己抽空看了一点内容,发现这个内容对于0基础解除G…...
基于SSM+Vue+MySQL的酒店管理系统
系统展示 用户前台界面 管理员后台界面 系统背景 随着旅游业的蓬勃发展,酒店业作为旅游产业链中的重要一环,面临着日益增长的客户需求和激烈的市场竞争。传统的人工酒店管理模式已难以满足高效、精准、个性化的服务要求。因此,开发一套基于SS…...
在WPF中保存控件内容为图片
在WPF中保存控件内容为图片 实现代码如下 1 private void SaveControlContentAsImage(FrameworkElement element,string fileName)2 {3 var render new RenderTargetBitmap((int)element.ActualWidth, (int)element.ActualHeight, 96, 96, PixelFormats…...
C#用SDK打开海康工业相机,callback取图Bitmap格式,并保存
上次写了python版本的,但是python虽好不方便发布,她带着重重的解释器有时候不方便玩耍.于是C#来了哦. C#图像一般用Bitmap表示,所以完全C#就够,别的格式可以自行想转换. 命令行哦,没界面. MVCamera.cs从MVS示例里面添加到项目中,using MvCamCtrl.NET; 就可以,不需要添加mvca…...
C语言字符学习初级优先看这个就够了
1. 字符的基本概念 在C语言中,字符(char)是一个基本的数据类型,用来表示单个字符。字符用单引号( )括起来,例如 a、1 等。字符在内存中实际上是以整数的形式存储的,即 ASCII 码。例…...
Python JSON
JSON 函数 json.dumps 语法 实例 json.loads 语法 实例 使用第三方库:Demjson 环境配置 JSON 函数 encode 语法 实例 decode 语法 实例 JSON 函数 使用 JSON 函数需要导入 json 库:import json。 函数描述json.dumps将 Python 对象编码…...
【华为杯】2024华为杯数模研赛F题 解题思路
题目 X射线脉冲星光子到达时间建模 问题背景 高速公路拥堵现象的原因众多,除了交通事故外,最典型的就是部分路段出现瓶颈现象,主要原因是车辆汇聚,而拥堵后又容易蔓延。高速公路一些特定的路段容易形成堵点,如匝道出…...
Object Pascal 结构化程序设计
Object Pascal 关系运算符 运算符名称等于<>不等于>大于<小于>大于等于<小于等于< (属于元素的)包含于> (属于元素的)包含in (属于元素的)属于 # Object Pascal 逻辑运算符 运算符名称含义Not逻辑非单目运算符,进行取反操作,由T…...
机器学习算法与实践_03概率论与贝叶斯算法笔记
1、概率论基础知识介绍 人工智能项目本质上是一个统计学项目,是通过对 样本 的分析,来评估/估计 总体 的情况,与数学知识相关联 高等数学 ——> 模型优化 概率论与数理统计 ——> 建模思想 线性代数 ——> 高性能计算 在机器学…...
如何使用Privoxy将SOCKS5代理转换为HTTP代理?
在这篇博客中,我将介绍如何使用Privoxy将SOCKS5代理转换为HTTP代理。我们将从下载和安装Privoxy开始,接着配置Privoxy,最后配置Windows以便浏览器使用该代理。 1. 下载并安装Privoxy 首先,您需要下载并安装Privoxy。您可以从Pri…...
AJAX(一)HTTP协议(请求响应报文),AJAX发送请求,请求问题处理
文章目录 一、AJAX二、HTTP协议1. 请求报文2. 响应报文 三、AJAX案例准备1. 安装node2. Express搭建服务器3. 安装nodemon实现自动重启 四、AJAX发送请求1. GET请求2. POST请求(1) 配置请求体(2) 配置请求头 3. 响应JSON数据的两种方式(1) 手动,JSON.parse()(2) 设置…...
Git进阶(十五):Git LFS 使用详解
文章目录 一、介绍二、Git LFS 使用步骤三、场景示例四、拓展阅读 一、介绍 Git LFS (Large File Storage) 是一个 Git 扩展,它使 Git 更适合处理大型文件,如音频、视频、图像或任何其他二进制大文件。Git LFS 替换仓库中的大文件为文本指针文件&#x…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
前端高频面试题2:浏览器/计算机网络
本专栏相关链接 前端高频面试题1:HTML/CSS 前端高频面试题2:浏览器/计算机网络 前端高频面试题3:JavaScript 1.什么是强缓存、协商缓存? 强缓存: 当浏览器请求资源时,首先检查本地缓存是否命中。如果命…...
深度解析:etcd 在 Milvus 向量数据库中的关键作用
目录 🚀 深度解析:etcd 在 Milvus 向量数据库中的关键作用 💡 什么是 etcd? 🧠 Milvus 架构简介 📦 etcd 在 Milvus 中的核心作用 🔧 实际工作流程示意 ⚠️ 如果 etcd 出现问题会怎样&am…...
简单介绍C++中 string与wstring
在C中,string和wstring是两种用于处理不同字符编码的字符串类型,分别基于char和wchar_t字符类型。以下是它们的详细说明和对比: 1. 基础定义 string 类型:std::string 字符类型:char(通常为8位)…...
ZYNQ学习记录FPGA(二)Verilog语言
一、Verilog简介 1.1 HDL(Hardware Description language) 在解释HDL之前,先来了解一下数字系统设计的流程:逻辑设计 -> 电路实现 -> 系统验证。 逻辑设计又称前端,在这个过程中就需要用到HDL,正文…...
