Flink窗口分配器WindowAssigner
前言
Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。
WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。
当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。
WindowAssigner
先看一下 WindowAssigner 抽象类的定义:
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;public WindowAssigner() {}public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);public Trigger<T, W> getDefaultTrigger() {return this.getDefaultTrigger(new StreamExecutionEnvironment());}/** @deprecated */@Deprecatedpublic abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);public abstract boolean isEventTime();@PublicEvolvingpublic abstract static class WindowAssignerContext {public WindowAssignerContext() {}public abstract long getCurrentProcessingTime();}
}
四个方法,作用如下:
- assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
- getDefaultTrigger 返回默认的窗口触发器 Trigger
- getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
- isEventTime 是否基于事件时间语义
Flink 内置的 WindowAssigner 实现类关系图如下:
首先,可以按照基于何种时间语义划分出三大类:
- 基于事件时间语义
- 基于处理时间语义
- 不基于时间语义 --> GlobalWindows
在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:
- 滚动窗口分配算法 tumbling windows
- 滑动窗口分配算法 sliding windows
- 会话窗口分配算法 session windows
定义窗口Window
窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window
,Flink 内置了两种实现,分别是:
- TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
- GlobalWindow 全局窗口,与时间无关的窗口
如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。
如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。
public class NumberWindow extends Window {private final int min;private final int max;public NumberWindow(int min, int max) {this.min = min;this.max = max;}public int getMin() {return min;}public int getMax() {return max;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;NumberWindow that = (NumberWindow) o;return min == that.min && max == that.max;}@Overridepublic int hashCode() {return Objects.hash(min, max);}@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}
}
Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。
public static class Serializer extends TypeSerializerSingleton<NumberWindow> {@Overridepublic boolean isImmutableType() {return true;}@Overridepublic NumberWindow createInstance() {return new NumberWindow(0, 0);}@Overridepublic NumberWindow copy(NumberWindow numberWindow) {return numberWindow;}@Overridepublic NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {return numberWindow;}@Overridepublic int getLength() {return 8;}@Overridepublic void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(numberWindow.getMin());dataOutputView.writeInt(numberWindow.getMax());}@Overridepublic NumberWindow deserialize(DataInputView dataInputView) throws IOException {return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());}@Overridepublic NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {return this.deserialize(dataInputView);}@Overridepublic void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(dataInputView.readInt());dataOutputView.writeInt(dataInputView.readInt());}@Overridepublic TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {return new TimeWindowSerializerSnapshot();}public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {public TimeWindowSerializerSnapshot() {super(Serializer::new);}}
}
自定义WindowAssigner
窗口对象定义好了,接下来就是定义窗口分配对象。
简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。
public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {private final int startingMedian;private final int startingLarge;public MyWindowAssigner(int startingMedian, int startingLarge) {this.startingMedian = startingMedian;this.startingLarge = startingLarge;}@Overridepublic Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {// 将数字划分到 小数、中位数、大数 窗口NumberWindow window;if (element < startingMedian) {window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);} else if (element < startingLarge) {window = new NumberWindow(startingMedian, startingLarge - 1);} else {window = new NumberWindow(startingLarge, Integer.MAX_VALUE);}return List.of(window);}@Overridepublic Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {return null;}@Overridepublic TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new NumberWindow.Serializer();}@Overridepublic boolean isEventTime() {return false;}
}
把流程串起来
窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。
如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Integer>() {@Overridepublic void run(SourceContext<Integer> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextInt(100));}}@Overridepublic void cancel() {}}).keyBy(i -> "all").window(new MyWindowAssigner(20, 80)).trigger(new Trigger<Integer, NumberWindow>() {@Overridepublic TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;if (count < 10) {countState.update(count);return TriggerResult.CONTINUE;}countState.update(0);return TriggerResult.FIRE_AND_PURGE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {}}).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");int sum = 0;for (Integer value : iterable) {builder.append(value + ",");sum += value;}builder.append("] sum=" + sum);System.err.println(builder.toString());}});environment.execute();
}
运行 Flink 作业,控制台输出:
[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344
尾巴
Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。
相关文章:

Flink窗口分配器WindowAssigner
前言 Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。 WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling wi…...

【Tinymce】富文本编辑器在vue项目中的使用;引入付费格式刷,上传视频、图片
引言 富文本编辑器有很多,对比了一下,还是决定用tinymce(号称宇宙最强),基础的插件确实好用,但是一些更好用的插件,比如格式刷等都是高级版(付费),当然也有人…...

Java实现简单的5阶m序列密钥生成
选择5阶本原多项式:x^5 x^2 1,初始值为{1,0,0,1,1},易得,递推公式为:ak ak-5 ⊕ ak-2 ,其中k≥5。于是可以写出下面这段代码: class BitsEncode {public static void main(String[] args) {//初始化数组…...

013_django基于大数据的高血压人群分析系统2024_dcb7986h_055
目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍:CodeMentor毕业设计领航者、全网关注者30W群落,InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者,博客领航之星、开发者头条/腾讯云/AW…...

OpenCV高级图形用户界面(21)暂停程序执行并等待用户按键输入函数waitKey()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 等待按键 该函数 waitKey 在 delay≤0 时无限等待按键事件,或者在 delay 为正数时等待 delay 毫秒。由于操作系统在切换线程时有最小…...

其他css的用途
1.animation-fill-mode: backwards; //避免了在动画开始前元素的突然显现,动画必要。 2.用rem响应式字体大小,可以在html样式定义font-size?(例10px,62.5%(100%是16px))。然后样式就可以用rem代替px。 3.color: transparent;: 这行代码将文…...

json路径 [‘a‘].b.c[0].d[‘1‘].f,具体代表什么意思
JSON路径是一种用于从JSON对象中提取数据的表达方式。你给出的路径 [a].b.c.d[1].f 代表了如何逐层访问JSON对象中的数据。让我们逐步解析这个路径: [a]: 表示访问JSON对象的根元素中键为 a 的值。使用方括号 [] 通常意味着这个键是一个字符串&#…...

等保测评:如何进行有效的安全合规性审查
等保测评(信息安全等级保护测评)是一项至关重要的安全合规性审查工作,旨在帮助组织保障信息系统的安全性、合规性,有效应对安全风险,提升整体安全防护水平。下面将从等保测评的流程、意义、应用场景,以及实…...

FFmpeg 4.3 音视频-多路H265监控录放C++开发二 : 18.04ubuntu安装,linux 下build ffmpeg 4.3 源码 并测试
测试环境 ubuntu 18.04 64 位,安装vmware and ubuntu 安装后调整 分辨率: 让windows 可以和 linux 互相复制黏贴 sudo apt-get autoremove open-vm-tools sudo apt-get update sudo apt-get install open-vm-tools-desktop 一直Y reboot 依赖安装 sud…...

将两张图片的不同标记出来
差异过于细微,阈值设置不当:您的差异可能是颜色或位置的微小变化,当前的阈值和处理方式可能不足以检测到这些细微差异。 图像配准不够精确:由于两张图片内容高度相似,特征点匹配可能存在误差,导致图像对齐…...

HarmonyOS开发(State模型)
一、State模型概述 FA(Feature Ability)模型:从API 7开始支持的模型,已经不再主推。 Stage模型:从API 9开始新增的模型,是目前主推且会长期演进的模型。在该模型中,由于提供了AbilityStage、Wi…...

在 WPF 中使用 OpenTK:从入门到进阶
一、引言 WPF(Windows Presentation Foundation)是微软推出的用于创建丰富的桌面应用程序用户界面的框架。OpenTK 则为我们提供了强大的图形处理能力,包括 3D 图形渲染、数学计算等功能。将两者结合起来,可以在 WPF 应用程序中实…...

【最新华为OD机试E卷-支持在线评测】水仙花数(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)
🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 💻 ACM金牌🏅️团队 | 大厂实习经历 | 多年算法竞赛经历 ✨ 本系列打算持续跟新华为OD-E/D卷的多语言AC题解 🧩 大部分包含 Python / C / Javascript / Java / Cpp 多语言代码 👏 感谢大家的订阅➕ 和 喜欢�…...

C# WinForm 用名字name字符串查找子控件
工作上遇到界面控件太多,需要对一些控件批量处理。虽然可以用代码批量控制,但要么是建立数组集合把所有要处理的控件放进去循环处理,要么是一个一个列出来修改属性。 但我大多数要求改的控件命名上是有规律的,所有只需要循环拼接字…...

Ubuntu下安装并初始化Git同时添加SSH密钥
在 Ubuntu 上可以使用以下命令安装git: sudo apt-get update sudo apt-get install git 在 Ubuntu 下安装好 Git 之后,接下来可以进行一些基本的配置和操作,以便更好地使用 Git。 1. 配置 Git 用户信息 在使用 Git 进行版本控制前&#x…...

好用的AI工具:探索智能生活的无限可能
💓 博客主页:倔强的石头的CSDN主页 📝Gitee主页:倔强的石头的gitee主页 ⏩ 文章专栏:《热点时事》 期待您的关注 目录 引言 一:常用AI工具 1. 语音助手(如Siri、小爱同学) 2. 智…...

-bash: conda: command not found
-bash: conda: command not found 说明当前的终端环境中没有找到 conda 命令,可能是因为 Conda 没有安装,或者当前的环境变量中没有包含 Conda 的路径。 解决方法 确保 Conda 已安装 确认 Conda 路径是否添加到环境变量 如果 Conda 已安装,…...

STM32-CubeIDE用串口通讯
USART串口通讯 一、轮询模式 1.设置所接引脚为UART异步模式 选择完成CTRLS保存。 2.编写测试代码(自动发送hello world) 在mian函数里面编写代码 原函数 调用函数,需要数据类型一致,使用函数通过串口发送数组里面的数据 打开串…...

FloodFill 算法(DFS)
文章目录 FloodFill 算法(DFS)图像渲染岛屿数量岛屿的最大面积被围绕的区域太平洋大西洋水流问题扫雷游戏衣橱整理 FloodFill 算法(DFS) 漫水填充(Flood Fi)算法是一种图像处理算法,在计算机图形学和计算机视觉中被广泛…...

计算机通信与网络实验笔记
1.LINUX通过版本号判断是否为稳定版本 2.计网基础 (CD),默认二层以太网交换机。 (10)物理层是均分(除以),数据链路层及以上是不除的。 3.传输介质: (1&…...

闲聊【干龙头】的重要性
市场面临转势,我们不知道谁会先涨,资金量大的操作必然会提前布局,而我们需要做的就是睁大眼睛,等待最强的那只股票出现,然后闭着眼睛进入就可以了。 追涨操作为什么都出现在大盘大涨情况下。原因简单,不能确…...

Ubuntu22.04安装RTX3080
Ubuntu22.04安装RTX3080 1 安装基础环境 更新依赖包 sudo apt-get update sudo apt-get upgrade2 安装驱动 (1)查看适合的显卡驱动 # 查看可用的驱动 sudo ubuntu-drivers devices# 返回值,推荐版本:nvidia-driver-550 ERROR…...

嵌入式学习-IO进程-Day04
嵌入式学习-IO进程-Day04 进程的函数接口 fork和Vfork 回收进程资源 wait waitpid 退出进程 获取进程号(getpid,getppid) 守护进程 守护进程的特点 创建步骤 exec函数族 线程 概念 线程和进程的区别 线程资源 线程函数接口 创建线程ÿ…...

RAII - 安卓中的智能指针
RAII - 安卓中的智能指针 概念 sp wp RefBase 是什么 system/core/libutils/RefBase.cpp system/core/libutils/include/utils/RefBase.hsystem/core/libutils/StrongPointer.cpp system/core/libutils/include/utils/StrongPointer.hAndroid在标准库之外,自定义…...

linux--库指令
ldd ldd 可执行文件路径 显示依赖的库的查找路径以及是否查找到了。...

展讯方案-内置多张开机logo
1. 开机图片的资源存放在logo分区中,这个分区中可以存放一个xx.bmp文件,也可以存放一个bin文件(1logo.bin,包含多张压缩的图片集合) 2.平台代码中logo.bin是由mk_1ogo_img.py脚本打包,具体如下(…...

Stable Diffusion模型资源合集(附整合包)
(模型资源在ComfyUI、WebUI以及ForgeUI中都通用) 之前的Stable Diffusion笔记受到了不少小伙伴的关注,很感谢大家的建议和支持。有很多小伙伴私信我问我一些AI绘画的模型资源在哪来下载,一般来说有两个网站比较常用,分…...

机器学习|Pytorch实现天气预测
机器学习|Pytorch实现天气预测 🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 电脑系统:Windows11 显卡型号:NVIDIA Quadro P620 语言环境:python 3.9.7 编译器&#x…...

【Kuberntes】k8s权限管理
文章目录 权限管理概述核心概念配置RBAC创建Role和ClusterRole创建RoleBinding和ClusterRoleBinding 默认角色和角色绑定权限的实现注意事项 如何在 Kubernetes 中实现 RBAC 的细粒度权限控制?1. Role和ClusterRole2. RoleBinding和ClusterRoleBinding3. 配置RBAC4.…...

C++,STL 033(24.10.15)
内容 queue容器(队列)的常用接口。 代码 #include <iostream> #include <string> #include <queue> // 注意包含queue容器(队列)的头文件using namespace std;class Person { public:string m_Name;int m_Age…...