0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析
在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器:
- UDF:User Defined Scalar Function
- UDTF:User Defined Table Function
- UDAF:User Defined Aggregate Function
- UDTAF:User Defined Table Aggregate Function
在很多案例中,我们看到udf、udtf和udaf几个装饰器修饰function
@udf(result_type=DataTypes.BIGINT())
def add(i, j):return i + j@udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
def range_emit(s, e):for i in range(e):yield s, i@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):return v.mean()
但是没有见到udtaf修饰function的案例,比如
# 错误的
@udtaf(result_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING()) , DataTypes.FIELD("count", DataTypes.BIGINT())]), accumulator_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING())]), func_type="general")
def lower(line):yield Row('a', 1)
这是因为这儿存在一个悖论
udtaf要求func_type必须是general
def udtaf(f: Union[Callable, TableAggregateFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,accumulator_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None,func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]:"""Helper method for creating a user-defined table aggregate function.:param f: user-defined table aggregate function.:param input_types: optional, the input data types.:param result_type: the result data type.:param accumulator_type: optional, the accumulator data type.:param deterministic: the determinism of the function's results. True if and only if a call tothis function is guaranteed to always return the same result given thesame parameters. (default True):param name: the function name.:param func_type: the type of the python function, available value: general(default: general):return: UserDefinedAggregateFunctionWrapper or function... versionadded:: 1.13.0"""if func_type != 'general':raise ValueError("The func_type must be 'general', got %s."% func_type)if f is None:return functools.partial(_create_udtaf, input_types=input_types, result_type=result_type,accumulator_type=accumulator_type, func_type=func_type,deterministic=deterministic, name=name)else:return _create_udtaf(f, input_types, result_type, accumulator_type, func_type,deterministic, name)
如果func_type不是’general’,则会抛出错误,所以func_type="pandas"是不可以的。
udtaf修饰方法后的返回类型是UserDefinedAggregateFunctionWrapper。
def _create_udtaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name):return UserDefinedAggregateFunctionWrapper(f, input_types, result_type, accumulator_type, func_type, deterministic, name, True)
delegate function要求非func_type必须是pandas
Table API下只有这些方法接受udtaf修饰function返回的UserDefinedAggregateFunctionWrapper。
- def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘AggregatedTable’
- def flat_aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘FlatAggregateTable’
这些方法的在底层会调用被修饰的UserDefinedFunctionWrapper。
def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) \-> 'AggregatedTable':"""Performs a global aggregate operation with an aggregate function. You have to close theaggregate with a select statement... versionadded:: 1.13.0"""if isinstance(func, Expression):return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)else:func._set_takes_row_as_input()if hasattr(func, "_alias_names"):alias_names = getattr(func, "_alias_names")func = func(with_columns(col("*"))).alias(*alias_names)else:func = func(with_columns(col("*")))return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
进而会调用到_java_user_defined_function。由于udtaf修饰的方法不是UserDefinedFunction对象,而是一个function,所以它会通过_create_delegate_function创建新的func 。
class UserDefinedFunctionWrapper(object):
……def _java_user_defined_function(self):……if not isinstance(self._func, UserDefinedFunction):func = self._create_delegate_function()……
而_create_delegate_function则要求udtaf中的function的func_type必须是pandas
def _create_delegate_function(self) -> UserDefinedFunction:assert self._func_type == 'pandas'return DelegatingPandasAggregateFunction(self._func)
这就和之前udtaf中要求func_type必须是general相背。
所以我们没看到udtaf修饰function的案例。
相关文章:
0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析
在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器: UDF:User Defined Scalar FunctionUDTF:User Defined Table FunctionUDAF:User Defined Aggrega…...
Spring Boot Endpoints:端点
Spring Boot 内置端点以及暴露端点列表: 端点被启用后,并不一定能够被访问,还要看端点是否被暴露,并且暴露的方式是怎样的。因为端点可能会包含敏感信息,所以需要谨慎暴露相关端点。Spring Boot 3.0.0 更改了默认暴露…...
漏洞复现--用友 畅捷通T+ .net反序列化RCE
免责声明: 文章中涉及的漏洞均已修复,敏感信息均已做打码处理,文章仅做经验分享用途,切勿当真,未授权的攻击属于非法行为!文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直…...
PHP 共享茶室棋牌室无人软硬件结合开发小程序系统的开发优势
随着科技的发展和人们生活方式的改变,共享经济和智能化成为了越来越受欢迎的趋势。在这样一个背景下,PHP共享茶室棋牌室无人软硬件结合开发小程序系统的出现,为人们提供了一种全新的娱乐和生活方式。本文将详细介绍PHP共享茶室棋牌室无人软硬…...
kibana监控
采取方式 Elastic Agent :更完善的功能 Metricbeat:轻量级指标收集(采用) 传统收集方法:使用内部导出器收集指标,已不建议 安装 metricbeat Download Metricbeat • Ship Metrics to Elasticsearch | E…...
基于 ARM+FPGA+AD平台的多类型同步信号采集仪开发及试验验证(二)板卡总体设计
2.2 板卡总体设计 本章开发了一款基于 AD7193RJ45 的多类型传感信号同步调理板卡,如图 2.4 所 示,负责将传感器传来的模拟电信号转化为数字信号,以供数据采集系统采集,实现了 单通道自由切换传感信号类型与同步采集多类型传…...
uniapp: 本应用使用HBuilderX x.x.xx 或对应的cli版本编译,而手机端SDK版本是 x.x.xx。不匹配的版本可能造成应用异常。
文章目录 前言一、原因分析二、解决方案2.1、方案一:更新HbuilderX版本2.2、方案二:设置固定的版本2.3、方案三:忽略版本(不推荐) 三、总结四、感谢 前言 项目场景:示例:通过使用HbuilderX打包…...
sqoop和flume简单安装配置使用
1. Sqoop 1.1 Sqoop介绍 Sqoop 是一个在结构化数据和 Hadoop 之间进行批量数据迁移的工具 结构化数据可以是MySQL、Oracle等关系型数据库 把关系型数据库的数据导入到 Hadoop 与其相关的系统 把数据从 Hadoop 系统里抽取并导出到关系型数据库里 底层用 MapReduce 实现数据 …...
什么是React Router?它的作用是什么?
聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…...
界面控件DevExtreme v23.1 - UI组件 UI模板库增强
DevExtreme拥有高性能的HTML5 / JavaScript小部件集合,使您可以利用现代Web开发堆栈(包括React,Angular,ASP.NET Core,jQuery,Knockout等)构建交互式的Web应用程序。从Angular和Reac,…...
Fedora Linux 38下Mariadb数据库设置utf8mb4字符编码
Fedora操作系统之下最好使用开源免费的MySQL替代品Mariadb来学习MySQL的知识,一点也不会耽搁。 连接上互联网后,打开shell命令行界面,Sudo dnf install mariadb-server mariadb -y就可以安装好 mariadb-server和 mariadb࿰…...
【单元测试】--高级主题
一、模拟与存根深入 在单元测试中,模拟(Mock)和存根(Stub)是两种常用的测试替代品,用于模拟外部依赖或模拟特定行为,以便测试能够独立运行。以下是深入了解模拟与存根的概念,以NUni…...
面向对象程序设计(2023年10月)
设计模式:参考下文 23 种设计模式详解(全23种)-CSDN博客...
常用正在表达式
function isIP(s) //by zergling { var patrn/^[0-9.]{1,20}$/; if (!patrn.exec(s)) return false return true } //校验密码:只能输入6-20个字母、数字、下划线 function isPasswd(s) { var patrn/^(\w){6,20}$/; if (!patrn.exec(s)) return false return true …...
ES6初步了解Map对象(含十种方法)
ES6提供了 Map数据结构。它类似于对象,也是键值对的集合。但是“键”的范围不限于字符串,各种类型的值(包括对象)都可以当作键。 创建方法 let m new Map()console.log(m)Map的方法 1.set( ) 添加元素 接收两个参数,…...
推荐一款可以识别m3u8格式ts流批量下载并且合成mp4视频的chrome插件——猫抓
https://chrome.google.com/webstore/detail/%E7%8C%AB%E6%8A%93/jfedfbgedapdagkghmgibemcoggfppbb?utm_sourceext_app_menuhttps://chrome.google.com/webstore/detail/%E7%8C%AB%E6%8A%93/jfedfbgedapdagkghmgibemcoggfppbb?utm_sourceext_app_menu 网页媒体嗅探工具 一…...
文本处理方法及其在NLP中的应用
文本处理方法及其在NLP中的应用 了解 在自然语言处理(NLP)领域,文本处理是一个至关重要的环节。 本篇博文将介绍几种常用的文本处理方法,并重点讨论了其中两种:One-Hot编码和停用词过滤。这些方法对于将文本转化为计…...
html文字一行时靠右,多行时靠左
html文字一行时靠右,多行时靠左 元素居中 display: block; margin: auto; 文字居中 text-align: center; 文字下划线 text-decoration: underline; 边框线 border: 1px #1D6AF8 double; 圆弧角 border-radius: 10px; <!DOCTYPE html> <html><hea…...
Stable-diffusion-webui
AI 画图,之前整理的 AI换脸 CSDN不给通过,说是换脸之类的不给通过,只能自己看了。 GitHub:https://github.com/AUTOMATIC1111/stable-diffusion-webuihttps://github.com/AUTOMATIC1111/stable-diffusion-webui 安装完毕跑起来大概…...
Python中的文件操作和异常处理
在Python编程中,文件操作和异常处理是非常重要的概念。本文将介绍如何使用Python进行文件读写操作,并展示如何处理可能出现的错误和异常情况。 文件读写操作 Python提供了简单而强大的文件读写功能,让我们能够轻松地处理各种文件类型。下面…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql
智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...
大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...
