当前位置: 首页 > news >正文

说说FLINK细粒度滑动窗口如何处理

分析&回答

Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类,下面的类图示出了Flink能够提供的所有窗口类型。

Flink窗口分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类,本文要说的是滑动窗口。

下图示出一个典型的统计用户访问的滑动窗口,来自官方文档。

假设每两条虚线之间代表1分钟时间差,那么窗口大小(size)就是2分钟,滑动步长(slide)是1分钟。若时间特征为事件时间,代码如下。

dataStream .keyBy("userId") .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1))); 由图可知,当前滑动窗口与上一个滑动窗口会有重叠。在窗口大小size是步长slide的2倍的情况下,(几乎)每个DataStream元素都会处于2个窗口内。

我们简单参考一下相关的Flink源码,以加深理解。以下是窗口算子WindowOperator的processElement()方法的部分源码。

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);boolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {// 会话窗口的处理逻辑,略去} else {for (W window : elementWindows) {if (isWindowLate(window)) {continue;}isSkippedElement = false;windowState.setCurrentNamespace(window);windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);}}// 最后是侧输出迟到数据的逻辑,略去}
复制代码

该方法先调用WindowAssigner.assignWindows()方法,根据输入元素的时间戳判断它应该属于哪些窗口。接着遍历所有窗口,将该元素加入对应的窗口状态(即缓存)中,并根据触发器返回的TriggerResult决定是输出(fire)还是清除(purge)窗口的内容,emitWindowContents()方法会调用用户函数。最后,还要调用registerCleanupTimer()方法注册计时器用来在窗口彻底过期时清除窗口状态。

以下是SlidingEventTimeWindows.assignWindows()方法的源码。

    @Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {List<TimeWindow> windows = new ArrayList<>((int) (size / slide));long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);for (long start = lastStart;start > timestamp - size;start -= slide) {windows.add(new TimeWindow(start, start + size));}return windows;} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?");}}public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;}
复制代码

这段代码就不难理解了,先调用getWindowStartWithOffset()方法根据元素的时间戳计算出其窗口的起点时间戳,再逐次循环向后滑动,产生size / slide个窗口。我们可以将size / slide叫做“粒度”,亦即上述代码中返回的Collection集合的大小。粒度越大(“细”),滑动窗口之间的重合也越大。

代码读完了,有一个貌似稀松平常的需求:

以3分钟的频率实时计算App内各个子模块近24小时的PV和UV。

直觉上我们需要用粒度为1440 / 3 = 480的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题,有两点:

状态 由代码可知,WindowOperator内维护了窗口本身的内部状态windowState(类型为InternalAppendingState)。对于一个元素,会将其写入对应的(key, window)二元组所圈定的状态中。可见,如果粒度为480,那么每个元素到来,更新windowState时都要遍历480个窗口并写入,开销是非常大的。在采用HDFS/RocksDB作为状态后端时,checkpoint的瓶颈也尤其明显。

定时器 在Flink中,定时器的实际实现是TimerHeapInternalTimer类,并且是用Flink自己实现的优先队列维护在堆内存中的。而在WindowOperator中,每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如allowedLateness过期)之后及时清理掉窗口的内部状态。细粒度滑动窗口会造成维护的定时器增多,内存负担加重。

在官方文档Windows最后一节的最后,也有如下的提醒:

Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

可能有看官会问:预聚合不能解决细粒度窗口的问题吗?答案是不能。预聚合只是让AggregateFunction/ReduceFunction之后的数据量降低,但是进入WindowOperator的窗口状态的数据还是没变的。换句话说,就算触发器实现为FIRE_AND_PURGE,遍历大量窗口并写入状态的开销也是无法消除的。

扯了这么多,有解决方案吗?

当然是有的,办法总比困难多。我们一般使用 滚动窗口+在线存储+读时聚合 的思路作为workaround。简单来讲就是:

弃用滑动窗口,用长度等于原滑动窗口步长的滚动窗口代替; 每个滚动窗口将其周期内的数据做聚合,打入外部在线存储(内存数据库如Redis,LSM-based NoSQL存储如HBase); 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。 针对上面的PV/UV问题,如果采用Redis作为在线存储,我们可以将时间戳放在key内,并设定24小时过期时间。用数字字符串存储3分钟周期内的PV量,用HyperLogLog存储3分钟周期内的UV量。近24小时的PV和UV就分别可以通过简单加减和HyperLogLog的pfmerge/pfcount命令得出了。当然,实际操作起来还是要根据需求和服务器资源而定。

喵呜面试助手:一站式解决面试问题,你可以搜索微信小程序 [喵呜面试助手] 或关注 [喵呜刷题] -> 面试助手 免费刷题。如有好的面试知识或技巧期待您的共享!

相关文章:

说说FLINK细粒度滑动窗口如何处理

分析&回答 Flink的窗口机制是其底层核心之一&#xff0c;也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类&#xff0c;下面的类图示出了Flink能够提供的所有窗口类型。 Flink窗口分为滚动&#xff08;tumbling&#xff09;、滑动&#xff08;sliding&am…...

记一次反弹shell的操作【非常简单】

#什么是反弹shell 通常我们对一个开启了80端口的服务器进行访问时&#xff0c;就会建立起与服务器Web服务链接&#xff0c;从而获取到服务器相应的Web服务。而反弹shell是我们开启一个端口进行监听&#xff0c;转而让服务器主动反弹一个shell来连接我们的主机&#xff0c;我们再…...

如何排查 Flink Checkpoint 失败问题?

分析&回答 这是 Flink 相关工作中最常出现的问题&#xff0c;值得大家搞明白。 1. 先找到超时的subtask序号 图有点问题&#xff0c;因为都是成功没失败的&#xff0c;尴尬了。 借图&#xff1a; 2. 找到对应的机器和任务 方法很多&#xff0c;这里看自己习惯和公司提供…...

lazarus(pascal)和c语言读日志文件筛选保存为新文件

lazarus(pascal)和c语言读日志文件筛选保存为新文件&#xff0c;源于看日志每次从一个很多内容文件里查找不方便&#xff0c;写个代码输入时分秒参数&#xff0c;然后按行读取比较日志时间&#xff0c;当前秒和上一秒的输出保存为新文件&#xff0c;只保存2秒钟文件小多了&…...

学习JAVA打卡第四十九天

Random类 尽管可以使用math类调用static方法random&#xff08;&#xff09;返回一个0~1之间的随机数。&#xff08;包括0.0但不包括0.1&#xff09;&#xff0c;即随机数的取值范围是[0.0&#xff0c;1.0]的左闭右开区间。 例如&#xff0c;下列代码得到1&#xff5e;100之间…...

Golang数据结构和算法

Golang数据结构和算法 数据的逻辑结构和物理结构常见数据结构及其特点算法的时间复杂度和空间复杂度Golang冒泡排序Golang选择排序Golang插入排序Golang快速排序Golang归并排序Golang二分查找Golang sort包Golang链表Golang container/list标准库Golang栈stackGolang二叉搜索树…...

python 装饰器

装饰器是 Python 中一种功能强大的语法特性&#xff0c;它可以用于在不修改原函数代码的情况下&#xff0c;动态地扩展或修改函数的行为。装饰器本质上是一个函数或类&#xff0c;它接受一个函数作为参数&#xff0c;并返回一个新的函数或类。 下面是装饰器的详细解释和示例&a…...

iOS如何获取设备型号的最新方法总结

每一种 iOS 设备型号都有对应的一个或多个硬件编码/标识符&#xff0c;称为 device model 或者叫 machine name 通常的做法是&#xff0c;先获取设备的 device model 值&#xff0c;再手动映射为具体的设备型号&#xff08;或者直接把 device model 值传给后端&#xff0c;让后…...

SpringBoot之RestTemplate使用Apache的HttpClient连接池

SpringBoot自带的RestTemplate是没有使用连接池的&#xff0c;只是SimpleClientHttpRequestFactory实现了ClientHttpRequestFactory、AsyncClientHttpRequestFactory 2个工厂接口&#xff0c;因此每次调用接口都会创建连接和销毁连接&#xff0c;如果是高并发场景下会大大降低性…...

第49节:cesium 倾斜模型osgb转3dtiles,并加载(含源码+视频)

结果示例: 完整步骤: 1、启动并登陆cesiumlab 2、准备OSGB模型数据(含下载地址) 链接:https://pan.quark.cn/s/46ac7b0b2bed 提取码:TvWL3、倾斜模型切片 选择倾斜模型data文件夹 空间参考、零点坐标 默认 强制双面关闭、无光照 打开...

零信任安全模型详解:探讨零信任安全策略的原理、实施方法和最佳实践,确保在网络中实现最小特权原则

在当今日益复杂和危险的网络环境中&#xff0c;传统的网络安全模型已经不再能够满足对抗不断进化的威胁。零信任安全模型应运而生&#xff0c;以其强调“不信任&#xff0c;始终验证”的理念&#xff0c;成为了当今信息技术领域中的热门话题。本文将深入探讨零信任安全模型&…...

01_nodejs简介

01 【nodejs简介】 1.前言 Node 的重要性已经不言而喻&#xff0c;很多互联网公司都已经有大量的高性能系统运行在 Node 之上。Node 凭借其单线程、异步等举措实现了极高的性能基准。此外&#xff0c;目前最为流行的 Web 开发模式是前后端分离的形式&#xff0c;即前端开发者…...

企业架构LNMP学习笔记4

企业服务器LNMP环境搭建&#xff1a; 常见的软件架构&#xff1a; 1&#xff09;C/S: client/server 2&#xff09;B/S: browser/server 不管是C还是B&#xff0c;都是属于客户端属于前端。那么运维人员主要是负责和管理的Server端&#xff0c;也统称为服务器端。为了快速的…...

探索UniApp分包

目录 什么是UniApp分包&#xff1f; UniApp分包的原理 优势 如何使用UniApp分包 1.manifest.json文件配置 2.静态图片资源分包注意事项 3.pages.json配置 结论 探索UniApp分包&#xff1a;优化移动应用性能与用户体验 在移动应用开发领域&#xff0c;性能和用户体验是至…...

uniapp 支持图片放大

<view class"list" v-for"(item, index) in urls" :key"index"><image :src"item" click"viewImg(item, index)" disabled></image></view> js // 预览大图 viewImg(data, index) {uni.previewImag…...

Oracle数据泵备份恢复(导出导入)详细语句

数据泵备份 查询已存在备份目录 select * from dba_directories;新建备份目录 create directory dbbak as /u01/dbbak;注意&#xff1a;在本地新建对应的物理目录 给指定用户赋权 grant read, write on directory dbbak to testuser; 或者直接把目录的权限设置为公开 g…...

【JS案例】JS实现积分抽奖(内附源码)

JS案例实现积分抽奖 &#x1f31f;效果展示 &#x1f31f;HTML结构 &#x1f31f;CSS样式 &#x1f31f;实现思路 &#x1f31f;具体实现 1.定义抽奖次数渲染 2.点击抽奖按钮,实现滚动抽奖效果 3.弹窗处理 &#x1f31f;完整代码 &#x1f31f;写在最后 &#x1f3…...

angular抛出 ExpressionChangedAfterItHasBeenCheckedError错误分析

当变更检测完成后又更改了表达式值时&#xff0c;Angular 就会抛出 ExpressionChangedAfterItHasBeenCheckedError 错误。Angular 只会在开发模式下抛出此错误。 在开发模式下&#xff0c;Angular 在每次变更检测运行后都会执行一次附加检查&#xff0c;以确保绑定没有更改。这…...

动态链接库的__declspec(dllexport)关键字的概念

在 Windows 操作系统下&#xff0c;创建一个动态链接库&#xff08;DLL&#xff09;项目时&#xff0c;您需要通过 __declspec(dllexport) 关键字来显式地标记希望在 DLL 中 公开 的函数、类、变量等符号。这是因为在默认情况下&#xff0c;编译器会将函数和符号视为 私有&…...

群晖NAS:DS Video、Jellyfin等视频电影电视剧海报、背景墙搜刮器

群晖NAS&#xff1a;DS Video、Jellyfin等视频电影电视剧海报、背景墙搜刮器 本文只使用豆瓣插件方式&#xff0c;系统默认的 The Movie Database 好注册&#xff0c;但是授权码输入后域名不通过&#xff0c;很麻烦。 1、插件地址&#xff1a; https://www.aliyundrive.com/s…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止

<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet&#xff1a; https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…...

API网关Kong的鉴权与限流:高并发场景下的核心实践

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 引言 在微服务架构中&#xff0c;API网关承担着流量调度、安全防护和协议转换的核心职责。作为云原生时代的代表性网关&#xff0c;Kong凭借其插件化架构…...

Ubuntu系统多网卡多相机IP设置方法

目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机&#xff0c;交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息&#xff0c;系统版本&#xff1a;Ubuntu22.04.5 LTS&#xff1b;内核版本…...