当前位置: 首页 > news >正文

0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析

在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器:

  1. UDF:User Defined Scalar Function
  2. UDTF:User Defined Table Function
  3. UDAF:User Defined Aggregate Function
  4. UDTAF:User Defined Table Aggregate Function

在很多案例中,我们看到udf、udtf和udaf几个装饰器修饰function

@udf(result_type=DataTypes.BIGINT())
def add(i, j):return i + j@udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
def range_emit(s, e):for i in range(e):yield s, i@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):return v.mean()

但是没有见到udtaf修饰function的案例,比如

# 错误的
@udtaf(result_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING()) , DataTypes.FIELD("count", DataTypes.BIGINT())]), accumulator_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING())]), func_type="general")
def lower(line):yield Row('a', 1)

这是因为这儿存在一个悖论

udtaf要求func_type必须是general

def udtaf(f: Union[Callable, TableAggregateFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,accumulator_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None,func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]:"""Helper method for creating a user-defined table aggregate function.:param f: user-defined table aggregate function.:param input_types: optional, the input data types.:param result_type: the result data type.:param accumulator_type: optional, the accumulator data type.:param deterministic: the determinism of the function's results. True if and only if a call tothis function is guaranteed to always return the same result given thesame parameters. (default True):param name: the function name.:param func_type: the type of the python function, available value: general(default: general):return: UserDefinedAggregateFunctionWrapper or function... versionadded:: 1.13.0"""if func_type != 'general':raise ValueError("The func_type must be 'general', got %s."% func_type)if f is None:return functools.partial(_create_udtaf, input_types=input_types, result_type=result_type,accumulator_type=accumulator_type, func_type=func_type,deterministic=deterministic, name=name)else:return _create_udtaf(f, input_types, result_type, accumulator_type, func_type,deterministic, name)

如果func_type不是’general’,则会抛出错误,所以func_type="pandas"是不可以的。
udtaf修饰方法后的返回类型是UserDefinedAggregateFunctionWrapper。

def _create_udtaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name):return UserDefinedAggregateFunctionWrapper(f, input_types, result_type, accumulator_type, func_type, deterministic, name, True)

delegate function要求非func_type必须是pandas

Table API下只有这些方法接受udtaf修饰function返回的UserDefinedAggregateFunctionWrapper。

  • def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘AggregatedTable’
  • def flat_aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘FlatAggregateTable’

这些方法的在底层会调用被修饰的UserDefinedFunctionWrapper。

    def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) \-> 'AggregatedTable':"""Performs a global aggregate operation with an aggregate function. You have to close theaggregate with a select statement... versionadded:: 1.13.0"""if isinstance(func, Expression):return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)else:func._set_takes_row_as_input()if hasattr(func, "_alias_names"):alias_names = getattr(func, "_alias_names")func = func(with_columns(col("*"))).alias(*alias_names)else:func = func(with_columns(col("*")))return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)

进而会调用到_java_user_defined_function。由于udtaf修饰的方法不是UserDefinedFunction对象,而是一个function,所以它会通过_create_delegate_function创建新的func 。

class UserDefinedFunctionWrapper(object):
……def _java_user_defined_function(self):……if not isinstance(self._func, UserDefinedFunction):func = self._create_delegate_function()……

而_create_delegate_function则要求udtaf中的function的func_type必须是pandas

    def _create_delegate_function(self) -> UserDefinedFunction:assert self._func_type == 'pandas'return DelegatingPandasAggregateFunction(self._func)

这就和之前udtaf中要求func_type必须是general相背。
所以我们没看到udtaf修饰function的案例。

相关文章:

0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析

在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器: UDF:User Defined Scalar FunctionUDTF:User Defined Table FunctionUDAF:User Defined Aggrega…...

Spring Boot Endpoints:端点

Spring Boot 内置端点以及暴露端点列表: 端点被启用后,并不一定能够被访问,还要看端点是否被暴露,并且暴露的方式是怎样的。因为端点可能会包含敏感信息,所以需要谨慎暴露相关端点。Spring Boot 3.0.0 更改了默认暴露…...

漏洞复现--用友 畅捷通T+ .net反序列化RCE

免责声明: 文章中涉及的漏洞均已修复,敏感信息均已做打码处理,文章仅做经验分享用途,切勿当真,未授权的攻击属于非法行为!文章中敏感信息均已做多层打马处理。传播、利用本文章所提供的信息而造成的任何直…...

PHP 共享茶室棋牌室无人软硬件结合开发小程序系统的开发优势

随着科技的发展和人们生活方式的改变,共享经济和智能化成为了越来越受欢迎的趋势。在这样一个背景下,PHP共享茶室棋牌室无人软硬件结合开发小程序系统的出现,为人们提供了一种全新的娱乐和生活方式。本文将详细介绍PHP共享茶室棋牌室无人软硬…...

kibana监控

采取方式 Elastic Agent :更完善的功能 Metricbeat:轻量级指标收集(采用) 传统收集方法:使用内部导出器收集指标,已不建议 安装 metricbeat Download Metricbeat • Ship Metrics to Elasticsearch | E…...

基于 ARM+FPGA+AD平台的多类型同步信号采集仪开发及试验验证(二)板卡总体设计

2.2 板卡总体设计 本章开发了一款基于 AD7193RJ45 的多类型传感信号同步调理板卡,如图 2.4 所 示,负责将传感器传来的模拟电信号转化为数字信号,以供数据采集系统采集,实现了 单通道自由切换传感信号类型与同步采集多类型传…...

uniapp: 本应用使用HBuilderX x.x.xx 或对应的cli版本编译,而手机端SDK版本是 x.x.xx。不匹配的版本可能造成应用异常。

文章目录 前言一、原因分析二、解决方案2.1、方案一:更新HbuilderX版本2.2、方案二:设置固定的版本2.3、方案三:忽略版本(不推荐) 三、总结四、感谢 前言 项目场景:示例:通过使用HbuilderX打包…...

sqoop和flume简单安装配置使用

1. Sqoop 1.1 Sqoop介绍 Sqoop 是一个在结构化数据和 Hadoop 之间进行批量数据迁移的工具 结构化数据可以是MySQL、Oracle等关系型数据库 把关系型数据库的数据导入到 Hadoop 与其相关的系统 把数据从 Hadoop 系统里抽取并导出到关系型数据库里 底层用 MapReduce 实现数据 …...

什么是React Router?它的作用是什么?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…...

界面控件DevExtreme v23.1 - UI组件 UI模板库增强

DevExtreme拥有高性能的HTML5 / JavaScript小部件集合,使您可以利用现代Web开发堆栈(包括React,Angular,ASP.NET Core,jQuery,Knockout等)构建交互式的Web应用程序。从Angular和Reac&#xff0c…...

Fedora Linux 38下Mariadb数据库设置utf8mb4字符编码

Fedora操作系统之下最好使用开源免费的MySQL替代品Mariadb来学习MySQL的知识,一点也不会耽搁。 连接上互联网后,打开shell命令行界面,Sudo dnf install mariadb-server mariadb -y就可以安装好 mariadb-server和 mariadb&#xff0…...

【单元测试】--高级主题

一、模拟与存根深入 在单元测试中,模拟(Mock)和存根(Stub)是两种常用的测试替代品,用于模拟外部依赖或模拟特定行为,以便测试能够独立运行。以下是深入了解模拟与存根的概念,以NUni…...

面向对象程序设计(2023年10月)

设计模式:参考下文 23 种设计模式详解(全23种)-CSDN博客...

常用正在表达式

function isIP(s) //by zergling { var patrn/^[0-9.]{1,20}$/; if (!patrn.exec(s)) return false return true } //校验密码:只能输入6-20个字母、数字、下划线 function isPasswd(s) { var patrn/^(\w){6,20}$/; if (!patrn.exec(s)) return false return true …...

ES6初步了解Map对象(含十种方法)

ES6提供了 Map数据结构。它类似于对象,也是键值对的集合。但是“键”的范围不限于字符串,各种类型的值(包括对象)都可以当作键。 创建方法 let m new Map()console.log(m)Map的方法 1.set( ) 添加元素 接收两个参数&#xff0c…...

推荐一款可以识别m3u8格式ts流批量下载并且合成mp4视频的chrome插件——猫抓

https://chrome.google.com/webstore/detail/%E7%8C%AB%E6%8A%93/jfedfbgedapdagkghmgibemcoggfppbb?utm_sourceext_app_menuhttps://chrome.google.com/webstore/detail/%E7%8C%AB%E6%8A%93/jfedfbgedapdagkghmgibemcoggfppbb?utm_sourceext_app_menu 网页媒体嗅探工具 一…...

文本处理方法及其在NLP中的应用

文本处理方法及其在NLP中的应用 了解 在自然语言处理(NLP)领域,文本处理是一个至关重要的环节。 本篇博文将介绍几种常用的文本处理方法,并重点讨论了其中两种:One-Hot编码和停用词过滤。这些方法对于将文本转化为计…...

html文字一行时靠右,多行时靠左

html文字一行时靠右&#xff0c;多行时靠左 元素居中 display: block; margin: auto; 文字居中 text-align: center; 文字下划线 text-decoration: underline; 边框线 border: 1px #1D6AF8 double; 圆弧角 border-radius: 10px; <!DOCTYPE html> <html><hea…...

Stable-diffusion-webui

AI 画图&#xff0c;之前整理的 AI换脸 CSDN不给通过&#xff0c;说是换脸之类的不给通过&#xff0c;只能自己看了。 GitHub&#xff1a;https://github.com/AUTOMATIC1111/stable-diffusion-webuihttps://github.com/AUTOMATIC1111/stable-diffusion-webui 安装完毕跑起来大概…...

Python中的文件操作和异常处理

在Python编程中&#xff0c;文件操作和异常处理是非常重要的概念。本文将介绍如何使用Python进行文件读写操作&#xff0c;并展示如何处理可能出现的错误和异常情况。 文件读写操作 Python提供了简单而强大的文件读写功能&#xff0c;让我们能够轻松地处理各种文件类型。下面…...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引&#xff0c;可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度&#xff08;创建索引的主要原因&#xff09;。3. 可以加速表和表之间的连接&#xff0c;实现数据的参考完整性。4. 可以在查询过程中&#xff0c;…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

深度学习习题2

1.如果增加神经网络的宽度&#xff0c;精确度会增加到一个特定阈值后&#xff0c;便开始降低。造成这一现象的可能原因是什么&#xff1f; A、即使增加卷积核的数量&#xff0c;只有少部分的核会被用作预测 B、当卷积核数量增加时&#xff0c;神经网络的预测能力会降低 C、当卷…...

Git常用命令完全指南:从入门到精通

Git常用命令完全指南&#xff1a;从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...

从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践

作者&#xff1a;吴岐诗&#xff0c;杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言&#xff1a;融合数据湖与数仓的创新之路 在数字金融时代&#xff0c;数据已成为金融机构的核心竞争力。杭银消费金…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...