0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析
在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器:
- UDF:User Defined Scalar Function
- UDTF:User Defined Table Function
- UDAF:User Defined Aggregate Function
- UDTAF:User Defined Table Aggregate Function
在很多案例中,我们看到udf、udtf和udaf几个装饰器修饰function
@udf(result_type=DataTypes.BIGINT())
def add(i, j):return i + j@udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
def range_emit(s, e):for i in range(e):yield s, i@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):return v.mean()
但是没有见到udtaf修饰function的案例,比如
# 错误的
@udtaf(result_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING()) , DataTypes.FIELD("count", DataTypes.BIGINT())]), accumulator_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING())]), func_type="general")
def lower(line):yield Row('a', 1)
这是因为这儿存在一个悖论
udtaf要求func_type必须是general
def udtaf(f: Union[Callable, TableAggregateFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,accumulator_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None,func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]:"""Helper method for creating a user-defined table aggregate function.:param f: user-defined table aggregate function.:param input_types: optional, the input data types.:param result_type: the result data type.:param accumulator_type: optional, the accumulator data type.:param deterministic: the determinism of the function's results. True if and only if a call tothis function is guaranteed to always return the same result given thesame parameters. (default True):param name: the function name.:param func_type: the type of the python function, available value: general(default: general):return: UserDefinedAggregateFunctionWrapper or function... versionadded:: 1.13.0"""if func_type != 'general':raise ValueError("The func_type must be 'general', got %s."% func_type)if f is None:return functools.partial(_create_udtaf, input_types=input_types, result_type=result_type,accumulator_type=accumulator_type, func_type=func_type,deterministic=deterministic, name=name)else:return _create_udtaf(f, input_types, result_type, accumulator_type, func_type,deterministic, name)
如果func_type不是’general’,则会抛出错误,所以func_type="pandas"是不可以的。
udtaf修饰方法后的返回类型是UserDefinedAggregateFunctionWrapper。
def _create_udtaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name):return UserDefinedAggregateFunctionWrapper(f, input_types, result_type, accumulator_type, func_type, deterministic, name, True)
delegate function要求非func_type必须是pandas
Table API下只有这些方法接受udtaf修饰function返回的UserDefinedAggregateFunctionWrapper。
- def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘AggregatedTable’
- def flat_aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘FlatAggregateTable’
这些方法的在底层会调用被修饰的UserDefinedFunctionWrapper。
def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) \-> 'AggregatedTable':"""Performs a global aggregate operation with an aggregate function. You have to close theaggregate with a select statement... versionadded:: 1.13.0"""if isinstance(func, Expression):return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)else:func._set_takes_row_as_input()if hasattr(func, "_alias_names"):alias_names = getattr(func, "_alias_names")func = func(with_columns(col("*"))).alias(*alias_names)else:func = func(with_columns(col("*")))return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
进而会调用到_java_user_defined_function。由于udtaf修饰的方法不是UserDefinedFunction对象,而是一个function,所以它会通过_create_delegate_function创建新的func 。
class UserDefinedFunctionWrapper(object):
……def _java_user_defined_function(self):……if not isinstance(self._func, UserDefinedFunction):func = self._create_delegate_function()……
而_create_delegate_function则要求udtaf中的function的func_type必须是pandas
def _create_delegate_function(self) -> UserDefinedFunction:assert self._func_type == 'pandas'return DelegatingPandasAggregateFunction(self._func)
这就和之前udtaf中要求func_type必须是general相背。
所以我们没看到udtaf修饰function的案例。
相关文章:
0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析
在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器: UDF:User Defined Scalar FunctionUDTF:User Defined Table FunctionUDAF:User Defined Aggrega…...
Spring Boot Endpoints:端点
Spring Boot 内置端点以及暴露端点列表: 端点被启用后,并不一定能够被访问,还要看端点是否被暴露,并且暴露的方式是怎样的。因为端点可能会包含敏感信息,所以需要谨慎暴露相关端点。Spring Boot 3.0.0 更改了默认暴露…...

漏洞复现--用友 畅捷通T+ .net反序列化RCE
免责声明: 文章中涉及的漏洞均已修复,敏感信息均已做打码处理,文章仅做经验分享用途,切勿当真,未授权的攻击属于非法行为!文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直…...
PHP 共享茶室棋牌室无人软硬件结合开发小程序系统的开发优势
随着科技的发展和人们生活方式的改变,共享经济和智能化成为了越来越受欢迎的趋势。在这样一个背景下,PHP共享茶室棋牌室无人软硬件结合开发小程序系统的出现,为人们提供了一种全新的娱乐和生活方式。本文将详细介绍PHP共享茶室棋牌室无人软硬…...

kibana监控
采取方式 Elastic Agent :更完善的功能 Metricbeat:轻量级指标收集(采用) 传统收集方法:使用内部导出器收集指标,已不建议 安装 metricbeat Download Metricbeat • Ship Metrics to Elasticsearch | E…...

基于 ARM+FPGA+AD平台的多类型同步信号采集仪开发及试验验证(二)板卡总体设计
2.2 板卡总体设计 本章开发了一款基于 AD7193RJ45 的多类型传感信号同步调理板卡,如图 2.4 所 示,负责将传感器传来的模拟电信号转化为数字信号,以供数据采集系统采集,实现了 单通道自由切换传感信号类型与同步采集多类型传…...

uniapp: 本应用使用HBuilderX x.x.xx 或对应的cli版本编译,而手机端SDK版本是 x.x.xx。不匹配的版本可能造成应用异常。
文章目录 前言一、原因分析二、解决方案2.1、方案一:更新HbuilderX版本2.2、方案二:设置固定的版本2.3、方案三:忽略版本(不推荐) 三、总结四、感谢 前言 项目场景:示例:通过使用HbuilderX打包…...

sqoop和flume简单安装配置使用
1. Sqoop 1.1 Sqoop介绍 Sqoop 是一个在结构化数据和 Hadoop 之间进行批量数据迁移的工具 结构化数据可以是MySQL、Oracle等关系型数据库 把关系型数据库的数据导入到 Hadoop 与其相关的系统 把数据从 Hadoop 系统里抽取并导出到关系型数据库里 底层用 MapReduce 实现数据 …...

什么是React Router?它的作用是什么?
聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…...

界面控件DevExtreme v23.1 - UI组件 UI模板库增强
DevExtreme拥有高性能的HTML5 / JavaScript小部件集合,使您可以利用现代Web开发堆栈(包括React,Angular,ASP.NET Core,jQuery,Knockout等)构建交互式的Web应用程序。从Angular和Reac,…...
Fedora Linux 38下Mariadb数据库设置utf8mb4字符编码
Fedora操作系统之下最好使用开源免费的MySQL替代品Mariadb来学习MySQL的知识,一点也不会耽搁。 连接上互联网后,打开shell命令行界面,Sudo dnf install mariadb-server mariadb -y就可以安装好 mariadb-server和 mariadb࿰…...

【单元测试】--高级主题
一、模拟与存根深入 在单元测试中,模拟(Mock)和存根(Stub)是两种常用的测试替代品,用于模拟外部依赖或模拟特定行为,以便测试能够独立运行。以下是深入了解模拟与存根的概念,以NUni…...
面向对象程序设计(2023年10月)
设计模式:参考下文 23 种设计模式详解(全23种)-CSDN博客...
常用正在表达式
function isIP(s) //by zergling { var patrn/^[0-9.]{1,20}$/; if (!patrn.exec(s)) return false return true } //校验密码:只能输入6-20个字母、数字、下划线 function isPasswd(s) { var patrn/^(\w){6,20}$/; if (!patrn.exec(s)) return false return true …...

ES6初步了解Map对象(含十种方法)
ES6提供了 Map数据结构。它类似于对象,也是键值对的集合。但是“键”的范围不限于字符串,各种类型的值(包括对象)都可以当作键。 创建方法 let m new Map()console.log(m)Map的方法 1.set( ) 添加元素 接收两个参数,…...
推荐一款可以识别m3u8格式ts流批量下载并且合成mp4视频的chrome插件——猫抓
https://chrome.google.com/webstore/detail/%E7%8C%AB%E6%8A%93/jfedfbgedapdagkghmgibemcoggfppbb?utm_sourceext_app_menuhttps://chrome.google.com/webstore/detail/%E7%8C%AB%E6%8A%93/jfedfbgedapdagkghmgibemcoggfppbb?utm_sourceext_app_menu 网页媒体嗅探工具 一…...
文本处理方法及其在NLP中的应用
文本处理方法及其在NLP中的应用 了解 在自然语言处理(NLP)领域,文本处理是一个至关重要的环节。 本篇博文将介绍几种常用的文本处理方法,并重点讨论了其中两种:One-Hot编码和停用词过滤。这些方法对于将文本转化为计…...
html文字一行时靠右,多行时靠左
html文字一行时靠右,多行时靠左 元素居中 display: block; margin: auto; 文字居中 text-align: center; 文字下划线 text-decoration: underline; 边框线 border: 1px #1D6AF8 double; 圆弧角 border-radius: 10px; <!DOCTYPE html> <html><hea…...

Stable-diffusion-webui
AI 画图,之前整理的 AI换脸 CSDN不给通过,说是换脸之类的不给通过,只能自己看了。 GitHub:https://github.com/AUTOMATIC1111/stable-diffusion-webuihttps://github.com/AUTOMATIC1111/stable-diffusion-webui 安装完毕跑起来大概…...
Python中的文件操作和异常处理
在Python编程中,文件操作和异常处理是非常重要的概念。本文将介绍如何使用Python进行文件读写操作,并展示如何处理可能出现的错误和异常情况。 文件读写操作 Python提供了简单而强大的文件读写功能,让我们能够轻松地处理各种文件类型。下面…...

网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...

Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...
CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云
目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...

mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...
Python Einops库:深度学习中的张量操作革命
Einops(爱因斯坦操作库)就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库,用类似自然语言的表达式替代了晦涩的API调用,彻底改变了深度学习工程…...