当前位置: 首页 > news >正文

Flink SQL 表值聚合函数(Table Aggregate Function)详解

使用场景: 表值聚合函数即 UDTAF,这个函数⽬前只能在 Table API 中使⽤,不能在 SQL API 中使⽤。

函数功能:

在 SQL 表达式中,如果想对数据先分组再进⾏聚合取值:

select max(xxx) from source_table group by key1, key2

上⾯ SQL 的 max 语义产出只有⼀条最终结果,如果想取聚合结果最⼤的 n 条数据,并且 n 条数据,每⼀条都要输出⼀次结果数据,上⾯的 SQL 就没有办法实现了。

所以 UDTAF 为了处理这种场景,可以⾃定义 怎么取 , 取多少条 最终的聚合结果,UDTAF 和 UDAF 是类似的。

在这里插入图片描述

案例场景: 有⼀个饮料表有 3 列,分别是 id、name 和 price,⼀共有 5 ⾏,需要找到价格最⾼的两个饮料,类似于 top2,表值聚合函数,需要遍历所有 5 ⾏数据,输出结果为 2 ⾏数据的⼀个表。

开发流程:

实现 TableAggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的

必须实现以下⽅法:

Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,存储了聚合的中间结果,⽐如在执⾏ max() 时会存储每⼀条中间结果的 max 值;

accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,都会调⽤ accumulate() ⽅法更新 accumulator,⽅法对每⼀条输⼊数据执⾏,⽐如执⾏ max() 时,遍历每⼀条数据执⾏;这个⽅法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,每个⽅法的参数类型可以不同,⽀持变⻓参数。

emitValue(Acc accumulator, Collector collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector collector) :

当所有的数据处理完之后,调⽤ emit ⽅法来计算和输出最终结果,可以⾃定义输出多少条以及怎样输出结果。

对于 emitValue 以及 emitUpdateWithRetract 区别,以 TopN 举例,emitValue 每次都会发送所有的最⼤的 n 个值,⽽这在流式任务中会有性能问题,为提升性能,可以实现 emitUpdateWithRetract ⽅法,这个⽅法在 retract 模式下会增量输出结果,⽐如只在有数据更新时,做到撤回⽼数据,再发送新数据,⽽不需要每次都发出全量的最新数据。

如果同时定义了 emitUpdateWithRetract、emitValue ⽅法,那 emitUpdateWithRetract 会优先于 emitValue ⽅法被使⽤,因为引擎会认为 emitUpdateWithRetract 会更加⾼效,它的输出是增量的。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 回撤流的场景必须实现,在计算回撤数据时调⽤,如果没有实现则会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景必须实现,这个⽅法对优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,从⽽在第⼀阶段先进⾏数据聚合。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型:

默认情况下,⽤户的 Input输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚 合中间结果 createAccumulator() 的返回结果)、 Output输出参数 数据类型( emitValue(Acc acc,Collector<Output输出参数> out) 的 Output输出参数 )会被 Flink 反射获取,但对于accumulator 和 Output输出参数类型来说,Flink SQL 的类型推导在遇到复杂类型的时候可能会推导出错误的结果(注意: Input输⼊参数 因为是上游算⼦传⼊的,所以类型信息是确认的,不会出现推导错误的情况),⽐如那些⾮基本类型 POJO 的复杂类型,所以跟 ScalarFunction 和 TableFunction ⼀样, AggregateFunction 提供了TableAggregateFunction#getResultType() 和 TableAggregateFunction#getAccumulatorType() 来分别指定最终返回值类型和accumulator 的类型,两个函数的返回值类型都是 TypeInformation。

  • getResultType() : 即 emitValue(Acc acc, Collector<Output输出参数> out) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型;

案例场景: Top2

定义⼀个 TableAggregateFunction 来计算给定列的最⼤的 2 个值

在 TableEnvironment 中注册函数

在 Table API 查询中使⽤函数(当前只在 Table API 中⽀持 TableAggregateFunction)

实现思路:

计算最⼤的 2 个值,accumulator 需要保存当前的最⼤的 2 个值,定义了类 Top2Accum 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,来保证精确⼀次的语义。

Top2 表值聚合函数(TableAggregateFunction)的 accumulate() ⽅法有两个输⼊,第⼀个是 Top2Accum accumulator,另⼀个是⽤户定义的输⼊:输⼊的值 v,尽管 merge() ⽅法在⼤多数聚合类型中不是必须的,但在样例中提供了它的实现。并且定义了 getResultType() 和 getAccumulatorType() ⽅法。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;/*** 输入数据:* a,1* a,2* a,3* * 输出结果:* res=>:1> +I[a, 1, 1]* res=>:1> -D[a, 1, 1]* res=>:1> +I[a, 2, 1]* res=>:1> +I[a, 1, 2]* res=>:1> -D[a, 2, 1]* res=>:1> -D[a, 1, 2]* res=>:1> +I[a, 3, 1]* res=>:1> +I[a, 2, 2]*/
public class TableAggregateFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String,Integer>> tpStream = source.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String input) throws Exception {return new Tuple2<>(input.split(",")[0],Integer.parseInt(input.split(",")[1]));}});tEnv.registerFunction("top2", new Top2());Table table = tEnv.fromDataStream(tpStream, "key,value");tEnv.createTemporaryView("SourceTable", table);// 使⽤函数Table res = tEnv.from("SourceTable").groupBy("key").flatAggregate("top2(value) as (v, rank)").select("key, v, rank");tEnv.toChangelogStream(res).print("res=>");env.execute();}/*** Accumulator for Top2.*/public static class Top2Accum {public Integer first;public Integer second;}public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {@Overridepublic Top2Accum createAccumulator() {Top2Accum acc = new Top2Accum();acc.first = Integer.MIN_VALUE;acc.second = Integer.MIN_VALUE;return acc;}public void accumulate(Top2Accum acc, Integer v) {if (v > acc.first) {acc.second = acc.first;acc.first = v;} else if (v > acc.second) {acc.second = v;}}public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {for (Top2Accum otherAcc : iterable) {accumulate(acc, otherAcc.first);accumulate(acc, otherAcc.second);}}public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {// emit the value and rankif (acc.first != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}}}
}

测试结果:

在这里插入图片描述

相关文章:

Flink SQL 表值聚合函数(Table Aggregate Function)详解

使用场景&#xff1a; 表值聚合函数即 UDTAF&#xff0c;这个函数⽬前只能在 Table API 中使⽤&#xff0c;不能在 SQL API 中使⽤。 函数功能&#xff1a; 在 SQL 表达式中&#xff0c;如果想对数据先分组再进⾏聚合取值&#xff1a; select max(xxx) from source_table gr…...

pgsql_全文检索_使用空间换时间的方法支持中文搜索

pgsql_全文检索_使用空间换时间的方法支持中文搜索 一、环境 PostgreSQL 14.2, compiled by Visual C build 1914, 64-bit 二、引言 提到全文检索首先想到的就是ES(ElasticSearch)和Lucene&#xff0c;专业且强大。对于一些小众场景对于搜索要求不高&#xff0c;数据量也不…...

OpenGL_Learn10(颜色)

1. 颜色 我们在现实生活中看到某一物体的颜色并不是这个物体真正拥有的颜色&#xff0c;而是它所反射的(Reflected)颜色。换句话说&#xff0c;那些不能被物体所吸收(Absorb)的颜色&#xff08;被拒绝的颜色&#xff09;就是我们能够感知到的物体的颜色。例如&#xff0c;太阳光…...

使用Go语言抓取酒店价格数据的技术实现

目录 一、引言 二、准备工作 三、抓取数据 四、数据处理与存储 五、数据分析与可视化 六、结论与展望 一、引言 随着互联网的快速发展&#xff0c;酒店预订已经成为人们出行的重要环节。在选择酒店时&#xff0c;价格是消费者考虑的重要因素之一。因此&#xff0c;抓取酒…...

设计模式1

![在这里插入图片描述](https://img-blog.csdnimg.cn/c9fbecf1ae89436095885722380ea460.png)一、设计模式分类&#xff1a; 1、创建型模式&#xff1a;创建与使用分离&#xff0c;单例、原型、工厂、抽象、建造者。 2、结构型模式&#xff1a;用于描述如何将对象按某种更大的…...

数字人部署之VITS+Wav2lip数据流转处理问题

一、模型 VITS模型训练教程VITS-从零开始微调&#xff08;finetune&#xff09;训练并部署指南-支持本地云端 Wav2lip是2D数字人&#xff0c;可参考训练嘴型同步模型Wav2Lip PS:以上模型都是开源可用。 二. VITS数据处理问题 VITS模型的输出为一维的numpy类型数据&#xff…...

RK3568笔记五:基于Yolov5的训练及部署

若该文为原创文章&#xff0c;转载请注明原文出处。 一. 部署概述 环境&#xff1a;Ubuntu20.04、python3.8 芯片&#xff1a;RK3568 芯片系统&#xff1a;buildroot 开发板&#xff1a;ATK-DLRK3568 开发主要参考文档&#xff1a;《Rockchip_Quick_Start_RKNN_Toolkit2_C…...

VR虚拟现实:VR技术如何进行原型制作

VR虚拟现实原型制作 利用VR虚拟现实软件进行原型制作可以用于增强原型测试期间的沉浸感&#xff0c;减少产品设计迭代次数&#xff0c;并将与产品原型制作相关的成本降低40-65%。 VR虚拟现实原型制作市场规模 用于原型制作的虚拟现实 (VR) 市场在 2017 年估计为 2.104 亿美元…...

51单片机入门

一、单片机以及开发板介绍 写在前面&#xff1a;本文为作者自学笔记&#xff0c;课程为哔哩哔哩江协科技51单片机入门教程&#xff0c;感兴趣可以看看&#xff0c;适合普中A2开发板或者HC6800-ESV2.0江协科技课程所用开发板。 工具安装请另行搜索&#xff0c;这里不做介绍&…...

notes_质谱蛋白组学数据分析基础知识

目录 1. 蛋白组学方法学1.1 液相-质谱法1) 基本原理2) bottom-up策略的基本流程 1.2 PEA/Olink 2. 质谱数据分析2.1 原始数据格式2.2 分析过程1&#xff09;鉴定搜索引擎&#xff08;质谱组学&#xff09;重难点/潜在的研究方向 2&#xff09;定量3&#xff09;预处理 2.3 下游…...

【Python基础】一个简单的TCP通信程序

&#x1f308;欢迎来到Python专栏 &#x1f64b;&#x1f3fe;‍♀️作者介绍&#xff1a;前PLA队员 目前是一名普通本科大三的软件工程专业学生 &#x1f30f;IP坐标&#xff1a;湖北武汉 &#x1f349; 目前技术栈&#xff1a;C/C、Linux系统编程、计算机网络、数据结构、Mys…...

算法之双指针

双指针算法的作用 双指针算法是一种使用2个变量对线性结构(逻辑线性/物理线性)&#xff0c;进行操作的算法&#xff0c;双指针可以对线性结构进行时间复杂度优化&#xff0c;可以对空间进行记忆或达到某种目的。 双指针算法的分类 1.快慢指针 2.滑动窗口 3.左右指针 4.前后指…...

Redis被攻击纪实

一、前言 声明&#xff1a;本文仅供技术交流使用&#xff0c;严禁采用本文的方法进行任何非法活动。 上周新来的同事分享Redis的原理和机制&#xff0c;想起2017年的时候测试环境Redis被攻击&#xff0c;最后只能重新安装服务器&#xff0c;今天试验一把利用Redis漏洞进行攻击…...

AI工具-PPT-SlidesAI

SlidesAI 使用手册 https://tella.video/get-started-with-slidesai-tutorial-18yq 简介 SlidesAI 是一款快速创建演示文稿的AI工具&#xff0c;适用于无设计经验的用户。 开始使用 1. **安装与设置** - 访问 [SlidesAI官网](https://www.slidesai.io/zh)。 - 完成简单的设置…...

原型链污染攻击

想要很清楚了理解原型链污染我们首先必须要弄清楚原型链这个概念 可以看这篇文章&#xff1a;对象的继承和原型链 目录 prototype和__proto__分别是什么&#xff1f; 原型链继承 原型链污染是什么 哪些情况下原型链会被污染&#xff1f; 例题1&#xff1a;Code-Breaking 2…...

Android Glide transform圆形图CircleCrop动态代码描边绘制外框线并rotateImage旋转,Kotlin

Android Glide transform圆形图CircleCrop动态代码描边绘制外框线并rotateImage旋转&#xff0c;Kotlin <?xml version"1.0" encoding"utf-8"?> <FrameLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:app&q…...

【ruoyi】微服务关闭登录验证码

登录本地的nacos服务&#xff0c;修改&#xff1a;配置管理-配置列表-ruoyi-gateway-dev.yml 将验证码的enabled设置成false&#xff0c;即可...

AI:78-基于深度学习的食物识别与营养分析

🚀 本文选自专栏:人工智能领域200例教程专栏 从基础到实践,深入学习。无论你是初学者还是经验丰富的老手,对于本专栏案例和项目实践都有参考学习意义。 ✨✨✨ 每一个案例都附带有在本地跑过的代码,详细讲解供大家学习,希望可以帮到大家。欢迎订阅支持,正在不断更新中,…...

日本it培训班,如何选择靠谱的赴日IT培训班?

随着科技的发展&#xff0c;信息技术行业在全球范围内迅速发展&#xff0c;并呈现出蓬勃的发展态势&#xff0c;在日本&#xff0c;IT行业也成为一种极为热门的职业选择。日本专门学校在这个领域内培养了许多IT从业者&#xff0c;成为了众多IT公司的培养基地。如果你对IT产业感…...

51单片机PCF8591数字电压表LCD1602液晶显示设计( proteus仿真+程序+设计报告+讲解视频)

51单片机PCF8591数字电压表LCD1602液晶设计 ( proteus仿真程序设计报告讲解视频&#xff09; 仿真图proteus7.8及以上 程序编译器&#xff1a;keil 4/keil 5 编程语言&#xff1a;C语言 设计编号&#xff1a;S0060 51单片机PCF8591数字电压表LCD1602液晶设计 1.主要功能&a…...

uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖

在前面的练习中&#xff0c;每个页面需要使用ref&#xff0c;onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入&#xff0c;需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务

通过akshare库&#xff0c;获取股票数据&#xff0c;并生成TabPFN这个模型 可以识别、处理的格式&#xff0c;写一个完整的预处理示例&#xff0c;并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务&#xff0c;进行预测并输…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

在四层代理中还原真实客户端ngx_stream_realip_module

一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡&#xff08;如 HAProxy、AWS NLB、阿里 SLB&#xff09;发起上游连接时&#xff0c;将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后&#xff0c;ngx_stream_realip_module 从中提取原始信息…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

python如何将word的doc另存为docx

将 DOCX 文件另存为 DOCX 格式&#xff08;Python 实现&#xff09; 在 Python 中&#xff0c;你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是&#xff0c;.doc 是旧的 Word 格式&#xff0c;而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

关于 WASM:1. WASM 基础原理

一、WASM 简介 1.1 WebAssembly 是什么&#xff1f; WebAssembly&#xff08;WASM&#xff09; 是一种能在现代浏览器中高效运行的二进制指令格式&#xff0c;它不是传统的编程语言&#xff0c;而是一种 低级字节码格式&#xff0c;可由高级语言&#xff08;如 C、C、Rust&am…...

tree 树组件大数据卡顿问题优化

问题背景 项目中有用到树组件用来做文件目录&#xff0c;但是由于这个树组件的节点越来越多&#xff0c;导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多&#xff0c;导致的浏览器卡顿&#xff0c;这里很明显就需要用到虚拟列表的技术&…...

【生成模型】视频生成论文调研

工作清单 上游应用方向&#xff1a;控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...