大数据-玩转数据-Flink窗口
一、Flink 窗口 理解
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.
时间窗口
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp()),时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。
二、数据准备
准备一个WaterSensor类方便演示
package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}
三、时间滚动窗口
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定,2.我们传递给window函数的对象叫窗口分配器.
时间滚动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
消费结果:
四、时间滑动窗口
与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。
时间滑动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
执行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
消费结果
五、时间会话窗口
会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。
时间会话窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
消费结果
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction 。
六、基于元素个数的滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.
基于元素个数的滚动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).countWindow(2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);out.collect("窗口:" + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
七、基于元素个数的滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .countWindow(2).countWindow(3,2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);out.collect("窗口:" + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
相关文章:

大数据-玩转数据-Flink窗口
一、Flink 窗口 理解 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击…...

【python爬虫】—豆瓣电影Top250
豆瓣电影Top250 豆瓣榜单简介需求描述Python实现 豆瓣榜单简介 豆瓣电影 Top 250 榜单是豆瓣网站上列出的评分最高、受观众喜爱的电影作品。这个榜单包含了一系列优秀的影片,涵盖了各种类型、不同国家和时期的电影。 需求描述 使用python爬取top250电影ÿ…...

【跟小嘉学 Rust 编程】十五、智能指针
系列文章目录 【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据 【跟小嘉学…...

Python爬虫基础之正则表达式
目录 一、什么是正则表达式? 二、re.compile()编译函数 三、group()获取匹配结果函数 四、常用匹配规则 4.1匹配单个字符 4.2匹配前字符次数 4.3匹配原生字符串 4.4匹配字符串开头和结尾 4.5分组匹配 五、re.match()开头匹配函数 六、re.search()全文搜索…...

【LeetCode】双指针妙解有效三角形的个数
Problem: 611. 有效三角形的个数 文章目录 题目分析讲解算法原理复杂度Code 题目分析 首先我们来分析一下本题的思路 看到题目中给出的示例 题目的意思很简单,就是将给到的数字去做一个组合,然后看看这三条边是否可以构成三角形。那判断的方法不用我说&a…...

mysql 计算两点之间距离
先说一下我们可能会用到的一些场景,这样同学们可以先评估,该篇文章是否对你有帮助! 场景: 假设 美团,我点外卖时,系统会让我先进行定位,比如我定位在了 A 点,系统就会给我推荐&…...

c语言自定义头文件是什么情况下使用?一般在什么情况下引用自定义的头文件?一般在自定义头文件中写什么代码?
c语言自定义头文件是什么情况下使用?一般在什么情况下引用自定义的头文件?一般在自定义头文件中写什么代码? C语言自定义头文件是一种用来封装函数和变量声明的文件,它通常用于将一组相关的函数和变量的声明集中在一个地方&#…...

electron应用打包成功纪念一下
electron应用打包成功纪念一下,以前曾经行过后来打包各种报错,现在有空就尝试解决一下 首先安装nvm能够方便切换node版本 curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.34.0/install.sh | bash 顺利安装后你用nvm list查看node列表时会…...

远程办公中安全远程访问解决方案
什么是安全远程访问 安全的远程访问是一个至关重要的过程,可让您使用互联网从远处完全控制某人的设备。为了确保安全,为受保护的远程访问采取了额外的身份验证和加密措施。 为什么安全远程访问解决方案很重要 当 IT 技术人员从远处帮助人们解决计算机…...

StartUp启动框架-Android启动性能
简述 当谈论Android应用程序的启动性能时,StartUp启动框架是一个不可忽视的关键工具。它旨在优化应用程序的启动过程,确保用户在打开应用时能够迅速获得流畅、高效的体验。让我们来深入了解StartUp框架的作用和重要性,以及它是如何改善Andro…...

Positive Technologies:五分之四的网络攻击具有针对性
Positive Technologies 对 2023 年第二季度的相关网络威胁进行了分析。报告显示,自今年年初以来,有针对性的攻击数量增加了 10%,目前占 78%。专家们注意到利用漏洞的大规模攻击和大量用户个人数据的泄露。此外,在此期间࿰…...

clickhouse的另类表引擎
clickhouse常用的MergeTree引擎外,还有特殊的引擎 1,memory引擎,顾名思义,数据是存储在内存中,数据不会被压缩也不会倍格式化转换数据在内存中保存的形态与查询时看到的如出一辙,重启ck数据丢失 2ÿ…...

Uniapp新版本打包后覆盖安装,新增的页面无法跳转,需退出重新启动才可以打开的解决方案
最近写uniapp项目,发现一个坑,在新版本覆盖安装后直接打开APP,新增的页面竟然无法跳转,需要重新启动才可以正常打开,在网上查了很多方法,最终总结下来有以下几点: 1.看打的是debug包还是releas…...

系统架构设计高级技能 · 面向服务架构设计理论与实践
点击进入系列文章目录 系统架构设计高级技能 面向服务架构设计理论与实践 一、SOA的相关概念1.1SOA的定义1.2 业务流程与业务流程执行语言 二、SOA的发展史三、SOA与微服务的区别三、SOA的参考架构四、SOA的主要协议规范五、SOA的设计标准要求六、SOA的作用与设计原则七、SOA的…...

QT注册界面练习(信号与槽实现页面跳转)
一、注册界面练习思路以及具体代码 在完成注册页面搭建的前提下,通过信号与槽机制实现多组件之间的相互通信,实现页面跳转。 基本步骤: 首先,将注册页面的登录按钮与成功登陆信号绑定,当用户名与密码均匹配时…...

MySQL从入门到精通【进阶篇】之 主从复制详解
文章目录 0.前言1. 主从复制简介2. 主从复制的工作流程主从复制过程中的日志文件作用(Binary Log)和中继日志(Relay Log) 3. MySQL主从复制的配置4. 参考资料 0.前言 MySQL的主从复制和读写分离是数据库领域的基本概念࿰…...

vue使用qrcodejs2生成二维码
目录 概要 构建展示的vue组件qrcode.vue 组件的使用 概要 项目中用到需要展示二维码的样式,想到了qrcode 例如: 前提:安装包 npm install qrcodejs2 --save 构建展示的vue组件qrcode.vue <template><div style"width: …...

python注释
任何编程语言都少不了注释,Python也不例外,以下是Python注释的具体用法: 单行注释 Python编程语言的单行注释常以#开头,单行注释可以作为单独的一行放在被注释代码行之上,也可以放在语句或者表达式之后。 实例&…...

update-alternatives详解
1.功能作用 update-alternatives是dpkg的实用工具,用来维护系统命令的符号链接,以决定系统默认使用什么命令。 在Debian系统中,我们可能会同时安装有很多功能类似的程序和可选配置,如Web浏览器程序(firefox,konquero…...

JavaScript 编写更好的条件语句
在任何编程语言中,代码需要根据不同的条件在给定的输入中做不同的决定和执行相应的动作。 例如,在一个游戏中,如果玩家生命点为0,游戏结束。在天气应用中,如果在早上被查看,显示一个日出图片,如…...

聊聊PBE算法
序 本文主要研究一下PBE算法 PBE PBE即Password Based Encryption,基于口令的加密,它是一种组合算法,即一般是哈希对称算法,比如PBEWithMD5AndDES,就是用MD5做哈希,用DES做加解密,而其密钥则…...

用MFC打开外部程序
在MFC(Microsoft Foundation Classes)中,你可以使用ShellExecute函数来打开Notepad并加载指定的文件。ShellExecute函数是Windows API的一部分,它可以执行与操作系统相关的操作,例如打开文件、运行程序等。 以下是在M…...

基于全新电脑环境安装pytorch的GPU版本
前言: 距离第一次安装深度学习的GPU环境已经过去了4年多(当时TensorFlow特别麻烦),现在发现安装pytorch的GPU版本还是很简单方便的,流程记录如下。 安装步骤: 步骤一:官网下载Anaconda Free…...

[当前就业]2023年8月25日-计算机视觉就业现状分析
计算机视觉就业现状分析 前言:超越YOLO:计算机视觉市场蓬勃发展 如今,YOLO(You Only Look Once)新版本的发布周期很快,每次迭代的性能都优于其前身。每 3 到 4 个月就会推出一个升级版 YOLO 变体…...

虚拟化技术原理
计算虚拟化 介绍 把物理主机上物理资源(CPU,内存,IO外设),通过虚拟化层抽象成超量、等量的逻辑资源(虚拟CPU,虚拟内存,虚拟IO设备),然后重新组合形成新的虚…...

opencv-答题卡识别判卷
#导入工具包 import numpy as np import argparse import imutils import cv2# 设置参数 ap = argparse.ArgumentParser() ap.add_argument("-i", "--image", required=True,help="path to the input image") args = vars(ap.parse_args())# 正确…...

【Linux】基础IO
目录 一、回顾C语言文件操作二、文件系统调用接口1. open2.write3.read 三、文件描述符四、重定向1.输出重定向2.输入重定向 五、dup2 一、回顾C语言文件操作 1 #include<stdio.h>2 #include<stdlib.h>3 4 #define LOG "log.txt"5 6 int main()7 {8 //…...

【Go 基础篇】深入探索:Go语言中的二维数组
在计算机编程中,数组是一种基本的数据结构,用于存储相同类型的元素。而二维数组作为数组的一种扩展,允许我们以类似表格的方式存储和处理数据。在Go语言中,二维数组是一个重要的概念,本文将深入探讨Go语言中的二维数组…...

IntelliJ IDEA 2023.2.1使用Git时弹出“使用访问令牌登录”问题解决
这里写目录标题 一、内网Git环境GitLabGogsGitea 二、外网Git环境GitHubGitee 升级为IntelliJ IDEA 2023.2.1后,使用Git时弹出“使用访问令牌登录”的窗口,习惯使用Git帐号密码登录的用户,面对这个突如其来的弹窗真的很懵。 一、内网Git环境 …...

前端开发学习路线
无前端基础学习路线: B站免费视频1 B站免费视频2 有HTML、CSS、JavaScript基础,可直接通过以上视频中Vue2Vue3中实战项目学习Vue。...