【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性
《Flink SQL 基础概念》系列,共包含以下 5 篇文章:
- Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API
- Flink SQL 基础概念(二):数据类型
- Flink SQL 基础概念(三):SQL 动态表 & 连续查询
- Flink SQL 基础概念(四):SQL 的时间属性
- Flink SQL 基础概念(五):SQL 时区问题
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
Flink SQL 基础概念(四):SQL 的时间属性
- 1.Flink 三种时间属性简介
- 2.Flink 三种时间属性的应用场景
- 2.1 事件时间案例
- 2.2 处理时间案例
- 2.3 摄入时间案例
- 3.SQL 指定时间属性的两种方式
- 4.SQL 事件时间案例
- 5.SQL 处理时间案例
与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间、事件时间、摄入时间 三种时间语义。
三种时间在生产环境的使用频次 事件时间(SQL 常用) > > > 处理时间(SQL 几乎不用,DataStream 少用) > > > 摄入时间(不用)。
1.Flink 三种时间属性简介
- 事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做事件时间。目前生产环境中用的最多。
- 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取
System.currentTimeMillis()
),在生产环境中用的次多。 - 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。
小伙伴们要注意到:
- 上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。
- 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。
2.Flink 三种时间属性的应用场景
讲到这里,有人会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?
先说结论,在 Flink 中时间的作用:
- 主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。
- 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。
博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。
2.1 事件时间案例
还是以之前的 clicks
表拿来举例。
上面这个案例的窗口大小是 1 小时,需求方需要按照用户点击时间戳 cTime
划分数据(划分滚动窗口),然后计算出 Count 聚合结果(这样计算能反映出事件的真实发生时间),那么就需要把 cTime
设置为窗口的划分时间戳,即代码中 tumble(cTime, interval '1' hour)
。
上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分(点击操作真实的发生时间)。
后续 Flink SQL 任务在运行的过程中也会实际按照 cTime
的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。
2.2 处理时间案例
还是以之前的 clicks
表拿来举例。
还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。
那么这种触发机制就是处理时间。
2.3 摄入时间案例
在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。
3.SQL 指定时间属性的两种方式
如果要满足 Flink SQL 时间窗口类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上面进行声明),以及支持时间相关的操作。
那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:
CREATE TABLE DDL
创建表的时候指定- 可以在
DataStream
中指定,在后续的 DataStream 转的 Table 中使用
一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。
4.SQL 事件时间案例
来看看 Flink 中如何指定事件时间。
CREATE TABLE DDL
指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP
或者 TIMESTAMP_LTZ
类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT
类型)嘛,那这种情况怎么办?
解决方案必须要有啊,如下。
CREATE TABLE user_actions (user_name STRING,data STRING,-- 1. 这个 ts 就是常见的毫秒级别时间戳ts BIGINT,-- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- 3. 使用下面这句来将 user_action_time 声明为事件时间,并且声明 watermark 的生成规则,即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
- DataStream 中指定事件时间。
之前介绍了 Table
和 DataStream
可以互转,那么 Flink 也提供了一个能力,就是在 Table
转为 DataStream
时,指定时间戳字段。如下案例:
public class DataStreamSourceEventTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(0L)) {@Overridepublic long extractTimestamp(Row element) {return (long) element.getField("f2");}});// 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}
5.SQL 处理时间案例
来看看 Flink SQL 中如何指定处理时间。
CREATE TABLE DDL
指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,-- 使用下面这句来将 user_action_time 声明为处理时间user_action_time AS PROCTIME()
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
-- 然后就可以在窗口算子中使用 user_action_time
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
- DataStream 中指定处理时间。
public class DataStreamSourceProcessingTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 1. 分配 watermarkDataStream<Row> r = env.addSource(new UserDefinedSource());// 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime");tEnv.createTemporaryView("source_table", sourceTable);// 3. 在 tumble window 中使用 f2String tumbleWindowSql ="SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n"+ "FROM source_table\n"+ "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)";Table resultTable = tEnv.sqlQuery(tumbleWindowSql);tEnv.toDataStream(resultTable, Row.class).print();env.execute();}private static class UserDefinedSource implements SourceFunction<Row>, ResultTypeQueryable<Row> {private volatile boolean isCancel;@Overridepublic void run(SourceContext<Row> sourceContext) throws Exception {int i = 0;while (!this.isCancel) {sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis()));Thread.sleep(10L);i++;}}@Overridepublic void cancel() {this.isCancel = true;}@Overridepublic TypeInformation<Row> getProducedType() {return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class),TypeInformation.of(Long.class));}}
}
相关文章:

【Flink SQL】Flink SQL 基础概念(四):SQL 的时间属性
《Flink SQL 基础概念》系列,共包含以下 5 篇文章: Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 APIFlink SQL 基础概念(二):数据类型Flink SQL 基础概念&am…...

文字弹性跳动CSS3代码
文字弹性跳动CSS3代码,源码由HTMLCSSJS组成,记事本打开源码文件可以进行内容文字之类的修改,双击html文件可以本地运行效果,也可以上传到服务器里面,重定向这个界面 下载地址 文字弹性跳动CSS3代码...
前端小白的学习之路(事件流)
提示:事件捕获,事件冒泡,事件委托 目录 事件模型(DOM事件流) 1.事件是什么 2.事件流 1).事件流的三个阶段 3.参考代码 二、事件委托 1.概念 2.使用案例 3.阻止冒泡行为 事件模型(DOM事件流) 1.事件是什么 1). 事件是HTML和Javascr…...

电脑文件误删除如何恢复?分享三个简单数据恢复方法
在日常使用电脑的过程中,文件误删除的情况时有发生。无论是由于操作失误还是病毒感染,丢失的文件都可能对我们的工作和学习造成极大的影响。因此,掌握文件恢复的方法显得尤为重要。下面围绕“电脑文件误删除如何恢复”这一主题,给…...

MySQL实战:监控
监控指标 性能类指标 名称说明QPS数据库每秒处理的请求数量TPS数据库每秒处理的事务数量并发数数据库实例当前并行处理的会话数量连接数连接到数据库会话的数量缓存命中率Innodb的缓存命中率 功能类指标 名称说明可用性数据库是否正常对外提供服务阻塞当前是否有阻塞的会话…...

MySQL自增主键自动生成的主键重置
需求描述: 从主键1开始,insert操作自增了五个,库里五条数主键是1、2、3、4、5; 然后把主键是3、4、5的三条数据给删了,再继续insert,主键就是6了 因为这里表会把最大的数即5记住,下次自增即为…...
reverse_iterator实现
对于实现reverse_iterator,我们可以学栈和队列的实现过程,利用适配器,实现如下; #pragma oncetemplate<class Iterator,class Ref,class Ptr> class reverse_Iterator { public://构造函数:reverse_Iterator(Iterator it):…...
C++:什么情况下函数应该声明为纯虚函数
在C中,函数应该在以下情况下声明为纯虚函数: 抽象基类:当你希望定义一个基类,该基类不能被实例化,只能作为其他类的基类时,你应该在基类中声明至少一个纯虚函数。这样的基类被称为抽象基类。纯虚函数通过在…...

【全面了解自然语言处理三大特征提取器】RNN(LSTM)、transformer(注意力机制)、CNN
目录 一 、RNN1.RNN单个cell的结构2.RNN工作原理3.RNN优缺点 二、LSTM1.LSTM单个cell的结构2. LSTM工作原理 三、transformer1 Encoder(1)position encoding(2)multi-head-attention(3)add&norm 残差链…...

区块链推广海外市场怎么做,CloudNEO服务商免费为您定制个性化营销方案
随着区块链技术的不断发展和应用场景的扩大,区块链项目希望能够进入海外市场并取得成功已成为越来越多公司的目标之一。然而,要在海外市场推广区块链项目,需要采取有效的营销策略和措施。作为您的区块链项目营销服务商,CloudNEO将…...
【S5PV210】 | ARM的指令集合
【S5PV210】 | ARM的指令集合 时间:2024年3月17日23:32:06 目录 文章目录 【S5PV210】 | ARM的指令集合目录 ARM指令集具有一系列显著的特点。首先,它属于RISC(精简指令集计算机)架构,这意味着译码机制相对简单。在AR…...
2024-3-17Go语言入门
在Go语言中: var a chan int 定义了一个名为 a 的变量,其类型为 chan int。这意味着 a 是一个整型值的通道(channel)。通道是Go语言中用于goroutine之间通信的一种机制,你可以通过通道发送和接收特定类型的值。在这个例…...
AJAX-XMLHttpRequest
XMLHttpRequest 定义: XMLHttpRequest对象用于与服务器交互。通过XMLHttpRequest可以在不断刷新页面的情况下请求特定URL,获取数据。这允许网页在不影响用户操作的情况下,更新页面的局部内容。 关系: axios内部采用XMLHttpReques…...

【GPT-SOVITS-04】SOVITS 模块-鉴别模型解析
说明:该系列文章从本人知乎账号迁入,主要原因是知乎图片附件过于模糊。 知乎专栏地址: 语音生成专栏 系列文章地址: 【GPT-SOVITS-01】源码梳理 【GPT-SOVITS-02】GPT模块解析 【GPT-SOVITS-03】SOVITS 模块-生成模型解析 【G…...

论文阅读_时序模型_iTransformer
1 2 3 4 5 6 7 8英文名称: ITRANSFORMER: INVERTED TRANSFORMERS ARE EFFECTIVE FOR TIME SERIES FORECASTING 中文名称: ITRANSFORMER:倒置Transformers在时间序列预测中的有效性 链接: https://openreview.net/forum?idX6ZmOsTYVs 代码: https://github.com/thum…...

Docker 哲学 - 容器操作 -cp
1、拷贝 容器绑定的 volume的 数据,到指定目录 2、匿名挂载 volume 只定义一个数据咋在容器内的path,docker自动生成一个 sha256 的key作为 volume 名字。这个 sha256 跟 commitID 一致都是唯一的所以 ,docker利用这个机制,可以…...

作品展示ETL
1、ETL 作业定义、作业导入、控件拖拽、执行、监控、稽核、告警、报告导出、定时设定 欧洲某国电信系统数据割接作业定义中文页面(作业顶层,可切英文,按F1弹当前页面帮助) 涉及文件拆分、文件到mysql、库到库、数据清洗、数据转…...
python的集合应用
在Python中,集合是一种无序、可变的数据类型,用于存储不重复的元素。Python提供了内置的集合类型 set,以及 frozenset(不可变的集合)。以下是一些Python集合的常见应用场景: 去重: 集合是存储唯…...

盒子IM开源仿微信聊天程序源码,可以商用
安装教程 1.安装运行环境 安装node:v14.16.0安装jdk:1.8安装maven:3.6.3安装mysql:5.7,密码分别为root/root,运行sql脚本(脚本在im-platfrom的resources/db目录)安装redis:5.0安装minio,命令端口使用9001,并创建一个名为”box-im”的bucket,…...

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:Web)中篇
onBeforeUnload onBeforeUnload(callback: (event?: { url: string; message: string; result: JsResult }) > boolean) 刷新或关闭场景下,在即将离开当前页面时触发此回调。刷新或关闭当前页面应先通过点击等方式获取焦点,才会触发此回调。 参数…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...

第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10+pip3.10)
第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10pip3.10) 一:前言二:安装编译依赖二:安装Python3.10三:安装PIP3.10四:安装Paddlepaddle基础框架4.1…...

基于江科大stm32屏幕驱动,实现OLED多级菜单(动画效果),结构体链表实现(独创源码)
引言 在嵌入式系统中,用户界面的设计往往直接影响到用户体验。本文将以STM32微控制器和OLED显示屏为例,介绍如何实现一个多级菜单系统。该系统支持用户通过按键导航菜单,执行相应操作,并提供平滑的滚动动画效果。 本文设计了一个…...

鸿蒙Navigation路由导航-基本使用介绍
1. Navigation介绍 Navigation组件是路由导航的根视图容器,一般作为Page页面的根容器使用,其内部默认包含了标题栏、内容区和工具栏,其中内容区默认首页显示导航内容(Navigation的子组件)或非首页显示(Nav…...

【技巧】dify前端源代码修改第一弹-增加tab页
回到目录 【技巧】dify前端源代码修改第一弹-增加tab页 尝试修改dify的前端源代码,在知识库增加一个tab页"HELLO WORLD",完成后的效果如下 [gif01] 1. 前端代码进入调试模式 参考 【部署】win10的wsl环境下启动dify的web前端服务 启动调试…...