20250124 Flink中 窗口开始时间和結束時間
增量聚合的 ProcessWindowFunction #
ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。
你也可以对过时的 WindowFunction 使用增量聚合。
使用 ReduceFunction 增量聚合 #
- 下例展示了如何将
ReduceFunction与ProcessWindowFunction组合,返回窗口中的最小元素和窗口的开始时间。
DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitionsprivate static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}
通俗解释:窗口开始时间的作用
我们可以用一个更贴近生活的例子来理解 窗口开始时间 的意义。
场景比喻:每天上午的「温度统计报告
假设你有一个气象站,每5分钟记录一次户外温度。现在需要 每小时(例如8:00-9:00)统计一次该时段内的最低温度,并在报告中标注这个小时段的起始时间(如“8:00-9:00的最低温度是15°C”)。
关键点
-
窗口开始时间:就是时间段的起点(如8:00)。
-
窗口结束时间:就是时间段的终点(如9:00)。
-
为什么要记录开始时间?
方便人类理解数据属于哪个时段(比如“8点档”的数据)。
代码示例解析
1. 窗口如何划分?
假设使用 滚动窗口(Tumbling Window),每1小时划分一次:
Copy
8:00-9:00 → 窗口1 9:00-10:00 → 窗口2
所有时间戳在8:00≤t<9:00的数据会被分配到窗口1。
2. 窗口触发计算的时机
当系统时间(或事件时间)到达9:00时,窗口1关闭,触发计算:
-
调用
MyReduceFunction找出该窗口内的最低温度。 -
调用
MyProcessWindowFunction将结果与窗口开始时间(8:00)绑定。
3. 为什么输出的是开始时间(8:00)而不是结束时间(9:00)?
-
业务需求:通常更关心数据所属时段的起点(例如“8点档的数据”)。
-
避免歧义:如果输出9:00,可能被误解为“9点档的数据”(实际是8:00-9:00的数据)。
代码中具体如何获取开始时间?
在 MyProcessWindowFunction 中:
context.window().getStart(); // 返回窗口的起始时间戳(如8:00对应的毫秒值)
-
context对象:包含窗口的元信息(起止时间、触发时间等)。 -
实际输出时:将时间戳转换为人类可读格式(如
8:00)。
常见疑问解答
Q1:如果数据延迟到达(比如8:59的数据在9:05才到),会进入哪个窗口?
-
取决于时间语义:
-
若使用 事件时间(Event Time):按数据自带的时间戳分配到8:00-9:00窗口。
-
若使用 处理时间(Processing Time):按到达系统的时间分配到9:00-10:00窗口。
-
(示例代码未显式设置时间语义,默认可能是处理时间)
-
Q2:窗口开始时间是如何计算的?
-
由窗口分配器(Window Assigner)决定:
-
滚动窗口按固定间隔对齐(如整点)。
-
滑动窗口按步长对齐(如每30分钟滑动一次的1小时窗口)。
-
会话窗口根据数据活跃度动态划分。
-
Q3:可以同时输出开始时间和结束时间吗?
可以!修改 ProcessWindowFunction:
out.collect(new Tuple3<>(context.window().getStart(), context.window().getEnd(), min));
总结
-
窗口开始时间 标记了数据所属时间段的起点(如“8:00档”)。
-
在 Flink 中,通过
ProcessWindowFunction的context可以轻松获取这一信息。 -
这种设计让数据处理结果更易理解(如统计报告、监控仪表盘)。
相关文章:
20250124 Flink中 窗口开始时间和結束時間
增量聚合的 ProcessWindowFunction # ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的…...
Android Studio安装配置
一、注意事项 想做安卓app和开发板通信,踩了大坑,Android 开发不是下载了就能直接开发的,对于新手需要注意的如下: 1、Android Studio版本,根据自己的Android Studio版本对应决定了你所兼容的AGP(Android…...
设计模式Python版 单例模式
文章目录 前言一、单例模式二、单例模式实现方式三、单例模式示例四、单例模式在Django框架的应用 前言 GOF设计模式分三大类: 创建型模式:关注对象的创建过程,包括单例模式、简单工厂模式、工厂方法模式、抽象工厂模式、原型模式和建造者模…...
7-Zip高危漏洞CVE-2025-0411:解析与修复
7-Zip高危漏洞CVE-2025-0411:解析与修复 免责声明 本系列工具仅供安全专业人员进行已授权环境使用,此工具所提供的功能只为网络安全人员对自己所负责的网站、服务器等(包括但不限于)进行检测或维护参考,未经授权请勿利…...
python实现http文件服务器访问下载
//1.py import http.server import socketserver import os import threading import sys# 获取当前脚本所在的目录 DIRECTORY os.path.dirname(os.path.abspath(__file__))# 设置服务器的端口 PORT 8000# 自定义Handler,将根目录设置为脚本所在目录 class MyHTT…...
《一文讲透》第4期:KWDB 数据库运维(6)—— 容灾与备份
一、KWDB 容灾 WAL 概述 KWDB 采用预写式日志(Write-Ahead Logging,WAL),记录每个时序表的模式变更和数据变更,以实现时序数据库的数据灾难恢复、时序数据的一致性和原子性。 KWDB 默认会将保存在 WAL 日志缓存中的…...
ArcGIS10.2 许可License点击始终启动无响应的解决办法及正常启动的前提
1、问题描述 在ArcGIS License Administrator中,手动点击“启动”无响应;且在计算机管理-服务中,无ArcGIS License 或者License的启动、停止、禁止等均为灰色,无法操作。 2、解决方法 ①通过cmd对service.txt进行手动服务的启动…...
Level2逐笔成交逐笔委托毫秒记录:今日分享优质股票数据20250124
逐笔成交逐笔委托下载 链接: https://pan.baidu.com/s/1UWVY11Q1IOfME9itDN5aZA?pwdhgeg 提取码: hgeg Level2逐笔成交逐笔委托数据分享下载 通过Level2逐笔成交与逐笔委托的详细数据,这种以毫秒为单位的信息能揭示许多关键点,如庄家意图、误导性行为…...
概率密度函数(PDF)分布函数(CDF)——直方图累积直方图——直方图规定化的数学基础
对于连续型随机变量,分布函数(Cumulative Distribution Function, CDF)是概率密度函数(Probability Density Function, PDF)的变上限积分,概率密度函数是分布函数的导函数。 如果我们有一个连续型随机变量…...
YOLOv5训练自己的数据及rknn部署
YOLOv5训练自己的数据及rknn部署 一、下载源码二、准备自己的数据集2.1 标注图像2.2 数据集结构 三、配置YOLOv5训练3.1 修改配置文件3.2 模型选择 四、训练五、测试六、部署6.1 pt转onnx6.2 onnx转rknn 七、常见错误7.1 训练过程中的错误7.1.1 cuda: out of memory7.1.2 train…...
计算机图形学:实验四 带纹理的OBJ文件读取和显示
一、程序功能设计 在程序中读取带纹理的obj文件,载入相应的纹理图片文件,将带纹理的模型显示在程序窗口中。实现带纹理的OBJ文件读取与显示功能,具体设计如下: OBJ文件解析与数据存储 通过实现TriMesh类中的readObj函数&#x…...
SQL Server 使用SELECT INTO实现表备份
在数据库管理过程中,有时我们需要对表进行备份,以防数据丢失或修改错误。在 SQL Server 中,可以使用 SELECT INTO 语句将数据从一个表备份到另一个表。 备份表的 SQL 语法: SELECT * INTO 【备份表名】 FROM 【要备份的表】 SEL…...
【线性代数】基础版本的高斯消元法
[精确算法] 高斯消元法求线性方程组 线性方程组 考虑线性方程组, 已知 A ∈ R n , n , b ∈ R n A\in \mathbb{R}^{n,n},b\in \mathbb{R}^n A∈Rn,n,b∈Rn, 求未知 x ∈ R n x\in \mathbb{R}^n x∈Rn A 1 , 1 x 1 A 1 , 2 x 2 ⋯ A 1 , n x n b 1…...
Python标准库 threading 的 start 和 join 的使用
python 的多线程机制可以的适用场景不适合与计算密集型的,因为 GIL 的存在,多线程在处理计算密集型时,实际上也是串行的,因为每个时刻只有一个线程可以获得 GIL,但是对于 IO 处理来说,不管是网络IO还是文件…...
无公网IP 外网访问媒体服务器 Emby
Emby 是一款多媒体服务器软件,用户可以在 Emby 创建自己的个人多媒体娱乐中心,并且可以跨多个设备访问自己的媒体库。它允许用户管理传输自己的媒体内容,比如电影、电视节目、音乐和照片等。 本文将详细的介绍如何利用 Docker 在本地部署 Emb…...
【数据结构】_顺序表
目录 1. 概念与结构 1.1 静态顺序表 1.2 动态顺序表 2. 动态顺序表实现 2.1 SeqList.h 2.2 SeqList.c 2.3 Test_SeqList.c 3. 顺序表性能分析 线性表是n个具有相同特性的数据元素的有限序列。 常见的线性表有:顺序表、链表、栈、队列、字符串等;…...
[MySQL]数据库表内容的增删查改操作大全
目录 一、增加表数据 1.全列插入与指定列插入 2.多行数据插入 3.更新与替换插入 二、查看表数据 1.全列查询与指定列查询 2.查询表达式字段 3.为查询结果起别名 4.结果去重 5.WHERE条件 6.结果排序 7.筛选分页结果 8.插入查询的结果 9.group by子句 三、修改表数…...
解决双系统引导问题:Ubuntu 启动时不显示 Windows 选项的处理方法
方法 1:检查 GRUB 引导菜单是否隐藏 启动进入 Ubuntu 系统。打开终端,输入以下命令编辑 GRUB 配置文件:sudo nano /etc/default/grub检查以下配置项: GRUB_TIMEOUT0:如果是 0,将其改为一个较大的值&#x…...
Java面试题2025-Spring
讲师:邓澎波 Spring面试专题 1.Spring应该很熟悉吧?来介绍下你的Spring的理解 1.1 Spring的发展历程 先介绍Spring是怎么来的,发展中有哪些核心的节点,当前的最新版本是什么等 通过上图可以比较清晰的看到Spring的各个时间版本对…...
CentOS7安装使用containerd
一,安装 1.1、安装containerd 下载 https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-linux-amd64.tar.gz wget https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-…...
地震勘探——干扰波识别、井中地震时距曲线特点
目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波:可以用来解决所提出的地质任务的波;干扰波:所有妨碍辨认、追踪有效波的其他波。 地震勘探中,有效波和干扰波是相对的。例如,在反射波…...
工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...
剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...
令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
python执行测试用例,allure报乱码且未成功生成报告
allure执行测试用例时显示乱码:‘allure’ �����ڲ����ⲿ���Ҳ���ǿ�&am…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...
