大数据Flink(一百二十):Flink SQL自定义函数(UDF)
文章目录
Flink SQL自定义函数(UDF)
一、概述
二、自定义标量函数(UDSF)
三、自定义聚合函数(UDAF)
四、 自定义表值函数(UDTF)
Flink SQL自定义函数(UDF)
Flink全托管支持在SQL作业中使用Python自定义函数,Flink支持以下3类自定义函数:UDSF(User Defined Scalar Function)、UDAF(User Defined Aggregation Function)、UDTF(User Defined Table-valued Function)。
一、概述
在资料udf函数中可以看到udx.zip压缩包,将其解压后可以看到有以下文件:
其中udfs.py udafs.py udtfs.py分别对应了UDSF、UDAF、UDTF三个函数的示例。
进入阿里云Flink开发平台,点击左侧导航栏SQL开发,点击左侧的函数页签,单击注册UDF,将udx.zip上传,如下图所示。
点击确定后,Flink开发控制台会解析UDF文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。可以看到这里识别出了三个函数。
点击创建函数,可以看到函数页签下出现了udx目录,下面有三个自定义函数,此时自定义函数创建完成。
二、自定义标量函数(UDSF)
- 自定义标量函数(UDSF)将0个、1个或多个标量值映射到一个新的标量值。输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
udfs.py内容如下:
from pyflink.table import DataTypes
from pyflink.table.udf import udf@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):return s[begin:end]
说明:
- sub_string定义了获取每条数据中从begin~end位的字符的代码;
- 需要通过名字为 “ udf ” 的装饰器,声明这是一个 scalar function;
- 需要通过装饰器中的 result_type 参数,声明 scalar function 的结果类型;
进入阿里云Flink开发平台,在test作业草稿下,进行建表,语句如下:
CREATE TABLE function_udf(a VARCHAR,b INT,c INT
) WITH ('connector' = 'socket','hostname' = '178.23.146.213','port' = '9999','format' = 'csv'
);
- 查询语句如下
SELECT sub_string(a,2,5)
FROM function_udf;
- 在ecs监听9999端口:nc -lk 9999,然后选中查询语句,点击调试.
- 在ecs向9999发送数据
123|456,4,2
12|3456,7,1
结果如下:
查询结果是function_udf表中a字段每行字符串的第3-5个字符。
三、自定义聚合函数(UDAF)
- 自定义聚合函数(UDAF),将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。
udafs.py内容如下:
from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udafclass WeightedAvg(AggregateFunction):def create_accumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, accumulator: Row) -> float:if accumulator[1] == 0:return 0else:return accumulator[0] / accumulator[1]def accumulate(self, accumulator: Row, value, weight):accumulator[0] += value * weightaccumulator[1] += weightdef retract(self, accumulator: Row, value, weight):accumulator[0] -= value * weightaccumulator[1] -= weightweighted_avg = udaf(f=WeightedAvg(),result_type=DataTypes.DOUBLE(),accumulator_type=DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()),DataTypes.FIELD("f1", DataTypes.BIGINT())]))
说明:
- 该示例中,weighted_avg定义了当前数据和历史数据求含权重的均值的代码。
- 需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function,
- 需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate function 的结果类型及 accumulator 类型;
- create_accumulator,get_value 和 accumulate 这 3 个方法必须要定义,retract 方法可以根据需要定义;需要注意的是,由于必须定义 create_accumulator,get_value 和 accumulate 这 3 个方法,Python UDAF 只能通过继承AggregateFunction 的方式进行定义。
仍然使用function_udf表,查询语句如下:
SELECT weighted_avg(b,c)
FROM function_udf;
选中查询语句运行之后,向9999端口依次发送数据,如下:
123|456,4,2
12|3456,7,1
查询结果是以c字段为权重的b字段当前数据和历史数据的均值。
四、 自定义表值函数(UDTF)
自定义表值函数(UDTF),将0个、1个或多个标量值作为输入参数(可以是变长参数)。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。与自定义的标量函数类似,但与标量函数不同。
udtfs.py内容如下:
from pyflink.table import DataTypes
from pyflink.table.udf import udtf@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str):splits = s.split("|")yield splits[0], splits[1]
说明:
- 该示例中,split定义了将一行字符串按照竖线(|)分割成多列字符串的代码。
- 需要通过名字为 “ udtf ” 的装饰器,声明这是一个 table function。
- 需要通过装饰器中的 result_types 参数,声明 table function 的结果类型。由于 table function 每条输出可以包含多个列,result_types 需要指定所有输出列的类型。
仍然使用function_udf表,查询语句如下:
SELECT a,b,c,d,e
FROM function_udf,lateral table(split(a)) as T(d,e);
- 选中查询语句运行之后,向9999端口发送数据,如下
123|456,4,2
12|3456,7,1
结果:
查询结果中,会将function_udf表中每行字符串的a字段按照竖线(|)分割成d,e两列。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:

大数据Flink(一百二十):Flink SQL自定义函数(UDF)
文章目录 Flink SQL自定义函数(UDF) 一、概述 二、自定义标量函数(UDSF) 三、自定义聚合函数(UDAF) 四、 自定义表值函数(UDTF) Flink SQL自定义函数…...

【图像检索】基于灰度共生矩的纹理图像检索,matlab实现
博主简介:matlab图像代码项目合作(扣扣:3249726188) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 本次案例是基于灰度共生矩的纹理图像检索,用matlab实现。 一、案例背景和算法介绍 …...

【操作系统】02.深入理解操作系统
一、操作系统的定位 任何计算机系统都包含一个基本的程序集合,称为操作系统(OS)。笼统的理解,操作系统包括操作系统内核和其他程序。 由上述的宏观图其实我们就知道:操作系统是一款进行软硬件资源管理的软件。 二、设计操作系统的目的 操…...

【Python】探索 Errbot:多功能聊天机器人框架
不是旅行治愈了你,是你在路上放过了自己。 在当今的数字化时代,聊天机器人已成为企业与客户互动、提升工作效率和增加乐趣的重要工具。Errbot是一个高度可扩展的聊天机器人框架,它允许开发者使用Python轻松创建和定制机器人。本文将介绍Errb…...

Linux 调试器 GDB 使用指南
在Linux环境下开发和调试程序时,GNU调试器(GDB)是一个强大的工具。它支持多种编程语言(如C、C、Fortran等),并且可以帮助开发人员检测、排除和修复程序中的错误。GDB能够让你在程序运行时暂停,查…...

MiniCPM3-4B | 笔记本电脑运行端侧大模型OpenBMB/MiniCPM3-4B-GPTQ-Int4量化版 | PyCharm环境
MiniCPM3-4B,轻松在笔记本电脑上运行大模型? 背景一、选择模型二、模型下载三、模型运行四、总结 背景 2024年9月5日,面壁智能发布了MiniCPM3-4B,面壁的测试结果声称MiniCPM3-4B表现超越 Phi-3.5-mini-instruct 和 GPT-3.5-Turbo-…...

【chromedriver编译-绕过selenium机器人检测】
有小伙伴说使用selenium没能绕过机器人检测,盘他。 selenium机器人检测有2种,一是cdp检测,二是webdriver特征检测。cdp检测前面的博客已写过,这里就提下webdriver特征检测。一、selenium简介 Selenium 是一个强大的工具ÿ…...

【JavaEE精炼宝库】HTTP | HTTPS 协议详解
文章目录 一、HTTP 简介二、HTTP 协议格式:2.1 抓包工具的使用:2.2 HTTP 请求报文格式:2.3 HTTP 响应报文格式:2.4 HTTP 协议格式总结: 三、HTTP 请求详解:3.1 刨析 URL:3.2 方法(method)&#…...

Go语言基础学习01
目录 Linux环境下配置安装VScode并配置Go语言开发环境工作区和GOPATH 之前学习过Go语言,学习的时候没有记录笔记,最近找了个极客时间的Go语言36讲,打算时间学习并记录学习过程。 自己抽空看了一点内容,发现这个内容对于0基础解除G…...

基于SSM+Vue+MySQL的酒店管理系统
系统展示 用户前台界面 管理员后台界面 系统背景 随着旅游业的蓬勃发展,酒店业作为旅游产业链中的重要一环,面临着日益增长的客户需求和激烈的市场竞争。传统的人工酒店管理模式已难以满足高效、精准、个性化的服务要求。因此,开发一套基于SS…...

在WPF中保存控件内容为图片
在WPF中保存控件内容为图片 实现代码如下 1 private void SaveControlContentAsImage(FrameworkElement element,string fileName)2 {3 var render new RenderTargetBitmap((int)element.ActualWidth, (int)element.ActualHeight, 96, 96, PixelFormats…...

C#用SDK打开海康工业相机,callback取图Bitmap格式,并保存
上次写了python版本的,但是python虽好不方便发布,她带着重重的解释器有时候不方便玩耍.于是C#来了哦. C#图像一般用Bitmap表示,所以完全C#就够,别的格式可以自行想转换. 命令行哦,没界面. MVCamera.cs从MVS示例里面添加到项目中,using MvCamCtrl.NET; 就可以,不需要添加mvca…...

C语言字符学习初级优先看这个就够了
1. 字符的基本概念 在C语言中,字符(char)是一个基本的数据类型,用来表示单个字符。字符用单引号( )括起来,例如 a、1 等。字符在内存中实际上是以整数的形式存储的,即 ASCII 码。例…...

Python JSON
JSON 函数 json.dumps 语法 实例 json.loads 语法 实例 使用第三方库:Demjson 环境配置 JSON 函数 encode 语法 实例 decode 语法 实例 JSON 函数 使用 JSON 函数需要导入 json 库:import json。 函数描述json.dumps将 Python 对象编码…...

【华为杯】2024华为杯数模研赛F题 解题思路
题目 X射线脉冲星光子到达时间建模 问题背景 高速公路拥堵现象的原因众多,除了交通事故外,最典型的就是部分路段出现瓶颈现象,主要原因是车辆汇聚,而拥堵后又容易蔓延。高速公路一些特定的路段容易形成堵点,如匝道出…...

Object Pascal 结构化程序设计
Object Pascal 关系运算符 运算符名称等于<>不等于>大于<小于>大于等于<小于等于< (属于元素的)包含于> (属于元素的)包含in (属于元素的)属于 # Object Pascal 逻辑运算符 运算符名称含义Not逻辑非单目运算符,进行取反操作,由T…...

机器学习算法与实践_03概率论与贝叶斯算法笔记
1、概率论基础知识介绍 人工智能项目本质上是一个统计学项目,是通过对 样本 的分析,来评估/估计 总体 的情况,与数学知识相关联 高等数学 ——> 模型优化 概率论与数理统计 ——> 建模思想 线性代数 ——> 高性能计算 在机器学…...

如何使用Privoxy将SOCKS5代理转换为HTTP代理?
在这篇博客中,我将介绍如何使用Privoxy将SOCKS5代理转换为HTTP代理。我们将从下载和安装Privoxy开始,接着配置Privoxy,最后配置Windows以便浏览器使用该代理。 1. 下载并安装Privoxy 首先,您需要下载并安装Privoxy。您可以从Pri…...

AJAX(一)HTTP协议(请求响应报文),AJAX发送请求,请求问题处理
文章目录 一、AJAX二、HTTP协议1. 请求报文2. 响应报文 三、AJAX案例准备1. 安装node2. Express搭建服务器3. 安装nodemon实现自动重启 四、AJAX发送请求1. GET请求2. POST请求(1) 配置请求体(2) 配置请求头 3. 响应JSON数据的两种方式(1) 手动,JSON.parse()(2) 设置…...

Git进阶(十五):Git LFS 使用详解
文章目录 一、介绍二、Git LFS 使用步骤三、场景示例四、拓展阅读 一、介绍 Git LFS (Large File Storage) 是一个 Git 扩展,它使 Git 更适合处理大型文件,如音频、视频、图像或任何其他二进制大文件。Git LFS 替换仓库中的大文件为文本指针文件&#x…...

操作系统 | 学习笔记 | | 王道 | 5.1 I/O管理概述
5.1 I/O管理概述 5.1.1 I/O设备 注:块设备可以寻址,但是字符设备是不可寻址的 I/O设备是将数据输入到计算机中,或者可以接收计算机输出数据的外部设备,属于计算机中的硬件部件; 设备的分类 按使用特性分类ÿ…...

关于es的一个多集群、多索引切换的实现
首先是封装了一个类里定义了关于集群名称和集群节点;以及关于索引的名称和集群的名称做一个关联;将多个集群封装存储得到类中 /*** es集群类*/ Data public class EsClusterConfig implements Serializable {/*** 集群名称*/private String name;/*** 集…...

Linux系统编程(基础指令)上
1.Linux常见目录介绍 Linux目录为树形结构 /:根目录,一般根目录下只存放目录,在Linux下有且只有一个根目录。所有的东西都是从这里开始。当你在终端里输入“/home”,你其实是在告诉电脑,先从/(根目录&…...

【STM32 Blue Pill编程】-定时器PWM模式
定时器PWM模式 文章目录 定时器PWM模式1、定时器PWM模式介绍2、硬件准备及接线3、模块配置4、代码实现在文中,我们将介绍如何使用 STM32 Blue Pill 定时器的PWM模式以及如何配置它们以生成具有不同占空比和频率的信号。 我们将使用 LED调光器示例来演示如何使用 STM32Cube IDE…...

数字英文验证码识别 API 对接说明
数字英文验证码识别 API 对接说明 本文将介绍一种 数字英文验证码识别 API 对接说明,它是基于深度学习技术,可用于识别变长英文数字验证码。输入验证码图像的内容,输出验证码结果。 接下来介绍下 数字英文验证码识别 API 的对接说明。 注册…...

稳了,搭建Docker国内源图文教程
大家好,之前分享了我的开源作品 Cloudflare Workers Proxy,它的作用是代理被屏蔽的地址,理论上支持代理任何被屏蔽的域名,使用方式也很简单,只需要设置环境变量 PROXY_HOSTNAME 为被屏蔽的域名,最后通过你的…...

零工市场小程序:推动零工市场建设
人力资源和社会保障部在2024年4月发布了标题为《地方推进零工市场建设经验做法》的文章。 零工市场小程序的功能 信息登记与发布 精准匹配、推送 在线沟通 权益保障 零工市场小程序作为一个找零工的渠道,在往后随着技术的发展和政策的支持下,功能必然…...

回归预测 | Matlab实现SSA-HKELM麻雀算法优化混合核极限学习机多变量回归预测
回归预测 | Matlab实现SSA-HKELM麻雀算法优化混合核极限学习机多变量回归预测 目录 回归预测 | Matlab实现SSA-HKELM麻雀算法优化混合核极限学习机多变量回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现SSA-HKELM麻雀算法优化混合核极限学习机多变量…...

VCNet论文阅读笔记
VCNet论文阅读笔记 0、基本信息 信息细节英文题目VCNet and Functional Targeted Regularization For Learning Causal Effects of Continuous Treatments翻译VCNet和功能目标正则化用于学习连续处理的因果效应单位芝加哥大学年份2021论文链接[2103.07861] VCNet和功能定向正…...

Python 装饰器使用详解
文章目录 0. 引言1. 什么是装饰器?2. 装饰器的基本语法3. 装饰器的工作原理4. 常见装饰器应用场景4.1. 日志记录4.2. 权限校验4.3. 缓存 5. 多重装饰器的执行顺序6. 装饰器的高级用法6.1. 带参数的装饰器6.2. 使用 functools.wraps6.3. 类装饰器 7. 图示说明7.1. 单…...