大数据-玩转数据-Flink窗口
一、Flink 窗口 理解
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。
在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.
时间窗口
时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸。在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp()),时间窗口又分3种:滚动窗口、滑动窗口、会话窗口。
二、数据准备
准备一个WaterSensor类方便演示
package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}
三、时间滚动窗口
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口.
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口
1.时间间隔可以通过: Time.milliseconds(x), Time.seconds(x), Time.minutes(x),等等来指定,2.我们传递给window函数的对象叫窗口分配器.
时间滚动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
消费结果:
四、时间滑动窗口
与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.
所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中
例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据。
时间滑动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
执行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
消费结果
五、时间会话窗口
会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.
如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)
我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。
时间会话窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))).process(new ProcessWindowFunction<WaterSensor, String,String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " " + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
在hadoop100 服务器
输入nc -lk 999 启动socket
消费结果
因为会话窗口没有固定的开启和关闭时间, 所以会话窗口的创建和关闭与滚动,滑动窗口不同. 在Flink内部, 每到达一个新的元素都会创建一个新的会话窗口, 如果这些窗口彼此相距比较定义的gap小, 则会对他们进行合并. 为了能够合并, 会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction 。
六、基于元素个数的滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
实例代码
.countWindow(3)
说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.
基于元素个数的滚动窗口代码
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).countWindow(2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);out.collect("窗口:" + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
七、基于元素个数的滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。
实例代码
.countWindow(3, 2)
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;public class Window_s_n {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId)
// .countWindow(2).countWindow(3,2).process(new ProcessWindowFunction<WaterSensor, String, String, GlobalWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<WaterSensor> elements,Collector<String> out) throws Exception {List<WaterSensor> list = toList(elements);out.collect("窗口:" + "key:" + key + " " + "list:" + list);}}).print();env.execute();}private static <T>List<T> toList(Iterable<T> it) {List<T> list = new ArrayList<>();for (T t : it) {list.add(t);}return list;}
}
运行结果
相关文章:

大数据-玩转数据-Flink窗口
一、Flink 窗口 理解 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击…...

【python爬虫】—豆瓣电影Top250
豆瓣电影Top250 豆瓣榜单简介需求描述Python实现 豆瓣榜单简介 豆瓣电影 Top 250 榜单是豆瓣网站上列出的评分最高、受观众喜爱的电影作品。这个榜单包含了一系列优秀的影片,涵盖了各种类型、不同国家和时期的电影。 需求描述 使用python爬取top250电影ÿ…...
【跟小嘉学 Rust 编程】十五、智能指针
系列文章目录 【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据 【跟小嘉学…...

Python爬虫基础之正则表达式
目录 一、什么是正则表达式? 二、re.compile()编译函数 三、group()获取匹配结果函数 四、常用匹配规则 4.1匹配单个字符 4.2匹配前字符次数 4.3匹配原生字符串 4.4匹配字符串开头和结尾 4.5分组匹配 五、re.match()开头匹配函数 六、re.search()全文搜索…...

【LeetCode】双指针妙解有效三角形的个数
Problem: 611. 有效三角形的个数 文章目录 题目分析讲解算法原理复杂度Code 题目分析 首先我们来分析一下本题的思路 看到题目中给出的示例 题目的意思很简单,就是将给到的数字去做一个组合,然后看看这三条边是否可以构成三角形。那判断的方法不用我说&a…...

mysql 计算两点之间距离
先说一下我们可能会用到的一些场景,这样同学们可以先评估,该篇文章是否对你有帮助! 场景: 假设 美团,我点外卖时,系统会让我先进行定位,比如我定位在了 A 点,系统就会给我推荐&…...
c语言自定义头文件是什么情况下使用?一般在什么情况下引用自定义的头文件?一般在自定义头文件中写什么代码?
c语言自定义头文件是什么情况下使用?一般在什么情况下引用自定义的头文件?一般在自定义头文件中写什么代码? C语言自定义头文件是一种用来封装函数和变量声明的文件,它通常用于将一组相关的函数和变量的声明集中在一个地方&#…...
electron应用打包成功纪念一下
electron应用打包成功纪念一下,以前曾经行过后来打包各种报错,现在有空就尝试解决一下 首先安装nvm能够方便切换node版本 curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.34.0/install.sh | bash 顺利安装后你用nvm list查看node列表时会…...

远程办公中安全远程访问解决方案
什么是安全远程访问 安全的远程访问是一个至关重要的过程,可让您使用互联网从远处完全控制某人的设备。为了确保安全,为受保护的远程访问采取了额外的身份验证和加密措施。 为什么安全远程访问解决方案很重要 当 IT 技术人员从远处帮助人们解决计算机…...
StartUp启动框架-Android启动性能
简述 当谈论Android应用程序的启动性能时,StartUp启动框架是一个不可忽视的关键工具。它旨在优化应用程序的启动过程,确保用户在打开应用时能够迅速获得流畅、高效的体验。让我们来深入了解StartUp框架的作用和重要性,以及它是如何改善Andro…...
Positive Technologies:五分之四的网络攻击具有针对性
Positive Technologies 对 2023 年第二季度的相关网络威胁进行了分析。报告显示,自今年年初以来,有针对性的攻击数量增加了 10%,目前占 78%。专家们注意到利用漏洞的大规模攻击和大量用户个人数据的泄露。此外,在此期间࿰…...
clickhouse的另类表引擎
clickhouse常用的MergeTree引擎外,还有特殊的引擎 1,memory引擎,顾名思义,数据是存储在内存中,数据不会被压缩也不会倍格式化转换数据在内存中保存的形态与查询时看到的如出一辙,重启ck数据丢失 2ÿ…...
Uniapp新版本打包后覆盖安装,新增的页面无法跳转,需退出重新启动才可以打开的解决方案
最近写uniapp项目,发现一个坑,在新版本覆盖安装后直接打开APP,新增的页面竟然无法跳转,需要重新启动才可以正常打开,在网上查了很多方法,最终总结下来有以下几点: 1.看打的是debug包还是releas…...

系统架构设计高级技能 · 面向服务架构设计理论与实践
点击进入系列文章目录 系统架构设计高级技能 面向服务架构设计理论与实践 一、SOA的相关概念1.1SOA的定义1.2 业务流程与业务流程执行语言 二、SOA的发展史三、SOA与微服务的区别三、SOA的参考架构四、SOA的主要协议规范五、SOA的设计标准要求六、SOA的作用与设计原则七、SOA的…...
QT注册界面练习(信号与槽实现页面跳转)
一、注册界面练习思路以及具体代码 在完成注册页面搭建的前提下,通过信号与槽机制实现多组件之间的相互通信,实现页面跳转。 基本步骤: 首先,将注册页面的登录按钮与成功登陆信号绑定,当用户名与密码均匹配时…...

MySQL从入门到精通【进阶篇】之 主从复制详解
文章目录 0.前言1. 主从复制简介2. 主从复制的工作流程主从复制过程中的日志文件作用(Binary Log)和中继日志(Relay Log) 3. MySQL主从复制的配置4. 参考资料 0.前言 MySQL的主从复制和读写分离是数据库领域的基本概念࿰…...

vue使用qrcodejs2生成二维码
目录 概要 构建展示的vue组件qrcode.vue 组件的使用 概要 项目中用到需要展示二维码的样式,想到了qrcode 例如: 前提:安装包 npm install qrcodejs2 --save 构建展示的vue组件qrcode.vue <template><div style"width: …...
python注释
任何编程语言都少不了注释,Python也不例外,以下是Python注释的具体用法: 单行注释 Python编程语言的单行注释常以#开头,单行注释可以作为单独的一行放在被注释代码行之上,也可以放在语句或者表达式之后。 实例&…...
update-alternatives详解
1.功能作用 update-alternatives是dpkg的实用工具,用来维护系统命令的符号链接,以决定系统默认使用什么命令。 在Debian系统中,我们可能会同时安装有很多功能类似的程序和可选配置,如Web浏览器程序(firefox,konquero…...
JavaScript 编写更好的条件语句
在任何编程语言中,代码需要根据不同的条件在给定的输入中做不同的决定和执行相应的动作。 例如,在一个游戏中,如果玩家生命点为0,游戏结束。在天气应用中,如果在早上被查看,显示一个日出图片,如…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...

抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...

AI病理诊断七剑下天山,医疗未来触手可及
一、病理诊断困局:刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断",医生需通过显微镜观察组织切片,在细胞迷宫中捕捉癌变信号。某省病理质控报告显示,基层医院误诊率达12%-15%,专家会诊…...

mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制
目录 节点的功能承载层(GATT/Adv)局限性: 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能,如 Configuration …...

02.运算符
目录 什么是运算符 算术运算符 1.基本四则运算符 2.增量运算符 3.自增/自减运算符 关系运算符 逻辑运算符 &&:逻辑与 ||:逻辑或 !:逻辑非 短路求值 位运算符 按位与&: 按位或 | 按位取反~ …...

SOC-ESP32S3部分:30-I2S音频-麦克风扬声器驱动
飞书文档https://x509p6c8to.feishu.cn/wiki/SKZzwIRH3i7lsckUOlzcuJsdnVf I2S简介 I2S(Inter-Integrated Circuit Sound)是一种用于传输数字音频数据的通信协议,广泛应用于音频设备中。 ESP32-S3 包含 2 个 I2S 外设,通过配置…...

轻量安全的密码管理工具Vaultwarden
一、Vaultwarden概述 Vaultwarden主要作用是提供一个自托管的密码管理器服务。它是Bitwarden密码管理器的第三方轻量版,由国外开发者在Bitwarden的基础上,采用Rust语言重写而成。 (一)Vaultwarden镜像的作用及特点 轻量级与高性…...
DriveGPT4: Interpretable End-to-end Autonomous Driving via Large Language Model
一、研究背景与创新点 (一)现有方法的局限性 当前智驾系统面临两大核心挑战:一是长尾问题,即系统在遇到新场景时可能失效,例如突发交通状况或非常规道路环境;二是可解释性问题,传统方法无法解释智驾系统的决策过程,用户难以理解车辆行为的依据。传统语言模型(如 BERT…...