当前位置: 首页 > news >正文

0基础学习PyFlink——用户自定义函数之UDF

大纲

  • 标量函数
    • 入参并非表中一行(Row)
    • 入参是表中一行(Row)
    • alias

PyFlink中关于用户定义方法有:

  • UDF:用户自定义函数。
  • UDTF:用户自定义表值函数。
  • UDAF:用户自定义聚合函数。
  • UDTAF:用户自定义表值聚合函数。

这些字母可以拆解如下:

  • UD表示User Defined(用户自定义);
  • F表示Function(方法);
  • T表示Table(表);
  • A表示Aggregate(聚合);
    在这里插入图片描述
    Aggregate(聚合)函数是指:以多行数据为输入,计算出一个新的值的函数。这块我们会在后续的章节介绍,本文我们主要介绍非聚合类型的用户自定义方法的简单使用。

标量函数

即我们常见的UDF。

def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

我们主要关注result_type和input_types,它们分别用于确定函数的输入和输出。
input_types可以是List[DataType], DataType, str, List[str]之一任何一种,这个要视使用者决定。UDTF也是这种类型,它们没啥区别。
result_type只能是DataType或str;而UDTF可以是List[DataType], DataType, str, List[str]任意之一。这也是UDF和UDTF最大的区别。
我们以一个例子来介绍它的用法。这个例子会将大写字符转换成小写字符,然后统计字符出现的次数。
在介绍例子之前,我们先构造Execute之前的准备环境

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)

这段代码从读取数据word_count_data,并构造出tab_source作为输入数据暂存的表。下面我们看下入参不同时,UDF怎么写

入参并非表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())

input_types我们设置成[DataTypes.STRING()],即该数组中只有一个参数,也表示修饰的方法只有一个参数,类型是String。如果觉得input_types写起来麻烦,这个参数可以不设置。
result_type我们设置为一个DataTypes.ROW([DataTypes.FIELD(“lower_word”, DataTypes.STRING())])。我们可以把它看成是一个新表的结构描述,即一行只有一个字段——lower_word,它的类型也是String。

    tab_lower=tab_source.map(colFunc(col('word')))

map方法中,我们会给UDF修饰的方法传入原始表tab_source每行中的word字段的值。然后构造出一个新的表tab_lower。这个新的表没有word字段,只有UDF中result_type定义的lower_word。

def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

后续只要使用这个新表,新字段即可。

    tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source )# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)@udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())tab_lower=tab_source.map(colFunc(col('word')))   tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':word_count()

入参是表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=row_type_tab_source)def rowFunc(row):return Row(row[0].lower())tab_lower=tab_source.map(rowFunc) tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

主要的区别是map方法直接传递udf修饰的方法,而不是直接其调用返回值。input_types是原始表的行结构——RowType,而不是一个参数数组。
map方法给rowFunc传递原始表tab_source的每行数据,然后构造出一个新表tab_lower。新表的字段也在udf的result_type中定义了,它是String类型的lower_word。后面我们对新表就要聚合统计这个新的字段,而不是老表中的字段。

alias

前面两个案例,在定义UDF时,我们严格设置了result_type和input_types。实际input_types可以不用设置,但是result_type必须设置。上面例子中,result_type我们都设置为RowType,即表行的结构。如果觉得这样写很麻烦,可以考虑使用alias来实现。

    @udf(result_type=DataTypes.STRING())def colFunc(oneCol):return oneCol.lower()tab_lower=tab_source.map(colFunc(col('word'))).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()
    @udf(result_type=DataTypes.STRING())def rowFunc(row):return row[0].lower()tab_lower=tab_source.map(rowFunc).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

这样我们在定义udf时,只是指定了返回类型是个字符串,也不知道它在新表中叫啥名字(实际叫f0)。但是为了便于后续使用,我们使用alias给它取了一个别名lower_word。这样就可以让其参与后续的计算了。

相关文章:

0基础学习PyFlink——用户自定义函数之UDF

大纲 标量函数入参并非表中一行(Row)入参是表中一行(Row)alias PyFlink中关于用户定义方法有: UDF:用户自定义函数。UDTF:用户自定义表值函数。UDAF:用户自定义聚合函数。UDTAF&…...

英语小作文模板(06求助+描述;07描述+建议)

06 求助描述: 题目背景及要求 第一段 第二段 第三段 翻译成中文 07 描述+建议: 题目背景及要求 第一段 第二段...

为什么感觉假期有时候比上班还累?

假期比上班还累的感觉可能由以下几个原因造成: 计划过度:在假期里,人们往往会制定各种计划,如旅游、聚会、休息等,以充分利用这段时间。然而,如果这些计划过于紧张或安排得过于紧密,就会导致身…...

推理还是背诵?通过反事实任务探索语言模型的能力和局限性

推理还是背诵?通过反事实任务探索语言模型的能力和局限性 摘要1 引言2 反事实任务2.1 反事实理解检测 3 任务3.1 算术3.2 编程3.3 基本的句法推理3.4 带有一阶逻辑的自然语言推理3.5 空间推理3.6 绘图3.7 音乐3.8 国际象棋 4 结果5 分析5.1 反事实条件的“普遍性”5…...

《利息理论》指导 TCP 拥塞控制

欧文费雪《利息原理》第 10 章,第 11 章对利息的几何说明是普适的,任何一个负反馈系统都能引申出新结论。给出原书图示,本文依据于此,详情参考原书: 将 burst 看作借贷是合理的,它包含成本(报文)&#xf…...

Bsdiff,Bspatch 的差分增量升级(基于Win和Linux)

目录 背景 内容 准备工作 在windows平台上 在linux平台上 正式工作 生成差分文件思路 作用差分文件思路 在保持相同目录结构进行差分增量升级 服务端(生成差分文件) 客户端(作用差分文件) 背景 像常见的Android 的linux平台,游戏,系统更新都…...

【3妹教我学历史-秦朝史】2 秦穆公-韩原之战

插: 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 坚持不懈,越努力越幸运,大家一起学习鸭~~~ 3妹:2哥,今天下班这么早&#…...

车载控制器

文章目录 车载控制器电动汽车上都有什么ECU 车载控制器 智能汽车上的控制器数量因车型和制造商而异。一般来说,现代汽车可能有50到100个电子控制单元(ECU)或控制器。这些控制器负责管理各种系统,如发动机管理、刹车、转向、空调、…...

回归预测 | Matlab实现RIME-CNN-SVM霜冰优化算法优化卷积神经网络-支持向量机的多变量回归预测

回归预测 | Matlab实现RIME-CNN-SVM霜冰优化算法优化卷积神经网络-支持向量机的多变量回归预测 目录 回归预测 | Matlab实现RIME-CNN-SVM霜冰优化算法优化卷积神经网络-支持向量机的多变量回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.RIME-CNN-SVM霜冰优化算…...

使用Jaeger进行分布式跟踪:学习如何在服务网格中使用Jaeger来监控和分析请求的跟踪信息

🌷🍁 博主猫头虎 带您 Go to New World.✨🍁 🦄 博客首页——猫头虎的博客🎐 🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺 &a…...

添加多个单元对象

开发环境: Windows 11 家庭中文版Microsoft Visual Studio Community 2019VTK-9.3.0.rc0vtk-example参考代码 demo解决问题:不同阶段添加多个单元对象。 定义一个点集和一个单元集合,单元的类型可以是点、三角形、矩形、多边形等基本图形。只…...

十八、模型构建器(ModelBuilder)快速提取城市建成区——批量掩膜提取夜光数据、夜光数据转面、面数据融合、要素转Excel(基于参考比较法)

一、前言 前文实现批量投影栅格、转为整型,接下来重点实现批量提取夜光数据,夜光数据转面、夜光数据面数据融合、要素转Excel。将相关结果转为Excel,接下来就是在Excel中进行阈值的确定,阈值确定无法通过批量操作,除非采用其他方式,但是那样的学习成本较高,对于参考比较…...

HarmonyOS开发:基于http开源一个网络请求库

前言 网络封装的目的,在于简洁,使用起来更加的方便,也易于我们进行相关动作的设置,如果,我们不封装,那么每次请求,就会重复大量的代码逻辑,如下代码,是官方给出的案例&am…...

【杂记】Ubuntu20.04装系统,安装CUDA等

装20.04系统 安装系统的过程中,ROG的B660G主板,即使不关掉Secure boot也是可以的,不会影响正常安装,我这边出现问题的主要原因是使用了Ventoy制作的系统安装盘,导致每次一选择使用U盘的UEFI启动,就会跳回到…...

040-第三代软件开发-全新波形抓取算法

第三代软件开发-全新波形抓取算法 文章目录 第三代软件开发-全新波形抓取算法项目介绍全新波形抓取算法代码小解 关键字: Qt、 Qml、 抓波、 截获、 波形 项目介绍 欢迎来到我们的 QML & C 项目!这个项目结合了 QML(Qt Meta-Object …...

分享一个基于asp.net的供销社农产品商品销售系统的设计与实现(源码调试 lw开题报告ppt)

💕💕作者:计算机源码社 💕💕个人简介:本人七年开发经验,擅长Java、Python、PHP、.NET、微信小程序、爬虫、大数据等,大家有这一块的问题可以一起交流! 💕&…...

Java基于SpringBoot的线上考试系统

1 摘 要 基于 SpringBoot 的在线考试系统网站,功能模块具有课程管理、成绩管理、教师管理、学生管理、考试管理以及基本信息的管理等,通过将系统分为管理员、授课教师以及学生,从不同的身份角度来对用户提供便利,将科技与教学模式…...

flask socketio 实时传值至html上【需补充实例】

目前版本如下 Flask-Cors 4.0.0 Flask-SocketIO 5.3.6from flask_socketio import SocketIO, emit 跨域问题网上的普通方法无法解决。 参考这篇文章解决 Flask教程(十九)SocketIO - 迷途小书童的Note迷途小书童的Note (xugaoxiang.com) app Flask(__name__) socketio Sock…...

C# Onnx P2PNet 人群检测和计数

效果 项目 代码 using Microsoft.ML.OnnxRuntime; using Microsoft.ML.OnnxRuntime.Tensors; using OpenCvSharp; using System; using System.Collections.Generic; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms;namespace Onnx…...

idea提交代码一直提示 log into gitee

解决idea提交代码一直提示 log into gitee问题 文章目录 打开setting->Version control->gitee,删除旧账号,重新配置账号,删除重新登录就好 打开setting->Version control->gitee,删除旧账号,重新配置账号,删除重新登…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...

视频字幕质量评估的大规模细粒度基准

大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...

【Oracle】分区表

个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

Mysql8 忘记密码重置,以及问题解决

1.使用免密登录 找到配置MySQL文件,我的文件路径是/etc/mysql/my.cnf,有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)

考察一般的三次多项式,以r为参数: p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]; 此多项式的根为: 尽管看起来这个多项式是特殊的,其实一般的三次多项式都是可以通过线性变换化为这个形式…...