【Flink】窗口(Window)
窗口理解
窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。
对窗口的正确理解:
我们将窗口理解为一个一个的水桶,数据流(stream)就像水流,每个数据都会分发到对应的桶中,当达到结束时间时,对每个桶中收集的数据进行计算处理

注:
Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口
窗口的分类
按照驱动类型分
时间窗口(Time Window)
以时间来定义窗口的开始和结束,获取某一段时间内的数据(类比于我们的定时发车)
计数窗口(Count Window)
计数窗口是基于元素的个数来获取窗口,达到固定个数时就计算并关闭窗口。(类比于我们的人齐才发车)
按照窗口分配数据的规则分类
滚动窗口(Tumbling Window)
窗口之间没有重叠,也不会有间隔的首尾相撞状态,这样,每个数据都会被分到一个窗口,而且只会属于一个窗口。
滚动窗口的应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。

DataStream<T> input = ...;// 滚动 event-time 窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滚动 processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);
滑动窗口(Sliding Windows)
滑动窗口大小也是固定的,但是窗口之间并不是首尾相接的,而是重叠的。

DataStream<T> input = ...;// 滑动 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口,偏移量为 -8 小时
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);
会话窗口(Session Windows)
会话窗口,是基于“会话”(session)来对数据进行分组的,会话窗口只能基于时间来定义。

DataStream<T> input = ...;// 设置了固定间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);
全局窗口
这种窗口对全局有效,会把相同的key的所有数据分配到同一个窗口中,这种窗口没有结束时间,默认不会触发计算,如果希望对数据进行处理,需要自定义“触发器”。

DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);
计数窗口
计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法
滚动计数窗口
滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。
stream.keyBy(...).countWindow(10)
滑动计数窗口
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。
stream.keyBy(...).countWindow(10,3)
窗口函数(Window Functions)
定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了
窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。
ReduceFunction
ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {//v1 和v2是 2个相同类型的输入参数public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});
AggregateFunction
ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。
/*** The accumulator is used to keep a running sum and a count. The {@code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());
接口中有四个方法:
- createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
- add():将输入的元素添加到累加器中。
- getResult():从累加器中提取聚合的输出结果。
- merge():合并两个累加器,并将合并后的状态作为一个累加器返回。
可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
ProcessWindowFunction
ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) {// 上下文可以拿到window对象,还有其他东西:侧输出流 等等long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);}}).print();env.execute();}
}
增量聚合和全窗口函数的结合使用
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。
// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function)// AggregateFunction与WindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,ProcessWindowFunction<V,R,K,W> windowFunction)
相关文章:
【Flink】窗口(Window)
窗口理解 窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。 对窗口的正确理解ÿ…...
读像火箭科学家一样思考笔记03_第一性原理(上)
1. 思维的两种障碍 1.1. 为什么知识会成为一种缺陷而非一种美德 1.1.1. 知识是一种美德 1.1.2. 知识同样的特质也会把它变成一种缺点 1.1.3. 知识确实是个好东西,但知识的作用应该是给人们提供信息,而不是起约束作用 1.1.4. 知识应该启发智慧&#…...
npm私有云
安装node时npm会自动安装,npm也可以单独安装。 package.json 在使用npm时,package.json文件是非常重要的,因为它包含了关于项目的必要信息,比如名称、版本、依赖项等。在初始化新项目时,通常会使用npm init命令生成一…...
莹莹API管理系统源码附带两套模板
这是一个API后台管理系统的源码,可以自定义添加接口,并自带两个模板。 环境要求 PHP版本要求高于5.6且低于8.0,已测试通过的版本为7.4。 需要安装PHPSG11加密扩展。 已测试:宝塔/主机亲测成功搭建! 安装说明 &am…...
【Kingbase FlySync】命令模式:安装部署同步软件,实现KES到KES实现同步
【Kingbase FlySync】命令模式:安装部署同步软件,实现KES到KES实现同步迁移 概述准备环境目标资源1.测试虚拟机下载地址包含node1,node22.同步工具下载地址3.临时授权下载地址4.ruby工具下载地址5.EXAMv0.11.sql下载地址 实操:同步软件安装部署1.node1准…...
python使用selenium webDriver时 报错
可能原因和解决: 1. python 解释器 ----> 设置 2. 浏览器版本 与 浏览器驱动版本不一致 ----> 安装同一版本的 (下载chromedriver | 谷歌驱动更高版本的测试版) 参考:Python使用Selenium WebDriver的入门介绍及安装教程-CSDN博客 Selenium安…...
【ROS2机器人入门到实战】
ROS2机器人入门到实战教程(鱼香ROS) 写在前面 当前平台文章汇总地址:ROS2机器人从入门到实战获取完整教程及配套资料代码,请关注公众号<鱼香ROS>获取教程配套机器人开发平台:两驱版| 四驱版为方便交流,搭建了机器人技术问…...
Nuxt3框架局部文件引用外部JS/CSS文件的相关配置方法
引入外部JS: <script setup>useHead({script: [ {type: "text/javascript",src: https://cdnjs.cloudflare.com/ajax/libs/jquery/3.7.0/jquery.min.js}]}) </script>useHead只能与组件的setup和生命周期钩子一起使用 如果需要将js放置body区…...
Docker 可视化面板 ——Portainer
Portainer 是一个非常好用的 Docker 可视化面板,可以让你轻松地管理你的 Docker 容器。 官网:Portainer: Container Management Software for Kubernetes and Docker 【Docker系列】超级好用的Docker可视化工具——Portainer_哔哩哔哩_bilibili 环境 …...
Java 教育局民办教育信息服务与监管平台
1) 项目背景 按照《中华人民共和国民办教育促进法》和《中华人民共和国政府信息公开条例》的相关规定,为满足学生和家长、社会各界获取权威信息的需求,着力解决服务老百姓最后一公里问题,达到宣传民办教育和引导家长择校的效果࿰…...
小迪笔记(1)——操作系统文件下载反弹SHELL防火墙绕过
名词解释 POC:验证漏洞存在的代码; EXP:利用漏洞的代码; payload:漏洞利用载荷, shellcode:漏洞代码, webshell:特指网站后门; 木马:强调控制…...
Pytorch D2L Subplots方法对画图、图片处理
问题代码 def show_images(imgs, num_rows, num_cols, titlesNone, scale1.5): #save """绘制图像列表""" figsize (num_cols * scale, num_rows * scale) _, axes d2l.plt.subplots(num_rows, num_cols, figsizefigsize) axes axes.flatten…...
MATLAB算法实战应用案例精讲-【目标检测】YOLOV5(补充篇)
目录 算法原理 YOLOv5数据集训练 软硬件背景: 数据集准备 配置文件 模型训练...
WPF中可视化树和逻辑树的区别是什么
在WPF中,用户界面元素被组织成树形结构。这种结构主要分为两种:逻辑树(Logical Tree)和可视化树(Visual Tree)。它们在设计上各有特点和用途。 逻辑树(Logical Tree) 逻辑树是WPF中…...
小迪安全笔记(2)——web应用架构搭建漏洞HTTP数据包代理服务器
Web应用环境架构类 开发语言:php、java、python、ASP、ASPX等程序源码:用的人多了,就成CMS了。中间件容器:IIS、Apache、Nginx、Tomcat、Weblogic、Jboos、glasshfish等数据库类型:Access、Mysql、Mssql、Oracle、Redi…...
[AI]ChatGPT4 与 ChatGPT3.5 区别有多大
ChatGPT 3.5 注册已经不需要手机了,直接邮箱认证就可以,这可真算是好消息,坏消息是 ChatGPT 4 还是要收费。 那么 GPT-3.5 与 GPT-4 区别有多大呢,下面简单测试一下。 以从 TDengine 订阅数据为例,TDengine 算是不太小…...
node实战——koa实现文件上传
文章目录 ⭐前言⭐koa实现文件上传⭐foxapi测试⭐总结⭐结束⭐前言 大家好,我是yma16,本文分享关于node实战——node实战——koa实现文件上传。 本文适用对象:前端初学者转node方向,在校大学生,即将毕业的同学,计算机爱好者。 node系列往期文章 node_windows环境变量配置…...
C++中的this指针
C中的this指针 this 实际上是成员函数的一个形参,在调用成员函数时将对象的地址作为实参传递给 this。不过 this 这个形参是隐式的,它并不出现在代码中,而是在编译阶段由编译器默默地将它添加到参数列表中。 this指针是类的指针,…...
分析日志的一般套路
日志文件很多怎么快速查看? 整机日志一般会有统一的文件名命名规则(如包含时间点),可以根据问题现象时间点大致定位到相应的文件根据日志文件的修改时间属性,定位到相应的文件根据时间点全文件夹搜索内容,…...
使用Flink处理Kafka中的数据_题库子任务_Java语言实现
2024年职业院校技术大赛-高职大数据应用开发赛项专题。 使用Flink处理Kafka中的数据_题库子任务1、2、3_Java语言实现使用Flink处理Kafka中的数据_题库子任务4、5、6_Java语言实现使用Flink处理Kafka中的数据_题库子任务7、8、9_Java语言实现...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...
Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...
ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障
关键领域软件测试的"安全密码":Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力,从金融交易到交通管控,这些关乎国计民生的关键领域…...
消息队列系统设计与实践全解析
文章目录 🚀 消息队列系统设计与实践全解析🔍 一、消息队列选型1.1 业务场景匹配矩阵1.2 吞吐量/延迟/可靠性权衡💡 权衡决策框架 1.3 运维复杂度评估🔧 运维成本降低策略 🏗️ 二、典型架构设计2.1 分布式事务最终一致…...
