【API篇】八、Flink窗口函数
文章目录
- 1、增量聚合之ReduceFunction
- 2、增量聚合之AggregateFunction
- 3、全窗口函数full window functions
- 4、增量聚合函数搭配全窗口函数
- 5、会话窗口动态获取间隔值
- 6、触发器和移除器
- 7、补充
//窗口操作
stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)
上一节的窗口分配器,指明了窗口类型,知道了数据属于哪个窗口并收集。而窗口函数,则是定义如何对这些数据做计算操作。
- 增量聚合:
来一条数据,计算一条数据
,窗口触发的时候输出计算结果 - 全窗口函数:
数据来了不计算,存起来
,窗口触发的时候,计算并输出计算结果
1、增量聚合之ReduceFunction
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).keyBy(r -> r.getId())// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(30))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("调用reduce方法,value1=:"+value1 + ",value2=:"+value2);return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc()+value2.getVc());}}).print();env.execute();}
}
运行,输入数据,查看控制台:
2、增量聚合之AggregateFunction
上面使用ReduceFunction的限制是,输入数据的类型、聚合中间状态的类型、输出结果的类型必须一致,AggregateFunction则没有这个限制。AggregateFunction接口有四个方法:
- createAccumulator:创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
- add:将输入的元素添加到累加器中。
- getResult:从累加器中提取聚合的输出结果。
- merge:合并两个累加器,并将合并后的状态作为一个累加器返回
AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果
public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()); //自定义的实现类,String转自定义对象WaterSensorKeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}//value即输入的数据,accumulator即之前的计算结果@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}});aggregate.print();env.execute();}
}
运行,输入数据,查看控制台:
3、全窗口函数full window functions
全窗口函数,即数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果Flink全窗口函数有两种,第一种为apply方法下的:
stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());
传入一个WindowFunction的实现类,该方法已被第二种ProcessWindowFunction全覆盖,因而逐渐弃用。ProcessWindowFunction除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”(Context),通过这个上下文对象,可以获取窗口对象、窗口处理时间、事件时间水位线
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {/*** 全窗口函数计算逻辑,窗口结束时触发才调用一次* s 分组的key* context 上下文对象* elements 窗口内存的所有数据* out 采集器对象*/@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();env.execute();}
}
效果:
4、增量聚合函数搭配全窗口函数
可以看出,增量和全窗口各有好处:
- 增量聚合下,来一条计算一条,只存储中间计算结果,占用空间少
- 全窗口函数则是可以通过上下文对象来实现灵活的功能
像同时拥有两者的优点,可以调用aggregate方法的另一个重载方法:
// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function)// AggregateFunction与WindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,ProcessWindowFunction<V,R,K,W> windowFunction)
此时:
- 基于第一个参数,即增量聚合函数,来处理数据,来一条聚合一条
- 窗口触发后,调用第二个参数的处理逻辑,此时,把增量聚合的结果(只有一条数据)再传递给全窗口函数,也就是说全窗口的Iterable<> elements,长度为1,注意全窗口不再缓存所有数据
- 经过全窗口,执行处理和包装,再输出
public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));//sensorWS.reduce() //也可以传两个SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}}
public class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}}// 全窗口函数的输入类型 = 增量聚合函数的输出类型
public class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{@Overridepublic void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}
}
注意,二者搭配时,根据前面分析,可以知道,必有:增量聚合函数的输出类型 = 全窗口函数的输入类型
5、会话窗口动态获取间隔值
到此,窗口API需要的窗口分配器(见上一篇)和窗口函数都已整理完。上面demo中用的窗口分配器都是滚动窗口,但应该有以下这些:
- 时间滚动窗口
- 时间滑动窗口
- 时间会话窗口
- 计数滚动窗口
- 计数滑动窗口
这里再记录下时间会话窗口+动态获取会话间隔:
public class WindowSessionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(t -> t.getTs() * 1000L));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();env.execute();}
}
来一条数据,根据这条数据获取一个值做为会话间隔,到达这个间隔前,下条数据到来了,则会话间隔又成了另一个值,动态的。运行:
可以看到,会话间隔动态获取,到达间隔时下条数据还没来,则结束本窗户,窗口口结束时触发才调用一次process,和分析的一致。最后补充一点,展开demo代码里的Lambda表达式,其实是一个抓取会话间隔的方法,定义了会话窗口间隔的获取逻辑。
再贴个计数滑动窗口:
6、触发器和移除器
触发器主要是用来控制窗口什么时候触发计算,即什么时候执行窗口函数
//基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)
stream.keyBy(...).window(...).trigger(new MyTrigger())
移除器主要用来定义移除某些数据的逻辑
基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...).window(...).evictor(new MyEvictor())
Flink提供的几个窗口,比如滑动、滚动等,都有对触发器和移除器的默认实现,不用自定义。
7、补充
窗口的划分:
- 窗口开始时间start是窗口长度的整数倍,向下取整
- 窗口结束时间是start+窗口长度
- 窗口是左闭右开,因为属于本窗口的最大时间戳为end-1
- 窗口的生命周期,创建是属于本窗口的第一条数据来的时候,现new的,放入一个singleton单例的集合中
- 窗口的销毁是时间的进展 >= 窗口的最大时间戳(end-1ms) + 允许迟到的时间(默认0)
- 窗口什么时候触发输出:当时间进展 >= 窗口的最大时间戳(end -1ms)
相关文章:

【API篇】八、Flink窗口函数
文章目录 1、增量聚合之ReduceFunction2、增量聚合之AggregateFunction3、全窗口函数full window functions4、增量聚合函数搭配全窗口函数5、会话窗口动态获取间隔值6、触发器和移除器7、补充 //窗口操作 stream.keyBy(<key selector>).window(<window assigner>)…...

React JSX常用语法总结
React JSX语法 什么是React JSX JSX(javascript xml) 就是JavaScript和XML结合的一种格式,是JavaScript的语法扩展,只要把HTML代码写在JS中,就为JSX。用于动态构建用户界面的Javascript库,发送请求获取数据…...

DVWA-Cross Site Request Forgery (CSRF)
大部分网站都会要求用户登录后,使用相应的权限在网页中进行操作,比如发邮件、购物或者转账等都是基于特定用户权限的操作。浏览器会短期或长期地记住用户的登录信息,但是,如果这个登录信息被恶意利用呢?就有可能发生CSRF CSRF的英文全称为Cross Site Request Forgery,中文…...

浅谈安科瑞可编程电测仪表在老挝某项目的应用
摘要:本文介绍了安科瑞多功能电能表在老挝某项目的应用。AMC系列交流多功能仪表是一款专门为电力系统、工矿企业、公用事业和智能建筑用于电力监控而设计的智能电表。 Abstract:This article introduces the application of the multi-function energy …...

Java项目源码合集
以下只是源码合集的一部分,源码均已本地正常调试运行,如需请与我联系。 序号项目名称演示地址1springbootvue药店销售管理系统https://pan.baidu.com/s/1n-Vk5Pr5z7s3IcN3WsCkdg?pwdve6z 2基于ssm协同过滤技术的旅游景点购票系统https://pan.baidu.com…...

Python学习笔记--生成器
四、生成器 1、为什么需要生成器 通过上面的学习,可以知道列表生成式,我们可以直接创建一个列表。 但是,受到内存限制,列表容量肯定是有限的。而且,创建一个包含 1000 万个元素的列表,不仅占用很大的存储…...

【Python学习】—Python基础语法(五)
【Python学习】—Python基础语法(五) 一、循环的嵌套使用 二、九九乘法表 #外层循环表示行数 for i in range(1,10):#内层循环表示每一行的数据for j in range(1,i1):#输出每一行的内容print(f"{i} * {j} {i * j} \t",end"") #外层…...

【js】JavaScript清除所有(多个)定时器的方法:
文章目录 一、停止单个定时器二、暂停与恢复定时器三、使用Promise来管理定时器四、使用ES6特性管理定时器五、案例(定时获取页面列表数据) 一、停止单个定时器 #在某些情况下,我们可能只需要停止单个定时器。 #在JavaScript中,我…...

java实现周易64卦并返回对应的卦象(含百度百科链接)
《易经》是中华民族传统思想文化中自然哲学与人文实践的理论根源,是古代汉民族思想、智慧的结晶,被誉为“大道之源”,是古代帝王之学,政治家、军事家、商家的必修之术。 《易经》含盖万有,纲纪群伦,是中华…...

# 算法与程序的灵魂
文章目录 前言算法与程序的关系例子1:冒泡排序例子2:斐波那契数列算法优化与进阶总结 前言 大家好我是艾老虎尤,算法与程序是计算机科学中两个非常重要的概念。算法是解决问题的方法和步骤,而程序是算法的具体实现。在计算机科学…...

2023-10-21 美团2024秋招后端开发岗笔试题
1 考察dfs和拓扑排序 1.1 题目描述(如果拓扑排序不清楚可以去做一下lc 207. 课程表) 1.2 答案 import java.util.*;public class Meituan {static int m,n;public static void main(String[] args) {Scanner in new Scanner(System.in);m in.nextInt…...

汽车托运是怎样收费
汽车托运是如何收费的呢?一般来说,汽车托运的费用是会随着每公里来增加,目前的托运的每公里费用在1.2-1.8元之间,托运的距离越远那么它的托运单价费用就会越低,如果你运气好找到一家在搞活动的汽车托运公司,那么你就算…...

使用docker-compose私有化部署 GitLab
在软件开发和协作过程中,版本控制是至关重要的一环。GitLab 是一个功能强大的开源平台,提供了完整的代码管理功能,包括版本控制、问题跟踪以及持续集成等。这使得团队能够更高效地协作开发。前段时间翻阅笔记时,偶然发现了之前公司…...

Vue项目引入百度统计的正确操作步骤,亲测有效!
1、平台获取统计代码 2、在head和body中分别添加以下代码 head: <script>var _hmt _hmt || [];</script>body: <script>var _hmt _hmt || [];(function () {var hm document.createElement("script");hm.src "https://hm.baidu.com/hm.js…...

Keras中model.evaluate() 返回的是 loss value 和 metrics values
Keras官方文档: https://keras.io/models/model/#evaluate Keras中model.evaluate() 返回的是 损失值和训练时选定的指标值(例如,[AUC, , accuracy])。 训练时选定的指标值是指model.compile()里面metrics后面的值,ev…...

CSRF跨域请求伪造
1.SSRF服务端请求伪造(外网访问内网) SSRF(Server-Side Request Forgery:服务器端请求伪造) 是一种由攻击者构造形成由服务端发起请求的一个安全漏洞。一般情况下,SSRF是要目标网站的内部系统。(因为他是从内部系统访问的…...

LeetCode 1465. 切割后面积最大的蛋糕:纵横分别处理
【LetMeFly】1465.切割后面积最大的蛋糕:纵横分别处理 力扣题目链接:https://leetcode.cn/problems/maximum-area-of-a-piece-of-cake-after-horizontal-and-vertical-cuts/ 矩形蛋糕的高度为 h 且宽度为 w,给你两个整数数组 horizontalCut…...

YTM32的增强型定时器eTMR外设模块详解
文章目录 eTMR外设简介eTMR工作机制系统框图引脚与信号计数器与时钟源输出比较模式PWM模式通道配对通道对的互补输出(Complementary Mode)双缓冲输出PWM(Double Switch)错误检测机制(Fault Detection) 输入…...
40.查找练习题(王道2023数据结构第7章)
试题1(王道7.2.4节综合练习5): 写出折半查找的递归算法。 #include<stdio.h> #include<stdlib.h> #include<string.h>#define MAXSIZE 10 #define ElemType int #define Status inttypedef struct{int data[MAXSIZE]; /…...

Segmentation fault 的bug解决
一,Segmentation fault 的bug解决 问题描述:自己在使用CPU上调试完代码之后,可以稳定运行,有输出结果。 但是把数据和模型加载上GPU之后,出现了报错。 Segmentation fault (core dumped) 搜了一下可能存在的原因&…...

【Python机器学习】零基础掌握BaggingRegressor集成学习
如何提升回归模型的稳定性和准确性? 在实际生活中,比如房价预测,经常会遇到一种情况:有大量的特征和样本数据,但模型的预测准确度仍然不尽人意。这时候,单一的模型(如支持向量机回归)可能表现得并不够好。 考虑到这个问题,解决方案可能是使用集成方法,特别是Baggin…...

麒麟KYLINOS通过命令行配置kysec的防火墙
原文链接:麒麟KYLINOS通过命令行配置kysec的防火墙 hello,大家好啊,今天给大家带来一篇使用命令行配置kysec的防火墙的文章,通过本篇文章的学习,大家可以了解到图形化界面中的防火墙信息是如何生成的,为后期…...

磁盘监控:告警时发送邮件
1.配置邮箱 1.编辑邮箱配置文件 vim /etc/mail.rc2.在末尾输入自己的邮箱配置,以163邮箱为例 #开启ssl set ssl-verifyignore #证书目录,下方为centos系统证书默认位置,也自行生成证书并指定 set nss-config-dir/etc/pki/nssdb # 配置的第…...

【HarmonyOS】元服务卡片router实现跳转到指定页面并传动态参数
【关键字】 元服务卡片、router跳转不同页面、传递动态参数 【写在前面】 本篇文章主要介绍开发元服务卡片时,如何实现从卡片中点击事件跳转到指定的应用内页面,并传递参数接受参数功能。此处以JS UI开发服务卡片为例,JS卡片支持组件设置ac…...

Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息
1,版本说明 erlang 和 rabbitmq 版本说明 https://www.rabbitmq.com/which-erlang.html 确认需要安装的mq版本以及对应的erlang版本。 2,下载安装文件 RabbitMQ下载地址: https://packagecloud.io/rabbitmq/rabbitmq-server Erlang下载地…...

leetcode:1323. 6 和 9 组成的最大数字(python3解法)
难度:简单 给你一个仅由数字 6 和 9 组成的正整数 num。 你最多只能翻转一位数字,将 6 变成 9,或者把 9 变成 6 。 请返回你可以得到的最大数字。 示例 1: 输入:num 9669 输出:9969 解释: 改变…...

SpringBoot集成Redis Cluster集群(附带Linux部署Redis Cluster高可用集群)
目录 一、前言二、集成配置2.1、POM2.2、添加配置文件application.yml2.3、编写配置文件2.4、编写启动类2.5、编写测试类测试是否连接成功 一、前言 这里会使用到spring-boot-starter-data-redis包,spring boot 2的spring-boot-starter-data-redis中,默…...

LLaVA:visual instruction tuning
对近期一些MLLM(Multimodal Large Language Model)的总结 - 知乎本文将从模型结构,训练方法,训练数据,模型表现四个方面对近期的一些MLLM(Multi-modal Large Language Models)进行总结并探讨这四个方面对模型表现的影响…...

Python实现双目标定、畸变矫正、立体矫正
一,双目标定、畸变矫正、立体矫正的作用 双目目标定: 3D重建和测距:通过双目目标定,您可以确定两个摄像头之间的相对位置和朝向,从而能够根据视差信息计算物体的深度,进行三维重建和测距。姿态估计…...

showdoc 文件上传 (cnvd-2020-26585)
showdoc 文件上传 (cnvd-2020-26585) 描述 ShowDoc是一个非常适合IT团队的在线API文档、技术文档工具。通过showdoc,你可以方便地使用markdown语法来书写出美观的API文档、数据字典文档、技术文档、在线excel文档等等。 api_page存在任意文…...