Flink SQL时间属性和窗口介绍
(1)概述
时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。
一旦定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。
时间属性的数据类型为 TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。
按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。
(2)事件时间
1)在创建表的 DDL 中定义
在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个字段,通过 WATERMARK语句来定义事件时间属性。
WATERMARK 语句主要用来定义水位线(watermark)的生成表达式,这个表达式会将带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的延迟时间。
具体定义方式如下:
CREATE TABLE EventTable(user STRING,url STRING,ts TIMESTAMP(3),WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (...
);
把 ts 字段定义为事件时间属性,而且基于 ts 设置了 5 秒的水位线延迟。这里的“5 秒”是以“时间间隔”的形式定义的,格式是 INTERVAL <数值> <时间单位>:INTERVAL '5’ SECOND,这里的数值必须用单引号引起来,而单位用 SECOND 和 SECONDS 是等效的。
Flink 中支持的事件时间属性数据类型必须为 TIMESTAMP 或者 TIMESTAMP_LTZ。这里TIMESTAMP_LTZ 是指带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIME ZONE);
如数据中的时间戳是“年-月-日-时-分-秒”形式,那就是不带时区信息的,可以将事件时间属性定义为 TIMESTAMP 类型。
而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件时间属性,类型定义为 TIMESTAMP_LTZ 会更方便:
CREATE TABLE events (user STRING,url STRING,ts BIGINT,ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND
) WITH (...
);
这里我们另外定义了一个字段 ts_ltz,是把长整型的 ts 转换为 TIMESTAMP_LTZ 得到的;进而使用 WATERMARK 语句将它设为事件时间属性,并设置 5 秒的水位线延迟。
2)在数据流转换为表时定义
调用 fromDataStream()方法创建表时,可以追加参数来定义表中的字段结构;这时可以给某个字段加上.rowtime() 后缀,就表示将当前字段指定为事件时间属性。
这个字段可以是数据中本不存在、额外追加上去的“逻辑字段”,就像之前 DDL 中定义的第二种情况;也可以是本身固有的字段,那么这个字段就会被事件时间属性所覆盖,类型也会被转换为 TIMESTAMP。
不论哪种方式,时间属性字段中保存的都是事件的时间戳(TIMESTAMP 类型)。
需要注意的是,这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该之前就在 DataStream 上定义好了。
由于 DataStream 中没有时区概念,因此 Flink 会将事件时间属性解析成不带时区的 TIMESTAMP 类型,所有的时间值都被当作 UTC 标准时间。
在代码中的定义方式如下:
// 方法一:
// 流中数据类型为二元组 Tuple2,包含两个字段;需要自定义提取时间戳并生成水位线
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").rowtime());// 方法二:
// 流中数据类型为三元组 Tuple3,最后一个字段就是事件时间戳
DataStream<Tuple3<String, String, Long>> stream = inputStream.assignTimestampsAndWatermarks(...);// 不再声明额外字段,直接用最后一个字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").rowtime());
(3)处理时间
系统时间,使用时不需要提取时间戳(timestamp)和生成水位线(watermark)。因此在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。
类似地,处理时间属性的定义也有两种方式:创建表 DDL 中定义,或者在数据流转换成表时定义。
1)在创建表的 DDL 中定义
在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个额外的字段,通过调用系统内置的 PROCTIME()函数来指定当前的处理时间属性,返回的类型是 TIMESTAMP_LTZ。
CREATE TABLE EventTable(user STRING,url STRING,ts AS PROCTIME()
) WITH (...
);
时间属性,以“计算列”(computed column)的形式定义出来的。所谓的计算列是 Flink SQL 中引入的特殊概念,可以用一个 AS 语句来在表中产生数据中不存在的列,并且可以利用原有的列、各种运算符及内置函数。
在前面事件时间属性的定义中,将 ts 字段转换成 TIMESTAMP_LTZ 类型的 ts_ltz,也是计算列的定义方式。
2)在数据流转换为表时定义
处理时间属性同样可以在将 DataStream 转换为表的时候来定义。我们调用 fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。
由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。
代码中定义处理时间属性的方法如下:
DataStream<Tuple2<String, String>> stream = ...;// 声明一个额外的字段作为处理时间属性字段
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").proctime());
(4)窗口(Window)
1)分组窗口(Group Window,老版本)
在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在 SQL 中就是调用 TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。
以滚动窗口为例:
TUMBLE(ts, INTERVAL '1' HOUR)
这里的 ts 是定义好的时间属性字段,窗口大小用“时间间隔”INTERVAL 来定义。在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。
基本使用方式如下:
Table result = tableEnv.sqlQuery("SELECT " +"user, " +
"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " +"COUNT(url) AS cnt " +"FROM EventTable " +"GROUP BY " + // 使用窗口和用户名进行分组"user, " +"TUMBLE(ts, INTERVAL '1' HOUR)" // 定义 1 小时滚动窗口);
这里定义了 1 小时的滚动窗口,将窗口和用户 user 一起作为分组的字段。用聚合函数COUNT()对分组数据的个数进行了聚合统计,并将结果字段重命名为cnt;用TUPMBLE_END() 函数获取滚动窗口的结束时间,重命名为 endT 提取出来。
分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。
2)窗口表值函数(Windowing TVFs,新版本)
从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,Windowing TVFs)来定义窗口。
窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数。
目前 Flink 提供了以下几个窗口 TVF:
1.滚动窗口(Tumbling Windows);
2.滑动窗口(Hop Windows,跳跃窗口);
3.累积窗口(Cumulate Windows);
4.会话窗口(Session Windows,目前尚未完全支持)。
1.概述
窗口表值函数可以完全替代传统的分组窗口函数。窗口 TVF 更符合 SQL 标准,性能得到了优化,拥有更强大的功能;可以支持基于窗口的复杂计算,例如窗口 Top-N、窗口联结(window join)等等。
在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:“窗口起始点(window_start)、“窗口结束点”(window_end)、“窗口时间”(window_time)。
起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于 window_end - 1ms,所以相当于是窗口中能够包含数据的最大时间戳。
在 SQL 中的声明方式,与以前的分组窗口是类似的,直接调用 TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。
2.滚动窗口(TUMBLE)
滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。
在 SQL 中通过调用 TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。
在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)
这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。
(2)滑动窗口(HOP)
滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide)两个参数。
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));
这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。
(3)累积窗口(CUMULATE)
滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。
在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;
与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。
例如,我们按天来统计网站的 PV(Page View,页面浏览量),如果用 1 天的滚动窗口,那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。
所以我们真正希望的是,还是按照自然日统计每天的PV,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累积窗口”(Cumulate Window)。
累积窗口是窗口 TVF 中新增的窗口功能,它会在一定的统计周期内进行累积计算。
累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。
所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。开始时,创建的第一个窗口大小就是步长 step;之后的每个窗口都会在之前的基础上再扩展 step 的长度,直到达到最大窗口长度。
在 SQL 中可以用 CUMULATE()函数来定义,具体如下:
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度。
上面所有的语句只是定义了窗口,类似于 DataStream API 中的窗口分配器;在 SQL 中窗口的完整调用,还需要配合聚合操作和其它操作。
相关文章:

Flink SQL时间属性和窗口介绍
(1)概述 时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。 一旦定义了时间…...

Tomcat免安装版修改标题名称和进程
tomcat免安装版启动后闪退问题 问题描述 在官网下载的tomcat免安装版的你安装完环境后发现启动闪退,tomcat启动依赖环境是JDK,所以需要tomcat对应版本的JDK支持。 tomcat8官网下载地址:https://tomcat.apache.org/ JDK环境官网下载地址&…...
vim搜索、替换tab
bibtex 中的缩进可能不一致,强迫症犯了想将: 缩进空格改 tab;行首的多个 tab 改为单个 参考 [1],空格换 tab 可以: :set noexpandtab :%retab!行首的多个 tab 换单个: :%s/^\t\/\t/gReferences Replac…...

一文读懂ARM安全性架构和可信系统构建要素
一文读懂ARM安全性架构和可信系统构建要素 所谓可信系统(trusted system),即能够用于保护密码和加密密钥等资产(assets)免受一系列的可信攻击,防止其被复制、损坏或不可用(unavailable…...

Voice vlan、ICMP、单臂路由、mux-vlan
目录 一,Voice VLAN Voice vlan配置命令 一,问:已知网络中一台服务器的IP地址,如何找到这太服务器在哪台交换机的哪个接口上编辑 思路: 二,ICMP协议 三,ICMP案例分析编辑 四…...
TCP IP 网络编程(七) 理解select和epoll的使用
文章目录 理解select函数select函数的功能和调用顺序设置文件描述符设置监视范围及超时select函数调用示例 优于select的epoll基于select的I/O复用速度慢实现epoll时必要的函数和结构体epoll_createepoll_ctlepoll_wait基于epoll的服务器端 边缘触发和水平触发 理解select函数 …...
Linux accept和FD_xxx的使用
Linux socket accept功能的作用是在服务器端等待并接受客户端的连接请求。当有客户端尝试连接服务器时,服务器调用accept函数来接受该连接请求,并创建一个新的socket来与该客户端进行通信。 具体来说,accept函数被动监听客户端的三次握手连接…...

树结构及其算法-二叉运算树
目录 树结构及其算法-二叉运算树 C代码 树结构及其算法-二叉运算树 二叉树的应用实际上相当广泛,例如表达式之间的转换。可以把中序表达式按运算符优先级的顺序建成一棵二叉运算树(Binary Expression Tree,或称为二叉表达式树)…...
vue的rules验证失效,部分可以部分又失效的原因
vue的rules验证失效,部分可以部分又失效的原因 很多百度都有,但是我这里遇到了一个特别的,那就是prop没有写全,导致验证某一个失效 例子: 正常写法 el-form-item....多个省略<el-form-item label"胶币" prop"cost"><el-input v-model"form.…...
c#字符串转整数类型
将字符串转换为整数类型。为了方便,C#提供了一个内置的方法TryParse来实现这个功能 字符串(String):表示一串字符的数据类型。整数(Integer):表示不带小数点的数字。解析(Parsing&a…...

【LeetCode】118. 杨辉三角
118. 杨辉三角 难度:简单 题目 给定一个非负整数 *numRows,*生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中,每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]]示例…...

【Vue.js】Vue3全局配置Axios并解决跨域请求问题
系列文章目录 文章目录 系列文章目录背景一、部署Axios1. npm 安装 axios2. 创建 request.js,创建axios实例3. 在main.js中全局注册axios4. 在页面中使用axios 二、后端解决跨域请求问题方法一 解决单Contoller跨域访问方法二 全局解决跨域问题 背景 对于前后端分离…...
【车载开发系列】CRC循环冗余校验码原理
【车载开发系列】CRC循环冗余校验码原理 CRC循环冗余校验码原理 【车载开发系列】CRC循环冗余校验码原理一. CRC算法原理二. 生成多项式三. 多项式与其对应代码四. CRC码校验原理1)发送端2)接收端 五. CRC码原理方法1)发送端生成CRC码方法2&a…...

数据库实验:SQL的数据更新
目录 实验目的实验内容实验要求实验步骤实验过程总结 再次书接上文,sql基础的增删改查 实验目的 (1) 掌握DBMS的数据查询功能 (2) 掌握SQL语言的数据更新功能 实验内容 (1) update 语句用于对表进行更新 (2) delete 语句用于对表进行删除 (3) insert 语句用于对表…...

3.线性神经网络-3GPT版
#pic_center R 1 R_1 R1 R 2 R^2 R2 目录 知识框架No.1 线性回归基础优化算法一、线性回归1、买房案例2、买房模型简化3、线性模型4、神经网络5、损失函数6、训练数据7、参数学习8、显示解9、总结 二、 基础优化算法1、梯度下降2、学习率3、小批量随机梯度下降4、批量大小5、…...

大语言模型对齐技术 最新论文及源码合集(外部对齐、内部对齐、可解释性)
大语言模型对齐(Large Language Model Alignment)是利用大规模预训练语言模型来理解它们内部的语义表示和计算过程的研究领域。主要目的是避免大语言模型可见的或可预见的风险,比如固有存在的幻觉问题、生成不符合人类期望的文本、容易被用来执行恶意行为等。 从必…...

x264交叉编译(ubuntu+arm)
1.下载源码 https://code.videolan.org/videolan/x264 在windows下解压;复制到ubuntu; 2.进入源码文件夹-新建脚本文件 touch sp_run.sh 3.在sp_run.sh文件中输入 #!/bin/sh./configure --prefix/home/alientek/sp_test/x264/sp_install --enable-…...
SpringMVC 处理后端日期格式
通过扩展Spring MVC框架的消息转化器 在WebMvcConfiguration中扩展SpringMVC的消息转换器,统一对日期类型进行格式处理 WebMvcConfiguration /*** 扩展Spring MVC框架的消息转化器* param converters*/protected void extendMessageConverters(List<HttpMessag…...

Servlet详解
一.Servlet生命周期 初始化提供服务销毁 1.测试生命周期 package com.demo.servlet;import javax.servlet.*; import java.io.IOException;public class LifeServlet implements Servlet {Overridepublic void init(ServletConfig servletConfig) throws ServletException {…...

遥遥领先,免费开源的django4-vue3前后端分离项目
星域后台管理系统前端介绍 🌿项目简介 本项目前端基于当下流行且常用的vue3作为主要技术栈进行开发,融合了typescript和element-plus-ui,提供暗黑模式和白昼模式两种主题以及全屏切换,开发bug少,简单易学,…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
ffmpeg(四):滤镜命令
FFmpeg 的滤镜命令是用于音视频处理中的强大工具,可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下: ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜: ffmpeg…...

现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...

IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

C# 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
PAN/FPN
import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...
4. TypeScript 类型推断与类型组合
一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式,自动确定它们的类型。 这一特性减少了显式类型注解的需要,在保持类型安全的同时简化了代码。通过分析上下文和初始值,TypeSc…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.
ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #:…...