当前位置: 首页 > news >正文

20250124 Flink中 窗口开始时间和結束時間

增量聚合的 ProcessWindowFunction #

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。

你也可以对过时的 WindowFunction 使用增量聚合。

使用 ReduceFunction 增量聚合 #
  • 下例展示了如何将 ReduceFunction 与 ProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。

 

DataStream<SensorReading> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new MyReduceFunction(), new MyProcessWindowFunction());// Function definitionsprivate static class MyReduceFunction implements ReduceFunction<SensorReading> {public SensorReading reduce(SensorReading r1, SensorReading r2) {return r1.value() > r2.value() ? r2 : r1;}
}private static class MyProcessWindowFunctionextends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {public void process(String key,Context context,Iterable<SensorReading> minReadings,Collector<Tuple2<Long, SensorReading>> out) {SensorReading min = minReadings.iterator().next();out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));}
}

通俗解释:窗口开始时间的作用

我们可以用一个更贴近生活的例子来理解 窗口开始时间 的意义。


场景比喻:每天上午的「温度统计报告

假设你有一个气象站,每5分钟记录一次户外温度。现在需要 每小时(例如8:00-9:00)统计一次该时段内的最低温度,并在报告中标注这个小时段的起始时间(如“8:00-9:00的最低温度是15°C”)。

关键点
  • 窗口开始时间:就是时间段的起点(如8:00)。

  • 窗口结束时间:就是时间段的终点(如9:00)。

  • 为什么要记录开始时间?
    方便人类理解数据属于哪个时段(比如“8点档”的数据)。


代码示例解析

1. 窗口如何划分?

假设使用 滚动窗口(Tumbling Window),每1小时划分一次:

Copy

8:00-9:00  → 窗口1  
9:00-10:00 → 窗口2  

所有时间戳在8:00≤t<9:00的数据会被分配到窗口1。

2. 窗口触发计算的时机

当系统时间(或事件时间)到达9:00时,窗口1关闭,触发计算:

  • 调用 MyReduceFunction 找出该窗口内的最低温度。

  • 调用 MyProcessWindowFunction 将结果与窗口开始时间(8:00)绑定。

3. 为什么输出的是开始时间(8:00)而不是结束时间(9:00)?
  • 业务需求:通常更关心数据所属时段的起点(例如“8点档的数据”)。

  • 避免歧义:如果输出9:00,可能被误解为“9点档的数据”(实际是8:00-9:00的数据)。


代码中具体如何获取开始时间?

在 MyProcessWindowFunction 中:

context.window().getStart(); // 返回窗口的起始时间戳(如8:00对应的毫秒值)
  • context 对象:包含窗口的元信息(起止时间、触发时间等)。

  • 实际输出时:将时间戳转换为人类可读格式(如 8:00)。


常见疑问解答

Q1:如果数据延迟到达(比如8:59的数据在9:05才到),会进入哪个窗口?
  • 取决于时间语义

    • 若使用 事件时间(Event Time):按数据自带的时间戳分配到8:00-9:00窗口。

    • 若使用 处理时间(Processing Time):按到达系统的时间分配到9:00-10:00窗口。

    • (示例代码未显式设置时间语义,默认可能是处理时间)

Q2:窗口开始时间是如何计算的?
  • 由窗口分配器(Window Assigner)决定

    • 滚动窗口按固定间隔对齐(如整点)。

    • 滑动窗口按步长对齐(如每30分钟滑动一次的1小时窗口)。

    • 会话窗口根据数据活跃度动态划分。

Q3:可以同时输出开始时间和结束时间吗?

可以!修改 ProcessWindowFunction

out.collect(new Tuple3<>(context.window().getStart(), context.window().getEnd(), min));
 

总结

  • 窗口开始时间 标记了数据所属时间段的起点(如“8:00档”)。

  • 在 Flink 中,通过 ProcessWindowFunction 的 context 可以轻松获取这一信息。

  • 这种设计让数据处理结果更易理解(如统计报告、监控仪表盘)。

相关文章:

20250124 Flink中 窗口开始时间和結束時間

增量聚合的 ProcessWindowFunction # ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用&#xff0c; 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时&#xff0c;ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的…...

Android Studio安装配置

一、注意事项 想做安卓app和开发板通信&#xff0c;踩了大坑&#xff0c;Android 开发不是下载了就能直接开发的&#xff0c;对于新手需要注意的如下&#xff1a; 1、Android Studio版本&#xff0c;根据自己的Android Studio版本对应决定了你所兼容的AGP&#xff08;Android…...

设计模式Python版 单例模式

文章目录 前言一、单例模式二、单例模式实现方式三、单例模式示例四、单例模式在Django框架的应用 前言 GOF设计模式分三大类&#xff1a; 创建型模式&#xff1a;关注对象的创建过程&#xff0c;包括单例模式、简单工厂模式、工厂方法模式、抽象工厂模式、原型模式和建造者模…...

7-Zip高危漏洞CVE-2025-0411:解析与修复

7-Zip高危漏洞CVE-2025-0411&#xff1a;解析与修复 免责声明 本系列工具仅供安全专业人员进行已授权环境使用&#xff0c;此工具所提供的功能只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利…...

python实现http文件服务器访问下载

//1.py import http.server import socketserver import os import threading import sys# 获取当前脚本所在的目录 DIRECTORY os.path.dirname(os.path.abspath(__file__))# 设置服务器的端口 PORT 8000# 自定义Handler&#xff0c;将根目录设置为脚本所在目录 class MyHTT…...

《一文讲透》第4期:KWDB 数据库运维(6)—— 容灾与备份

一、KWDB 容灾 WAL 概述 KWDB 采用预写式日志&#xff08;Write-Ahead Logging&#xff0c;WAL&#xff09;&#xff0c;记录每个时序表的模式变更和数据变更&#xff0c;以实现时序数据库的数据灾难恢复、时序数据的一致性和原子性。 KWDB 默认会将保存在 WAL 日志缓存中的…...

ArcGIS10.2 许可License点击始终启动无响应的解决办法及正常启动的前提

1、问题描述 在ArcGIS License Administrator中&#xff0c;手动点击“启动”无响应&#xff1b;且在计算机管理-服务中&#xff0c;无ArcGIS License 或者License的启动、停止、禁止等均为灰色&#xff0c;无法操作。 2、解决方法 ①通过cmd对service.txt进行手动服务的启动…...

Level2逐笔成交逐笔委托毫秒记录:今日分享优质股票数据20250124

逐笔成交逐笔委托下载 链接: https://pan.baidu.com/s/1UWVY11Q1IOfME9itDN5aZA?pwdhgeg 提取码: hgeg Level2逐笔成交逐笔委托数据分享下载 通过Level2逐笔成交与逐笔委托的详细数据&#xff0c;这种以毫秒为单位的信息能揭示许多关键点&#xff0c;如庄家意图、误导性行为…...

概率密度函数(PDF)分布函数(CDF)——直方图累积直方图——直方图规定化的数学基础

对于连续型随机变量&#xff0c;分布函数&#xff08;Cumulative Distribution Function, CDF&#xff09;是概率密度函数&#xff08;Probability Density Function, PDF&#xff09;的变上限积分&#xff0c;概率密度函数是分布函数的导函数。 如果我们有一个连续型随机变量…...

YOLOv5训练自己的数据及rknn部署

YOLOv5训练自己的数据及rknn部署 一、下载源码二、准备自己的数据集2.1 标注图像2.2 数据集结构 三、配置YOLOv5训练3.1 修改配置文件3.2 模型选择 四、训练五、测试六、部署6.1 pt转onnx6.2 onnx转rknn 七、常见错误7.1 训练过程中的错误7.1.1 cuda: out of memory7.1.2 train…...

计算机图形学:实验四 带纹理的OBJ文件读取和显示

一、程序功能设计 在程序中读取带纹理的obj文件&#xff0c;载入相应的纹理图片文件&#xff0c;将带纹理的模型显示在程序窗口中。实现带纹理的OBJ文件读取与显示功能&#xff0c;具体设计如下&#xff1a; OBJ文件解析与数据存储 通过实现TriMesh类中的readObj函数&#x…...

SQL Server 使用SELECT INTO实现表备份

在数据库管理过程中&#xff0c;有时我们需要对表进行备份&#xff0c;以防数据丢失或修改错误。在 SQL Server 中&#xff0c;可以使用 SELECT INTO 语句将数据从一个表备份到另一个表。 备份表的 SQL 语法&#xff1a; SELECT * INTO 【备份表名】 FROM 【要备份的表】 SEL…...

【线性代数】基础版本的高斯消元法

[精确算法] 高斯消元法求线性方程组 线性方程组 考虑线性方程组&#xff0c; 已知 A ∈ R n , n , b ∈ R n A\in \mathbb{R}^{n,n},b\in \mathbb{R}^n A∈Rn,n,b∈Rn&#xff0c; 求未知 x ∈ R n x\in \mathbb{R}^n x∈Rn A 1 , 1 x 1 A 1 , 2 x 2 ⋯ A 1 , n x n b 1…...

Python标准库 threading 的 start 和 join 的使用

python 的多线程机制可以的适用场景不适合与计算密集型的&#xff0c;因为 GIL 的存在&#xff0c;多线程在处理计算密集型时&#xff0c;实际上也是串行的&#xff0c;因为每个时刻只有一个线程可以获得 GIL&#xff0c;但是对于 IO 处理来说&#xff0c;不管是网络IO还是文件…...

无公网IP 外网访问媒体服务器 Emby

Emby 是一款多媒体服务器软件&#xff0c;用户可以在 Emby 创建自己的个人多媒体娱乐中心&#xff0c;并且可以跨多个设备访问自己的媒体库。它允许用户管理传输自己的媒体内容&#xff0c;比如电影、电视节目、音乐和照片等。 本文将详细的介绍如何利用 Docker 在本地部署 Emb…...

【数据结构】_顺序表

目录 1. 概念与结构 1.1 静态顺序表 1.2 动态顺序表 2. 动态顺序表实现 2.1 SeqList.h 2.2 SeqList.c 2.3 Test_SeqList.c 3. 顺序表性能分析 线性表是n个具有相同特性的数据元素的有限序列。 常见的线性表有&#xff1a;顺序表、链表、栈、队列、字符串等&#xff1b…...

[MySQL]数据库表内容的增删查改操作大全

目录 一、增加表数据 1.全列插入与指定列插入 2.多行数据插入 3.更新与替换插入 二、查看表数据 1.全列查询与指定列查询 2.查询表达式字段 3.为查询结果起别名 4.结果去重 5.WHERE条件 6.结果排序 7.筛选分页结果 8.插入查询的结果 9.group by子句 三、修改表数…...

解决双系统引导问题:Ubuntu 启动时不显示 Windows 选项的处理方法

方法 1&#xff1a;检查 GRUB 引导菜单是否隐藏 启动进入 Ubuntu 系统。打开终端&#xff0c;输入以下命令编辑 GRUB 配置文件&#xff1a;sudo nano /etc/default/grub检查以下配置项&#xff1a; GRUB_TIMEOUT0&#xff1a;如果是 0&#xff0c;将其改为一个较大的值&#x…...

Java面试题2025-Spring

讲师&#xff1a;邓澎波 Spring面试专题 1.Spring应该很熟悉吧&#xff1f;来介绍下你的Spring的理解 1.1 Spring的发展历程 先介绍Spring是怎么来的&#xff0c;发展中有哪些核心的节点&#xff0c;当前的最新版本是什么等 通过上图可以比较清晰的看到Spring的各个时间版本对…...

CentOS7安装使用containerd

一&#xff0c;安装 1.1、安装containerd 下载 https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-linux-amd64.tar.gz wget https://github.com/containerd/containerd/releases/download/v1.7.24/cri-containerd-cni-1.7.24-…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

C++_核心编程_多态案例二-制作饮品

#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为&#xff1a;煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例&#xff0c;提供抽象制作饮品基类&#xff0c;提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...

深入理解JavaScript设计模式之单例模式

目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式&#xff08;Singleton Pattern&#…...

学校招生小程序源码介绍

基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码&#xff0c;专为学校招生场景量身打造&#xff0c;功能实用且操作便捷。 从技术架构来看&#xff0c;ThinkPHP提供稳定可靠的后台服务&#xff0c;FastAdmin加速开发流程&#xff0c;UniApp则保障小程序在多端有良好的兼…...

基于Docker Compose部署Java微服务项目

一. 创建根项目 根项目&#xff08;父项目&#xff09;主要用于依赖管理 一些需要注意的点&#xff1a; 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件&#xff0c;否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?

在大数据处理领域&#xff0c;Hive 作为 Hadoop 生态中重要的数据仓库工具&#xff0c;其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式&#xff0c;很多开发者常常陷入选择困境。本文将从底…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

0x-3-Oracle 23 ai-sqlcl 25.1 集成安装-配置和优化

是不是受够了安装了oracle database之后sqlplus的简陋&#xff0c;无法删除无法上下翻页的苦恼。 可以安装readline和rlwrap插件的话&#xff0c;配置.bahs_profile后也能解决上下翻页这些&#xff0c;但是很多生产环境无法安装rpm包。 oracle提供了sqlcl免费许可&#xff0c…...

数据结构:递归的种类(Types of Recursion)

目录 尾递归&#xff08;Tail Recursion&#xff09; 什么是 Loop&#xff08;循环&#xff09;&#xff1f; 复杂度分析 头递归&#xff08;Head Recursion&#xff09; 树形递归&#xff08;Tree Recursion&#xff09; 线性递归&#xff08;Linear Recursion&#xff09;…...