大数据-玩转数据-Flink窗口
一、Flink 窗口 理解
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.
时间窗口
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp()),时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。
二、数据准备
准备一个WaterSensor类方便演示
package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}
三、时间滚动窗口
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定,2.我们传递给window函数的对象叫窗口分配器.
时间滚动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket

消费结果:

四、时间滑动窗口
与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。
时间滑动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
执行结果
在hadoop100 服务器
输入nc -lk 999 启动socket

消费结果

五、时间会话窗口
会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。
时间会话窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket

消费结果

因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction 。
六、基于元素个数的滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.
基于元素个数的滚动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).countWindow(2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);out.collect("窗口:" + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果


七、基于元素个数的滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .countWindow(2).countWindow(3,2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);out.collect("窗口:" + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果


相关文章:
大数据-玩转数据-Flink窗口
一、Flink 窗口 理解 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击…...
【python爬虫】—豆瓣电影Top250
豆瓣电影Top250 豆瓣榜单简介需求描述Python实现 豆瓣榜单简介 豆瓣电影 Top 250 榜单是豆瓣网站上列出的评分最高、受观众喜爱的电影作品。这个榜单包含了一系列优秀的影片,涵盖了各种类型、不同国家和时期的电影。 需求描述 使用python爬取top250电影ÿ…...
【跟小嘉学 Rust 编程】十五、智能指针
系列文章目录 【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据 【跟小嘉学…...
Python爬虫基础之正则表达式
目录 一、什么是正则表达式? 二、re.compile()编译函数 三、group()获取匹配结果函数 四、常用匹配规则 4.1匹配单个字符 4.2匹配前字符次数 4.3匹配原生字符串 4.4匹配字符串开头和结尾 4.5分组匹配 五、re.match()开头匹配函数 六、re.search()全文搜索…...
【LeetCode】双指针妙解有效三角形的个数
Problem: 611. 有效三角形的个数 文章目录 题目分析讲解算法原理复杂度Code 题目分析 首先我们来分析一下本题的思路 看到题目中给出的示例 题目的意思很简单,就是将给到的数字去做一个组合,然后看看这三条边是否可以构成三角形。那判断的方法不用我说&a…...
mysql 计算两点之间距离
先说一下我们可能会用到的一些场景,这样同学们可以先评估,该篇文章是否对你有帮助! 场景: 假设 美团,我点外卖时,系统会让我先进行定位,比如我定位在了 A 点,系统就会给我推荐&…...
c语言自定义头文件是什么情况下使用?一般在什么情况下引用自定义的头文件?一般在自定义头文件中写什么代码?
c语言自定义头文件是什么情况下使用?一般在什么情况下引用自定义的头文件?一般在自定义头文件中写什么代码? C语言自定义头文件是一种用来封装函数和变量声明的文件,它通常用于将一组相关的函数和变量的声明集中在一个地方&#…...
electron应用打包成功纪念一下
electron应用打包成功纪念一下,以前曾经行过后来打包各种报错,现在有空就尝试解决一下 首先安装nvm能够方便切换node版本 curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.34.0/install.sh | bash 顺利安装后你用nvm list查看node列表时会…...
远程办公中安全远程访问解决方案
什么是安全远程访问 安全的远程访问是一个至关重要的过程,可让您使用互联网从远处完全控制某人的设备。为了确保安全,为受保护的远程访问采取了额外的身份验证和加密措施。 为什么安全远程访问解决方案很重要 当 IT 技术人员从远处帮助人们解决计算机…...
StartUp启动框架-Android启动性能
简述 当谈论Android应用程序的启动性能时,StartUp启动框架是一个不可忽视的关键工具。它旨在优化应用程序的启动过程,确保用户在打开应用时能够迅速获得流畅、高效的体验。让我们来深入了解StartUp框架的作用和重要性,以及它是如何改善Andro…...
Positive Technologies:五分之四的网络攻击具有针对性
Positive Technologies 对 2023 年第二季度的相关网络威胁进行了分析。报告显示,自今年年初以来,有针对性的攻击数量增加了 10%,目前占 78%。专家们注意到利用漏洞的大规模攻击和大量用户个人数据的泄露。此外,在此期间࿰…...
clickhouse的另类表引擎
clickhouse常用的MergeTree引擎外,还有特殊的引擎 1,memory引擎,顾名思义,数据是存储在内存中,数据不会被压缩也不会倍格式化转换数据在内存中保存的形态与查询时看到的如出一辙,重启ck数据丢失 2ÿ…...
Uniapp新版本打包后覆盖安装,新增的页面无法跳转,需退出重新启动才可以打开的解决方案
最近写uniapp项目,发现一个坑,在新版本覆盖安装后直接打开APP,新增的页面竟然无法跳转,需要重新启动才可以正常打开,在网上查了很多方法,最终总结下来有以下几点: 1.看打的是debug包还是releas…...
系统架构设计高级技能 · 面向服务架构设计理论与实践
点击进入系列文章目录 系统架构设计高级技能 面向服务架构设计理论与实践 一、SOA的相关概念1.1SOA的定义1.2 业务流程与业务流程执行语言 二、SOA的发展史三、SOA与微服务的区别三、SOA的参考架构四、SOA的主要协议规范五、SOA的设计标准要求六、SOA的作用与设计原则七、SOA的…...
QT注册界面练习(信号与槽实现页面跳转)
一、注册界面练习思路以及具体代码 在完成注册页面搭建的前提下,通过信号与槽机制实现多组件之间的相互通信,实现页面跳转。 基本步骤: 首先,将注册页面的登录按钮与成功登陆信号绑定,当用户名与密码均匹配时…...
MySQL从入门到精通【进阶篇】之 主从复制详解
文章目录 0.前言1. 主从复制简介2. 主从复制的工作流程主从复制过程中的日志文件作用(Binary Log)和中继日志(Relay Log) 3. MySQL主从复制的配置4. 参考资料 0.前言 MySQL的主从复制和读写分离是数据库领域的基本概念࿰…...
vue使用qrcodejs2生成二维码
目录 概要 构建展示的vue组件qrcode.vue 组件的使用 概要 项目中用到需要展示二维码的样式,想到了qrcode 例如: 前提:安装包 npm install qrcodejs2 --save 构建展示的vue组件qrcode.vue <template><div style"width: …...
python注释
任何编程语言都少不了注释,Python也不例外,以下是Python注释的具体用法: 单行注释 Python编程语言的单行注释常以#开头,单行注释可以作为单独的一行放在被注释代码行之上,也可以放在语句或者表达式之后。 实例&…...
update-alternatives详解
1.功能作用 update-alternatives是dpkg的实用工具,用来维护系统命令的符号链接,以决定系统默认使用什么命令。 在Debian系统中,我们可能会同时安装有很多功能类似的程序和可选配置,如Web浏览器程序(firefox,konquero…...
JavaScript 编写更好的条件语句
在任何编程语言中,代码需要根据不同的条件在给定的输入中做不同的决定和执行相应的动作。 例如,在一个游戏中,如果玩家生命点为0,游戏结束。在天气应用中,如果在早上被查看,显示一个日出图片,如…...
【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
webpack面试题
面试题:webpack介绍和简单使用 一、webpack(模块化打包工具)1. webpack是把项目当作一个整体,通过给定的一个主文件,webpack将从这个主文件开始找到你项目当中的所有依赖文件,使用loaders来处理它们&#x…...
boost::filesystem::path文件路径使用详解和示例
boost::filesystem::path 是 Boost 库中用于跨平台操作文件路径的类,封装了路径的拼接、分割、提取、判断等常用功能。下面是对它的使用详解,包括常用接口与完整示例。 1. 引入头文件与命名空间 #include <boost/filesystem.hpp> namespace fs b…...
路由基础-路由表
本篇将会向读者介绍路由的基本概念。 前言 在一个典型的数据通信网络中,往往存在多个不同的IP网段,数据在不同的IP网段之间交互是需要借助三层设备的,这些设备具备路由能力,能够实现数据的跨网段转发。 路由是数据通信网络中最基…...
Win系统权限提升篇UAC绕过DLL劫持未引号路径可控服务全检项目
应用场景: 1、常规某个机器被钓鱼后门攻击后,我们需要做更高权限操作或权限维持等。 2、内网域中某个机器被钓鱼后门攻击后,我们需要对后续内网域做安全测试。 #Win10&11-BypassUAC自动提权-MSF&UACME 为了远程执行目标的exe或者b…...
