20250124 Flink中 窗口开始时间和結束時間
增量聚合的 ProcessWindowFunction #
ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。
你也可以对过时的 WindowFunction 使用增量聚合。
使用 ReduceFunction 增量聚合 #
- 下例展示了如何将
ReduceFunction与ProcessWindowFunction组合,返回窗口中的最小元素和窗口的开始时间。
DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitionsprivate static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}
通俗解释:窗口开始时间的作用
我们可以用一个更贴近生活的例子来理解 窗口开始时间 的意义。
场景比喻:每天上午的「温度统计报告
假设你有一个气象站,每5分钟记录一次户外温度。现在需要 每小时(例如8:00-9:00)统计一次该时段内的最低温度,并在报告中标注这个小时段的起始时间(如“8:00-9:00的最低温度是15°C”)。
关键点
-
窗口开始时间:就是时间段的起点(如8:00)。
-
窗口结束时间:就是时间段的终点(如9:00)。
-
为什么要记录开始时间?
方便人类理解数据属于哪个时段(比如“8点档”的数据)。
代码示例解析
1. 窗口如何划分?
假设使用 滚动窗口(Tumbling Window),每1小时划分一次:
Copy
8:00-9:00 → 窗口1 9:00-10:00 → 窗口2
所有时间戳在8:00≤t<9:00的数据会被分配到窗口1。
2. 窗口触发计算的时机
当系统时间(或事件时间)到达9:00时,窗口1关闭,触发计算:
-
调用
MyReduceFunction找出该窗口内的最低温度。 -
调用
MyProcessWindowFunction将结果与窗口开始时间(8:00)绑定。
3. 为什么输出的是开始时间(8:00)而不是结束时间(9:00)?
-
业务需求:通常更关心数据所属时段的起点(例如“8点档的数据”)。
-
避免歧义:如果输出9:00,可能被误解为“9点档的数据”(实际是8:00-9:00的数据)。
代码中具体如何获取开始时间?
在 MyProcessWindowFunction 中:
context.window().getStart(); // 返回窗口的起始时间戳(如8:00对应的毫秒值)
-
context对象:包含窗口的元信息(起止时间、触发时间等)。 -
实际输出时:将时间戳转换为人类可读格式(如
8:00)。
常见疑问解答
Q1:如果数据延迟到达(比如8:59的数据在9:05才到),会进入哪个窗口?
-
取决于时间语义:
-
若使用 事件时间(Event Time):按数据自带的时间戳分配到8:00-9:00窗口。
-
若使用 处理时间(Processing Time):按到达系统的时间分配到9:00-10:00窗口。
-
(示例代码未显式设置时间语义,默认可能是处理时间)
-
Q2:窗口开始时间是如何计算的?
-
由窗口分配器(Window Assigner)决定:
-
滚动窗口按固定间隔对齐(如整点)。
-
滑动窗口按步长对齐(如每30分钟滑动一次的1小时窗口)。
-
会话窗口根据数据活跃度动态划分。
-
Q3:可以同时输出开始时间和结束时间吗?
可以!修改 ProcessWindowFunction:
out.collect(new Tuple3<>(context.window().getStart(), context.window().getEnd(), min));
总结
-
窗口开始时间 标记了数据所属时间段的起点(如“8:00档”)。
-
在 Flink 中,通过
ProcessWindowFunction的context可以轻松获取这一信息。 -
这种设计让数据处理结果更易理解(如统计报告、监控仪表盘)。
相关文章:
20250124 Flink中 窗口开始时间和結束時間
增量聚合的 ProcessWindowFunction # ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的…...
Android Studio安装配置
一、注意事项 想做安卓app和开发板通信,踩了大坑,Android 开发不是下载了就能直接开发的,对于新手需要注意的如下: 1、Android Studio版本,根据自己的Android Studio版本对应决定了你所兼容的AGP(Android…...
设计模式Python版 单例模式
文章目录 前言一、单例模式二、单例模式实现方式三、单例模式示例四、单例模式在Django框架的应用 前言 GOF设计模式分三大类: 创建型模式:关注对象的创建过程,包括单例模式、简单工厂模式、工厂方法模式、抽象工厂模式、原型模式和建造者模…...
7-Zip高危漏洞CVE-2025-0411:解析与修复
7-Zip高危漏洞CVE-2025-0411:解析与修复 免责声明 本系列工具仅供安全专业人员进行已授权环境使用,此工具所提供的功能只为网络安全人员对自己所负责的网站、服务器等(包括但不限于)进行检测或维护参考,未经授权请勿利…...
python实现http文件服务器访问下载
//1.py import http.server import socketserver import os import threading import sys# 获取当前脚本所在的目录 DIRECTORY os.path.dirname(os.path.abspath(__file__))# 设置服务器的端口 PORT 8000# 自定义Handler,将根目录设置为脚本所在目录 class MyHTT…...
《一文讲透》第4期:KWDB 数据库运维(6)—— 容灾与备份
一、KWDB 容灾 WAL 概述 KWDB 采用预写式日志(Write-Ahead Logging,WAL),记录每个时序表的模式变更和数据变更,以实现时序数据库的数据灾难恢复、时序数据的一致性和原子性。 KWDB 默认会将保存在 WAL 日志缓存中的…...
ArcGIS10.2 许可License点击始终启动无响应的解决办法及正常启动的前提
1、问题描述 在ArcGIS License Administrator中,手动点击“启动”无响应;且在计算机管理-服务中,无ArcGIS License 或者License的启动、停止、禁止等均为灰色,无法操作。 2、解决方法 ①通过cmd对service.txt进行手动服务的启动…...
Level2逐笔成交逐笔委托毫秒记录:今日分享优质股票数据20250124
逐笔成交逐笔委托下载 链接: https://pan.baidu.com/s/1UWVY11Q1IOfME9itDN5aZA?pwdhgeg 提取码: hgeg Level2逐笔成交逐笔委托数据分享下载 通过Level2逐笔成交与逐笔委托的详细数据,这种以毫秒为单位的信息能揭示许多关键点,如庄家意图、误导性行为…...
概率密度函数(PDF)分布函数(CDF)——直方图累积直方图——直方图规定化的数学基础
对于连续型随机变量,分布函数(Cumulative Distribution Function, CDF)是概率密度函数(Probability Density Function, PDF)的变上限积分,概率密度函数是分布函数的导函数。 如果我们有一个连续型随机变量…...
YOLOv5训练自己的数据及rknn部署
YOLOv5训练自己的数据及rknn部署 一、下载源码二、准备自己的数据集2.1 标注图像2.2 数据集结构 三、配置YOLOv5训练3.1 修改配置文件3.2 模型选择 四、训练五、测试六、部署6.1 pt转onnx6.2 onnx转rknn 七、常见错误7.1 训练过程中的错误7.1.1 cuda: out of memory7.1.2 train…...
计算机图形学:实验四 带纹理的OBJ文件读取和显示
一、程序功能设计 在程序中读取带纹理的obj文件,载入相应的纹理图片文件,将带纹理的模型显示在程序窗口中。实现带纹理的OBJ文件读取与显示功能,具体设计如下: OBJ文件解析与数据存储 通过实现TriMesh类中的readObj函数&#x…...
SQL Server 使用SELECT INTO实现表备份
在数据库管理过程中,有时我们需要对表进行备份,以防数据丢失或修改错误。在 SQL Server 中,可以使用 SELECT INTO 语句将数据从一个表备份到另一个表。 备份表的 SQL 语法: SELECT * INTO 【备份表名】 FROM 【要备份的表】 SEL…...
【线性代数】基础版本的高斯消元法
[精确算法] 高斯消元法求线性方程组 线性方程组 考虑线性方程组, 已知 A ∈ R n , n , b ∈ R n A\in \mathbb{R}^{n,n},b\in \mathbb{R}^n A∈Rn,n,b∈Rn, 求未知 x ∈ R n x\in \mathbb{R}^n x∈Rn A 1 , 1 x 1 A 1 , 2 x 2 ⋯ A 1 , n x n b 1…...
Python标准库 threading 的 start 和 join 的使用
python 的多线程机制可以的适用场景不适合与计算密集型的,因为 GIL 的存在,多线程在处理计算密集型时,实际上也是串行的,因为每个时刻只有一个线程可以获得 GIL,但是对于 IO 处理来说,不管是网络IO还是文件…...
无公网IP 外网访问媒体服务器 Emby
Emby 是一款多媒体服务器软件,用户可以在 Emby 创建自己的个人多媒体娱乐中心,并且可以跨多个设备访问自己的媒体库。它允许用户管理传输自己的媒体内容,比如电影、电视节目、音乐和照片等。 本文将详细的介绍如何利用 Docker 在本地部署 Emb…...
【数据结构】_顺序表
目录 1. 概念与结构 1.1 静态顺序表 1.2 动态顺序表 2. 动态顺序表实现 2.1 SeqList.h 2.2 SeqList.c 2.3 Test_SeqList.c 3. 顺序表性能分析 线性表是n个具有相同特性的数据元素的有限序列。 常见的线性表有:顺序表、链表、栈、队列、字符串等;…...
[MySQL]数据库表内容的增删查改操作大全
目录 一、增加表数据 1.全列插入与指定列插入 2.多行数据插入 3.更新与替换插入 二、查看表数据 1.全列查询与指定列查询 2.查询表达式字段 3.为查询结果起别名 4.结果去重 5.WHERE条件 6.结果排序 7.筛选分页结果 8.插入查询的结果 9.group by子句 三、修改表数…...
解决双系统引导问题:Ubuntu 启动时不显示 Windows 选项的处理方法
方法 1:检查 GRUB 引导菜单是否隐藏 启动进入 Ubuntu 系统。打开终端,输入以下命令编辑 GRUB 配置文件:sudo nano /etc/default/grub检查以下配置项: GRUB_TIMEOUT0:如果是 0,将其改为一个较大的值&#x…...
Java面试题2025-Spring
讲师:邓澎波 Spring面试专题 1.Spring应该很熟悉吧?来介绍下你的Spring的理解 1.1 Spring的发展历程 先介绍Spring是怎么来的,发展中有哪些核心的节点,当前的最新版本是什么等 通过上图可以比较清晰的看到Spring的各个时间版本对…...
CentOS7安装使用containerd
一,安装 1.1、安装containerd 下载 https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-linux-amd64.tar.gz wget https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
JAVA后端开发——多租户
数据隔离是多租户系统中的核心概念,确保一个租户(在这个系统中可能是一个公司或一个独立的客户)的数据对其他租户是不可见的。在 RuoYi 框架(您当前项目所使用的基础框架)中,这通常是通过在数据表中增加一个…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!
本文介绍了一种名为AnomalyAny的创新框架,该方法利用Stable Diffusion的强大生成能力,仅需单个正常样本和文本描述,即可生成逼真且多样化的异常样本,有效解决了视觉异常检测中异常样本稀缺的难题,为工业质检、医疗影像…...
【无标题】湖北理元理律师事务所:债务优化中的生活保障与法律平衡之道
文/法律实务观察组 在债务重组领域,专业机构的核心价值不仅在于减轻债务数字,更在于帮助债务人在履行义务的同时维持基本生活尊严。湖北理元理律师事务所的服务实践表明,合法债务优化需同步实现三重平衡: 法律刚性(债…...
简单聊下阿里云DNS劫持事件
阿里云域名被DNS劫持事件 事件总结 根据ICANN规则,域名注册商(Verisign)认定aliyuncs.com域名下的部分网站被用于非法活动(如传播恶意软件);顶级域名DNS服务器将aliyuncs.com域名的DNS记录统一解析到shado…...
