Flink窗口分配器WindowAssigner
前言
Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。
WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。
当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。
WindowAssigner
先看一下 WindowAssigner 抽象类的定义:
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;public WindowAssigner() {}public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);public Trigger<T, W> getDefaultTrigger() {return this.getDefaultTrigger(new StreamExecutionEnvironment());}/** @deprecated */@Deprecatedpublic abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);public abstract boolean isEventTime();@PublicEvolvingpublic abstract static class WindowAssignerContext {public WindowAssignerContext() {}public abstract long getCurrentProcessingTime();}
}
四个方法,作用如下:
- assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
- getDefaultTrigger 返回默认的窗口触发器 Trigger
- getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
- isEventTime 是否基于事件时间语义
Flink 内置的 WindowAssigner 实现类关系图如下:

首先,可以按照基于何种时间语义划分出三大类:
- 基于事件时间语义
- 基于处理时间语义
- 不基于时间语义 --> GlobalWindows
在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:
- 滚动窗口分配算法 tumbling windows
- 滑动窗口分配算法 sliding windows
- 会话窗口分配算法 session windows
定义窗口Window
窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window,Flink 内置了两种实现,分别是:
- TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
- GlobalWindow 全局窗口,与时间无关的窗口
如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。
如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。
public class NumberWindow extends Window {private final int min;private final int max;public NumberWindow(int min, int max) {this.min = min;this.max = max;}public int getMin() {return min;}public int getMax() {return max;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;NumberWindow that = (NumberWindow) o;return min == that.min && max == that.max;}@Overridepublic int hashCode() {return Objects.hash(min, max);}@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}
}
Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。
public static class Serializer extends TypeSerializerSingleton<NumberWindow> {@Overridepublic boolean isImmutableType() {return true;}@Overridepublic NumberWindow createInstance() {return new NumberWindow(0, 0);}@Overridepublic NumberWindow copy(NumberWindow numberWindow) {return numberWindow;}@Overridepublic NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {return numberWindow;}@Overridepublic int getLength() {return 8;}@Overridepublic void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(numberWindow.getMin());dataOutputView.writeInt(numberWindow.getMax());}@Overridepublic NumberWindow deserialize(DataInputView dataInputView) throws IOException {return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());}@Overridepublic NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {return this.deserialize(dataInputView);}@Overridepublic void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(dataInputView.readInt());dataOutputView.writeInt(dataInputView.readInt());}@Overridepublic TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {return new TimeWindowSerializerSnapshot();}public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {public TimeWindowSerializerSnapshot() {super(Serializer::new);}}
}
自定义WindowAssigner
窗口对象定义好了,接下来就是定义窗口分配对象。
简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。
public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {private final int startingMedian;private final int startingLarge;public MyWindowAssigner(int startingMedian, int startingLarge) {this.startingMedian = startingMedian;this.startingLarge = startingLarge;}@Overridepublic Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {// 将数字划分到 小数、中位数、大数 窗口NumberWindow window;if (element < startingMedian) {window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);} else if (element < startingLarge) {window = new NumberWindow(startingMedian, startingLarge - 1);} else {window = new NumberWindow(startingLarge, Integer.MAX_VALUE);}return List.of(window);}@Overridepublic Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {return null;}@Overridepublic TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new NumberWindow.Serializer();}@Overridepublic boolean isEventTime() {return false;}
}
把流程串起来
窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。
如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Integer>() {@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextInt(100));}}@Overridepublic void cancel() {}}).keyBy(i -> "all").window(new MyWindowAssigner(20, 80)).trigger(new Trigger<Integer, NumberWindow>() {@Overridepublic TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;if (count < 10) {countState.update(count);return TriggerResult.CONTINUE;}countState.update(0);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {}}).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");int sum = 0;for (Integer value : iterable) {builder.append(value + ",");sum += value;}builder.append("] sum=" + sum);System.err.println(builder.toString());}});environment.execute();
}
运行 Flink 作业,控制台输出:
[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344
尾巴
Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。
相关文章:
Flink窗口分配器WindowAssigner
前言 Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。 WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling wi…...
【Tinymce】富文本编辑器在vue项目中的使用;引入付费格式刷,上传视频、图片
引言 富文本编辑器有很多,对比了一下,还是决定用tinymce(号称宇宙最强),基础的插件确实好用,但是一些更好用的插件,比如格式刷等都是高级版(付费),当然也有人…...
Java实现简单的5阶m序列密钥生成
选择5阶本原多项式:x^5 x^2 1,初始值为{1,0,0,1,1},易得,递推公式为:ak ak-5 ⊕ ak-2 ,其中k≥5。于是可以写出下面这段代码: class BitsEncode {public static void main(String[] args) {//初始化数组…...
013_django基于大数据的高血压人群分析系统2024_dcb7986h_055
目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍:CodeMentor毕业设计领航者、全网关注者30W群落,InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者,博客领航之星、开发者头条/腾讯云/AW…...
OpenCV高级图形用户界面(21)暂停程序执行并等待用户按键输入函数waitKey()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 等待按键 该函数 waitKey 在 delay≤0 时无限等待按键事件,或者在 delay 为正数时等待 delay 毫秒。由于操作系统在切换线程时有最小…...
其他css的用途
1.animation-fill-mode: backwards; //避免了在动画开始前元素的突然显现,动画必要。 2.用rem响应式字体大小,可以在html样式定义font-size?(例10px,62.5%(100%是16px))。然后样式就可以用rem代替px。 3.color: transparent;: 这行代码将文…...
json路径 [‘a‘].b.c[0].d[‘1‘].f,具体代表什么意思
JSON路径是一种用于从JSON对象中提取数据的表达方式。你给出的路径 [a].b.c.d[1].f 代表了如何逐层访问JSON对象中的数据。让我们逐步解析这个路径: [a]: 表示访问JSON对象的根元素中键为 a 的值。使用方括号 [] 通常意味着这个键是一个字符串&#…...
等保测评:如何进行有效的安全合规性审查
等保测评(信息安全等级保护测评)是一项至关重要的安全合规性审查工作,旨在帮助组织保障信息系统的安全性、合规性,有效应对安全风险,提升整体安全防护水平。下面将从等保测评的流程、意义、应用场景,以及实…...
FFmpeg 4.3 音视频-多路H265监控录放C++开发二 : 18.04ubuntu安装,linux 下build ffmpeg 4.3 源码 并测试
测试环境 ubuntu 18.04 64 位,安装vmware and ubuntu 安装后调整 分辨率: 让windows 可以和 linux 互相复制黏贴 sudo apt-get autoremove open-vm-tools sudo apt-get update sudo apt-get install open-vm-tools-desktop 一直Y reboot 依赖安装 sud…...
将两张图片的不同标记出来
差异过于细微,阈值设置不当:您的差异可能是颜色或位置的微小变化,当前的阈值和处理方式可能不足以检测到这些细微差异。 图像配准不够精确:由于两张图片内容高度相似,特征点匹配可能存在误差,导致图像对齐…...
HarmonyOS开发(State模型)
一、State模型概述 FA(Feature Ability)模型:从API 7开始支持的模型,已经不再主推。 Stage模型:从API 9开始新增的模型,是目前主推且会长期演进的模型。在该模型中,由于提供了AbilityStage、Wi…...
在 WPF 中使用 OpenTK:从入门到进阶
一、引言 WPF(Windows Presentation Foundation)是微软推出的用于创建丰富的桌面应用程序用户界面的框架。OpenTK 则为我们提供了强大的图形处理能力,包括 3D 图形渲染、数学计算等功能。将两者结合起来,可以在 WPF 应用程序中实…...
【最新华为OD机试E卷-支持在线评测】水仙花数(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)
🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 💻 ACM金牌🏅️团队 | 大厂实习经历 | 多年算法竞赛经历 ✨ 本系列打算持续跟新华为OD-E/D卷的多语言AC题解 🧩 大部分包含 Python / C / Javascript / Java / Cpp 多语言代码 👏 感谢大家的订阅➕ 和 喜欢�…...
C# WinForm 用名字name字符串查找子控件
工作上遇到界面控件太多,需要对一些控件批量处理。虽然可以用代码批量控制,但要么是建立数组集合把所有要处理的控件放进去循环处理,要么是一个一个列出来修改属性。 但我大多数要求改的控件命名上是有规律的,所有只需要循环拼接字…...
Ubuntu下安装并初始化Git同时添加SSH密钥
在 Ubuntu 上可以使用以下命令安装git: sudo apt-get update sudo apt-get install git 在 Ubuntu 下安装好 Git 之后,接下来可以进行一些基本的配置和操作,以便更好地使用 Git。 1. 配置 Git 用户信息 在使用 Git 进行版本控制前&#x…...
好用的AI工具:探索智能生活的无限可能
💓 博客主页:倔强的石头的CSDN主页 📝Gitee主页:倔强的石头的gitee主页 ⏩ 文章专栏:《热点时事》 期待您的关注 目录 引言 一:常用AI工具 1. 语音助手(如Siri、小爱同学) 2. 智…...
-bash: conda: command not found
-bash: conda: command not found 说明当前的终端环境中没有找到 conda 命令,可能是因为 Conda 没有安装,或者当前的环境变量中没有包含 Conda 的路径。 解决方法 确保 Conda 已安装 确认 Conda 路径是否添加到环境变量 如果 Conda 已安装,…...
STM32-CubeIDE用串口通讯
USART串口通讯 一、轮询模式 1.设置所接引脚为UART异步模式 选择完成CTRLS保存。 2.编写测试代码(自动发送hello world) 在mian函数里面编写代码 原函数 调用函数,需要数据类型一致,使用函数通过串口发送数组里面的数据 打开串…...
FloodFill 算法(DFS)
文章目录 FloodFill 算法(DFS)图像渲染岛屿数量岛屿的最大面积被围绕的区域太平洋大西洋水流问题扫雷游戏衣橱整理 FloodFill 算法(DFS) 漫水填充(Flood Fi)算法是一种图像处理算法,在计算机图形学和计算机视觉中被广泛…...
计算机通信与网络实验笔记
1.LINUX通过版本号判断是否为稳定版本 2.计网基础 (CD),默认二层以太网交换机。 (10)物理层是均分(除以),数据链路层及以上是不除的。 3.传输介质: (1&…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
遍历 Map 类型集合的方法汇总
1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...
音视频——I2S 协议详解
I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议,专门用于在数字音频设备之间传输数字音频数据。它由飞利浦(Philips)公司开发,以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...
20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
【笔记】WSL 中 Rust 安装与测试完整记录
#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统:Ubuntu 24.04 LTS (WSL2)架构:x86_64 (GNU/Linux)Rust 版本:rustc 1.87.0 (2025-05-09)Cargo 版本:cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...
【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...
LLMs 系列实操科普(1)
写在前面: 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容,原视频时长 ~130 分钟,以实操演示主流的一些 LLMs 的使用,由于涉及到实操,实际上并不适合以文字整理,但还是决定尽量整理一份笔…...
