Flink窗口分配器WindowAssigner
前言
Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。
WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。
当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。
WindowAssigner
先看一下 WindowAssigner 抽象类的定义:
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;public WindowAssigner() {}public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);public Trigger<T, W> getDefaultTrigger() {return this.getDefaultTrigger(new StreamExecutionEnvironment());}/** @deprecated */@Deprecatedpublic abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);public abstract boolean isEventTime();@PublicEvolvingpublic abstract static class WindowAssignerContext {public WindowAssignerContext() {}public abstract long getCurrentProcessingTime();}
}
四个方法,作用如下:
- assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
- getDefaultTrigger 返回默认的窗口触发器 Trigger
- getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
- isEventTime 是否基于事件时间语义
Flink 内置的 WindowAssigner 实现类关系图如下:

首先,可以按照基于何种时间语义划分出三大类:
- 基于事件时间语义
- 基于处理时间语义
- 不基于时间语义 --> GlobalWindows
在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:
- 滚动窗口分配算法 tumbling windows
- 滑动窗口分配算法 sliding windows
- 会话窗口分配算法 session windows
定义窗口Window
窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window,Flink 内置了两种实现,分别是:
- TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
- GlobalWindow 全局窗口,与时间无关的窗口
如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。
如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。
public class NumberWindow extends Window {private final int min;private final int max;public NumberWindow(int min, int max) {this.min = min;this.max = max;}public int getMin() {return min;}public int getMax() {return max;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;NumberWindow that = (NumberWindow) o;return min == that.min && max == that.max;}@Overridepublic int hashCode() {return Objects.hash(min, max);}@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}
}
Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。
public static class Serializer extends TypeSerializerSingleton<NumberWindow> {@Overridepublic boolean isImmutableType() {return true;}@Overridepublic NumberWindow createInstance() {return new NumberWindow(0, 0);}@Overridepublic NumberWindow copy(NumberWindow numberWindow) {return numberWindow;}@Overridepublic NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {return numberWindow;}@Overridepublic int getLength() {return 8;}@Overridepublic void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(numberWindow.getMin());dataOutputView.writeInt(numberWindow.getMax());}@Overridepublic NumberWindow deserialize(DataInputView dataInputView) throws IOException {return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());}@Overridepublic NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {return this.deserialize(dataInputView);}@Overridepublic void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(dataInputView.readInt());dataOutputView.writeInt(dataInputView.readInt());}@Overridepublic TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {return new TimeWindowSerializerSnapshot();}public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {public TimeWindowSerializerSnapshot() {super(Serializer::new);}}
}
自定义WindowAssigner
窗口对象定义好了,接下来就是定义窗口分配对象。
简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。
public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {private final int startingMedian;private final int startingLarge;public MyWindowAssigner(int startingMedian, int startingLarge) {this.startingMedian = startingMedian;this.startingLarge = startingLarge;}@Overridepublic Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {// 将数字划分到 小数、中位数、大数 窗口NumberWindow window;if (element < startingMedian) {window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);} else if (element < startingLarge) {window = new NumberWindow(startingMedian, startingLarge - 1);} else {window = new NumberWindow(startingLarge, Integer.MAX_VALUE);}return List.of(window);}@Overridepublic Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {return null;}@Overridepublic TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new NumberWindow.Serializer();}@Overridepublic boolean isEventTime() {return false;}
}
把流程串起来
窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。
如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Integer>() {@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextInt(100));}}@Overridepublic void cancel() {}}).keyBy(i -> "all").window(new MyWindowAssigner(20, 80)).trigger(new Trigger<Integer, NumberWindow>() {@Overridepublic TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;if (count < 10) {countState.update(count);return TriggerResult.CONTINUE;}countState.update(0);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {}}).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");int sum = 0;for (Integer value : iterable) {builder.append(value + ",");sum += value;}builder.append("] sum=" + sum);System.err.println(builder.toString());}});environment.execute();
}
运行 Flink 作业,控制台输出:
[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344
尾巴
Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。
相关文章:
Flink窗口分配器WindowAssigner
前言 Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。 WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling wi…...
【Tinymce】富文本编辑器在vue项目中的使用;引入付费格式刷,上传视频、图片
引言 富文本编辑器有很多,对比了一下,还是决定用tinymce(号称宇宙最强),基础的插件确实好用,但是一些更好用的插件,比如格式刷等都是高级版(付费),当然也有人…...
Java实现简单的5阶m序列密钥生成
选择5阶本原多项式:x^5 x^2 1,初始值为{1,0,0,1,1},易得,递推公式为:ak ak-5 ⊕ ak-2 ,其中k≥5。于是可以写出下面这段代码: class BitsEncode {public static void main(String[] args) {//初始化数组…...
013_django基于大数据的高血压人群分析系统2024_dcb7986h_055
目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍:CodeMentor毕业设计领航者、全网关注者30W群落,InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者,博客领航之星、开发者头条/腾讯云/AW…...
OpenCV高级图形用户界面(21)暂停程序执行并等待用户按键输入函数waitKey()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 等待按键 该函数 waitKey 在 delay≤0 时无限等待按键事件,或者在 delay 为正数时等待 delay 毫秒。由于操作系统在切换线程时有最小…...
其他css的用途
1.animation-fill-mode: backwards; //避免了在动画开始前元素的突然显现,动画必要。 2.用rem响应式字体大小,可以在html样式定义font-size?(例10px,62.5%(100%是16px))。然后样式就可以用rem代替px。 3.color: transparent;: 这行代码将文…...
json路径 [‘a‘].b.c[0].d[‘1‘].f,具体代表什么意思
JSON路径是一种用于从JSON对象中提取数据的表达方式。你给出的路径 [a].b.c.d[1].f 代表了如何逐层访问JSON对象中的数据。让我们逐步解析这个路径: [a]: 表示访问JSON对象的根元素中键为 a 的值。使用方括号 [] 通常意味着这个键是一个字符串&#…...
等保测评:如何进行有效的安全合规性审查
等保测评(信息安全等级保护测评)是一项至关重要的安全合规性审查工作,旨在帮助组织保障信息系统的安全性、合规性,有效应对安全风险,提升整体安全防护水平。下面将从等保测评的流程、意义、应用场景,以及实…...
FFmpeg 4.3 音视频-多路H265监控录放C++开发二 : 18.04ubuntu安装,linux 下build ffmpeg 4.3 源码 并测试
测试环境 ubuntu 18.04 64 位,安装vmware and ubuntu 安装后调整 分辨率: 让windows 可以和 linux 互相复制黏贴 sudo apt-get autoremove open-vm-tools sudo apt-get update sudo apt-get install open-vm-tools-desktop 一直Y reboot 依赖安装 sud…...
将两张图片的不同标记出来
差异过于细微,阈值设置不当:您的差异可能是颜色或位置的微小变化,当前的阈值和处理方式可能不足以检测到这些细微差异。 图像配准不够精确:由于两张图片内容高度相似,特征点匹配可能存在误差,导致图像对齐…...
HarmonyOS开发(State模型)
一、State模型概述 FA(Feature Ability)模型:从API 7开始支持的模型,已经不再主推。 Stage模型:从API 9开始新增的模型,是目前主推且会长期演进的模型。在该模型中,由于提供了AbilityStage、Wi…...
在 WPF 中使用 OpenTK:从入门到进阶
一、引言 WPF(Windows Presentation Foundation)是微软推出的用于创建丰富的桌面应用程序用户界面的框架。OpenTK 则为我们提供了强大的图形处理能力,包括 3D 图形渲染、数学计算等功能。将两者结合起来,可以在 WPF 应用程序中实…...
【最新华为OD机试E卷-支持在线评测】水仙花数(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)
🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 💻 ACM金牌🏅️团队 | 大厂实习经历 | 多年算法竞赛经历 ✨ 本系列打算持续跟新华为OD-E/D卷的多语言AC题解 🧩 大部分包含 Python / C / Javascript / Java / Cpp 多语言代码 👏 感谢大家的订阅➕ 和 喜欢�…...
C# WinForm 用名字name字符串查找子控件
工作上遇到界面控件太多,需要对一些控件批量处理。虽然可以用代码批量控制,但要么是建立数组集合把所有要处理的控件放进去循环处理,要么是一个一个列出来修改属性。 但我大多数要求改的控件命名上是有规律的,所有只需要循环拼接字…...
Ubuntu下安装并初始化Git同时添加SSH密钥
在 Ubuntu 上可以使用以下命令安装git: sudo apt-get update sudo apt-get install git 在 Ubuntu 下安装好 Git 之后,接下来可以进行一些基本的配置和操作,以便更好地使用 Git。 1. 配置 Git 用户信息 在使用 Git 进行版本控制前&#x…...
好用的AI工具:探索智能生活的无限可能
💓 博客主页:倔强的石头的CSDN主页 📝Gitee主页:倔强的石头的gitee主页 ⏩ 文章专栏:《热点时事》 期待您的关注 目录 引言 一:常用AI工具 1. 语音助手(如Siri、小爱同学) 2. 智…...
-bash: conda: command not found
-bash: conda: command not found 说明当前的终端环境中没有找到 conda 命令,可能是因为 Conda 没有安装,或者当前的环境变量中没有包含 Conda 的路径。 解决方法 确保 Conda 已安装 确认 Conda 路径是否添加到环境变量 如果 Conda 已安装,…...
STM32-CubeIDE用串口通讯
USART串口通讯 一、轮询模式 1.设置所接引脚为UART异步模式 选择完成CTRLS保存。 2.编写测试代码(自动发送hello world) 在mian函数里面编写代码 原函数 调用函数,需要数据类型一致,使用函数通过串口发送数组里面的数据 打开串…...
FloodFill 算法(DFS)
文章目录 FloodFill 算法(DFS)图像渲染岛屿数量岛屿的最大面积被围绕的区域太平洋大西洋水流问题扫雷游戏衣橱整理 FloodFill 算法(DFS) 漫水填充(Flood Fi)算法是一种图像处理算法,在计算机图形学和计算机视觉中被广泛…...
计算机通信与网络实验笔记
1.LINUX通过版本号判断是否为稳定版本 2.计网基础 (CD),默认二层以太网交换机。 (10)物理层是均分(除以),数据链路层及以上是不除的。 3.传输介质: (1&…...
JavaScript this 关键字详解
JavaScript this 关键字详解 引言 在JavaScript中,this 是一个非常重要的关键字,它用来指代当前执行上下文中的对象。理解 this 的行为和作用域对于编写高效、可维护的JavaScript代码至关重要。本文将深入探讨 this 的概念、用法以及在不同场景下的表现。 什么是 this? …...
告别手动计算!用EB工具链高效配置S32K144的Dio与Port模块
告别手动计算!用EB工具链高效配置S32K144的Dio与Port模块 在汽车电子开发中,S32K1XX系列MCU因其出色的实时性和可靠性成为主流选择。但面对数百个引脚配置,传统手动计算PCR值、逐项填写寄存器的方式不仅效率低下,还容易引入人为错…...
华为元老许映童下周敲钟:思格新能开启招股:估值超100亿美元 高瓴是基石
雷递网 雷建平 4月8日思格新能源(上海)股份有限公司(简称:“思格新能”,股票代码:“06656”)今日开启招股,准备2026年4月16日在港交所上市。思格新能计划发售1357.39万股,…...
OpenCV踩坑记:为什么cv2.imread读‘坏图’不返回None?深度解析JPEG文件结构与解码陷阱
OpenCV图像读取陷阱:JPEG文件损坏时cv2.imread为何不返回None? 在计算机视觉项目开发中,处理JPEG图像时经常会遇到这样的场景:明明系统提示"Premature end of JPEG file"警告,但cv2.imread()却依然返回了一个…...
DS3232 Arduino轻量RTC库:嵌入式时间管理与I²C优化实践
1. DS3232 Arduino库深度解析:面向嵌入式工程师的精简型RTC驱动实践指南1.1 库定位与工程设计哲学DS3232 Arduino库是一个专为嵌入式实时系统优化的轻量级IC实时时钟(RTC)驱动,其核心设计目标并非功能堆砌,而是在资源受…...
VsCode插件避坑指南:我为什么卸载了这些热门插件(附替代方案)
VSCode插件避坑指南:我为什么卸载了这些热门插件(附替代方案) 第一次打开VSCode的插件市场时,那种感觉就像走进了一家琳琅满目的糖果店——每个插件都包装精美,下载量动辄百万,五星好评如潮。但当我真正开始…...
【限时开源】:我们刚交付的三级医院FHIR适配引擎源码(C#/.NET 6+),含动态Profile加载、术语服务桥接、差量同步模块——仅开放72小时
第一章:FHIR适配引擎在三级医院信息系统的战略定位与开源意义FHIR适配引擎并非简单的协议转换中间件,而是三级医院实现跨系统互操作、支撑国家健康医疗大数据平台对接、满足《医疗卫生机构网络安全管理办法》与《电子病历系统功能应用水平分级评价标准》…...
用C语言和EasyX库写一个五子棋,我踩过的这些坑你别再踩了
用C语言和EasyX库写五子棋:那些教科书不会告诉你的实战陷阱 第一次用EasyX库写五子棋时,我以为三天就能搞定,结果花了三周时间调试各种奇葩问题。坐标计算差1个像素导致棋子永远对不齐、鼠标点击识别区域偏差、二维数组越界导致程序崩溃...这…...
IDE战争:VSCode凭什么成为开发者最爱?
——一位软件测试工程师的深度剖析在软件开发工具(IDE)的激烈战场上,Visual Studio Code(简称VSCode)的崛起堪称一个现象。它从一众重量级对手中脱颖而出,俘获了全球超过七成开发者的心。作为一名软件测试工…...
Windows任务栏美化终极指南:如何使用TranslucentTB实现透明化效果
Windows任务栏美化终极指南:如何使用TranslucentTB实现透明化效果 【免费下载链接】TranslucentTB A lightweight utility that makes the Windows taskbar translucent/transparent. 项目地址: https://gitcode.com/gh_mirrors/tr/TranslucentTB 你是否厌倦…...
