大数据-玩转数据-Flink窗口
一、Flink 窗口 理解
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.
时间窗口
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp()),时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。
二、数据准备
准备一个WaterSensor类方便演示
package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}
三、时间滚动窗口
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定,2.我们传递给window函数的对象叫窗口分配器.
时间滚动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket

消费结果:

四、时间滑动窗口
与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。
时间滑动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
执行结果
在hadoop100 服务器
输入nc -lk 999 启动socket

消费结果

五、时间会话窗口
会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。
时间会话窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket

消费结果

因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction 。
六、基于元素个数的滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.
基于元素个数的滚动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).countWindow(2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);out.collect("窗口:" + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果


七、基于元素个数的滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .countWindow(2).countWindow(3,2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);out.collect("窗口:" + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果


相关文章:
大数据-玩转数据-Flink窗口
一、Flink 窗口 理解 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击…...
【python爬虫】—豆瓣电影Top250
豆瓣电影Top250 豆瓣榜单简介需求描述Python实现 豆瓣榜单简介 豆瓣电影 Top 250 榜单是豆瓣网站上列出的评分最高、受观众喜爱的电影作品。这个榜单包含了一系列优秀的影片,涵盖了各种类型、不同国家和时期的电影。 需求描述 使用python爬取top250电影ÿ…...
【跟小嘉学 Rust 编程】十五、智能指针
系列文章目录 【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据 【跟小嘉学…...
Python爬虫基础之正则表达式
目录 一、什么是正则表达式? 二、re.compile()编译函数 三、group()获取匹配结果函数 四、常用匹配规则 4.1匹配单个字符 4.2匹配前字符次数 4.3匹配原生字符串 4.4匹配字符串开头和结尾 4.5分组匹配 五、re.match()开头匹配函数 六、re.search()全文搜索…...
【LeetCode】双指针妙解有效三角形的个数
Problem: 611. 有效三角形的个数 文章目录 题目分析讲解算法原理复杂度Code 题目分析 首先我们来分析一下本题的思路 看到题目中给出的示例 题目的意思很简单,就是将给到的数字去做一个组合,然后看看这三条边是否可以构成三角形。那判断的方法不用我说&a…...
mysql 计算两点之间距离
先说一下我们可能会用到的一些场景,这样同学们可以先评估,该篇文章是否对你有帮助! 场景: 假设 美团,我点外卖时,系统会让我先进行定位,比如我定位在了 A 点,系统就会给我推荐&…...
c语言自定义头文件是什么情况下使用?一般在什么情况下引用自定义的头文件?一般在自定义头文件中写什么代码?
c语言自定义头文件是什么情况下使用?一般在什么情况下引用自定义的头文件?一般在自定义头文件中写什么代码? C语言自定义头文件是一种用来封装函数和变量声明的文件,它通常用于将一组相关的函数和变量的声明集中在一个地方&#…...
electron应用打包成功纪念一下
electron应用打包成功纪念一下,以前曾经行过后来打包各种报错,现在有空就尝试解决一下 首先安装nvm能够方便切换node版本 curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.34.0/install.sh | bash 顺利安装后你用nvm list查看node列表时会…...
远程办公中安全远程访问解决方案
什么是安全远程访问 安全的远程访问是一个至关重要的过程,可让您使用互联网从远处完全控制某人的设备。为了确保安全,为受保护的远程访问采取了额外的身份验证和加密措施。 为什么安全远程访问解决方案很重要 当 IT 技术人员从远处帮助人们解决计算机…...
StartUp启动框架-Android启动性能
简述 当谈论Android应用程序的启动性能时,StartUp启动框架是一个不可忽视的关键工具。它旨在优化应用程序的启动过程,确保用户在打开应用时能够迅速获得流畅、高效的体验。让我们来深入了解StartUp框架的作用和重要性,以及它是如何改善Andro…...
Positive Technologies:五分之四的网络攻击具有针对性
Positive Technologies 对 2023 年第二季度的相关网络威胁进行了分析。报告显示,自今年年初以来,有针对性的攻击数量增加了 10%,目前占 78%。专家们注意到利用漏洞的大规模攻击和大量用户个人数据的泄露。此外,在此期间࿰…...
clickhouse的另类表引擎
clickhouse常用的MergeTree引擎外,还有特殊的引擎 1,memory引擎,顾名思义,数据是存储在内存中,数据不会被压缩也不会倍格式化转换数据在内存中保存的形态与查询时看到的如出一辙,重启ck数据丢失 2ÿ…...
Uniapp新版本打包后覆盖安装,新增的页面无法跳转,需退出重新启动才可以打开的解决方案
最近写uniapp项目,发现一个坑,在新版本覆盖安装后直接打开APP,新增的页面竟然无法跳转,需要重新启动才可以正常打开,在网上查了很多方法,最终总结下来有以下几点: 1.看打的是debug包还是releas…...
系统架构设计高级技能 · 面向服务架构设计理论与实践
点击进入系列文章目录 系统架构设计高级技能 面向服务架构设计理论与实践 一、SOA的相关概念1.1SOA的定义1.2 业务流程与业务流程执行语言 二、SOA的发展史三、SOA与微服务的区别三、SOA的参考架构四、SOA的主要协议规范五、SOA的设计标准要求六、SOA的作用与设计原则七、SOA的…...
QT注册界面练习(信号与槽实现页面跳转)
一、注册界面练习思路以及具体代码 在完成注册页面搭建的前提下,通过信号与槽机制实现多组件之间的相互通信,实现页面跳转。 基本步骤: 首先,将注册页面的登录按钮与成功登陆信号绑定,当用户名与密码均匹配时…...
MySQL从入门到精通【进阶篇】之 主从复制详解
文章目录 0.前言1. 主从复制简介2. 主从复制的工作流程主从复制过程中的日志文件作用(Binary Log)和中继日志(Relay Log) 3. MySQL主从复制的配置4. 参考资料 0.前言 MySQL的主从复制和读写分离是数据库领域的基本概念࿰…...
vue使用qrcodejs2生成二维码
目录 概要 构建展示的vue组件qrcode.vue 组件的使用 概要 项目中用到需要展示二维码的样式,想到了qrcode 例如: 前提:安装包 npm install qrcodejs2 --save 构建展示的vue组件qrcode.vue <template><div style"width: …...
python注释
任何编程语言都少不了注释,Python也不例外,以下是Python注释的具体用法: 单行注释 Python编程语言的单行注释常以#开头,单行注释可以作为单独的一行放在被注释代码行之上,也可以放在语句或者表达式之后。 实例&…...
update-alternatives详解
1.功能作用 update-alternatives是dpkg的实用工具,用来维护系统命令的符号链接,以决定系统默认使用什么命令。 在Debian系统中,我们可能会同时安装有很多功能类似的程序和可选配置,如Web浏览器程序(firefox,konquero…...
JavaScript 编写更好的条件语句
在任何编程语言中,代码需要根据不同的条件在给定的输入中做不同的决定和执行相应的动作。 例如,在一个游戏中,如果玩家生命点为0,游戏结束。在天气应用中,如果在早上被查看,显示一个日出图片,如…...
3分钟快速上手:Buzz音频转录软件完整使用指南
3分钟快速上手:Buzz音频转录软件完整使用指南 【免费下载链接】buzz Buzz transcribes and translates audio offline on your personal computer. Powered by OpenAIs Whisper. 项目地址: https://gitcode.com/GitHub_Trending/buz/buzz 还在为音频转录烦恼…...
Solidity 知识点速记整理 - (2026年) (75 - 94)
文章目录前言Solidity 知识点速记整理 - (2026年) (75 - 94)前言 如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。 而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那…...
收藏!程序员转AI工程师的3条死路+3条真路(内含2026年最新就业方向)
本文揭示了2026年程序员转AI工程师的3条死路和3条真路。死路包括从零学ML训练想做研究员、靠Prompt工程当主修、装AI App做评测自媒体,这些路径因入门方向被误导而难以成功。真路则包括用现有领域跳板转AI应用工程、AI Infra/MLOps方向、AI Agent工程师方向…...
从暗房到云端:一位古籍修复师用Midjourney蓝晒法重制《天工开物》插图的72小时实录(含失败21次原始日志)
更多请点击: https://intelliparadigm.com 第一章:从暗房到云端:一场古籍图像的跨时空显影 在胶片时代,古籍修复师需在幽暗的暗房中,借助红光灯与显影液,一帧一帧唤醒沉睡于纸背的墨痕;而今&am…...
B站视频下载终极指南:如何一键获取无水印高清视频
B站视频下载终极指南:如何一键获取无水印高清视频 【免费下载链接】BiliDownload B站视频下载工具 项目地址: https://gitcode.com/gh_mirrors/bil/BiliDownload 你是否曾为下载B站视频而烦恼?想要保存喜欢的视频却找不到合适的工具?B…...
终极指南:如何使用IDM激活脚本实现永久免费下载体验
终极指南:如何使用IDM激活脚本实现永久免费下载体验 【免费下载链接】IDM-Activation-Script IDM Activation & Trail Reset Script 项目地址: https://gitcode.com/gh_mirrors/id/IDM-Activation-Script Internet Download Manager(IDM&…...
汽车底盘松散?别忽视!成因与排查养护指南
对于每一位车主而言,汽车驾驶质感藏于细节,而底盘状态则是决定这份质感的核心。刚提新车时,驾驶紧致利落,过减速带悬挂反馈干脆,转弯车身平稳。然而,随着用车时间增长,底盘可能出现“松散感”&a…...
如何高效实现STL到STEP格式转换?专业工具stltostp实战指南
如何高效实现STL到STEP格式转换?专业工具stltostp实战指南 【免费下载链接】stltostp Convert stl files to STEP brep files 项目地址: https://gitcode.com/gh_mirrors/st/stltostp 你是否曾遇到这样的困境:精心设计的3D模型在STL格式下无法导入…...
UV-UI框架终极指南:如何快速构建跨平台应用
UV-UI框架终极指南:如何快速构建跨平台应用 【免费下载链接】uv-ui uv-ui 破釜沉舟之兼容vue32、app、h5、小程序等多端基于uni-app和uView2.x的生态框架,支持单独导入,开箱即用,利剑出击。 项目地址: https://gitcode.com/gh_m…...
实战指南:在Cortex-A53/A57平台上配置与调试AMBA AXI/ACE总线
Cortex-A53/A57平台AMBA总线实战:从寄存器配置到性能调优 1. AMBA总线架构与Cortex-A系列核心的深度适配 在嵌入式系统开发领域,AMBA总线作为ARM处理器生态的核心互联架构,其性能表现直接决定了SoC整体效能。Cortex-A53/A57作为经典的big.LIT…...
