FlinkSQL聚合函数(Aggregate Function)详解
使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。
上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法。
案例场景:
关于饮料的表,有三个字段,分别是 id、name、price,表⾥有 5 ⾏数据,找到所有饮料⾥最贵的饮料的价格,即执⾏⼀个 max() 聚合拿到结果,遍历所有 5 ⾏数据,最终结果就只有⼀个数值。
开发流程:
实现 AggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的;
必须实现以下⽅法:
- Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,其存储了聚合的中间结果,⽐如在执⾏ max() 时会存储当前的 max 值;
- accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,调⽤ accumulate() ⽅法更新 accumulator,处理每⼀条输⼊数据,方法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,⽅法的参数类型可以不同,并且⽀持变⻓参数。
- Output输出参数 getValue(Acc accumulator) : 通过调⽤ getValue ⽅法来计算和返回最终的结果。
某些场景下必须实现:
- retract(Acc accumulator, Input输⼊参数) : 在回撤流的场景下必须实现,在计算回撤数据时调⽤,如果没有实现会直接报错。
- merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景下必须要实现,此外,这个⽅法对于优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,在数据 shuffle 前先进⾏⼀次聚合计算。
- resetAccumulator() : 在批式聚合中是必须实现的。
关于⼊参、出参数据类型信息的⽅法:
默认情况下,⽤户的 Input 输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚合中间结果 createAccumulator() 的返回结果)、 Output输出参数数据类型( Output输出参数 getValue(Acc accumulator) 的 Output输出参数 )会被 Flink 使⽤反射获取到。
对于 accumulator 和 Output 输出参数类型,Flink SQL 的类型推导在遇到复杂类型时会推导出错误的结果(注意:Input输⼊参数 因为是上游算⼦传⼊的,类型信息是确认的,不会出现推导错误),⽐如⾮基本类型 POJO 的复杂类型。
同 ScalarFunction 和 TableFunction, AggregateFunction 提供了 AggregateFunction#getResultType() 和AggregateFunction#getAccumulatorType() 指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型是TypeInformation。
- getResultType() : 即 Output 输出参数 getValue(Acc accumulator) 的输出结果数据类型;
- getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型。
案例: 加权平均值
- 定义⼀个聚合函数来计算某⼀列的加权平均
- 在 TableEnvironment 中注册函数
- 在查询中使⽤函数
实现思路:
为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数,定义了类 WeightedAvgAccumulator 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,保证精确⼀次的语义。
WeightedAvg(聚合函数)的 accumulate ⽅法有三个输⼊参数,第⼀个是 WeightedAvgAccum accumulator,另外两个是⽤户⾃定义的输⼊:输⼊的值 ivalue 和 输⼊的权重 iweight,尽管 retract()、merge()、resetAccumulator() ⽅法在⼤多数聚合类型中都不是必须实现的,但在样例中提供了他们的实现,并且定义了 getResultType() 和 getAccumulatorType()。
代码案例:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;import java.io.Serializable;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;/*** 输入数据:* a,1,1* a,10,2** 输出结果:* res1=>:1> +I[a, 1.0]* res2=>:1> +I[a, 1.0]* res3=>:1> +I[a, 1.0]** res1=>:1> -U[a, 1.0]* res1=>:1> +U[a, 7.0]* res3=>:1> -U[a, 1.0]* res3=>:1> +U[a, 7.0]* res2=>:1> -U[a, 1.0]* res2=>:1> +U[a, 7.0]*/
public class AggregateFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple3<String, Double, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, Double, Double>>() {@Overridepublic Tuple3<String, Double, Double> map(String input) throws Exception {return new Tuple3<>(input.split(",")[0],Double.parseDouble(input.split(",")[1]),Double.parseDouble(input.split(",")[2]));}});Table table = tEnv.fromDataStream(tpStream, "field,iValue,iWeight");tEnv.createTemporaryView("SourceTable", table);Table res1 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call(WeightedAvg.class, $("iValue"), $("iWeight")));// 注册函数tEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);// Table API 调⽤函数Table res2 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call("WeightedAvg", $("iValue"), $("iWeight")));// SQL API 调⽤函数Table res3 = tEnv.sqlQuery("SELECT field, WeightedAvg(`iValue`, iWeight) FROM SourceTable GROUP BY field");tEnv.toChangelogStream(res1).print("res1=>");tEnv.toChangelogStream(res2).print("res2=>");tEnv.toChangelogStream(res3).print("res3=>");env.execute();}// ⾃定义⼀个计算权重 avg 的 accmulatorpublic static class WeightedAvgAccumulator implements Serializable {public Double sum = 0.0;public Double count = 0.0;}// 输⼊:Long iValue, Integer iWeightpublic static class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {// 创建⼀个 accumulator@Overridepublic WeightedAvgAccumulator createAccumulator() {return new WeightedAvgAccumulator();}public void accumulate(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}public void retract(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}// 获取返回结果@Overridepublic Double getValue(WeightedAvgAccumulator acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}// Session window 使⽤这个⽅法将⼏个单独窗⼝的结果合并public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {for (WeightedAvgAccumulator a : it) {acc.count += a.count;acc.sum += a.sum;}}public void resetAccumulator(WeightedAvgAccumulator acc) {acc.count = 0.0;acc.sum = 0.0;}}
}
测试结果:
相关文章:

FlinkSQL聚合函数(Aggregate Function)详解
使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。 上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法。 案例场景: 关于饮料的表,有三个字段,分别是 id、name、price࿰…...

TensorFlow学习笔记--(3)张量的常用运算函数
损失函数及求偏导 通过 tf.GradientTape 函数来指定损失函数的变量以及表达式 最后通过 gradient(%损失函数%,%偏导对象%) 来获取求偏导的结果 独热编码 给出一组特征值 来对图像进行分类 可以用独热编码 0的概率是第0种 1的概率是第1种 0的概率是第二种 tf.one_hot(%某标签…...

RT-Thread:嵌入式实时操作系统的设计与应用
RT-Thread(Real-Time Thread)是一个开源的嵌入式实时操作系统,其设计和应用在嵌入式领域具有重要意义。本文将从RT-Thread的设计理念、核心特性,以及在嵌入式系统中的应用等方面进行探讨,对其进行全面的介绍。 首先&a…...
SpringBoot学习笔记-创建菜单与游戏页面(下)
笔记内容转载自 AcWing 的 SpringBoot 框架课讲义,课程链接:AcWing SpringBoot 框架课。 CONTENTS 1. 地图优化改进2. 绘制玩家的起始位置3. 实现玩家移动4. 优化蛇的身体效果5. 碰撞检测实现 本节实现两名玩家即两条蛇的绘制与人工操作移动功能。 1. 地…...
STM32一
0.前言 在B站经常看见有人用stm32做出了有趣的电子小玩艺儿,感到很羡慕,于是想了解一下。 1.什么是stm32 STM32 是一系列由STMicroelectronics(意法半导体)公司设计和制造的32位ARM Cortex-M微控制器。这一系列的微控制器广泛用…...
GPT-4 Turbo Assistants API
Assistants API Assistants API 允许您在自己的应用程序中构建 AI 助手。助手有指令,可以利用模型、工具和知识来响应用户查询。Assistants API 目前支持三种类型的工具:代码解释器、检索和函数调用。未来,我们计划发布更多 OpenAI 构建的工…...
day08_回顾与课程概括
回顾与课程概括 一、上节课复习 一、上节课复习 1、osi七层与数据传输 2、socketsocket是对传输层以下的封装ipport标识唯一一个基于网络通讯的软件3、tcp与udptcp:因为在通信之前必须建立双向连接,通常都是客户端主动连接服务端的,所以必须…...

iptables、netfilter、firewalld、ufd简单介绍
参考:...

Python基础入门例程53-NP53 前10个偶数(循环语句)
最近的博文: Python基础入门例程52-NP52 累加数与平均值(循环语句)-CSDN博客 Python基础入门例程51-NP51 列表的最大与最小(循环语句)-CSDN博客 Python基础入门例程50-NP50 程序员节(循环语句)-CSDN博客 目录 最近的博文: 描…...

v-bind和v-model
目录 前言 v-bind 作用 语法格式 编译原理 简写 v-model 作用 使用方法 v-bind和v-model的区别和联系 前言 本文我们来了解一下模板语法之指令语法中的v-bind和v-model v-bind 作用 v-bind可以让html标签的某个属性的值产生动态的效果 语法格式 <html标签 v-bin…...

Adobe premiere裁剪视频尺寸并转为GIF格式
第 1 步:裁剪视频 修改序列设置以适应裁剪之后的图像区域;序列中的编辑模式不能使用默认的,这里使用的是“ProRes RAW” 第 2 步:设置背景色 需要设置“颜色遮罩”的大小和颜色,颜色遮罩放在下面。 第 3 步࿱…...
关于react输入框回显问题
绑定表单元素的值到组件状态中。例如,对于一个文本框,可以使用onChange事件将用户输入的值绑定到组件状态中。 创建一个处理表单提交的函数。这个函数通常会使用组件状态中的值来更新页面上的数据。 在handleSubmit函数中,防止默认表单提交…...

案例续集留言板
前端没有保存数据的功能,后端把数据保存下来(内存,数据库等等......) 前端代码如下 : <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initia…...

72 内网安全-域横向CSMSF联动及应急响应初识
目录 演示案例:MSF&CobaltStrike联动ShellWEB攻击应急响应朔源-后门,日志WIN系统攻击应急响应朔源-后门,日志,流量临时给大家看看学的好的怎么干对应CTF比赛 涉及资源 权限维持留到后面在补充,先把后面的知识点给大家讲起来,因为权限维持它是我们前期…...

Leetcode—20.有效的括号【简单】
2023每日刷题(二十七) Leetcode—20.有效的括号 C实现代码 class Solution { public:bool isValid(string s) {stack<char> arr;int len s.size();if(len 1) {return false;}for(int i 0; i < len; i) {if(s[i] ( || s[i] [ || s[i] {)…...

Leetcode—剑指OfferII LCR 019.验证回文串II【简单】
2023每日刷题(二十七) Leetcode—剑指OfferII LCR 019.验证回文串II 实现代码 class Solution { public:bool judgeFunc(string s, int left, int right) {while(left < right) {if(s[left] ! s[right]) {return false;}left;right--;}return true;…...

Mac电脑配置Flutter开发环境
1.进入官网下载页: Flutter SDK releases | Flutter 可以看到有 Windows、macOS、Linux三种系统的下载包 选择macOS,然后点击下载 Stable channel(稳定版)中的最新版本,下载完成后可以移动到资源库Library中。 2.下载…...
QTableView如何清空数据保留表头
QTableView如何清空数据保留表头 调用QAbstractItemModel中的removeRows或者removeColumns方法。 方法原型 bool removeRows(int column, int count, const QModelIndex &parent QModelIndex())在支持此功能的模型上,从模型中删除从父级父级下给定行开始的计…...

[工业自动化-17]:西门子S7-15xxx编程 - 软件编程 - PLC编程语言以及与嵌入式编程的比较
目录 一、博图编程语言 1.1 概述 1.2 三种编程语言之间的关系 二、PLC与嵌入式系统的类比 三、PLC编程与嵌入式系统编程的比较 3.1 不同点 3.2 相同点 3.3 PLC是一种专门用于工业控制系统的嵌入式系统 一、博图编程语言 1.1 概述 西门子(Siemens࿰…...

云原生微服务架构及实现技术
云原生是一种技术理念和架构方法,它充分利用云计算的优势,将应用程序和基础设施进行优化,以适应云环境的特性。云原生的设计原则主要包括弹性、韧性、安全性、可观测性、灰度等,旨在让企业在云环境中实现轻量、敏捷、高度自动化的…...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...

Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...

均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...

2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...

零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程
STM32F1 本教程使用零知标准板(STM32F103RBT6)通过I2C驱动ICM20948九轴传感器,实现姿态解算,并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化,适合嵌入式及物联网开发者。在基础驱动上新增…...