说说FLINK细粒度滑动窗口如何处理
分析&回答
Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类,下面的类图示出了Flink能够提供的所有窗口类型。

Flink窗口分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类,本文要说的是滑动窗口。
下图示出一个典型的统计用户访问的滑动窗口,来自官方文档。

假设每两条虚线之间代表1分钟时间差,那么窗口大小(size)就是2分钟,滑动步长(slide)是1分钟。若时间特征为事件时间,代码如下。
dataStream .keyBy("userId") .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1))); 由图可知,当前滑动窗口与上一个滑动窗口会有重叠。在窗口大小size是步长slide的2倍的情况下,(几乎)每个DataStream元素都会处于2个窗口内。
我们简单参考一下相关的Flink源码,以加深理解。以下是窗口算子WindowOperator的processElement()方法的部分源码。
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);boolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {// 会话窗口的处理逻辑,略去} else {for (W window : elementWindows) {if (isWindowLate(window)) {continue;}isSkippedElement = false;windowState.setCurrentNamespace(window);windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);}}// 最后是侧输出迟到数据的逻辑,略去}
复制代码
该方法先调用WindowAssigner.assignWindows()方法,根据输入元素的时间戳判断它应该属于哪些窗口。接着遍历所有窗口,将该元素加入对应的窗口状态(即缓存)中,并根据触发器返回的TriggerResult决定是输出(fire)还是清除(purge)窗口的内容,emitWindowContents()方法会调用用户函数。最后,还要调用registerCleanupTimer()方法注册计时器用来在窗口彻底过期时清除窗口状态。
以下是SlidingEventTimeWindows.assignWindows()方法的源码。
@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {List<TimeWindow> windows = new ArrayList<>((int) (size / slide));long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);for (long start = lastStart;start > timestamp - size;start -= slide) {windows.add(new TimeWindow(start, start + size));}return windows;} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?");}}public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;}
复制代码
这段代码就不难理解了,先调用getWindowStartWithOffset()方法根据元素的时间戳计算出其窗口的起点时间戳,再逐次循环向后滑动,产生size / slide个窗口。我们可以将size / slide叫做“粒度”,亦即上述代码中返回的Collection集合的大小。粒度越大(“细”),滑动窗口之间的重合也越大。
代码读完了,有一个貌似稀松平常的需求:
以3分钟的频率实时计算App内各个子模块近24小时的PV和UV。
直觉上我们需要用粒度为1440 / 3 = 480的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题,有两点:
状态 由代码可知,WindowOperator内维护了窗口本身的内部状态windowState(类型为InternalAppendingState)。对于一个元素,会将其写入对应的(key, window)二元组所圈定的状态中。可见,如果粒度为480,那么每个元素到来,更新windowState时都要遍历480个窗口并写入,开销是非常大的。在采用HDFS/RocksDB作为状态后端时,checkpoint的瓶颈也尤其明显。
定时器 在Flink中,定时器的实际实现是TimerHeapInternalTimer类,并且是用Flink自己实现的优先队列维护在堆内存中的。而在WindowOperator中,每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如allowedLateness过期)之后及时清理掉窗口的内部状态。细粒度滑动窗口会造成维护的定时器增多,内存负担加重。
在官方文档Windows最后一节的最后,也有如下的提醒:
Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.
可能有看官会问:预聚合不能解决细粒度窗口的问题吗?答案是不能。预聚合只是让AggregateFunction/ReduceFunction之后的数据量降低,但是进入WindowOperator的窗口状态的数据还是没变的。换句话说,就算触发器实现为FIRE_AND_PURGE,遍历大量窗口并写入状态的开销也是无法消除的。
扯了这么多,有解决方案吗?
当然是有的,办法总比困难多。我们一般使用 滚动窗口+在线存储+读时聚合 的思路作为workaround。简单来讲就是:
弃用滑动窗口,用长度等于原滑动窗口步长的滚动窗口代替; 每个滚动窗口将其周期内的数据做聚合,打入外部在线存储(内存数据库如Redis,LSM-based NoSQL存储如HBase); 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。 针对上面的PV/UV问题,如果采用Redis作为在线存储,我们可以将时间戳放在key内,并设定24小时过期时间。用数字字符串存储3分钟周期内的PV量,用HyperLogLog存储3分钟周期内的UV量。近24小时的PV和UV就分别可以通过简单加减和HyperLogLog的pfmerge/pfcount命令得出了。当然,实际操作起来还是要根据需求和服务器资源而定。
喵呜面试助手:一站式解决面试问题,你可以搜索微信小程序 [喵呜面试助手] 或关注 [喵呜刷题] -> 面试助手 免费刷题。如有好的面试知识或技巧期待您的共享!
相关文章:
说说FLINK细粒度滑动窗口如何处理
分析&回答 Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类,下面的类图示出了Flink能够提供的所有窗口类型。 Flink窗口分为滚动(tumbling)、滑动(sliding&am…...
记一次反弹shell的操作【非常简单】
#什么是反弹shell 通常我们对一个开启了80端口的服务器进行访问时,就会建立起与服务器Web服务链接,从而获取到服务器相应的Web服务。而反弹shell是我们开启一个端口进行监听,转而让服务器主动反弹一个shell来连接我们的主机,我们再…...
如何排查 Flink Checkpoint 失败问题?
分析&回答 这是 Flink 相关工作中最常出现的问题,值得大家搞明白。 1. 先找到超时的subtask序号 图有点问题,因为都是成功没失败的,尴尬了。 借图: 2. 找到对应的机器和任务 方法很多,这里看自己习惯和公司提供…...
lazarus(pascal)和c语言读日志文件筛选保存为新文件
lazarus(pascal)和c语言读日志文件筛选保存为新文件,源于看日志每次从一个很多内容文件里查找不方便,写个代码输入时分秒参数,然后按行读取比较日志时间,当前秒和上一秒的输出保存为新文件,只保存2秒钟文件小多了&…...
学习JAVA打卡第四十九天
Random类 尽管可以使用math类调用static方法random()返回一个0~1之间的随机数。(包括0.0但不包括0.1),即随机数的取值范围是[0.0,1.0]的左闭右开区间。 例如,下列代码得到1~100之间…...
Golang数据结构和算法
Golang数据结构和算法 数据的逻辑结构和物理结构常见数据结构及其特点算法的时间复杂度和空间复杂度Golang冒泡排序Golang选择排序Golang插入排序Golang快速排序Golang归并排序Golang二分查找Golang sort包Golang链表Golang container/list标准库Golang栈stackGolang二叉搜索树…...
python 装饰器
装饰器是 Python 中一种功能强大的语法特性,它可以用于在不修改原函数代码的情况下,动态地扩展或修改函数的行为。装饰器本质上是一个函数或类,它接受一个函数作为参数,并返回一个新的函数或类。 下面是装饰器的详细解释和示例&a…...
iOS如何获取设备型号的最新方法总结
每一种 iOS 设备型号都有对应的一个或多个硬件编码/标识符,称为 device model 或者叫 machine name 通常的做法是,先获取设备的 device model 值,再手动映射为具体的设备型号(或者直接把 device model 值传给后端,让后…...
SpringBoot之RestTemplate使用Apache的HttpClient连接池
SpringBoot自带的RestTemplate是没有使用连接池的,只是SimpleClientHttpRequestFactory实现了ClientHttpRequestFactory、AsyncClientHttpRequestFactory 2个工厂接口,因此每次调用接口都会创建连接和销毁连接,如果是高并发场景下会大大降低性…...
第49节:cesium 倾斜模型osgb转3dtiles,并加载(含源码+视频)
结果示例: 完整步骤: 1、启动并登陆cesiumlab 2、准备OSGB模型数据(含下载地址) 链接:https://pan.quark.cn/s/46ac7b0b2bed 提取码:TvWL3、倾斜模型切片 选择倾斜模型data文件夹 空间参考、零点坐标 默认 强制双面关闭、无光照 打开...
零信任安全模型详解:探讨零信任安全策略的原理、实施方法和最佳实践,确保在网络中实现最小特权原则
在当今日益复杂和危险的网络环境中,传统的网络安全模型已经不再能够满足对抗不断进化的威胁。零信任安全模型应运而生,以其强调“不信任,始终验证”的理念,成为了当今信息技术领域中的热门话题。本文将深入探讨零信任安全模型&…...
01_nodejs简介
01 【nodejs简介】 1.前言 Node 的重要性已经不言而喻,很多互联网公司都已经有大量的高性能系统运行在 Node 之上。Node 凭借其单线程、异步等举措实现了极高的性能基准。此外,目前最为流行的 Web 开发模式是前后端分离的形式,即前端开发者…...
企业架构LNMP学习笔记4
企业服务器LNMP环境搭建: 常见的软件架构: 1)C/S: client/server 2)B/S: browser/server 不管是C还是B,都是属于客户端属于前端。那么运维人员主要是负责和管理的Server端,也统称为服务器端。为了快速的…...
探索UniApp分包
目录 什么是UniApp分包? UniApp分包的原理 优势 如何使用UniApp分包 1.manifest.json文件配置 2.静态图片资源分包注意事项 3.pages.json配置 结论 探索UniApp分包:优化移动应用性能与用户体验 在移动应用开发领域,性能和用户体验是至…...
uniapp 支持图片放大
<view class"list" v-for"(item, index) in urls" :key"index"><image :src"item" click"viewImg(item, index)" disabled></image></view> js // 预览大图 viewImg(data, index) {uni.previewImag…...
Oracle数据泵备份恢复(导出导入)详细语句
数据泵备份 查询已存在备份目录 select * from dba_directories;新建备份目录 create directory dbbak as /u01/dbbak;注意:在本地新建对应的物理目录 给指定用户赋权 grant read, write on directory dbbak to testuser; 或者直接把目录的权限设置为公开 g…...
【JS案例】JS实现积分抽奖(内附源码)
JS案例实现积分抽奖 🌟效果展示 🌟HTML结构 🌟CSS样式 🌟实现思路 🌟具体实现 1.定义抽奖次数渲染 2.点击抽奖按钮,实现滚动抽奖效果 3.弹窗处理 🌟完整代码 🌟写在最后 dz…...
angular抛出 ExpressionChangedAfterItHasBeenCheckedError错误分析
当变更检测完成后又更改了表达式值时,Angular 就会抛出 ExpressionChangedAfterItHasBeenCheckedError 错误。Angular 只会在开发模式下抛出此错误。 在开发模式下,Angular 在每次变更检测运行后都会执行一次附加检查,以确保绑定没有更改。这…...
动态链接库的__declspec(dllexport)关键字的概念
在 Windows 操作系统下,创建一个动态链接库(DLL)项目时,您需要通过 __declspec(dllexport) 关键字来显式地标记希望在 DLL 中 公开 的函数、类、变量等符号。这是因为在默认情况下,编译器会将函数和符号视为 私有&…...
群晖NAS:DS Video、Jellyfin等视频电影电视剧海报、背景墙搜刮器
群晖NAS:DS Video、Jellyfin等视频电影电视剧海报、背景墙搜刮器 本文只使用豆瓣插件方式,系统默认的 The Movie Database 好注册,但是授权码输入后域名不通过,很麻烦。 1、插件地址: https://www.aliyundrive.com/s…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?
大家好,欢迎来到《云原生核心技术》系列的第七篇! 在上一篇,我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在,我们就像一个拥有了一块崭新数字土地的农场主,是时…...
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...
通过MicroSip配置自己的freeswitch服务器进行调试记录
之前用docker安装的freeswitch的,启动是正常的, 但用下面的Microsip连接不上 主要原因有可能一下几个 1、通过下面命令可以看 [rootlocalhost default]# docker exec -it freeswitch fs_cli -x "sofia status profile internal"Name …...
Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...
VisualXML全新升级 | 新增数据库编辑功能
VisualXML是一个功能强大的网络总线设计工具,专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑(如DBC、LDF、ARXML、HEX等),并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...
