0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析
在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器:
- UDF:User Defined Scalar Function
- UDTF:User Defined Table Function
- UDAF:User Defined Aggregate Function
- UDTAF:User Defined Table Aggregate Function
在很多案例中,我们看到udf、udtf和udaf几个装饰器修饰function
@udf(result_type=DataTypes.BIGINT())
def add(i, j):return i + j@udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
def range_emit(s, e):for i in range(e):yield s, i@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):return v.mean()
但是没有见到udtaf修饰function的案例,比如
# 错误的
@udtaf(result_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING()) , DataTypes.FIELD("count", DataTypes.BIGINT())]), accumulator_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING())]), func_type="general")
def lower(line):yield Row('a', 1)
这是因为这儿存在一个悖论
udtaf要求func_type必须是general
def udtaf(f: Union[Callable, TableAggregateFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,accumulator_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None,func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]:"""Helper method for creating a user-defined table aggregate function.:param f: user-defined table aggregate function.:param input_types: optional, the input data types.:param result_type: the result data type.:param accumulator_type: optional, the accumulator data type.:param deterministic: the determinism of the function's results. True if and only if a call tothis function is guaranteed to always return the same result given thesame parameters. (default True):param name: the function name.:param func_type: the type of the python function, available value: general(default: general):return: UserDefinedAggregateFunctionWrapper or function... versionadded:: 1.13.0"""if func_type != 'general':raise ValueError("The func_type must be 'general', got %s."% func_type)if f is None:return functools.partial(_create_udtaf, input_types=input_types, result_type=result_type,accumulator_type=accumulator_type, func_type=func_type,deterministic=deterministic, name=name)else:return _create_udtaf(f, input_types, result_type, accumulator_type, func_type,deterministic, name)
如果func_type不是’general’,则会抛出错误,所以func_type="pandas"是不可以的。
udtaf修饰方法后的返回类型是UserDefinedAggregateFunctionWrapper。
def _create_udtaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name):return UserDefinedAggregateFunctionWrapper(f, input_types, result_type, accumulator_type, func_type, deterministic, name, True)
delegate function要求非func_type必须是pandas
Table API下只有这些方法接受udtaf修饰function返回的UserDefinedAggregateFunctionWrapper。
- def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘AggregatedTable’
- def flat_aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘FlatAggregateTable’
这些方法的在底层会调用被修饰的UserDefinedFunctionWrapper。
def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) \-> 'AggregatedTable':"""Performs a global aggregate operation with an aggregate function. You have to close theaggregate with a select statement... versionadded:: 1.13.0"""if isinstance(func, Expression):return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)else:func._set_takes_row_as_input()if hasattr(func, "_alias_names"):alias_names = getattr(func, "_alias_names")func = func(with_columns(col("*"))).alias(*alias_names)else:func = func(with_columns(col("*")))return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
进而会调用到_java_user_defined_function。由于udtaf修饰的方法不是UserDefinedFunction对象,而是一个function,所以它会通过_create_delegate_function创建新的func 。
class UserDefinedFunctionWrapper(object):
……def _java_user_defined_function(self):……if not isinstance(self._func, UserDefinedFunction):func = self._create_delegate_function()……
而_create_delegate_function则要求udtaf中的function的func_type必须是pandas
def _create_delegate_function(self) -> UserDefinedFunction:assert self._func_type == 'pandas'return DelegatingPandasAggregateFunction(self._func)
这就和之前udtaf中要求func_type必须是general相背。
所以我们没看到udtaf修饰function的案例。
相关文章:
0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析
在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器: UDF:User Defined Scalar FunctionUDTF:User Defined Table FunctionUDAF:User Defined Aggrega…...
Spring Boot Endpoints:端点
Spring Boot 内置端点以及暴露端点列表: 端点被启用后,并不一定能够被访问,还要看端点是否被暴露,并且暴露的方式是怎样的。因为端点可能会包含敏感信息,所以需要谨慎暴露相关端点。Spring Boot 3.0.0 更改了默认暴露…...
漏洞复现--用友 畅捷通T+ .net反序列化RCE
免责声明: 文章中涉及的漏洞均已修复,敏感信息均已做打码处理,文章仅做经验分享用途,切勿当真,未授权的攻击属于非法行为!文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直…...
PHP 共享茶室棋牌室无人软硬件结合开发小程序系统的开发优势
随着科技的发展和人们生活方式的改变,共享经济和智能化成为了越来越受欢迎的趋势。在这样一个背景下,PHP共享茶室棋牌室无人软硬件结合开发小程序系统的出现,为人们提供了一种全新的娱乐和生活方式。本文将详细介绍PHP共享茶室棋牌室无人软硬…...
kibana监控
采取方式 Elastic Agent :更完善的功能 Metricbeat:轻量级指标收集(采用) 传统收集方法:使用内部导出器收集指标,已不建议 安装 metricbeat Download Metricbeat • Ship Metrics to Elasticsearch | E…...
基于 ARM+FPGA+AD平台的多类型同步信号采集仪开发及试验验证(二)板卡总体设计
2.2 板卡总体设计 本章开发了一款基于 AD7193RJ45 的多类型传感信号同步调理板卡,如图 2.4 所 示,负责将传感器传来的模拟电信号转化为数字信号,以供数据采集系统采集,实现了 单通道自由切换传感信号类型与同步采集多类型传…...
uniapp: 本应用使用HBuilderX x.x.xx 或对应的cli版本编译,而手机端SDK版本是 x.x.xx。不匹配的版本可能造成应用异常。
文章目录 前言一、原因分析二、解决方案2.1、方案一:更新HbuilderX版本2.2、方案二:设置固定的版本2.3、方案三:忽略版本(不推荐) 三、总结四、感谢 前言 项目场景:示例:通过使用HbuilderX打包…...
sqoop和flume简单安装配置使用
1. Sqoop 1.1 Sqoop介绍 Sqoop 是一个在结构化数据和 Hadoop 之间进行批量数据迁移的工具 结构化数据可以是MySQL、Oracle等关系型数据库 把关系型数据库的数据导入到 Hadoop 与其相关的系统 把数据从 Hadoop 系统里抽取并导出到关系型数据库里 底层用 MapReduce 实现数据 …...
什么是React Router?它的作用是什么?
聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…...
界面控件DevExtreme v23.1 - UI组件 UI模板库增强
DevExtreme拥有高性能的HTML5 / JavaScript小部件集合,使您可以利用现代Web开发堆栈(包括React,Angular,ASP.NET Core,jQuery,Knockout等)构建交互式的Web应用程序。从Angular和Reac,…...
Fedora Linux 38下Mariadb数据库设置utf8mb4字符编码
Fedora操作系统之下最好使用开源免费的MySQL替代品Mariadb来学习MySQL的知识,一点也不会耽搁。 连接上互联网后,打开shell命令行界面,Sudo dnf install mariadb-server mariadb -y就可以安装好 mariadb-server和 mariadb࿰…...
【单元测试】--高级主题
一、模拟与存根深入 在单元测试中,模拟(Mock)和存根(Stub)是两种常用的测试替代品,用于模拟外部依赖或模拟特定行为,以便测试能够独立运行。以下是深入了解模拟与存根的概念,以NUni…...
面向对象程序设计(2023年10月)
设计模式:参考下文 23 种设计模式详解(全23种)-CSDN博客...
常用正在表达式
function isIP(s) //by zergling { var patrn/^[0-9.]{1,20}$/; if (!patrn.exec(s)) return false return true } //校验密码:只能输入6-20个字母、数字、下划线 function isPasswd(s) { var patrn/^(\w){6,20}$/; if (!patrn.exec(s)) return false return true …...
ES6初步了解Map对象(含十种方法)
ES6提供了 Map数据结构。它类似于对象,也是键值对的集合。但是“键”的范围不限于字符串,各种类型的值(包括对象)都可以当作键。 创建方法 let m new Map()console.log(m)Map的方法 1.set( ) 添加元素 接收两个参数,…...
推荐一款可以识别m3u8格式ts流批量下载并且合成mp4视频的chrome插件——猫抓
https://chrome.google.com/webstore/detail/%E7%8C%AB%E6%8A%93/jfedfbgedapdagkghmgibemcoggfppbb?utm_sourceext_app_menuhttps://chrome.google.com/webstore/detail/%E7%8C%AB%E6%8A%93/jfedfbgedapdagkghmgibemcoggfppbb?utm_sourceext_app_menu 网页媒体嗅探工具 一…...
文本处理方法及其在NLP中的应用
文本处理方法及其在NLP中的应用 了解 在自然语言处理(NLP)领域,文本处理是一个至关重要的环节。 本篇博文将介绍几种常用的文本处理方法,并重点讨论了其中两种:One-Hot编码和停用词过滤。这些方法对于将文本转化为计…...
html文字一行时靠右,多行时靠左
html文字一行时靠右,多行时靠左 元素居中 display: block; margin: auto; 文字居中 text-align: center; 文字下划线 text-decoration: underline; 边框线 border: 1px #1D6AF8 double; 圆弧角 border-radius: 10px; <!DOCTYPE html> <html><hea…...
Stable-diffusion-webui
AI 画图,之前整理的 AI换脸 CSDN不给通过,说是换脸之类的不给通过,只能自己看了。 GitHub:https://github.com/AUTOMATIC1111/stable-diffusion-webuihttps://github.com/AUTOMATIC1111/stable-diffusion-webui 安装完毕跑起来大概…...
Python中的文件操作和异常处理
在Python编程中,文件操作和异常处理是非常重要的概念。本文将介绍如何使用Python进行文件读写操作,并展示如何处理可能出现的错误和异常情况。 文件读写操作 Python提供了简单而强大的文件读写功能,让我们能够轻松地处理各种文件类型。下面…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
Java 语言特性(面试系列2)
一、SQL 基础 1. 复杂查询 (1)连接查询(JOIN) 内连接(INNER JOIN):返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...
【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
Git常用命令完全指南:从入门到精通
Git常用命令完全指南:从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...
从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践
作者:吴岐诗,杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言:融合数据湖与数仓的创新之路 在数字金融时代,数据已成为金融机构的核心竞争力。杭银消费金…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...
