大数据Flink(一百二十):Flink SQL自定义函数(UDF)
文章目录
Flink SQL自定义函数(UDF)
一、概述
二、自定义标量函数(UDSF)
三、自定义聚合函数(UDAF)
四、 自定义表值函数(UDTF)
Flink SQL自定义函数(UDF)
Flink全托管支持在SQL作业中使用Python自定义函数,Flink支持以下3类自定义函数:UDSF(User Defined Scalar Function)、UDAF(User Defined Aggregation Function)、UDTF(User Defined Table-valued Function)。
一、概述
在资料udf函数中可以看到udx.zip压缩包,将其解压后可以看到有以下文件:
其中udfs.py udafs.py udtfs.py分别对应了UDSF、UDAF、UDTF三个函数的示例。
进入阿里云Flink开发平台,点击左侧导航栏SQL开发,点击左侧的函数页签,单击注册UDF,将udx.zip上传,如下图所示。
点击确定后,Flink开发控制台会解析UDF文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。可以看到这里识别出了三个函数。
点击创建函数,可以看到函数页签下出现了udx目录,下面有三个自定义函数,此时自定义函数创建完成。
二、自定义标量函数(UDSF)
- 自定义标量函数(UDSF)将0个、1个或多个标量值映射到一个新的标量值。输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
udfs.py内容如下:
from pyflink.table import DataTypes
from pyflink.table.udf import udf@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):return s[begin:end]
说明:
- sub_string定义了获取每条数据中从begin~end位的字符的代码;
- 需要通过名字为 “ udf ” 的装饰器,声明这是一个 scalar function;
- 需要通过装饰器中的 result_type 参数,声明 scalar function 的结果类型;
进入阿里云Flink开发平台,在test作业草稿下,进行建表,语句如下:
CREATE TABLE function_udf(a VARCHAR,b INT,c INT
) WITH ('connector' = 'socket','hostname' = '178.23.146.213','port' = '9999','format' = 'csv'
);
- 查询语句如下
SELECT sub_string(a,2,5)
FROM function_udf;
- 在ecs监听9999端口:nc -lk 9999,然后选中查询语句,点击调试.
- 在ecs向9999发送数据
123|456,4,2
12|3456,7,1
结果如下:
查询结果是function_udf表中a字段每行字符串的第3-5个字符。
三、自定义聚合函数(UDAF)
- 自定义聚合函数(UDAF),将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。
udafs.py内容如下:
from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udafclass WeightedAvg(AggregateFunction):def create_accumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, accumulator: Row) -> float:if accumulator[1] == 0:return 0else:return accumulator[0] / accumulator[1]def accumulate(self, accumulator: Row, value, weight):accumulator[0] += value * weightaccumulator[1] += weightdef retract(self, accumulator: Row, value, weight):accumulator[0] -= value * weightaccumulator[1] -= weightweighted_avg = udaf(f=WeightedAvg(),result_type=DataTypes.DOUBLE(),accumulator_type=DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()),DataTypes.FIELD("f1", DataTypes.BIGINT())]))
说明:
- 该示例中,weighted_avg定义了当前数据和历史数据求含权重的均值的代码。
- 需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function,
- 需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate function 的结果类型及 accumulator 类型;
- create_accumulator,get_value 和 accumulate 这 3 个方法必须要定义,retract 方法可以根据需要定义;需要注意的是,由于必须定义 create_accumulator,get_value 和 accumulate 这 3 个方法,Python UDAF 只能通过继承AggregateFunction 的方式进行定义。
仍然使用function_udf表,查询语句如下:
SELECT weighted_avg(b,c)
FROM function_udf;
选中查询语句运行之后,向9999端口依次发送数据,如下:
123|456,4,2
12|3456,7,1
查询结果是以c字段为权重的b字段当前数据和历史数据的均值。
四、 自定义表值函数(UDTF)
自定义表值函数(UDTF),将0个、1个或多个标量值作为输入参数(可以是变长参数)。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。与自定义的标量函数类似,但与标量函数不同。
udtfs.py内容如下:
from pyflink.table import DataTypes
from pyflink.table.udf import udtf@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str):splits = s.split("|")yield splits[0], splits[1]
说明:
- 该示例中,split定义了将一行字符串按照竖线(|)分割成多列字符串的代码。
- 需要通过名字为 “ udtf ” 的装饰器,声明这是一个 table function。
- 需要通过装饰器中的 result_types 参数,声明 table function 的结果类型。由于 table function 每条输出可以包含多个列,result_types 需要指定所有输出列的类型。
仍然使用function_udf表,查询语句如下:
SELECT a,b,c,d,e
FROM function_udf,lateral table(split(a)) as T(d,e);
- 选中查询语句运行之后,向9999端口发送数据,如下
123|456,4,2
12|3456,7,1
结果:
查询结果中,会将function_udf表中每行字符串的a字段按照竖线(|)分割成d,e两列。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:

大数据Flink(一百二十):Flink SQL自定义函数(UDF)
文章目录 Flink SQL自定义函数(UDF) 一、概述 二、自定义标量函数(UDSF) 三、自定义聚合函数(UDAF) 四、 自定义表值函数(UDTF) Flink SQL自定义函数…...

【图像检索】基于灰度共生矩的纹理图像检索,matlab实现
博主简介:matlab图像代码项目合作(扣扣:3249726188) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 本次案例是基于灰度共生矩的纹理图像检索,用matlab实现。 一、案例背景和算法介绍 …...

【操作系统】02.深入理解操作系统
一、操作系统的定位 任何计算机系统都包含一个基本的程序集合,称为操作系统(OS)。笼统的理解,操作系统包括操作系统内核和其他程序。 由上述的宏观图其实我们就知道:操作系统是一款进行软硬件资源管理的软件。 二、设计操作系统的目的 操…...

【Python】探索 Errbot:多功能聊天机器人框架
不是旅行治愈了你,是你在路上放过了自己。 在当今的数字化时代,聊天机器人已成为企业与客户互动、提升工作效率和增加乐趣的重要工具。Errbot是一个高度可扩展的聊天机器人框架,它允许开发者使用Python轻松创建和定制机器人。本文将介绍Errb…...
Linux 调试器 GDB 使用指南
在Linux环境下开发和调试程序时,GNU调试器(GDB)是一个强大的工具。它支持多种编程语言(如C、C、Fortran等),并且可以帮助开发人员检测、排除和修复程序中的错误。GDB能够让你在程序运行时暂停,查…...

MiniCPM3-4B | 笔记本电脑运行端侧大模型OpenBMB/MiniCPM3-4B-GPTQ-Int4量化版 | PyCharm环境
MiniCPM3-4B,轻松在笔记本电脑上运行大模型? 背景一、选择模型二、模型下载三、模型运行四、总结 背景 2024年9月5日,面壁智能发布了MiniCPM3-4B,面壁的测试结果声称MiniCPM3-4B表现超越 Phi-3.5-mini-instruct 和 GPT-3.5-Turbo-…...

【chromedriver编译-绕过selenium机器人检测】
有小伙伴说使用selenium没能绕过机器人检测,盘他。 selenium机器人检测有2种,一是cdp检测,二是webdriver特征检测。cdp检测前面的博客已写过,这里就提下webdriver特征检测。一、selenium简介 Selenium 是一个强大的工具ÿ…...

【JavaEE精炼宝库】HTTP | HTTPS 协议详解
文章目录 一、HTTP 简介二、HTTP 协议格式:2.1 抓包工具的使用:2.2 HTTP 请求报文格式:2.3 HTTP 响应报文格式:2.4 HTTP 协议格式总结: 三、HTTP 请求详解:3.1 刨析 URL:3.2 方法(method)&#…...
Go语言基础学习01
目录 Linux环境下配置安装VScode并配置Go语言开发环境工作区和GOPATH 之前学习过Go语言,学习的时候没有记录笔记,最近找了个极客时间的Go语言36讲,打算时间学习并记录学习过程。 自己抽空看了一点内容,发现这个内容对于0基础解除G…...

基于SSM+Vue+MySQL的酒店管理系统
系统展示 用户前台界面 管理员后台界面 系统背景 随着旅游业的蓬勃发展,酒店业作为旅游产业链中的重要一环,面临着日益增长的客户需求和激烈的市场竞争。传统的人工酒店管理模式已难以满足高效、精准、个性化的服务要求。因此,开发一套基于SS…...
在WPF中保存控件内容为图片
在WPF中保存控件内容为图片 实现代码如下 1 private void SaveControlContentAsImage(FrameworkElement element,string fileName)2 {3 var render new RenderTargetBitmap((int)element.ActualWidth, (int)element.ActualHeight, 96, 96, PixelFormats…...
C#用SDK打开海康工业相机,callback取图Bitmap格式,并保存
上次写了python版本的,但是python虽好不方便发布,她带着重重的解释器有时候不方便玩耍.于是C#来了哦. C#图像一般用Bitmap表示,所以完全C#就够,别的格式可以自行想转换. 命令行哦,没界面. MVCamera.cs从MVS示例里面添加到项目中,using MvCamCtrl.NET; 就可以,不需要添加mvca…...
C语言字符学习初级优先看这个就够了
1. 字符的基本概念 在C语言中,字符(char)是一个基本的数据类型,用来表示单个字符。字符用单引号( )括起来,例如 a、1 等。字符在内存中实际上是以整数的形式存储的,即 ASCII 码。例…...
Python JSON
JSON 函数 json.dumps 语法 实例 json.loads 语法 实例 使用第三方库:Demjson 环境配置 JSON 函数 encode 语法 实例 decode 语法 实例 JSON 函数 使用 JSON 函数需要导入 json 库:import json。 函数描述json.dumps将 Python 对象编码…...

【华为杯】2024华为杯数模研赛F题 解题思路
题目 X射线脉冲星光子到达时间建模 问题背景 高速公路拥堵现象的原因众多,除了交通事故外,最典型的就是部分路段出现瓶颈现象,主要原因是车辆汇聚,而拥堵后又容易蔓延。高速公路一些特定的路段容易形成堵点,如匝道出…...
Object Pascal 结构化程序设计
Object Pascal 关系运算符 运算符名称等于<>不等于>大于<小于>大于等于<小于等于< (属于元素的)包含于> (属于元素的)包含in (属于元素的)属于 # Object Pascal 逻辑运算符 运算符名称含义Not逻辑非单目运算符,进行取反操作,由T…...

机器学习算法与实践_03概率论与贝叶斯算法笔记
1、概率论基础知识介绍 人工智能项目本质上是一个统计学项目,是通过对 样本 的分析,来评估/估计 总体 的情况,与数学知识相关联 高等数学 ——> 模型优化 概率论与数理统计 ——> 建模思想 线性代数 ——> 高性能计算 在机器学…...
如何使用Privoxy将SOCKS5代理转换为HTTP代理?
在这篇博客中,我将介绍如何使用Privoxy将SOCKS5代理转换为HTTP代理。我们将从下载和安装Privoxy开始,接着配置Privoxy,最后配置Windows以便浏览器使用该代理。 1. 下载并安装Privoxy 首先,您需要下载并安装Privoxy。您可以从Pri…...

AJAX(一)HTTP协议(请求响应报文),AJAX发送请求,请求问题处理
文章目录 一、AJAX二、HTTP协议1. 请求报文2. 响应报文 三、AJAX案例准备1. 安装node2. Express搭建服务器3. 安装nodemon实现自动重启 四、AJAX发送请求1. GET请求2. POST请求(1) 配置请求体(2) 配置请求头 3. 响应JSON数据的两种方式(1) 手动,JSON.parse()(2) 设置…...
Git进阶(十五):Git LFS 使用详解
文章目录 一、介绍二、Git LFS 使用步骤三、场景示例四、拓展阅读 一、介绍 Git LFS (Large File Storage) 是一个 Git 扩展,它使 Git 更适合处理大型文件,如音频、视频、图像或任何其他二进制大文件。Git LFS 替换仓库中的大文件为文本指针文件&#x…...

XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...

关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...