聊聊Flink:这次把Flink的window分类(滚动、滑动、会话、全局)、窗口函数讲透
一、窗口
窗口(Window)是处理无界流的关键所在。窗口将流分成有限大小的“桶”,我们可以在其上应用算子计算。Flink可以使用window()和windowAll()定义一个窗口,二者都需要传入一个窗口分配器WindowAssigner,WindowAssigner负责分配事件到相应的窗口。
window()作用于KeyedStream上,即keyBy()之后,这样可以多任务并行计算,对窗口内的多组数据分别进行聚合;windowAll()作用于非KeyedStream上(通常指DataStream),由于所有元素都必须通过相同的算子实例,因此该操作本质上是非并行的,仅在特殊情况下(例如对齐的时间窗口)才可以并行执行。假设要计算24小时内每个用户的订单平均消费额,就需要使用window()定义窗口;如果要计算24小时内的所有订单平均消费额,则需要使用windowAll()定义窗口。
一个Flink窗口程序的大致骨架结构如下:
对KeyedStream应用window()函数进行窗口计算:
对非KeyedStream应用windowAll()函数进行窗口计算:
上面方括号([…])中的命令是可选的。也就是说,Flink 允许你自定义多样化的窗口操作来满足你的需求。
首先必须要在定义窗口前确定的是你的 stream 是 keyed 还是 non-keyed。 keyBy(…) 会将你的无界 stream 分割为逻辑上的 keyed stream。 如果 keyBy(…) 没有被调用,你的 stream 就不是 keyed。
对于 keyed stream,其中数据的任何属性都可以作为 key。 使用 keyed stream 允许你的窗口计算由多个 task 并行,因为每个逻辑上的 keyed stream 都可以被单独处理。 属于同一个 key 的元素会被发送到同一个 task。
对于 non-keyed stream,原始的 stream 不会被分割为多个逻辑上的 stream, 所以所有的窗口计算会被同一个 task 完成,也就是 parallelism 为 1。
二、窗口的分类
Flink的窗口可以分为滚动窗口、滑动窗口、会话窗口、全局窗口,且每种窗口又可分别根据事件时间和处理时间进行创建。
2.1 滚动窗口(Tumbling Windows)
滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。
滚动窗口使用时需要指定窗口大小参数,下面的代码片段展示了如何使用滚动窗口:
滚动窗口分配器还可以使用可选的偏移(Offset)参数,该参数可用于更改窗口的对齐方式。例如,在没有偏移的情况下,时间窗口会做一个对齐,那么1小时窗口的起止时间可以是[0:00:00.000~0:59:59.999)。如果你想要一个以小时为单位的窗口流,但是窗口需要从每个小时的第15分钟开始,则可以使用偏移量,代码如下:
TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(15))
那么窗口的起止时间将变为[0:15:00.000~1:14:59.999),这样你将得到起始时间在0:15:00,1:15:00,2:15:00的窗口。与此相反,如果你生活在不使用UTC±00:00时间(世界标准时间)的地方,例如中国使用UTC+08:00,中国的当地时间要设置偏移量为Time.hours(-8)。你想要一个一天大小的时间窗口,并且窗口从当地时间的每一个00:00:00开始,可以使用
TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8))
因为UTC+08:00比UTC时间早8小时。
2.2 滑动窗口(Sliding Windows)
与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。
比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。
使用滑动窗口时,需要设置窗口大小和滑动步长两个参数。滑动步长决定了Flink以多大的频率来创建新的窗口,步长较小,窗口的个数会很多。步长小于窗口的大小时,相邻窗口会重叠,一个事件会被分配到多个窗口;步长大于窗口大小时,有些事件可能会丢失。
下面的代码片段展示了如何使用滑动窗口:
如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。 比如说,不设置 offset 时,长度为一小时、滑动距离为 30 分钟的滑动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999 等。 如果你想改变对齐方式,你可以设置一个 offset。 如果设置了 15 分钟的 offset,你会得到 1:15:00.000 - 2:14:59.999、1:45:00.000 - 2:44:59.999 等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)。
2.3 会话窗口(Session Windows)
会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。
下面的代码展示了如何使用会话窗口。
创建动态间隔会话窗口,需要实现SessionWindowTimeGapExtractor接口,并实现其中的extract()方法,可以在extract()方法中加入相应的业务逻辑来动态控制会话间隔。
2.4 全局窗口(Global Windows)
全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。
下面的代码片段展示了如何使用全局窗口:
三、窗口函数(Window Functions)
事件被窗口分配器分配到窗口后,接下来需要指定想要在每个窗口上执行的计算函数(即窗口函数),以便对窗口内的数据进行处理。Flink提供的窗口函数有ReduceFunction、AggregateFunction、ProcessWindowFunction。
ReduceFunction和AggregateFunction是增量计算函数,都可以基于中间状态对窗口中的元素进行递增聚合。例如,窗口每流入一个新元素,新元素就会与中间数据进行合并,生成新的中间数据,再保存到窗口中。
ProcessWindowFunction是全量计算函数,如果需要依赖窗口中的所有数据或需要获取窗口中的状态数据和窗口元数据(窗口开始时间、窗口结束时间等),就需要使用ProcessWindowFunction。例如对整个窗口数据排序取TopN,使用ProcessWindowFunction就非常灵活。
3.1 ReduceFunction
ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。
ReduceFunction 可以像下面这样定义:
上面的例子是对窗口内元组的第二个属性求和。
3.2 AggregateFunction
AggregateFunction是聚合函数的基本接口,也是ReduceFunction的通用版本。与ReduceFunction相同,Flink将在窗口输入元素到达时对其进行增量聚合。
AggregateFunction的泛型具有3种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型指的是输入流中元素的类型。AggregateFunction具有一种将一个输入元素添加到累加器的方法,还具有创建初始累加器,将两个累加器合并为一个累加器以及从累加器提取输出(OUT类型)的方法。
AggregateFunction是一种灵活的聚合函数,具有以下特点:
- 可以对输入值、中间聚合和结果使用不同的类型,以支持广泛的聚合类型。
- 支持分布式聚合,不同的中间聚合可以合并在一起,以允许预聚合/最终聚合优化。
AggregateFunction的中间聚合(正在进行的聚合状态)称为累加器。将值添加到累加器,并通过结束累加器状态获得最终的聚合。中间聚合的数据类型可能与最终的聚合结果类型不同,例如求平均值时,需要保存计数和总和作为中间聚合。合并中间聚合(部分聚合)意味着合并累加器。
AggregationFunction本身是无状态的。为了允许单个AggregationFunction实例维护多个聚合(例如每个Key一个聚合),AggregationFunction在新聚合启动时会创建一个新的累加器。此外,聚合函数必须是可序列化的,因为它们在分布式执行期间会在分布式进程之间发送。
AggregationFunction接口的Java定义源码如下:
例如,计算窗口中每组元素的第二个字段的平均值,定义和使用AggregateFunction的代码片段如下:
注:
AggregateFunction的merge()方法用于会话窗口。当会话窗口彼此之间的实际间隔比已定义的间隔小时,它们将合并在一起。为了可合并,对会话窗口计算时也需要相应的触发器和窗口函数,例如ReduceFunction,AggregateFunction或ProcessWindowFunction。当需要合并两个会话窗口时,merge()方法会被调用,通过该方法合并两个窗口的结果。对于滚动窗口和滑动窗口不会调用merge()方法。
3.3 .ProcessWindowFunction
ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。因此,使用ProcessWindowFunction需要注意数据量不应太大,否则会造成内存溢出。
抽象类ProcessWindowFunction的源码如下:
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param context The context in which the window is being evaluated.* @param elements The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;/*** Deletes any state in the {@code Context} when the Window expires (the watermark passes its* {@code maxTimestamp} + {@code allowedLateness}).** @param context The context to which the window is being evaluated* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public void clear(Context context) throws Exception {}/** The context holding window metadata. */public abstract class Context implements java.io.Serializable {/** Returns the window that is being evaluated. */public abstract W window();/** Returns the current processing time. */public abstract long currentProcessingTime();/** Returns the current event-time watermark. */public abstract long currentWatermark();/*** State accessor for per-key and per-window state.** <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up by* implementing {@link ProcessWindowFunction#clear(Context)}.*/public abstract KeyedStateStore windowState();/** State accessor for per-key global state. */public abstract KeyedStateStore globalState();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}
}
key 参数由 keyBy() 中指定的 KeySelector 选出。 如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple, 并且你需要手动将它转换为正确大小的 tuple 才能提取 key。
ProcessWindowFunction 可以像下面这样定义:
使用ProcessWindowFunction来处理简单的聚合(例如计算元素数量)是非常低效的。接下来讲解如何将ReduceFunction或AggregateFunction与ProcessWindowFunction结合起来,以便实现增量聚合并通过ProcessWindowFunction获得额外的窗口信息等。
3.4 带增量聚合的ProcessWindowFunction
由于ProcessWindowFunction是全量计算函数,如果既要获得窗口信息又要进行增量聚合,则可以将ProcessWindowFunction与ReduceFunction或AggregateFunction结合使用。
ProcessWindowFunction可以与ReduceFunction或AggregateFunction组合在一起,以便在元素到达窗口时增量地聚合。当窗口关闭时,ProcessWindowFunction将提供聚合的结果。
3.4.1 结合ReduceFunction实现增量聚合
下例展示了如何将 ReduceFunction 与 ProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。
3.4.2 结合AggregateFunction实现增量聚合
下例展示了如何将 AggregateFunction 与 ProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。
相关文章:

聊聊Flink:这次把Flink的window分类(滚动、滑动、会话、全局)、窗口函数讲透
一、窗口 窗口(Window)是处理无界流的关键所在。窗口将流分成有限大小的“桶”,我们可以在其上应用算子计算。Flink可以使用window()和windowAll()定义一个窗口,二者都需要传入一个窗口分配器WindowAssigner,WindowAs…...

mysql-分析MVCC原理
一、MVCC简介 MVCC是一种用来解决读写冲读的无锁并发控制,也就是为事务分配单增长的时间戳,为每个修改保存一个版本,版本与事务时间戳关联,读操作只读该事务开始前的数据库的快照,所以MVCC可以为数据库解决一些问题。…...

由于答案过大,请对a取模。取模后的答案不是原问题的答案 取模有何意义呢 详解
在许多情况下,处理大数时会将 a 取模,即用 a m o d m a \mod m amodm的结果代替 a a a,然后继续计算。这种做法的核心问题是:取模后的值与原问题之间的关系是否保持一致。取模后的意义在于,它在不改变问题核心特性的前…...

【c++篇】掌握动态内存的奥妙
【C篇】动态内存 一、Static 关键字1.1函数内部的静态变量1.2 全局静态变量1.3静态成员变量1.4静态成员函数 二、内存管理2.1栈区(Stack)2.2堆区(Heap) 三、动态内存分配机制3.1、动态内存分配的两种方法c语言c 3.2new 和delete的用法3.3语法和类型安全性…...

5.4.2-3 编写Java程序读取HDFS文件
在本次实战中,我们通过Java程序实现了从Hadoop分布式文件系统(HDFS)读取文件的功能。首先,我们创建了ReadFileOnHDFS类,并在其中实现了两个方法:read1()和read1_()。read1()方法展示了如何打开HDFS文件并逐…...

@EnableConfigurationProperties @ConfigurationProperties
EnableConfigurationProperties && ConfigurationProperties的使用时机 今天在写properties时想到了这个问题,为什么有时候我需要写EnableConfigurationProperties有时候又不需要呢?下面就详细讲讲。 Data Component ConfigurationProperties(pr…...

RK3588适配MTK7921 USB接口WiFi驱动开发
在当前RK原厂提供的SDK里面已经适配的WiFi模组有不少,但是支持的模组大部分集中在realtek、正基、英飞凌等厂家。主要型号有Realtek的RTL8188系列、RTL8723系列、RTL8812系列、RTL8821系列、RTL8822系列和支持WiFi 6 的RTL8852系列,正基的AP6275系列、AP6276系列等。接下来将…...

【数据结构OJ】【图论】图综合练习--拓扑排序
题目描述 已知有向图,顶点从0开始编号,求它的求拓扑有序序列。 拓扑排序算法:给出有向图邻接矩阵 1.逐列扫描矩阵,找出入度为0且编号最小的顶点v 2.输出v,并标识v已访问 3.把矩阵第v行全清0 重复上述步骤࿰…...

模型 I/O 与 LangChain 实践
模型 I/O 与 LangChain 实践 本文是《LangChain 实战课》第 4 节——模型 I/O:输入提示、调用模型、解析输出的一些学习笔记与总结。这篇文章将围绕模型 I/O 的基本概念、LangChain 提供的最佳实践以及如何通过 LangChain 实现高效的结构化数据处理展开。 什么是模…...

C++:用红黑树封装map与set-1
文章目录 前言一、STL源码分析二、红黑树的构建三、map与set整体框架的搭建与解析四、如何取出进行比较?1. met与set的数据是不同的2. 取出数据进行比较1)问题发现2)仿函数解决 五、封装插入六、迭代器的实现1. operator* 与operator->2. …...

HBU算法设计与分析 贪心算法
1.最优会场调度 #include <bits/stdc.h> using namespace std; const int N1e55; typedef pair<int,int> PII; PII p[N]; priority_queue<int,vector<int>,greater<int>> q; //最小堆 存储最早结束的会场的结束时间 int n; //其实这个题可以理…...

python pycharm安装教程及基本使用,超详细
一.PyCharm下载及安装 1.1 进入pycharm官网,点击下载,下载社区版本(日常学习使用够用了),专业版是收费的哦(功能更强大) Download PyCharm: The Python IDE for data science and web development by Jet…...

变量提升函数提升
示例 1:变量提升 原始代码: console.log(x); // 输出: undefined var x 5; console.log(x); // 输出: 5提升后的代码(理解为): var x; // 变量声明被提升 console.log(x); // 输出: undefined x 5; // 赋值 conso…...

el-table vue3统计计算数字
固定合计在最下列 父组件 <template><el-tablev-loading"loading"tooltip-effect"light":data"list"style"width: 100%":max-height"maxHeight"element-loading-text"拼命加载中...":header-cell-styl…...

IDE应当具备的功能
IDE 是辅助编程的工具,应当具备以下功能 语法高亮 显示注释 显示光键词 显示括号 matlab 自带的 IDE 没有这个功能 显示缩进 matlab 自带的 IDE 没有这个功能 显示字符串 显示数字常量 定位到函数的定义位置 Matlab 自带的集成开发环境(IDE&am…...

Stable Diffusion初步见解(二)
Stable Diffusion 是一种先进的深度学习模型,用于生成高质量的图像和艺术作品。它基于扩散模型(Diffusion Models),并结合了潜在扩散模型(Latent Diffusion Models)以及条件生成技术(如文本到图…...

前端框架 react 性能优化
目录 一、不使用任何性能优化API进行优化 二、通过性能优化API优化 1、React.memo 2、useCallback 3、useMemo 4、PureComponent 三、总结 总览:react的优化核心思想就是让react跳过重新渲染那个些没有改变的Component,而只重新渲染发生变化的C…...

RK3568平台开发系列讲解(Input子系统篇)输入子系统介绍
🚀返回专栏总目录 文章目录 一、什么是输入子系统?二、输入设备和节点的关系沉淀、分享、成长,让自己和他人都能有所收获!😄 一、什么是输入子系统? 在 Linux 中,input 子系统是专门为处理输入类设备而设计的一个子系统或框架。它提供 了一套通用的接口和机制,用于驱…...

准备阶段 Profiler性能分析工具的使用(一)
Unity 性能分析器 (Unity Profiler) 性能分析器记录应用程序性能的多个方面并显示相关信息。使用此信息可以做出有关应用程序中可能需要优化的事项的明智决策,并确认所做的优化是否产生预期结果。 默认情况下,性能分析器记录并保留游戏的最后 300 帧&a…...

go-rod vs Selenium:自动化测试工具的比较与选择
自动化测试是软件开发过程中的关键环节,它能够帮助我们发现缺陷、验证功能并提高软件质量。随着Web技术的快速发展,市场上出现了多种自动化测试工具,其中Selenium和go-rod是两个备受关注的选择。本文将从多个维度对这两个工具进行比较&#x…...

探索免费的Figma中文版:开启高效设计之旅
在当今数字化设计的浪潮中,Figma以其强大的云端协作功能和出色的设计能力,成为了众多设计师的心头好。而对于国内的设计师来说,能够免费使用Figma中文版更是一大福音,下面就来一起探索一下吧。 一、Figma中文版的获取途径 虽然F…...

功能齐全,支持协作 | Docker部署一款支持多人共享的私密浏览器『n.eko』
功能齐全,支持协作 | Docker部署一款支持多人共享的私密浏览器『n.eko』 哈喽小伙伴们好,我是Stark-C~ 玩NAS的朋友基本都会在本地部署一款浏览器用来远程访问内网的网络设备,或者偶尔拿来浏览一些私密网站都是很方便的。 今天为大家分享的…...

部署实战(二)--修改jar中的文件并重新打包成jar文件
一.jar文件 JAR 文件就是 Java Archive ( Java 档案文件),它是 Java 的一种文档格式JAR 文件与 ZIP 文件唯一的区别就是在 JAR 文件的内容中,多出了一个META-INF/MANIFEST.MF 文件META-INF/MANIFEST.MF 文件在生成 JAR 文件的时候…...

Ubuntu24.04——软件包系统已损坏
如果你在使用 Ubuntu 时遇到“软件包系统已损坏”的问题,可以尝试以下步骤来修复它: 更新软件包列表: 打开终端,运行以下命令以更新软件包列表: sudo apt update修复损坏的软件包: 运行以下命令来修复损坏的…...

2024年华为OD机试真题-空栈压数-C++-OD统一考试(E卷)
最新华为OD机试考点合集:华为OD机试2024年真题题库(E卷+D卷+C卷)_华为od机试题库-CSDN博客 每一题都含有详细的解题思路和代码注释,精编c++、JAVA、Python三种语言解法。帮助每一位考生轻松、高效刷题。订阅后永久可看,发现新题及时跟新。 题目描述: 向一个空栈压入…...

嵌入式Linux基于IMX6ULL tslib学习总结
目录 1. tslib开源库介绍1.1 tslib主要功能1.2 架构 2. tslib代码简单分析2.1 ts_print_mt.c分析代码2.2 ts_setup代码分析2.3 ts_open代码分析2.4 ts_config代码分析2.5 ts_read_mt代码分析2.6 tslib中4个模块的含义 3. 使用tslib库打印触摸屏2点之间的距离 基于韦东山IMX6ULL…...

go中的参数传递是值传递还是引用传递?
在Go语言中,参数传递机制是一个重要的概念,它决定了函数内部对参数的修改是否会影响到原始数据。关于Go中的参数传递是值传递还是引用传递的问题,可以从以下几个方面进行解答。 一、值传递与引用传递的定义 值传递:在值传递中&a…...

记录一种在内核空间向用户空间通知中断的方法
记录一种在内核空间向用户空间通知中断的方法 0.前言1.代码实现1)内核设备驱动实现2)消息通知实现3)测试程序 2.解析 参考文章:Linux驱动实践:中断处理函数如何【发送信号】给应用层? 0.前言 最近在项目中遇到一个需求,需要将一个…...

.NetCore 过滤器和拦截器 的区别
Asp.NET Core 中的过滤器(Filter)和拦截器(Interceptor)是两个不同的概念,但它们在某些方面有相似之处,也有明显的区别。 🔑过滤器(Filter) 过滤器是Asp.NET Core中用于…...

【笔记】自动驾驶预测与决策规划_Part7_数据驱动的预测方法
文章目录 0. 前言1. 多模态传感器的编码方式1.1 栅格化表示1.2 向量化表示 Vectornet1.3 基于点云或者多模态输入的预测1.4 基于Transformer的方法 2. 网络输出的表达形式2.1 多模态轨迹回归2.2 轨迹分类2.3 轨迹回归轨迹分类2.4 目标点预测 3.场景级别的预测和决策3.1 论文&am…...