【Flink】窗口(Window)
窗口理解
窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。
对窗口的正确理解:
我们将窗口理解为一个一个的水桶,数据流(stream)就像水流,每个数据都会分发到对应的桶中,当达到结束时间时,对每个桶中收集的数据进行计算处理

注:
Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口
窗口的分类
按照驱动类型分
时间窗口(Time Window)
以时间来定义窗口的开始和结束,获取某一段时间内的数据(类比于我们的定时发车)
计数窗口(Count Window)
计数窗口是基于元素的个数来获取窗口,达到固定个数时就计算并关闭窗口。(类比于我们的人齐才发车)
按照窗口分配数据的规则分类
滚动窗口(Tumbling Window)
窗口之间没有重叠,也不会有间隔的首尾相撞状态,这样,每个数据都会被分到一个窗口,而且只会属于一个窗口。
滚动窗口的应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。

DataStream<T> input = ...;// 滚动 event-time 窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滚动 processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);
滑动窗口(Sliding Windows)
滑动窗口大小也是固定的,但是窗口之间并不是首尾相接的,而是重叠的。

DataStream<T> input = ...;// 滑动 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口,偏移量为 -8 小时
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);
会话窗口(Session Windows)
会话窗口,是基于“会话”(session)来对数据进行分组的,会话窗口只能基于时间来定义。

DataStream<T> input = ...;// 设置了固定间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);
全局窗口
这种窗口对全局有效,会把相同的key的所有数据分配到同一个窗口中,这种窗口没有结束时间,默认不会触发计算,如果希望对数据进行处理,需要自定义“触发器”。

DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);
计数窗口
计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法
滚动计数窗口
滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。
stream.keyBy(...).countWindow(10)
滑动计数窗口
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。
stream.keyBy(...).countWindow(10,3)
窗口函数(Window Functions)
定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了
窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。
ReduceFunction
ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。
DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {//v1 和v2是 2个相同类型的输入参数public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});
AggregateFunction
ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。
/*** The accumulator is used to keep a running sum and a count. The {@code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());
接口中有四个方法:
- createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
- add():将输入的元素添加到累加器中。
- getResult():从累加器中提取聚合的输出结果。
- merge():合并两个累加器,并将合并后的状态作为一个累加器返回。
可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
ProcessWindowFunction
ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) {// 上下文可以拿到window对象,还有其他东西:侧输出流 等等long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);}}).print();env.execute();}
}
增量聚合和全窗口函数的结合使用
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。
// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function)// AggregateFunction与WindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,ProcessWindowFunction<V,R,K,W> windowFunction)
相关文章:
【Flink】窗口(Window)
窗口理解 窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。 对窗口的正确理解ÿ…...
读像火箭科学家一样思考笔记03_第一性原理(上)
1. 思维的两种障碍 1.1. 为什么知识会成为一种缺陷而非一种美德 1.1.1. 知识是一种美德 1.1.2. 知识同样的特质也会把它变成一种缺点 1.1.3. 知识确实是个好东西,但知识的作用应该是给人们提供信息,而不是起约束作用 1.1.4. 知识应该启发智慧&#…...
npm私有云
安装node时npm会自动安装,npm也可以单独安装。 package.json 在使用npm时,package.json文件是非常重要的,因为它包含了关于项目的必要信息,比如名称、版本、依赖项等。在初始化新项目时,通常会使用npm init命令生成一…...
莹莹API管理系统源码附带两套模板
这是一个API后台管理系统的源码,可以自定义添加接口,并自带两个模板。 环境要求 PHP版本要求高于5.6且低于8.0,已测试通过的版本为7.4。 需要安装PHPSG11加密扩展。 已测试:宝塔/主机亲测成功搭建! 安装说明 &am…...
【Kingbase FlySync】命令模式:安装部署同步软件,实现KES到KES实现同步
【Kingbase FlySync】命令模式:安装部署同步软件,实现KES到KES实现同步迁移 概述准备环境目标资源1.测试虚拟机下载地址包含node1,node22.同步工具下载地址3.临时授权下载地址4.ruby工具下载地址5.EXAMv0.11.sql下载地址 实操:同步软件安装部署1.node1准…...
python使用selenium webDriver时 报错
可能原因和解决: 1. python 解释器 ----> 设置 2. 浏览器版本 与 浏览器驱动版本不一致 ----> 安装同一版本的 (下载chromedriver | 谷歌驱动更高版本的测试版) 参考:Python使用Selenium WebDriver的入门介绍及安装教程-CSDN博客 Selenium安…...
【ROS2机器人入门到实战】
ROS2机器人入门到实战教程(鱼香ROS) 写在前面 当前平台文章汇总地址:ROS2机器人从入门到实战获取完整教程及配套资料代码,请关注公众号<鱼香ROS>获取教程配套机器人开发平台:两驱版| 四驱版为方便交流,搭建了机器人技术问…...
Nuxt3框架局部文件引用外部JS/CSS文件的相关配置方法
引入外部JS: <script setup>useHead({script: [ {type: "text/javascript",src: https://cdnjs.cloudflare.com/ajax/libs/jquery/3.7.0/jquery.min.js}]}) </script>useHead只能与组件的setup和生命周期钩子一起使用 如果需要将js放置body区…...
Docker 可视化面板 ——Portainer
Portainer 是一个非常好用的 Docker 可视化面板,可以让你轻松地管理你的 Docker 容器。 官网:Portainer: Container Management Software for Kubernetes and Docker 【Docker系列】超级好用的Docker可视化工具——Portainer_哔哩哔哩_bilibili 环境 …...
Java 教育局民办教育信息服务与监管平台
1) 项目背景 按照《中华人民共和国民办教育促进法》和《中华人民共和国政府信息公开条例》的相关规定,为满足学生和家长、社会各界获取权威信息的需求,着力解决服务老百姓最后一公里问题,达到宣传民办教育和引导家长择校的效果࿰…...
小迪笔记(1)——操作系统文件下载反弹SHELL防火墙绕过
名词解释 POC:验证漏洞存在的代码; EXP:利用漏洞的代码; payload:漏洞利用载荷, shellcode:漏洞代码, webshell:特指网站后门; 木马:强调控制…...
Pytorch D2L Subplots方法对画图、图片处理
问题代码 def show_images(imgs, num_rows, num_cols, titlesNone, scale1.5): #save """绘制图像列表""" figsize (num_cols * scale, num_rows * scale) _, axes d2l.plt.subplots(num_rows, num_cols, figsizefigsize) axes axes.flatten…...
MATLAB算法实战应用案例精讲-【目标检测】YOLOV5(补充篇)
目录 算法原理 YOLOv5数据集训练 软硬件背景: 数据集准备 配置文件 模型训练...
WPF中可视化树和逻辑树的区别是什么
在WPF中,用户界面元素被组织成树形结构。这种结构主要分为两种:逻辑树(Logical Tree)和可视化树(Visual Tree)。它们在设计上各有特点和用途。 逻辑树(Logical Tree) 逻辑树是WPF中…...
小迪安全笔记(2)——web应用架构搭建漏洞HTTP数据包代理服务器
Web应用环境架构类 开发语言:php、java、python、ASP、ASPX等程序源码:用的人多了,就成CMS了。中间件容器:IIS、Apache、Nginx、Tomcat、Weblogic、Jboos、glasshfish等数据库类型:Access、Mysql、Mssql、Oracle、Redi…...
[AI]ChatGPT4 与 ChatGPT3.5 区别有多大
ChatGPT 3.5 注册已经不需要手机了,直接邮箱认证就可以,这可真算是好消息,坏消息是 ChatGPT 4 还是要收费。 那么 GPT-3.5 与 GPT-4 区别有多大呢,下面简单测试一下。 以从 TDengine 订阅数据为例,TDengine 算是不太小…...
node实战——koa实现文件上传
文章目录 ⭐前言⭐koa实现文件上传⭐foxapi测试⭐总结⭐结束⭐前言 大家好,我是yma16,本文分享关于node实战——node实战——koa实现文件上传。 本文适用对象:前端初学者转node方向,在校大学生,即将毕业的同学,计算机爱好者。 node系列往期文章 node_windows环境变量配置…...
C++中的this指针
C中的this指针 this 实际上是成员函数的一个形参,在调用成员函数时将对象的地址作为实参传递给 this。不过 this 这个形参是隐式的,它并不出现在代码中,而是在编译阶段由编译器默默地将它添加到参数列表中。 this指针是类的指针,…...
分析日志的一般套路
日志文件很多怎么快速查看? 整机日志一般会有统一的文件名命名规则(如包含时间点),可以根据问题现象时间点大致定位到相应的文件根据日志文件的修改时间属性,定位到相应的文件根据时间点全文件夹搜索内容,…...
使用Flink处理Kafka中的数据_题库子任务_Java语言实现
2024年职业院校技术大赛-高职大数据应用开发赛项专题。 使用Flink处理Kafka中的数据_题库子任务1、2、3_Java语言实现使用Flink处理Kafka中的数据_题库子任务4、5、6_Java语言实现使用Flink处理Kafka中的数据_题库子任务7、8、9_Java语言实现...
RNA Clean-Up and Concentration Kits:适用于小RNA测序的RNA纯化与浓缩方案
在分子生物学研究中,RNA的纯度与浓度直接影响下游实验的成败。无论是从TRIzol等酚类试剂中提取的RNA,还是经过体外转录、DNase处理、标记反应等酶促步骤的样本,均可能残留影响后续实验的杂质。由艾美捷代理的Norgen Biotek推出的RNA Clean-Up…...
OpenClaw环境隔离方案:Phi-3-vision-128k-instruct多模态任务专用沙箱配置
OpenClaw环境隔离方案:Phi-3-vision-128k-instruct多模态任务专用沙箱配置 1. 为什么需要环境隔离? 去年我在尝试用OpenClaw处理一批包含敏感客户数据的PDF文件时,曾因为一个错误的鼠标操作指令导致系统临时文件被意外删除。那次经历让我意…...
OpenClaw与Qwen3-14b_int4_awq联动:低成本实现个人自动化办公
OpenClaw与Qwen3-14b_int4_awq联动:低成本实现个人自动化办公 1. 为什么选择OpenClawQwen3-14b_int4_awq组合 去年夏天,当我第一次尝试用AI自动化处理周报时,发现商业API的token消耗速度远超预期——生成5份周报就花掉了近50元。这促使我开…...
OpenClaw开发环境配置:千问3.5-9B辅助的IDE插件管理
OpenClaw开发环境配置:千问3.5-9B辅助的IDE插件管理 1. 为什么需要AI辅助的IDE管理 作为一个长期在多个项目间切换的全栈开发者,我深受开发环境配置问题的困扰。每次换新电脑或者重装系统,光是配置VSCode插件和项目依赖就要耗费大半天时间。…...
Vodafone K4606 USB调制解调器Linux内核驱动适配
1. Vodafone USB Modem 驱动适配技术解析:K4606 型号的底层支持实现 1.1 项目背景与工程定位 VodafoneUSBModem 是一个面向嵌入式 Linux 系统的 USB 串行通信驱动增强项目,其核心目标并非开发全新协议栈,而是对上游 Linux 内核中已有的 op…...
04_RAGFlow之知识图谱与Text2SQL
RAGFlow之知识图谱与Text2SQL:构建智能检索的双引擎 知识体系结构 RAGFlow技术栈 │ ├── 知识图谱层 │ ├── 实体识别与关系提取(NER Relation Extraction) │ ├── 图谱查询与推理(Graph Query & Reasoning&a…...
云端开发新选择:星图OpenClaw镜像+千问3.5-9B联调
云端开发新选择:星图OpenClaw镜像千问3.5-9B联调 1. 为什么选择云端联调方案? 去年尝试在MacBook Pro上本地部署OpenClaw时,风扇狂转的噪音让我意识到一个问题:个人设备跑大模型自动化框架的组合实在太吃资源。当时为了调试一个…...
用Python+ddddocr搞定某税网滑块验证码,再拆解SM2/SM4/HMacSHA256加密全流程
Python实战:国密算法与滑块验证的自动化登录全解析 当开发者遇到集成了滑块验证和国密加密的复杂登录系统时,传统爬虫手段往往束手无策。本文将完整演示如何用Python构建一个从滑块识别到加密处理的自动化登录系统,重点解决SM2/SM4加密和HMac…...
LangChain-AI应用开发框架(四)
目录 一.LangChain软件包安装 二.LangChain能力详解 1.本章节环境说明 2.目标与内容 三.详细过程 1.步骤1: a.申请API key并配置环境变量 b.配置环境变量 步骤2:定义大模型 a.安装OpenAI包 b.定义大模型 步骤3:定义消息列表 步骤4ÿ…...
如何用Dify API和GPT-4o高效识别图片?附避坑指南
如何用Dify API和GPT-4o高效识别图片?附避坑指南 在当今数字化时代,图片识别技术已成为众多应用场景中的核心需求。从电商平台的商品自动分类到社交媒体内容审核,再到医疗影像分析,高效准确的图片识别能力正变得越来越重要。Dify作…...
