【API篇】八、Flink窗口函数
文章目录
- 1、增量聚合之ReduceFunction
- 2、增量聚合之AggregateFunction
- 3、全窗口函数full window functions
- 4、增量聚合函数搭配全窗口函数
- 5、会话窗口动态获取间隔值
- 6、触发器和移除器
- 7、补充
//窗口操作
stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)
上一节的窗口分配器,指明了窗口类型,知道了数据属于哪个窗口并收集。而窗口函数,则是定义如何对这些数据做计算操作。
- 增量聚合:
来一条数据,计算一条数据
,窗口触发的时候输出计算结果 - 全窗口函数:
数据来了不计算,存起来
,窗口触发的时候,计算并输出计算结果
1、增量聚合之ReduceFunction
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).keyBy(r -> r.getId())// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(30))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("调用reduce方法,value1=:"+value1 + ",value2=:"+value2);return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc()+value2.getVc());}}).print();env.execute();}
}
运行,输入数据,查看控制台:
2、增量聚合之AggregateFunction
上面使用ReduceFunction的限制是,输入数据的类型、聚合中间状态的类型、输出结果的类型必须一致,AggregateFunction则没有这个限制。AggregateFunction接口有四个方法:
- createAccumulator:创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
- add:将输入的元素添加到累加器中。
- getResult:从累加器中提取聚合的输出结果。
- merge:合并两个累加器,并将合并后的状态作为一个累加器返回
AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果
public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()); //自定义的实现类,String转自定义对象WaterSensorKeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}//value即输入的数据,accumulator即之前的计算结果@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}});aggregate.print();env.execute();}
}
运行,输入数据,查看控制台:
3、全窗口函数full window functions
全窗口函数,即数据来了不计算,存起来,窗口触发的时候,计算并输出计算结果Flink全窗口函数有两种,第一种为apply方法下的:
stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());
传入一个WindowFunction的实现类,该方法已被第二种ProcessWindowFunction全覆盖,因而逐渐弃用。ProcessWindowFunction除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”(Context),通过这个上下文对象,可以获取窗口对象、窗口处理时间、事件时间水位线
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {/*** 全窗口函数计算逻辑,窗口结束时触发才调用一次* s 分组的key* context 上下文对象* elements 窗口内存的所有数据* out 采集器对象*/@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();env.execute();}
}
效果:
4、增量聚合函数搭配全窗口函数
可以看出,增量和全窗口各有好处:
- 增量聚合下,来一条计算一条,只存储中间计算结果,占用空间少
- 全窗口函数则是可以通过上下文对象来实现灵活的功能
像同时拥有两者的优点,可以调用aggregate方法的另一个重载方法:
// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function)// AggregateFunction与WindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,ProcessWindowFunction<V,R,K,W> windowFunction)
此时:
- 基于第一个参数,即增量聚合函数,来处理数据,来一条聚合一条
- 窗口触发后,调用第二个参数的处理逻辑,此时,把增量聚合的结果(只有一条数据)再传递给全窗口函数,也就是说全窗口的Iterable<> elements,长度为1,注意全窗口不再缓存所有数据
- 经过全窗口,执行处理和包装,再输出
public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));//sensorWS.reduce() //也可以传两个SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}}
public class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}}// 全窗口函数的输入类型 = 增量聚合函数的输出类型
public class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{@Overridepublic void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}
}
注意,二者搭配时,根据前面分析,可以知道,必有:增量聚合函数的输出类型 = 全窗口函数的输入类型
5、会话窗口动态获取间隔值
到此,窗口API需要的窗口分配器(见上一篇)和窗口函数都已整理完。上面demo中用的窗口分配器都是滚动窗口,但应该有以下这些:
- 时间滚动窗口
- 时间滑动窗口
- 时间会话窗口
- 计数滚动窗口
- 计数滑动窗口
这里再记录下时间会话窗口+动态获取会话间隔:
public class WindowSessionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(t -> t.getTs() * 1000L));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();env.execute();}
}
来一条数据,根据这条数据获取一个值做为会话间隔,到达这个间隔前,下条数据到来了,则会话间隔又成了另一个值,动态的。运行:
可以看到,会话间隔动态获取,到达间隔时下条数据还没来,则结束本窗户,窗口口结束时触发才调用一次process,和分析的一致。最后补充一点,展开demo代码里的Lambda表达式,其实是一个抓取会话间隔的方法,定义了会话窗口间隔的获取逻辑。
再贴个计数滑动窗口:
6、触发器和移除器
触发器主要是用来控制窗口什么时候触发计算,即什么时候执行窗口函数
//基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)
stream.keyBy(...).window(...).trigger(new MyTrigger())
移除器主要用来定义移除某些数据的逻辑
基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...).window(...).evictor(new MyEvictor())
Flink提供的几个窗口,比如滑动、滚动等,都有对触发器和移除器的默认实现,不用自定义。
7、补充
窗口的划分:
- 窗口开始时间start是窗口长度的整数倍,向下取整
- 窗口结束时间是start+窗口长度
- 窗口是左闭右开,因为属于本窗口的最大时间戳为end-1
- 窗口的生命周期,创建是属于本窗口的第一条数据来的时候,现new的,放入一个singleton单例的集合中
- 窗口的销毁是时间的进展 >= 窗口的最大时间戳(end-1ms) + 允许迟到的时间(默认0)
- 窗口什么时候触发输出:当时间进展 >= 窗口的最大时间戳(end -1ms)
相关文章:

【API篇】八、Flink窗口函数
文章目录 1、增量聚合之ReduceFunction2、增量聚合之AggregateFunction3、全窗口函数full window functions4、增量聚合函数搭配全窗口函数5、会话窗口动态获取间隔值6、触发器和移除器7、补充 //窗口操作 stream.keyBy(<key selector>).window(<window assigner>)…...

React JSX常用语法总结
React JSX语法 什么是React JSX JSX(javascript xml) 就是JavaScript和XML结合的一种格式,是JavaScript的语法扩展,只要把HTML代码写在JS中,就为JSX。用于动态构建用户界面的Javascript库,发送请求获取数据…...

DVWA-Cross Site Request Forgery (CSRF)
大部分网站都会要求用户登录后,使用相应的权限在网页中进行操作,比如发邮件、购物或者转账等都是基于特定用户权限的操作。浏览器会短期或长期地记住用户的登录信息,但是,如果这个登录信息被恶意利用呢?就有可能发生CSRF CSRF的英文全称为Cross Site Request Forgery,中文…...

浅谈安科瑞可编程电测仪表在老挝某项目的应用
摘要:本文介绍了安科瑞多功能电能表在老挝某项目的应用。AMC系列交流多功能仪表是一款专门为电力系统、工矿企业、公用事业和智能建筑用于电力监控而设计的智能电表。 Abstract:This article introduces the application of the multi-function energy …...
Java项目源码合集
以下只是源码合集的一部分,源码均已本地正常调试运行,如需请与我联系。 序号项目名称演示地址1springbootvue药店销售管理系统https://pan.baidu.com/s/1n-Vk5Pr5z7s3IcN3WsCkdg?pwdve6z 2基于ssm协同过滤技术的旅游景点购票系统https://pan.baidu.com…...

Python学习笔记--生成器
四、生成器 1、为什么需要生成器 通过上面的学习,可以知道列表生成式,我们可以直接创建一个列表。 但是,受到内存限制,列表容量肯定是有限的。而且,创建一个包含 1000 万个元素的列表,不仅占用很大的存储…...

【Python学习】—Python基础语法(五)
【Python学习】—Python基础语法(五) 一、循环的嵌套使用 二、九九乘法表 #外层循环表示行数 for i in range(1,10):#内层循环表示每一行的数据for j in range(1,i1):#输出每一行的内容print(f"{i} * {j} {i * j} \t",end"") #外层…...
【js】JavaScript清除所有(多个)定时器的方法:
文章目录 一、停止单个定时器二、暂停与恢复定时器三、使用Promise来管理定时器四、使用ES6特性管理定时器五、案例(定时获取页面列表数据) 一、停止单个定时器 #在某些情况下,我们可能只需要停止单个定时器。 #在JavaScript中,我…...

java实现周易64卦并返回对应的卦象(含百度百科链接)
《易经》是中华民族传统思想文化中自然哲学与人文实践的理论根源,是古代汉民族思想、智慧的结晶,被誉为“大道之源”,是古代帝王之学,政治家、军事家、商家的必修之术。 《易经》含盖万有,纲纪群伦,是中华…...
# 算法与程序的灵魂
文章目录 前言算法与程序的关系例子1:冒泡排序例子2:斐波那契数列算法优化与进阶总结 前言 大家好我是艾老虎尤,算法与程序是计算机科学中两个非常重要的概念。算法是解决问题的方法和步骤,而程序是算法的具体实现。在计算机科学…...

2023-10-21 美团2024秋招后端开发岗笔试题
1 考察dfs和拓扑排序 1.1 题目描述(如果拓扑排序不清楚可以去做一下lc 207. 课程表) 1.2 答案 import java.util.*;public class Meituan {static int m,n;public static void main(String[] args) {Scanner in new Scanner(System.in);m in.nextInt…...
汽车托运是怎样收费
汽车托运是如何收费的呢?一般来说,汽车托运的费用是会随着每公里来增加,目前的托运的每公里费用在1.2-1.8元之间,托运的距离越远那么它的托运单价费用就会越低,如果你运气好找到一家在搞活动的汽车托运公司,那么你就算…...

使用docker-compose私有化部署 GitLab
在软件开发和协作过程中,版本控制是至关重要的一环。GitLab 是一个功能强大的开源平台,提供了完整的代码管理功能,包括版本控制、问题跟踪以及持续集成等。这使得团队能够更高效地协作开发。前段时间翻阅笔记时,偶然发现了之前公司…...

Vue项目引入百度统计的正确操作步骤,亲测有效!
1、平台获取统计代码 2、在head和body中分别添加以下代码 head: <script>var _hmt _hmt || [];</script>body: <script>var _hmt _hmt || [];(function () {var hm document.createElement("script");hm.src "https://hm.baidu.com/hm.js…...
Keras中model.evaluate() 返回的是 loss value 和 metrics values
Keras官方文档: https://keras.io/models/model/#evaluate Keras中model.evaluate() 返回的是 损失值和训练时选定的指标值(例如,[AUC, , accuracy])。 训练时选定的指标值是指model.compile()里面metrics后面的值,ev…...

CSRF跨域请求伪造
1.SSRF服务端请求伪造(外网访问内网) SSRF(Server-Side Request Forgery:服务器端请求伪造) 是一种由攻击者构造形成由服务端发起请求的一个安全漏洞。一般情况下,SSRF是要目标网站的内部系统。(因为他是从内部系统访问的…...

LeetCode 1465. 切割后面积最大的蛋糕:纵横分别处理
【LetMeFly】1465.切割后面积最大的蛋糕:纵横分别处理 力扣题目链接:https://leetcode.cn/problems/maximum-area-of-a-piece-of-cake-after-horizontal-and-vertical-cuts/ 矩形蛋糕的高度为 h 且宽度为 w,给你两个整数数组 horizontalCut…...

YTM32的增强型定时器eTMR外设模块详解
文章目录 eTMR外设简介eTMR工作机制系统框图引脚与信号计数器与时钟源输出比较模式PWM模式通道配对通道对的互补输出(Complementary Mode)双缓冲输出PWM(Double Switch)错误检测机制(Fault Detection) 输入…...
40.查找练习题(王道2023数据结构第7章)
试题1(王道7.2.4节综合练习5): 写出折半查找的递归算法。 #include<stdio.h> #include<stdlib.h> #include<string.h>#define MAXSIZE 10 #define ElemType int #define Status inttypedef struct{int data[MAXSIZE]; /…...
Segmentation fault 的bug解决
一,Segmentation fault 的bug解决 问题描述:自己在使用CPU上调试完代码之后,可以稳定运行,有输出结果。 但是把数据和模型加载上GPU之后,出现了报错。 Segmentation fault (core dumped) 搜了一下可能存在的原因&…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...

k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...

嵌入式学习笔记DAY33(网络编程——TCP)
一、网络架构 C/S (client/server 客户端/服务器):由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序,负责提供用户界面和交互逻辑 ,接收用户输入,向服务器发送请求,并展示服务…...

【从零学习JVM|第三篇】类的生命周期(高频面试题)
前言: 在Java编程中,类的生命周期是指类从被加载到内存中开始,到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期,让读者对此有深刻印象。 目录 …...

如何更改默认 Crontab 编辑器 ?
在 Linux 领域中,crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用,用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益,允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...

并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
Leetcode33( 搜索旋转排序数组)
题目表述 整数数组 nums 按升序排列,数组中的值 互不相同 。 在传递给函数之前,nums 在预先未知的某个下标 k(0 < k < nums.length)上进行了 旋转,使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...
js 设置3秒后执行
如何在JavaScript中延迟3秒执行操作 在JavaScript中,要设置一个操作在指定延迟后(例如3秒)执行,可以使用 setTimeout 函数。setTimeout 是JavaScript的核心计时器方法,它接受两个参数: 要执行的函数&…...

【版本控制】GitHub Desktop 入门教程与开源协作全流程解析
目录 0 引言1 GitHub Desktop 入门教程1.1 安装与基础配置1.2 核心功能使用指南仓库管理日常开发流程分支管理 2 GitHub 开源协作流程详解2.1 Fork & Pull Request 模型2.2 完整协作流程步骤步骤 1: Fork(创建个人副本)步骤 2: Clone(克隆…...