说说FLINK细粒度滑动窗口如何处理
分析&回答
Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类,下面的类图示出了Flink能够提供的所有窗口类型。

Flink窗口分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类,本文要说的是滑动窗口。
下图示出一个典型的统计用户访问的滑动窗口,来自官方文档。

假设每两条虚线之间代表1分钟时间差,那么窗口大小(size)就是2分钟,滑动步长(slide)是1分钟。若时间特征为事件时间,代码如下。
dataStream .keyBy("userId") .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1))); 由图可知,当前滑动窗口与上一个滑动窗口会有重叠。在窗口大小size是步长slide的2倍的情况下,(几乎)每个DataStream元素都会处于2个窗口内。
我们简单参考一下相关的Flink源码,以加深理解。以下是窗口算子WindowOperator的processElement()方法的部分源码。
@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);boolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {// 会话窗口的处理逻辑,略去} else {for (W window : elementWindows) {if (isWindowLate(window)) {continue;}isSkippedElement = false;windowState.setCurrentNamespace(window);windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);}}// 最后是侧输出迟到数据的逻辑,略去}
复制代码
该方法先调用WindowAssigner.assignWindows()方法,根据输入元素的时间戳判断它应该属于哪些窗口。接着遍历所有窗口,将该元素加入对应的窗口状态(即缓存)中,并根据触发器返回的TriggerResult决定是输出(fire)还是清除(purge)窗口的内容,emitWindowContents()方法会调用用户函数。最后,还要调用registerCleanupTimer()方法注册计时器用来在窗口彻底过期时清除窗口状态。
以下是SlidingEventTimeWindows.assignWindows()方法的源码。
@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {List<TimeWindow> windows = new ArrayList<>((int) (size / slide));long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);for (long start = lastStart;start > timestamp - size;start -= slide) {windows.add(new TimeWindow(start, start + size));}return windows;} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?");}}public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;}
复制代码
这段代码就不难理解了,先调用getWindowStartWithOffset()方法根据元素的时间戳计算出其窗口的起点时间戳,再逐次循环向后滑动,产生size / slide个窗口。我们可以将size / slide叫做“粒度”,亦即上述代码中返回的Collection集合的大小。粒度越大(“细”),滑动窗口之间的重合也越大。
代码读完了,有一个貌似稀松平常的需求:
以3分钟的频率实时计算App内各个子模块近24小时的PV和UV。
直觉上我们需要用粒度为1440 / 3 = 480的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题,有两点:
状态 由代码可知,WindowOperator内维护了窗口本身的内部状态windowState(类型为InternalAppendingState)。对于一个元素,会将其写入对应的(key, window)二元组所圈定的状态中。可见,如果粒度为480,那么每个元素到来,更新windowState时都要遍历480个窗口并写入,开销是非常大的。在采用HDFS/RocksDB作为状态后端时,checkpoint的瓶颈也尤其明显。
定时器 在Flink中,定时器的实际实现是TimerHeapInternalTimer类,并且是用Flink自己实现的优先队列维护在堆内存中的。而在WindowOperator中,每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如allowedLateness过期)之后及时清理掉窗口的内部状态。细粒度滑动窗口会造成维护的定时器增多,内存负担加重。
在官方文档Windows最后一节的最后,也有如下的提醒:
Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.
可能有看官会问:预聚合不能解决细粒度窗口的问题吗?答案是不能。预聚合只是让AggregateFunction/ReduceFunction之后的数据量降低,但是进入WindowOperator的窗口状态的数据还是没变的。换句话说,就算触发器实现为FIRE_AND_PURGE,遍历大量窗口并写入状态的开销也是无法消除的。
扯了这么多,有解决方案吗?
当然是有的,办法总比困难多。我们一般使用 滚动窗口+在线存储+读时聚合 的思路作为workaround。简单来讲就是:
弃用滑动窗口,用长度等于原滑动窗口步长的滚动窗口代替; 每个滚动窗口将其周期内的数据做聚合,打入外部在线存储(内存数据库如Redis,LSM-based NoSQL存储如HBase); 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。 针对上面的PV/UV问题,如果采用Redis作为在线存储,我们可以将时间戳放在key内,并设定24小时过期时间。用数字字符串存储3分钟周期内的PV量,用HyperLogLog存储3分钟周期内的UV量。近24小时的PV和UV就分别可以通过简单加减和HyperLogLog的pfmerge/pfcount命令得出了。当然,实际操作起来还是要根据需求和服务器资源而定。
喵呜面试助手:一站式解决面试问题,你可以搜索微信小程序 [喵呜面试助手] 或关注 [喵呜刷题] -> 面试助手 免费刷题。如有好的面试知识或技巧期待您的共享!
相关文章:
说说FLINK细粒度滑动窗口如何处理
分析&回答 Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类,下面的类图示出了Flink能够提供的所有窗口类型。 Flink窗口分为滚动(tumbling)、滑动(sliding&am…...
记一次反弹shell的操作【非常简单】
#什么是反弹shell 通常我们对一个开启了80端口的服务器进行访问时,就会建立起与服务器Web服务链接,从而获取到服务器相应的Web服务。而反弹shell是我们开启一个端口进行监听,转而让服务器主动反弹一个shell来连接我们的主机,我们再…...
如何排查 Flink Checkpoint 失败问题?
分析&回答 这是 Flink 相关工作中最常出现的问题,值得大家搞明白。 1. 先找到超时的subtask序号 图有点问题,因为都是成功没失败的,尴尬了。 借图: 2. 找到对应的机器和任务 方法很多,这里看自己习惯和公司提供…...
lazarus(pascal)和c语言读日志文件筛选保存为新文件
lazarus(pascal)和c语言读日志文件筛选保存为新文件,源于看日志每次从一个很多内容文件里查找不方便,写个代码输入时分秒参数,然后按行读取比较日志时间,当前秒和上一秒的输出保存为新文件,只保存2秒钟文件小多了&…...
学习JAVA打卡第四十九天
Random类 尽管可以使用math类调用static方法random()返回一个0~1之间的随机数。(包括0.0但不包括0.1),即随机数的取值范围是[0.0,1.0]的左闭右开区间。 例如,下列代码得到1~100之间…...
Golang数据结构和算法
Golang数据结构和算法 数据的逻辑结构和物理结构常见数据结构及其特点算法的时间复杂度和空间复杂度Golang冒泡排序Golang选择排序Golang插入排序Golang快速排序Golang归并排序Golang二分查找Golang sort包Golang链表Golang container/list标准库Golang栈stackGolang二叉搜索树…...
python 装饰器
装饰器是 Python 中一种功能强大的语法特性,它可以用于在不修改原函数代码的情况下,动态地扩展或修改函数的行为。装饰器本质上是一个函数或类,它接受一个函数作为参数,并返回一个新的函数或类。 下面是装饰器的详细解释和示例&a…...
iOS如何获取设备型号的最新方法总结
每一种 iOS 设备型号都有对应的一个或多个硬件编码/标识符,称为 device model 或者叫 machine name 通常的做法是,先获取设备的 device model 值,再手动映射为具体的设备型号(或者直接把 device model 值传给后端,让后…...
SpringBoot之RestTemplate使用Apache的HttpClient连接池
SpringBoot自带的RestTemplate是没有使用连接池的,只是SimpleClientHttpRequestFactory实现了ClientHttpRequestFactory、AsyncClientHttpRequestFactory 2个工厂接口,因此每次调用接口都会创建连接和销毁连接,如果是高并发场景下会大大降低性…...
第49节:cesium 倾斜模型osgb转3dtiles,并加载(含源码+视频)
结果示例: 完整步骤: 1、启动并登陆cesiumlab 2、准备OSGB模型数据(含下载地址) 链接:https://pan.quark.cn/s/46ac7b0b2bed 提取码:TvWL3、倾斜模型切片 选择倾斜模型data文件夹 空间参考、零点坐标 默认 强制双面关闭、无光照 打开...
零信任安全模型详解:探讨零信任安全策略的原理、实施方法和最佳实践,确保在网络中实现最小特权原则
在当今日益复杂和危险的网络环境中,传统的网络安全模型已经不再能够满足对抗不断进化的威胁。零信任安全模型应运而生,以其强调“不信任,始终验证”的理念,成为了当今信息技术领域中的热门话题。本文将深入探讨零信任安全模型&…...
01_nodejs简介
01 【nodejs简介】 1.前言 Node 的重要性已经不言而喻,很多互联网公司都已经有大量的高性能系统运行在 Node 之上。Node 凭借其单线程、异步等举措实现了极高的性能基准。此外,目前最为流行的 Web 开发模式是前后端分离的形式,即前端开发者…...
企业架构LNMP学习笔记4
企业服务器LNMP环境搭建: 常见的软件架构: 1)C/S: client/server 2)B/S: browser/server 不管是C还是B,都是属于客户端属于前端。那么运维人员主要是负责和管理的Server端,也统称为服务器端。为了快速的…...
探索UniApp分包
目录 什么是UniApp分包? UniApp分包的原理 优势 如何使用UniApp分包 1.manifest.json文件配置 2.静态图片资源分包注意事项 3.pages.json配置 结论 探索UniApp分包:优化移动应用性能与用户体验 在移动应用开发领域,性能和用户体验是至…...
uniapp 支持图片放大
<view class"list" v-for"(item, index) in urls" :key"index"><image :src"item" click"viewImg(item, index)" disabled></image></view> js // 预览大图 viewImg(data, index) {uni.previewImag…...
Oracle数据泵备份恢复(导出导入)详细语句
数据泵备份 查询已存在备份目录 select * from dba_directories;新建备份目录 create directory dbbak as /u01/dbbak;注意:在本地新建对应的物理目录 给指定用户赋权 grant read, write on directory dbbak to testuser; 或者直接把目录的权限设置为公开 g…...
【JS案例】JS实现积分抽奖(内附源码)
JS案例实现积分抽奖 🌟效果展示 🌟HTML结构 🌟CSS样式 🌟实现思路 🌟具体实现 1.定义抽奖次数渲染 2.点击抽奖按钮,实现滚动抽奖效果 3.弹窗处理 🌟完整代码 🌟写在最后 dz…...
angular抛出 ExpressionChangedAfterItHasBeenCheckedError错误分析
当变更检测完成后又更改了表达式值时,Angular 就会抛出 ExpressionChangedAfterItHasBeenCheckedError 错误。Angular 只会在开发模式下抛出此错误。 在开发模式下,Angular 在每次变更检测运行后都会执行一次附加检查,以确保绑定没有更改。这…...
动态链接库的__declspec(dllexport)关键字的概念
在 Windows 操作系统下,创建一个动态链接库(DLL)项目时,您需要通过 __declspec(dllexport) 关键字来显式地标记希望在 DLL 中 公开 的函数、类、变量等符号。这是因为在默认情况下,编译器会将函数和符号视为 私有&…...
群晖NAS:DS Video、Jellyfin等视频电影电视剧海报、背景墙搜刮器
群晖NAS:DS Video、Jellyfin等视频电影电视剧海报、背景墙搜刮器 本文只使用豆瓣插件方式,系统默认的 The Movie Database 好注册,但是授权码输入后域名不通过,很麻烦。 1、插件地址: https://www.aliyundrive.com/s…...
告别繁琐标注:Windows上最轻量级实时屏幕画笔工具完全指南
告别繁琐标注:Windows上最轻量级实时屏幕画笔工具完全指南 【免费下载链接】gInk An easy to use on-screen annotation software inspired by Epic Pen. 项目地址: https://gitcode.com/gh_mirrors/gi/gInk 你是否曾在视频会议中手忙脚乱地寻找标注工具&…...
终极Python金融数据接口:3步掌握免费高效的A股数据获取方案
终极Python金融数据接口:3步掌握免费高效的A股数据获取方案 【免费下载链接】mootdx 通达信数据读取的一个简便使用封装 项目地址: https://gitcode.com/GitHub_Trending/mo/mootdx 在金融数据分析和量化交易领域,获取准确、及时且成本可控的市场…...
NXP LPC2000中断向量校验和机制与Keil实现
1. NXP LPC2000设备向量校验和机制解析在嵌入式开发领域,NXP LPC2000系列微控制器以其ARM7内核和丰富的外设资源广受欢迎。这类设备有一个独特的启动要求——中断向量表的校验和验证机制。具体来说,地址0x00000014处(ARM保留的中断向量位置&a…...
嵌入式Linux UVC驱动开发:DWC2控制器与处理单元数据流详解
1. 项目概述:从DWC2控制器到UVC处理单元在嵌入式Linux系统里搞USB摄像头驱动开发,尤其是用DWC2这种集成在SoC里的USB控制器,UVC(USB Video Class)驱动的“处理单元”绝对是个绕不开的核心。很多朋友在移植或调试摄像头…...
Claude Desktop for Linux桌面集成:.desktop文件与MIME类型配置
Claude Desktop for Linux桌面集成:.desktop文件与MIME类型配置 【免费下载链接】claude-desktop-debian Claude Desktop for Linux 项目地址: https://gitcode.com/GitHub_Trending/cl/claude-desktop-debian Claude Desktop for Linux是一款强大的桌面应用…...
兄弟反目成仇?《易经》深挖人性:猜疑才是最大祸根
你有没有过这样的经历?关系最好的朋友或同事,因为一个误会,突然就成了“最熟悉的陌生人”。你解释,他觉得你掩饰;你沉默,他觉得你默认。最后,好好的关系,硬生生被“猜疑”这把刀&…...
技术人的英语能力如何影响薪资?数据说话
打开任何一个招聘平台,搜索“软件测试工程师”,你会发现一个越来越普遍的现象。对于那些薪资范围宽、技术描述详尽、公司名号响亮的岗位,末尾往往会附上一句:“英语可作为工作语言”、“英文读写能力优异”、“CET-6以上优先”。这…...
Solaar 4.0:解锁罗技设备的完整Linux管理体验
Solaar 4.0:解锁罗技设备的完整Linux管理体验 【免费下载链接】Solaar Linux device manager for Logitech devices 项目地址: https://gitcode.com/gh_mirrors/so/Solaar 你是否曾为管理多款罗技无线设备而烦恼?不同设备需要不同的配置工具&…...
神作《盲视》,最硬核的反人类科幻,二十年前预言了AI的冰冷本质
哎呀好久不更新了,半夜睡不着起来随便写点,免得账号被回收了。《盲视》是是加拿大科幻作家彼得沃茨的一部硬科幻经典,入围雨果奖、轨迹奖、坎贝尔奖。但它也是一本阅读门槛很高阅读体验很差的小说。其不适感一部分来自它晦涩的文风和叙事方式…...
Topit:Mac窗口置顶终极指南 - 三步打造高效多任务工作环境
Topit:Mac窗口置顶终极指南 - 三步打造高效多任务工作环境 【免费下载链接】Topit Pin any window to the top of your screen / 在Mac上将你的任何窗口强制置顶 项目地址: https://gitcode.com/gh_mirrors/to/Topit 还在为Mac上频繁切换窗口而烦恼吗&#x…...
