FlinkSQL聚合函数(Aggregate Function)详解
使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。

上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法。
案例场景:
关于饮料的表,有三个字段,分别是 id、name、price,表⾥有 5 ⾏数据,找到所有饮料⾥最贵的饮料的价格,即执⾏⼀个 max() 聚合拿到结果,遍历所有 5 ⾏数据,最终结果就只有⼀个数值。
开发流程:
实现 AggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的;
必须实现以下⽅法:
- Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,其存储了聚合的中间结果,⽐如在执⾏ max() 时会存储当前的 max 值;
- accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,调⽤ accumulate() ⽅法更新 accumulator,处理每⼀条输⼊数据,方法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,⽅法的参数类型可以不同,并且⽀持变⻓参数。
- Output输出参数 getValue(Acc accumulator) : 通过调⽤ getValue ⽅法来计算和返回最终的结果。
某些场景下必须实现:
- retract(Acc accumulator, Input输⼊参数) : 在回撤流的场景下必须实现,在计算回撤数据时调⽤,如果没有实现会直接报错。
- merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景下必须要实现,此外,这个⽅法对于优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,在数据 shuffle 前先进⾏⼀次聚合计算。
- resetAccumulator() : 在批式聚合中是必须实现的。
关于⼊参、出参数据类型信息的⽅法:
默认情况下,⽤户的 Input 输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚合中间结果 createAccumulator() 的返回结果)、 Output输出参数数据类型( Output输出参数 getValue(Acc accumulator) 的 Output输出参数 )会被 Flink 使⽤反射获取到。
对于 accumulator 和 Output 输出参数类型,Flink SQL 的类型推导在遇到复杂类型时会推导出错误的结果(注意:Input输⼊参数 因为是上游算⼦传⼊的,类型信息是确认的,不会出现推导错误),⽐如⾮基本类型 POJO 的复杂类型。
同 ScalarFunction 和 TableFunction, AggregateFunction 提供了 AggregateFunction#getResultType() 和AggregateFunction#getAccumulatorType() 指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型是TypeInformation。
- getResultType() : 即 Output 输出参数 getValue(Acc accumulator) 的输出结果数据类型;
- getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型。
案例: 加权平均值
- 定义⼀个聚合函数来计算某⼀列的加权平均
- 在 TableEnvironment 中注册函数
- 在查询中使⽤函数
实现思路:
为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数,定义了类 WeightedAvgAccumulator 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,保证精确⼀次的语义。
WeightedAvg(聚合函数)的 accumulate ⽅法有三个输⼊参数,第⼀个是 WeightedAvgAccum accumulator,另外两个是⽤户⾃定义的输⼊:输⼊的值 ivalue 和 输⼊的权重 iweight,尽管 retract()、merge()、resetAccumulator() ⽅法在⼤多数聚合类型中都不是必须实现的,但在样例中提供了他们的实现,并且定义了 getResultType() 和 getAccumulatorType()。
代码案例:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;import java.io.Serializable;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;/*** 输入数据:* a,1,1* a,10,2** 输出结果:* res1=>:1> +I[a, 1.0]* res2=>:1> +I[a, 1.0]* res3=>:1> +I[a, 1.0]** res1=>:1> -U[a, 1.0]* res1=>:1> +U[a, 7.0]* res3=>:1> -U[a, 1.0]* res3=>:1> +U[a, 7.0]* res2=>:1> -U[a, 1.0]* res2=>:1> +U[a, 7.0]*/
public class AggregateFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple3<String, Double, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, Double, Double>>() {@Overridepublic Tuple3<String, Double, Double> map(String input) throws Exception {return new Tuple3<>(input.split(",")[0],Double.parseDouble(input.split(",")[1]),Double.parseDouble(input.split(",")[2]));}});Table table = tEnv.fromDataStream(tpStream, "field,iValue,iWeight");tEnv.createTemporaryView("SourceTable", table);Table res1 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call(WeightedAvg.class, $("iValue"), $("iWeight")));// 注册函数tEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);// Table API 调⽤函数Table res2 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call("WeightedAvg", $("iValue"), $("iWeight")));// SQL API 调⽤函数Table res3 = tEnv.sqlQuery("SELECT field, WeightedAvg(`iValue`, iWeight) FROM SourceTable GROUP BY field");tEnv.toChangelogStream(res1).print("res1=>");tEnv.toChangelogStream(res2).print("res2=>");tEnv.toChangelogStream(res3).print("res3=>");env.execute();}// ⾃定义⼀个计算权重 avg 的 accmulatorpublic static class WeightedAvgAccumulator implements Serializable {public Double sum = 0.0;public Double count = 0.0;}// 输⼊:Long iValue, Integer iWeightpublic static class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {// 创建⼀个 accumulator@Overridepublic WeightedAvgAccumulator createAccumulator() {return new WeightedAvgAccumulator();}public void accumulate(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}public void retract(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}// 获取返回结果@Overridepublic Double getValue(WeightedAvgAccumulator acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}// Session window 使⽤这个⽅法将⼏个单独窗⼝的结果合并public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {for (WeightedAvgAccumulator a : it) {acc.count += a.count;acc.sum += a.sum;}}public void resetAccumulator(WeightedAvgAccumulator acc) {acc.count = 0.0;acc.sum = 0.0;}}
}
测试结果:

相关文章:
FlinkSQL聚合函数(Aggregate Function)详解
使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。 上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法。 案例场景: 关于饮料的表,有三个字段,分别是 id、name、price࿰…...
TensorFlow学习笔记--(3)张量的常用运算函数
损失函数及求偏导 通过 tf.GradientTape 函数来指定损失函数的变量以及表达式 最后通过 gradient(%损失函数%,%偏导对象%) 来获取求偏导的结果 独热编码 给出一组特征值 来对图像进行分类 可以用独热编码 0的概率是第0种 1的概率是第1种 0的概率是第二种 tf.one_hot(%某标签…...
RT-Thread:嵌入式实时操作系统的设计与应用
RT-Thread(Real-Time Thread)是一个开源的嵌入式实时操作系统,其设计和应用在嵌入式领域具有重要意义。本文将从RT-Thread的设计理念、核心特性,以及在嵌入式系统中的应用等方面进行探讨,对其进行全面的介绍。 首先&a…...
SpringBoot学习笔记-创建菜单与游戏页面(下)
笔记内容转载自 AcWing 的 SpringBoot 框架课讲义,课程链接:AcWing SpringBoot 框架课。 CONTENTS 1. 地图优化改进2. 绘制玩家的起始位置3. 实现玩家移动4. 优化蛇的身体效果5. 碰撞检测实现 本节实现两名玩家即两条蛇的绘制与人工操作移动功能。 1. 地…...
STM32一
0.前言 在B站经常看见有人用stm32做出了有趣的电子小玩艺儿,感到很羡慕,于是想了解一下。 1.什么是stm32 STM32 是一系列由STMicroelectronics(意法半导体)公司设计和制造的32位ARM Cortex-M微控制器。这一系列的微控制器广泛用…...
GPT-4 Turbo Assistants API
Assistants API Assistants API 允许您在自己的应用程序中构建 AI 助手。助手有指令,可以利用模型、工具和知识来响应用户查询。Assistants API 目前支持三种类型的工具:代码解释器、检索和函数调用。未来,我们计划发布更多 OpenAI 构建的工…...
day08_回顾与课程概括
回顾与课程概括 一、上节课复习 一、上节课复习 1、osi七层与数据传输 2、socketsocket是对传输层以下的封装ipport标识唯一一个基于网络通讯的软件3、tcp与udptcp:因为在通信之前必须建立双向连接,通常都是客户端主动连接服务端的,所以必须…...
iptables、netfilter、firewalld、ufd简单介绍
参考:...
Python基础入门例程53-NP53 前10个偶数(循环语句)
最近的博文: Python基础入门例程52-NP52 累加数与平均值(循环语句)-CSDN博客 Python基础入门例程51-NP51 列表的最大与最小(循环语句)-CSDN博客 Python基础入门例程50-NP50 程序员节(循环语句)-CSDN博客 目录 最近的博文: 描…...
v-bind和v-model
目录 前言 v-bind 作用 语法格式 编译原理 简写 v-model 作用 使用方法 v-bind和v-model的区别和联系 前言 本文我们来了解一下模板语法之指令语法中的v-bind和v-model v-bind 作用 v-bind可以让html标签的某个属性的值产生动态的效果 语法格式 <html标签 v-bin…...
Adobe premiere裁剪视频尺寸并转为GIF格式
第 1 步:裁剪视频 修改序列设置以适应裁剪之后的图像区域;序列中的编辑模式不能使用默认的,这里使用的是“ProRes RAW” 第 2 步:设置背景色 需要设置“颜色遮罩”的大小和颜色,颜色遮罩放在下面。 第 3 步࿱…...
关于react输入框回显问题
绑定表单元素的值到组件状态中。例如,对于一个文本框,可以使用onChange事件将用户输入的值绑定到组件状态中。 创建一个处理表单提交的函数。这个函数通常会使用组件状态中的值来更新页面上的数据。 在handleSubmit函数中,防止默认表单提交…...
案例续集留言板
前端没有保存数据的功能,后端把数据保存下来(内存,数据库等等......) 前端代码如下 : <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initia…...
72 内网安全-域横向CSMSF联动及应急响应初识
目录 演示案例:MSF&CobaltStrike联动ShellWEB攻击应急响应朔源-后门,日志WIN系统攻击应急响应朔源-后门,日志,流量临时给大家看看学的好的怎么干对应CTF比赛 涉及资源 权限维持留到后面在补充,先把后面的知识点给大家讲起来,因为权限维持它是我们前期…...
Leetcode—20.有效的括号【简单】
2023每日刷题(二十七) Leetcode—20.有效的括号 C实现代码 class Solution { public:bool isValid(string s) {stack<char> arr;int len s.size();if(len 1) {return false;}for(int i 0; i < len; i) {if(s[i] ( || s[i] [ || s[i] {)…...
Leetcode—剑指OfferII LCR 019.验证回文串II【简单】
2023每日刷题(二十七) Leetcode—剑指OfferII LCR 019.验证回文串II 实现代码 class Solution { public:bool judgeFunc(string s, int left, int right) {while(left < right) {if(s[left] ! s[right]) {return false;}left;right--;}return true;…...
Mac电脑配置Flutter开发环境
1.进入官网下载页: Flutter SDK releases | Flutter 可以看到有 Windows、macOS、Linux三种系统的下载包 选择macOS,然后点击下载 Stable channel(稳定版)中的最新版本,下载完成后可以移动到资源库Library中。 2.下载…...
QTableView如何清空数据保留表头
QTableView如何清空数据保留表头 调用QAbstractItemModel中的removeRows或者removeColumns方法。 方法原型 bool removeRows(int column, int count, const QModelIndex &parent QModelIndex())在支持此功能的模型上,从模型中删除从父级父级下给定行开始的计…...
[工业自动化-17]:西门子S7-15xxx编程 - 软件编程 - PLC编程语言以及与嵌入式编程的比较
目录 一、博图编程语言 1.1 概述 1.2 三种编程语言之间的关系 二、PLC与嵌入式系统的类比 三、PLC编程与嵌入式系统编程的比较 3.1 不同点 3.2 相同点 3.3 PLC是一种专门用于工业控制系统的嵌入式系统 一、博图编程语言 1.1 概述 西门子(Siemens࿰…...
云原生微服务架构及实现技术
云原生是一种技术理念和架构方法,它充分利用云计算的优势,将应用程序和基础设施进行优化,以适应云环境的特性。云原生的设计原则主要包括弹性、韧性、安全性、可观测性、灰度等,旨在让企业在云环境中实现轻量、敏捷、高度自动化的…...
深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
毫米波雷达基础理论(3D+4D)
3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文: 一文入门汽车毫米波雷达基本原理 :https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...
