20250124 Flink中 窗口开始时间和結束時間
增量聚合的 ProcessWindowFunction #
ProcessWindowFunction
可以与 ReduceFunction
或 AggregateFunction
搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction
将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。
你也可以对过时的 WindowFunction
使用增量聚合。
使用 ReduceFunction 增量聚合 #
- 下例展示了如何将
ReduceFunction
与ProcessWindowFunction
组合,返回窗口中的最小元素和窗口的开始时间。
DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitionsprivate static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}
通俗解释:窗口开始时间的作用
我们可以用一个更贴近生活的例子来理解 窗口开始时间 的意义。
场景比喻:每天上午的「温度统计报告
假设你有一个气象站,每5分钟记录一次户外温度。现在需要 每小时(例如8:00-9:00)统计一次该时段内的最低温度,并在报告中标注这个小时段的起始时间(如“8:00-9:00的最低温度是15°C”)。
关键点
-
窗口开始时间:就是时间段的起点(如8:00)。
-
窗口结束时间:就是时间段的终点(如9:00)。
-
为什么要记录开始时间?
方便人类理解数据属于哪个时段(比如“8点档”的数据)。
代码示例解析
1. 窗口如何划分?
假设使用 滚动窗口(Tumbling Window),每1小时划分一次:
Copy
8:00-9:00 → 窗口1 9:00-10:00 → 窗口2
所有时间戳在8:00≤t<9:00的数据会被分配到窗口1。
2. 窗口触发计算的时机
当系统时间(或事件时间)到达9:00时,窗口1关闭,触发计算:
-
调用
MyReduceFunction
找出该窗口内的最低温度。 -
调用
MyProcessWindowFunction
将结果与窗口开始时间(8:00)绑定。
3. 为什么输出的是开始时间(8:00)而不是结束时间(9:00)?
-
业务需求:通常更关心数据所属时段的起点(例如“8点档的数据”)。
-
避免歧义:如果输出9:00,可能被误解为“9点档的数据”(实际是8:00-9:00的数据)。
代码中具体如何获取开始时间?
在 MyProcessWindowFunction
中:
context.window().getStart(); // 返回窗口的起始时间戳(如8:00对应的毫秒值)
-
context
对象:包含窗口的元信息(起止时间、触发时间等)。 -
实际输出时:将时间戳转换为人类可读格式(如
8:00
)。
常见疑问解答
Q1:如果数据延迟到达(比如8:59的数据在9:05才到),会进入哪个窗口?
-
取决于时间语义:
-
若使用 事件时间(Event Time):按数据自带的时间戳分配到8:00-9:00窗口。
-
若使用 处理时间(Processing Time):按到达系统的时间分配到9:00-10:00窗口。
-
(示例代码未显式设置时间语义,默认可能是处理时间)
-
Q2:窗口开始时间是如何计算的?
-
由窗口分配器(Window Assigner)决定:
-
滚动窗口按固定间隔对齐(如整点)。
-
滑动窗口按步长对齐(如每30分钟滑动一次的1小时窗口)。
-
会话窗口根据数据活跃度动态划分。
-
Q3:可以同时输出开始时间和结束时间吗?
可以!修改 ProcessWindowFunction
:
out.collect(new Tuple3<>(context.window().getStart(), context.window().getEnd(), min));
总结
-
窗口开始时间 标记了数据所属时间段的起点(如“8:00档”)。
-
在 Flink 中,通过
ProcessWindowFunction
的context
可以轻松获取这一信息。 -
这种设计让数据处理结果更易理解(如统计报告、监控仪表盘)。
相关文章:

20250124 Flink中 窗口开始时间和結束時間
增量聚合的 ProcessWindowFunction # ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的…...

Android Studio安装配置
一、注意事项 想做安卓app和开发板通信,踩了大坑,Android 开发不是下载了就能直接开发的,对于新手需要注意的如下: 1、Android Studio版本,根据自己的Android Studio版本对应决定了你所兼容的AGP(Android…...

设计模式Python版 单例模式
文章目录 前言一、单例模式二、单例模式实现方式三、单例模式示例四、单例模式在Django框架的应用 前言 GOF设计模式分三大类: 创建型模式:关注对象的创建过程,包括单例模式、简单工厂模式、工厂方法模式、抽象工厂模式、原型模式和建造者模…...

7-Zip高危漏洞CVE-2025-0411:解析与修复
7-Zip高危漏洞CVE-2025-0411:解析与修复 免责声明 本系列工具仅供安全专业人员进行已授权环境使用,此工具所提供的功能只为网络安全人员对自己所负责的网站、服务器等(包括但不限于)进行检测或维护参考,未经授权请勿利…...

python实现http文件服务器访问下载
//1.py import http.server import socketserver import os import threading import sys# 获取当前脚本所在的目录 DIRECTORY os.path.dirname(os.path.abspath(__file__))# 设置服务器的端口 PORT 8000# 自定义Handler,将根目录设置为脚本所在目录 class MyHTT…...

《一文讲透》第4期:KWDB 数据库运维(6)—— 容灾与备份
一、KWDB 容灾 WAL 概述 KWDB 采用预写式日志(Write-Ahead Logging,WAL),记录每个时序表的模式变更和数据变更,以实现时序数据库的数据灾难恢复、时序数据的一致性和原子性。 KWDB 默认会将保存在 WAL 日志缓存中的…...

ArcGIS10.2 许可License点击始终启动无响应的解决办法及正常启动的前提
1、问题描述 在ArcGIS License Administrator中,手动点击“启动”无响应;且在计算机管理-服务中,无ArcGIS License 或者License的启动、停止、禁止等均为灰色,无法操作。 2、解决方法 ①通过cmd对service.txt进行手动服务的启动…...

Level2逐笔成交逐笔委托毫秒记录:今日分享优质股票数据20250124
逐笔成交逐笔委托下载 链接: https://pan.baidu.com/s/1UWVY11Q1IOfME9itDN5aZA?pwdhgeg 提取码: hgeg Level2逐笔成交逐笔委托数据分享下载 通过Level2逐笔成交与逐笔委托的详细数据,这种以毫秒为单位的信息能揭示许多关键点,如庄家意图、误导性行为…...

概率密度函数(PDF)分布函数(CDF)——直方图累积直方图——直方图规定化的数学基础
对于连续型随机变量,分布函数(Cumulative Distribution Function, CDF)是概率密度函数(Probability Density Function, PDF)的变上限积分,概率密度函数是分布函数的导函数。 如果我们有一个连续型随机变量…...

YOLOv5训练自己的数据及rknn部署
YOLOv5训练自己的数据及rknn部署 一、下载源码二、准备自己的数据集2.1 标注图像2.2 数据集结构 三、配置YOLOv5训练3.1 修改配置文件3.2 模型选择 四、训练五、测试六、部署6.1 pt转onnx6.2 onnx转rknn 七、常见错误7.1 训练过程中的错误7.1.1 cuda: out of memory7.1.2 train…...

计算机图形学:实验四 带纹理的OBJ文件读取和显示
一、程序功能设计 在程序中读取带纹理的obj文件,载入相应的纹理图片文件,将带纹理的模型显示在程序窗口中。实现带纹理的OBJ文件读取与显示功能,具体设计如下: OBJ文件解析与数据存储 通过实现TriMesh类中的readObj函数&#x…...

SQL Server 使用SELECT INTO实现表备份
在数据库管理过程中,有时我们需要对表进行备份,以防数据丢失或修改错误。在 SQL Server 中,可以使用 SELECT INTO 语句将数据从一个表备份到另一个表。 备份表的 SQL 语法: SELECT * INTO 【备份表名】 FROM 【要备份的表】 SEL…...

【线性代数】基础版本的高斯消元法
[精确算法] 高斯消元法求线性方程组 线性方程组 考虑线性方程组, 已知 A ∈ R n , n , b ∈ R n A\in \mathbb{R}^{n,n},b\in \mathbb{R}^n A∈Rn,n,b∈Rn, 求未知 x ∈ R n x\in \mathbb{R}^n x∈Rn A 1 , 1 x 1 A 1 , 2 x 2 ⋯ A 1 , n x n b 1…...

Python标准库 threading 的 start 和 join 的使用
python 的多线程机制可以的适用场景不适合与计算密集型的,因为 GIL 的存在,多线程在处理计算密集型时,实际上也是串行的,因为每个时刻只有一个线程可以获得 GIL,但是对于 IO 处理来说,不管是网络IO还是文件…...

无公网IP 外网访问媒体服务器 Emby
Emby 是一款多媒体服务器软件,用户可以在 Emby 创建自己的个人多媒体娱乐中心,并且可以跨多个设备访问自己的媒体库。它允许用户管理传输自己的媒体内容,比如电影、电视节目、音乐和照片等。 本文将详细的介绍如何利用 Docker 在本地部署 Emb…...

【数据结构】_顺序表
目录 1. 概念与结构 1.1 静态顺序表 1.2 动态顺序表 2. 动态顺序表实现 2.1 SeqList.h 2.2 SeqList.c 2.3 Test_SeqList.c 3. 顺序表性能分析 线性表是n个具有相同特性的数据元素的有限序列。 常见的线性表有:顺序表、链表、栈、队列、字符串等;…...

[MySQL]数据库表内容的增删查改操作大全
目录 一、增加表数据 1.全列插入与指定列插入 2.多行数据插入 3.更新与替换插入 二、查看表数据 1.全列查询与指定列查询 2.查询表达式字段 3.为查询结果起别名 4.结果去重 5.WHERE条件 6.结果排序 7.筛选分页结果 8.插入查询的结果 9.group by子句 三、修改表数…...

解决双系统引导问题:Ubuntu 启动时不显示 Windows 选项的处理方法
方法 1:检查 GRUB 引导菜单是否隐藏 启动进入 Ubuntu 系统。打开终端,输入以下命令编辑 GRUB 配置文件:sudo nano /etc/default/grub检查以下配置项: GRUB_TIMEOUT0:如果是 0,将其改为一个较大的值&#x…...

Java面试题2025-Spring
讲师:邓澎波 Spring面试专题 1.Spring应该很熟悉吧?来介绍下你的Spring的理解 1.1 Spring的发展历程 先介绍Spring是怎么来的,发展中有哪些核心的节点,当前的最新版本是什么等 通过上图可以比较清晰的看到Spring的各个时间版本对…...

CentOS7安装使用containerd
一,安装 1.1、安装containerd 下载 https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-linux-amd64.tar.gz wget https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-…...

Redis 集群模式入门
Redis 集群模式入门 一、简介 Redis 有三种集群模式:主从模式、Sentinel 哨兵模式、cluster 分片模式 主从复制(Master-Slave Replication): 在这种模式下,数据可以从一个 Redis 实例(主节点 Master)复…...

WinDBG查找C++句柄泄露
C代码(频繁点击About按钮导致Mutex句柄泄露) HANDLE _mutexHandle;LRESULT CALLBACK WndProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam) {switch (message){case WM_COMMAND:{int wmId LOWORD(wParam);// 分析菜单选择:switch (wmId){c…...

Linux查看服务器的内外网地址
目录: 1、内网地址2、外网地址3、ping时显示地址与真实不一致 1、内网地址 ifconfig2、外网地址 curl ifconfig.me3、ping时显示地址与真实不一致 原因是dns缓存导致的,ping这种方法也是不准确的,有弊端不建议使用,只适用于测试…...

深入MapReduce——引入
引入 前面我们已经深入了HDFS的设计与实现,对于分布式系统也有了不错的理解。 但HDFS仅仅解决了海量数据存储和读写的问题。要想让数据产生价值,一定是需要从数据中挖掘出价值才行,这就需要我们拥有海量数据的计算处理能力。 下面我们还是…...

Oracle之开窗函数使用
Oracle中的开窗函数(Window Functions)是一种强大的工具,用于在SQL查询中对数据进行复杂的分析和聚合操作,而无需改变原始查询结果的行数或顺序。以下是关于Oracle开窗函数的使用方法和常见示例: 1. 开窗函数的基本语法…...

航空客户价值的数据挖掘与分析(numpy+pandas+matplotlib+scikit-learn)
文章目录 航空客户价值的数据挖掘与分析(numpy+pandas+matplotlib+scikit-learn)写在前面背景与挖掘目标1.1 需求背景1.2 挖掘目标1.3 项目概述项目分析方法规划2.1 RFM模型2.2 LRFMC模型指标2.3 分析总体流程图数据抽取探索及预处理3.1 数据抽取3.2 数据探索分析3.3 数据预处…...

云原生时代,如何构建高效分布式监控系统
文章目录 一.监控现状二.Thanos原理分析SidecarQuerierStoreCompactor 三.Sidecar or ReceiverThanos Receiver工作原理 四.分布式运维架构 一.监控现状 Prometheus是CNCF基金会管理的一个开源监控项目,由于其良好的架构设计和完善的生态,迅速成为了监控…...

什么是CIDR技术? 它是如何解决路由缩放问题的
什么是CIDR技术? 它是如何解决路由缩放问题的 一. 什么是 CIDR?二. CIDR 是如何工作的?1. 高效地址分配2. 路由聚合(Route Aggregation)3. 精确满足需求 三. CIDR 的计算详解1. 子网掩码计算2. 地址范围计算3. 可用 IP…...

Unity URP 获取/设置 Light-Indirect Multiplier
Unity URP 获取/设置 Light-Indirect Multiplier 他喵的代码的字段名称叫:bounceIntensity ~~~~~~...

用Python和Tkinter标准模块建立密码管理器
用Python和Tkinter标准模块建立密码管理器 创建一个简单的密码管理器应用程序,帮助用户存储和管理他们的密码。使用Python的tkinter模块来创建一个图形用户界面(GUI)。 本程序支持 添加、查看、搜索、复制、修改、删除 功能。 本程序使用 …...