当前位置: 首页 > news >正文

20250124 Flink中 窗口开始时间和結束時間

增量聚合的 ProcessWindowFunction #

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。

你也可以对过时的 WindowFunction 使用增量聚合。

使用 ReduceFunction 增量聚合 #
  • 下例展示了如何将 ReduceFunction 与 ProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。

 

DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitionsprivate static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}

通俗解释:窗口开始时间的作用

我们可以用一个更贴近生活的例子来理解 窗口开始时间 的意义。


场景比喻:每天上午的「温度统计报告

假设你有一个气象站,每5分钟记录一次户外温度。现在需要 每小时(例如8:00-9:00)统计一次该时段内的最低温度,并在报告中标注这个小时段的起始时间(如“8:00-9:00的最低温度是15°C”)。

关键点
  • 窗口开始时间:就是时间段的起点(如8:00)。

  • 窗口结束时间:就是时间段的终点(如9:00)。

  • 为什么要记录开始时间?
    方便人类理解数据属于哪个时段(比如“8点档”的数据)。


代码示例解析

1. 窗口如何划分?

假设使用 滚动窗口(Tumbling Window),每1小时划分一次:

Copy

8:00-9:00  → 窗口1  
9:00-10:00 → 窗口2  

所有时间戳在8:00≤t<9:00的数据会被分配到窗口1。

2. 窗口触发计算的时机

当系统时间(或事件时间)到达9:00时,窗口1关闭,触发计算:

  • 调用 MyReduceFunction 找出该窗口内的最低温度。

  • 调用 MyProcessWindowFunction 将结果与窗口开始时间(8:00)绑定。

3. 为什么输出的是开始时间(8:00)而不是结束时间(9:00)?
  • 业务需求:通常更关心数据所属时段的起点(例如“8点档的数据”)。

  • 避免歧义:如果输出9:00,可能被误解为“9点档的数据”(实际是8:00-9:00的数据)。


代码中具体如何获取开始时间?

在 MyProcessWindowFunction 中:

context.window().getStart(); // 返回窗口的起始时间戳(如8:00对应的毫秒值)
  • context 对象:包含窗口的元信息(起止时间、触发时间等)。

  • 实际输出时:将时间戳转换为人类可读格式(如 8:00)。


常见疑问解答

Q1:如果数据延迟到达(比如8:59的数据在9:05才到),会进入哪个窗口?
  • 取决于时间语义

    • 若使用 事件时间(Event Time):按数据自带的时间戳分配到8:00-9:00窗口。

    • 若使用 处理时间(Processing Time):按到达系统的时间分配到9:00-10:00窗口。

    • (示例代码未显式设置时间语义,默认可能是处理时间)

Q2:窗口开始时间是如何计算的?
  • 由窗口分配器(Window Assigner)决定

    • 滚动窗口按固定间隔对齐(如整点)。

    • 滑动窗口按步长对齐(如每30分钟滑动一次的1小时窗口)。

    • 会话窗口根据数据活跃度动态划分。

Q3:可以同时输出开始时间和结束时间吗?

可以!修改 ProcessWindowFunction

out.collect(new Tuple3<>(context.window().getStart(), context.window().getEnd(), min));
 

总结

  • 窗口开始时间 标记了数据所属时间段的起点(如“8:00档”)。

  • 在 Flink 中,通过 ProcessWindowFunction 的 context 可以轻松获取这一信息。

  • 这种设计让数据处理结果更易理解(如统计报告、监控仪表盘)。

相关文章:

20250124 Flink中 窗口开始时间和結束時間

增量聚合的 ProcessWindowFunction # ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用&#xff0c; 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时&#xff0c;ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的…...

Android Studio安装配置

一、注意事项 想做安卓app和开发板通信&#xff0c;踩了大坑&#xff0c;Android 开发不是下载了就能直接开发的&#xff0c;对于新手需要注意的如下&#xff1a; 1、Android Studio版本&#xff0c;根据自己的Android Studio版本对应决定了你所兼容的AGP&#xff08;Android…...

设计模式Python版 单例模式

文章目录 前言一、单例模式二、单例模式实现方式三、单例模式示例四、单例模式在Django框架的应用 前言 GOF设计模式分三大类&#xff1a; 创建型模式&#xff1a;关注对象的创建过程&#xff0c;包括单例模式、简单工厂模式、工厂方法模式、抽象工厂模式、原型模式和建造者模…...

7-Zip高危漏洞CVE-2025-0411:解析与修复

7-Zip高危漏洞CVE-2025-0411&#xff1a;解析与修复 免责声明 本系列工具仅供安全专业人员进行已授权环境使用&#xff0c;此工具所提供的功能只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利…...

python实现http文件服务器访问下载

//1.py import http.server import socketserver import os import threading import sys# 获取当前脚本所在的目录 DIRECTORY os.path.dirname(os.path.abspath(__file__))# 设置服务器的端口 PORT 8000# 自定义Handler&#xff0c;将根目录设置为脚本所在目录 class MyHTT…...

《一文讲透》第4期:KWDB 数据库运维(6)—— 容灾与备份

一、KWDB 容灾 WAL 概述 KWDB 采用预写式日志&#xff08;Write-Ahead Logging&#xff0c;WAL&#xff09;&#xff0c;记录每个时序表的模式变更和数据变更&#xff0c;以实现时序数据库的数据灾难恢复、时序数据的一致性和原子性。 KWDB 默认会将保存在 WAL 日志缓存中的…...

ArcGIS10.2 许可License点击始终启动无响应的解决办法及正常启动的前提

1、问题描述 在ArcGIS License Administrator中&#xff0c;手动点击“启动”无响应&#xff1b;且在计算机管理-服务中&#xff0c;无ArcGIS License 或者License的启动、停止、禁止等均为灰色&#xff0c;无法操作。 2、解决方法 ①通过cmd对service.txt进行手动服务的启动…...

Level2逐笔成交逐笔委托毫秒记录:今日分享优质股票数据20250124

逐笔成交逐笔委托下载 链接: https://pan.baidu.com/s/1UWVY11Q1IOfME9itDN5aZA?pwdhgeg 提取码: hgeg Level2逐笔成交逐笔委托数据分享下载 通过Level2逐笔成交与逐笔委托的详细数据&#xff0c;这种以毫秒为单位的信息能揭示许多关键点&#xff0c;如庄家意图、误导性行为…...

概率密度函数(PDF)分布函数(CDF)——直方图累积直方图——直方图规定化的数学基础

对于连续型随机变量&#xff0c;分布函数&#xff08;Cumulative Distribution Function, CDF&#xff09;是概率密度函数&#xff08;Probability Density Function, PDF&#xff09;的变上限积分&#xff0c;概率密度函数是分布函数的导函数。 如果我们有一个连续型随机变量…...

YOLOv5训练自己的数据及rknn部署

YOLOv5训练自己的数据及rknn部署 一、下载源码二、准备自己的数据集2.1 标注图像2.2 数据集结构 三、配置YOLOv5训练3.1 修改配置文件3.2 模型选择 四、训练五、测试六、部署6.1 pt转onnx6.2 onnx转rknn 七、常见错误7.1 训练过程中的错误7.1.1 cuda: out of memory7.1.2 train…...

计算机图形学:实验四 带纹理的OBJ文件读取和显示

一、程序功能设计 在程序中读取带纹理的obj文件&#xff0c;载入相应的纹理图片文件&#xff0c;将带纹理的模型显示在程序窗口中。实现带纹理的OBJ文件读取与显示功能&#xff0c;具体设计如下&#xff1a; OBJ文件解析与数据存储 通过实现TriMesh类中的readObj函数&#x…...

SQL Server 使用SELECT INTO实现表备份

在数据库管理过程中&#xff0c;有时我们需要对表进行备份&#xff0c;以防数据丢失或修改错误。在 SQL Server 中&#xff0c;可以使用 SELECT INTO 语句将数据从一个表备份到另一个表。 备份表的 SQL 语法&#xff1a; SELECT * INTO 【备份表名】 FROM 【要备份的表】 SEL…...

【线性代数】基础版本的高斯消元法

[精确算法] 高斯消元法求线性方程组 线性方程组 考虑线性方程组&#xff0c; 已知 A ∈ R n , n , b ∈ R n A\in \mathbb{R}^{n,n},b\in \mathbb{R}^n A∈Rn,n,b∈Rn&#xff0c; 求未知 x ∈ R n x\in \mathbb{R}^n x∈Rn A 1 , 1 x 1 A 1 , 2 x 2 ⋯ A 1 , n x n b 1…...

Python标准库 threading 的 start 和 join 的使用

python 的多线程机制可以的适用场景不适合与计算密集型的&#xff0c;因为 GIL 的存在&#xff0c;多线程在处理计算密集型时&#xff0c;实际上也是串行的&#xff0c;因为每个时刻只有一个线程可以获得 GIL&#xff0c;但是对于 IO 处理来说&#xff0c;不管是网络IO还是文件…...

无公网IP 外网访问媒体服务器 Emby

Emby 是一款多媒体服务器软件&#xff0c;用户可以在 Emby 创建自己的个人多媒体娱乐中心&#xff0c;并且可以跨多个设备访问自己的媒体库。它允许用户管理传输自己的媒体内容&#xff0c;比如电影、电视节目、音乐和照片等。 本文将详细的介绍如何利用 Docker 在本地部署 Emb…...

【数据结构】_顺序表

目录 1. 概念与结构 1.1 静态顺序表 1.2 动态顺序表 2. 动态顺序表实现 2.1 SeqList.h 2.2 SeqList.c 2.3 Test_SeqList.c 3. 顺序表性能分析 线性表是n个具有相同特性的数据元素的有限序列。 常见的线性表有&#xff1a;顺序表、链表、栈、队列、字符串等&#xff1b…...

[MySQL]数据库表内容的增删查改操作大全

目录 一、增加表数据 1.全列插入与指定列插入 2.多行数据插入 3.更新与替换插入 二、查看表数据 1.全列查询与指定列查询 2.查询表达式字段 3.为查询结果起别名 4.结果去重 5.WHERE条件 6.结果排序 7.筛选分页结果 8.插入查询的结果 9.group by子句 三、修改表数…...

解决双系统引导问题:Ubuntu 启动时不显示 Windows 选项的处理方法

方法 1&#xff1a;检查 GRUB 引导菜单是否隐藏 启动进入 Ubuntu 系统。打开终端&#xff0c;输入以下命令编辑 GRUB 配置文件&#xff1a;sudo nano /etc/default/grub检查以下配置项&#xff1a; GRUB_TIMEOUT0&#xff1a;如果是 0&#xff0c;将其改为一个较大的值&#x…...

Java面试题2025-Spring

讲师&#xff1a;邓澎波 Spring面试专题 1.Spring应该很熟悉吧&#xff1f;来介绍下你的Spring的理解 1.1 Spring的发展历程 先介绍Spring是怎么来的&#xff0c;发展中有哪些核心的节点&#xff0c;当前的最新版本是什么等 通过上图可以比较清晰的看到Spring的各个时间版本对…...

CentOS7安装使用containerd

一&#xff0c;安装 1.1、安装containerd 下载 https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-linux-amd64.tar.gz wget https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来

一、破局&#xff1a;PCB行业的时代之问 在数字经济蓬勃发展的浪潮中&#xff0c;PCB&#xff08;印制电路板&#xff09;作为 “电子产品之母”&#xff0c;其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透&#xff0c;PCB行业面临着前所未有的挑战与机遇。产品迭代…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

4. TypeScript 类型推断与类型组合

一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式&#xff0c;自动确定它们的类型。 这一特性减少了显式类型注解的需要&#xff0c;在保持类型安全的同时简化了代码。通过分析上下文和初始值&#xff0c;TypeSc…...

深入浅出Diffusion模型:从原理到实践的全方位教程

I. 引言&#xff1a;生成式AI的黎明 – Diffusion模型是什么&#xff1f; 近年来&#xff0c;生成式人工智能&#xff08;Generative AI&#xff09;领域取得了爆炸性的进展&#xff0c;模型能够根据简单的文本提示创作出逼真的图像、连贯的文本&#xff0c;乃至更多令人惊叹的…...

鸿蒙(HarmonyOS5)实现跳一跳小游戏

下面我将介绍如何使用鸿蒙的ArkUI框架&#xff0c;实现一个简单的跳一跳小游戏。 1. 项目结构 src/main/ets/ ├── MainAbility │ ├── pages │ │ ├── Index.ets // 主页面 │ │ └── GamePage.ets // 游戏页面 │ └── model │ …...

规则与人性的天平——由高考迟到事件引发的思考

当那位身着校服的考生在考场关闭1分钟后狂奔而至&#xff0c;他涨红的脸上写满绝望。铁门内秒针划过的弧度&#xff0c;成为改变人生的残酷抛物线。家长声嘶力竭的哀求与考务人员机械的"这是规定"&#xff0c;构成当代中国教育最尖锐的隐喻。 一、刚性规则的必要性 …...

rm视觉学习1-自瞄部分

首先先感谢中南大学的开源&#xff0c;提供了很全面的思路&#xff0c;减少了很多基础性的开发研究 我看的阅读的是中南大学FYT战队开源视觉代码 链接&#xff1a;https://github.com/CSU-FYT-Vision/FYT2024_vision.git 1.框架&#xff1a; 代码框架结构&#xff1a;readme有…...

React父子组件通信:Props怎么用?如何从父组件向子组件传递数据?

系列回顾&#xff1a; 在上一篇《React核心概念&#xff1a;State是什么&#xff1f;》中&#xff0c;我们学习了如何使用useState让一个组件拥有自己的内部数据&#xff08;State&#xff09;&#xff0c;并通过一个计数器案例&#xff0c;实现了组件的自我更新。这很棒&#…...