当前位置: 首页 > news >正文

FlinkSQL聚合函数(Aggregate Function)详解

使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。

在这里插入图片描述

上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法

案例场景:

关于饮料的表,有三个字段,分别是 id、name、price,表⾥有 5 ⾏数据,找到所有饮料⾥最贵的饮料的价格,即执⾏⼀个 max() 聚合拿到结果,遍历所有 5 ⾏数据,最终结果就只有⼀个数值。

开发流程:

实现 AggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的;

必须实现以下⽅法:

  • Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,其存储了聚合的中间结果,⽐如在执⾏ max() 时会存储当前的 max 值;
  • accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,调⽤ accumulate() ⽅法更新 accumulator,处理每⼀条输⼊数据,方法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,⽅法的参数类型可以不同,并且⽀持变⻓参数。
  • Output输出参数 getValue(Acc accumulator) : 通过调⽤ getValue ⽅法来计算和返回最终的结果。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 在回撤流的场景下必须实现,在计算回撤数据时调⽤,如果没有实现会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景下必须要实现,此外,这个⽅法对于优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,在数据 shuffle 前先进⾏⼀次聚合计算。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型信息的⽅法:

默认情况下,⽤户的 Input 输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚合中间结果 createAccumulator() 的返回结果)、 Output输出参数数据类型( Output输出参数 getValue(Acc accumulator) 的 Output输出参数 )会被 Flink 使⽤反射获取到。

对于 accumulator 和 Output 输出参数类型,Flink SQL 的类型推导在遇到复杂类型时会推导出错误的结果(注意:Input输⼊参数 因为是上游算⼦传⼊的,类型信息是确认的,不会出现推导错误),⽐如⾮基本类型 POJO 的复杂类型。

同 ScalarFunction 和 TableFunction, AggregateFunction 提供了 AggregateFunction#getResultType() 和AggregateFunction#getAccumulatorType() 指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型是TypeInformation。

  • getResultType() : 即 Output 输出参数 getValue(Acc accumulator) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型。

案例: 加权平均值

  • 定义⼀个聚合函数来计算某⼀列的加权平均
  • 在 TableEnvironment 中注册函数
  • 在查询中使⽤函数

实现思路:

为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数,定义了类 WeightedAvgAccumulator 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,保证精确⼀次的语义。

WeightedAvg(聚合函数)的 accumulate ⽅法有三个输⼊参数,第⼀个是 WeightedAvgAccum accumulator,另外两个是⽤户⾃定义的输⼊:输⼊的值 ivalue 和 输⼊的权重 iweight,尽管 retract()、merge()、resetAccumulator() ⽅法在⼤多数聚合类型中都不是必须实现的,但在样例中提供了他们的实现,并且定义了 getResultType() 和 getAccumulatorType()。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;import java.io.Serializable;import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;/*** 输入数据:* a,1,1* a,10,2** 输出结果:* res1=>:1> +I[a, 1.0]* res2=>:1> +I[a, 1.0]* res3=>:1> +I[a, 1.0]** res1=>:1> -U[a, 1.0]* res1=>:1> +U[a, 7.0]* res3=>:1> -U[a, 1.0]* res3=>:1> +U[a, 7.0]* res2=>:1> -U[a, 1.0]* res2=>:1> +U[a, 7.0]*/
public class AggregateFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple3<String, Double, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, Double, Double>>() {@Overridepublic Tuple3<String, Double, Double> map(String input) throws Exception {return new Tuple3<>(input.split(",")[0],Double.parseDouble(input.split(",")[1]),Double.parseDouble(input.split(",")[2]));}});Table table = tEnv.fromDataStream(tpStream, "field,iValue,iWeight");tEnv.createTemporaryView("SourceTable", table);Table res1 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call(WeightedAvg.class, $("iValue"), $("iWeight")));// 注册函数tEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);// Table API 调⽤函数Table res2 = tEnv.from("SourceTable").groupBy($("field")).select($("field"), call("WeightedAvg", $("iValue"), $("iWeight")));// SQL API 调⽤函数Table res3 = tEnv.sqlQuery("SELECT field, WeightedAvg(`iValue`, iWeight) FROM SourceTable GROUP BY field");tEnv.toChangelogStream(res1).print("res1=>");tEnv.toChangelogStream(res2).print("res2=>");tEnv.toChangelogStream(res3).print("res3=>");env.execute();}// ⾃定义⼀个计算权重 avg 的 accmulatorpublic static class WeightedAvgAccumulator implements Serializable {public Double sum = 0.0;public Double count = 0.0;}// 输⼊:Long iValue, Integer iWeightpublic static class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {// 创建⼀个 accumulator@Overridepublic WeightedAvgAccumulator createAccumulator() {return new WeightedAvgAccumulator();}public void accumulate(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}public void retract(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}// 获取返回结果@Overridepublic Double getValue(WeightedAvgAccumulator acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}// Session window 使⽤这个⽅法将⼏个单独窗⼝的结果合并public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {for (WeightedAvgAccumulator a : it) {acc.count += a.count;acc.sum += a.sum;}}public void resetAccumulator(WeightedAvgAccumulator acc) {acc.count = 0.0;acc.sum = 0.0;}}
}

测试结果:

在这里插入图片描述

相关文章:

FlinkSQL聚合函数(Aggregate Function)详解

使用场景&#xff1a; 聚合函数即 UDAF&#xff0c;常⽤于进多条数据&#xff0c;出⼀条数据的场景。 上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法。 案例场景&#xff1a; 关于饮料的表&#xff0c;有三个字段&#xff0c;分别是 id、name、price&#xff0…...

TensorFlow学习笔记--(3)张量的常用运算函数

损失函数及求偏导 通过 tf.GradientTape 函数来指定损失函数的变量以及表达式 最后通过 gradient(%损失函数%,%偏导对象%) 来获取求偏导的结果 独热编码 给出一组特征值 来对图像进行分类 可以用独热编码 0的概率是第0种 1的概率是第1种 0的概率是第二种 tf.one_hot(%某标签…...

RT-Thread:嵌入式实时操作系统的设计与应用

RT-Thread&#xff08;Real-Time Thread&#xff09;是一个开源的嵌入式实时操作系统&#xff0c;其设计和应用在嵌入式领域具有重要意义。本文将从RT-Thread的设计理念、核心特性&#xff0c;以及在嵌入式系统中的应用等方面进行探讨&#xff0c;对其进行全面的介绍。 首先&a…...

SpringBoot学习笔记-创建菜单与游戏页面(下)

笔记内容转载自 AcWing 的 SpringBoot 框架课讲义&#xff0c;课程链接&#xff1a;AcWing SpringBoot 框架课。 CONTENTS 1. 地图优化改进2. 绘制玩家的起始位置3. 实现玩家移动4. 优化蛇的身体效果5. 碰撞检测实现 本节实现两名玩家即两条蛇的绘制与人工操作移动功能。 1. 地…...

STM32一

0.前言 在B站经常看见有人用stm32做出了有趣的电子小玩艺儿&#xff0c;感到很羡慕&#xff0c;于是想了解一下。 1.什么是stm32 STM32 是一系列由STMicroelectronics&#xff08;意法半导体&#xff09;公司设计和制造的32位ARM Cortex-M微控制器。这一系列的微控制器广泛用…...

GPT-4 Turbo Assistants API

Assistants API Assistants API 允许您在自己的应用程序中构建 AI 助手。助手有指令&#xff0c;可以利用模型、工具和知识来响应用户查询。Assistants API 目前支持三种类型的工具&#xff1a;代码解释器、检索和函数调用。未来&#xff0c;我们计划发布更多 OpenAI 构建的工…...

day08_回顾与课程概括

回顾与课程概括 一、上节课复习 一、上节课复习 1、osi七层与数据传输 2、socketsocket是对传输层以下的封装ipport标识唯一一个基于网络通讯的软件3、tcp与udptcp&#xff1a;因为在通信之前必须建立双向连接&#xff0c;通常都是客户端主动连接服务端的&#xff0c;所以必须…...

iptables、netfilter、firewalld、ufd简单介绍

参考&#xff1a;...

Python基础入门例程53-NP53 前10个偶数(循环语句)

最近的博文&#xff1a; Python基础入门例程52-NP52 累加数与平均值(循环语句)-CSDN博客 Python基础入门例程51-NP51 列表的最大与最小(循环语句)-CSDN博客 Python基础入门例程50-NP50 程序员节&#xff08;循环语句&#xff09;-CSDN博客 目录 最近的博文&#xff1a; 描…...

v-bind和v-model

目录 前言 v-bind 作用 语法格式 编译原理 简写 v-model 作用 使用方法 v-bind和v-model的区别和联系 前言 本文我们来了解一下模板语法之指令语法中的v-bind和v-model v-bind 作用 v-bind可以让html标签的某个属性的值产生动态的效果 语法格式 <html标签 v-bin…...

Adobe premiere裁剪视频尺寸并转为GIF格式

第 1 步&#xff1a;裁剪视频 修改序列设置以适应裁剪之后的图像区域&#xff1b;序列中的编辑模式不能使用默认的&#xff0c;这里使用的是“ProRes RAW” 第 2 步&#xff1a;设置背景色 需要设置“颜色遮罩”的大小和颜色&#xff0c;颜色遮罩放在下面。 第 3 步&#xff1…...

关于react输入框回显问题

绑定表单元素的值到组件状态中。例如&#xff0c;对于一个文本框&#xff0c;可以使用onChange事件将用户输入的值绑定到组件状态中。 创建一个处理表单提交的函数。这个函数通常会使用组件状态中的值来更新页面上的数据。 在handleSubmit函数中&#xff0c;防止默认表单提交…...

案例续集留言板

前端没有保存数据的功能,后端把数据保存下来(内存,数据库等等......) 前端代码如下 : <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initia…...

72 内网安全-域横向CSMSF联动及应急响应初识

目录 演示案例:MSF&CobaltStrike联动ShellWEB攻击应急响应朔源-后门,日志WIN系统攻击应急响应朔源-后门,日志,流量临时给大家看看学的好的怎么干对应CTF比赛 涉及资源 权限维持留到后面在补充&#xff0c;先把后面的知识点给大家讲起来&#xff0c;因为权限维持它是我们前期…...

Leetcode—20.有效的括号【简单】

2023每日刷题&#xff08;二十七&#xff09; Leetcode—20.有效的括号 C实现代码 class Solution { public:bool isValid(string s) {stack<char> arr;int len s.size();if(len 1) {return false;}for(int i 0; i < len; i) {if(s[i] ( || s[i] [ || s[i] {)…...

Leetcode—剑指OfferII LCR 019.验证回文串II【简单】

2023每日刷题&#xff08;二十七&#xff09; Leetcode—剑指OfferII LCR 019.验证回文串II 实现代码 class Solution { public:bool judgeFunc(string s, int left, int right) {while(left < right) {if(s[left] ! s[right]) {return false;}left;right--;}return true;…...

Mac电脑配置Flutter开发环境

1.进入官网下载页&#xff1a; Flutter SDK releases | Flutter 可以看到有 Windows、macOS、Linux三种系统的下载包 选择macOS&#xff0c;然后点击下载 Stable channel&#xff08;稳定版&#xff09;中的最新版本&#xff0c;下载完成后可以移动到资源库Library中。 2.下载…...

QTableView如何清空数据保留表头

QTableView如何清空数据保留表头 调用QAbstractItemModel中的removeRows或者removeColumns方法。 方法原型 bool removeRows(int column, int count, const QModelIndex &parent QModelIndex())在支持此功能的模型上&#xff0c;从模型中删除从父级父级下给定行开始的计…...

[工业自动化-17]:西门子S7-15xxx编程 - 软件编程 - PLC编程语言以及与嵌入式编程的比较

目录 一、博图编程语言 1.1 概述 1.2 三种编程语言之间的关系 二、PLC与嵌入式系统的类比 三、PLC编程与嵌入式系统编程的比较 3.1 不同点 3.2 相同点 3.3 PLC是一种专门用于工业控制系统的嵌入式系统 一、博图编程语言 1.1 概述 西门子&#xff08;Siemens&#xff0…...

云原生微服务架构及实现技术

云原生是一种技术理念和架构方法&#xff0c;它充分利用云计算的优势&#xff0c;将应用程序和基础设施进行优化&#xff0c;以适应云环境的特性。云原生的设计原则主要包括弹性、韧性、安全性、可观测性、灰度等&#xff0c;旨在让企业在云环境中实现轻量、敏捷、高度自动化的…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速&#xff08;高性能&#xff09;的 Web 框架&#xff0c;用于构建 API&#xff0c;支持 Python 3.6。它基于标准 Python 类型提示&#xff0c;易于学习且功能强大。以下是一个完整的 FastAPI 入门教程&#xff0c;涵盖从环境搭建到创建并运行一个简单的…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

拉力测试cuda pytorch 把 4070显卡拉满

import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试&#xff0c;通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小&#xff0c;增大可提高计算复杂度duration: 测试持续时间&#xff08;秒&…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?

uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件&#xff0c;用于在原生应用中加载 HTML 页面&#xff1a; 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

Rust 开发环境搭建

环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行&#xff1a; rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu ​ 2、Hello World fn main() { println…...

破解路内监管盲区:免布线低位视频桩重塑停车管理新标准

城市路内停车管理常因行道树遮挡、高位设备盲区等问题&#xff0c;导致车牌识别率低、逃费率高&#xff0c;传统模式在复杂路段束手无策。免布线低位视频桩凭借超低视角部署与智能算法&#xff0c;正成为破局关键。该设备安装于车位侧方0.5-0.7米高度&#xff0c;直接规避树枝遮…...

WPF八大法则:告别模态窗口卡顿

⚙️ 核心问题&#xff1a;阻塞式模态窗口的缺陷 原始代码中ShowDialog()会阻塞UI线程&#xff0c;导致后续逻辑无法执行&#xff1a; var result modalWindow.ShowDialog(); // 线程阻塞 ProcessResult(result); // 必须等待窗口关闭根本问题&#xff1a…...

Unity中的transform.up

2025年6月8日&#xff0c;周日下午 在Unity中&#xff0c;transform.up是Transform组件的一个属性&#xff0c;表示游戏对象在世界空间中的“上”方向&#xff08;Y轴正方向&#xff09;&#xff0c;且会随对象旋转动态变化。以下是关键点解析&#xff1a; 基本定义 transfor…...