33、Flink 的Table API 和 SQL 中的时区
Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
文章目录
- Flink 系列文章
- 一、时区
- 1、TIMESTAMP vs TIMESTAMP_LTZ
- 1)、TIMESTAMP 类型
- 2)、TIMESTAMP_LTZ 类型
- 2、时区的作用
- 1)、确定时间函数的返回值
- 2)、TIMESTAMP_LTZ 字符串表示
- 3、时间属性和时区
- 1)、处理时间和时区
- 2)、事件时间和时区
- 1、TIMESTAMP 上的事件时间属性
- 2、TIMESTAMP_LTZ 上的事件时间属性
- 4、夏令时支持
- 5、Batch 模式和 Streaming 模式的区别
本文简单的介绍了Flink 中关于时区的概念,并以具体的示例进行说明。
本文依赖flink、kafka集群能正常使用。
本文分为5个部分,即TIMESTAMP vs TIMESTAMP_LTZ介绍、时区的作用、时区属性与时区、夏令时支持与流批关于时间的处理区别。
本文的示例是在Flink 1.17版本中运行。
一、时区
Flink 为日期和时间提供了丰富的数据类型, 包括 DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND (更多详情请参考 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 中的 Date and Time)。 Flink 支持在 session (会话)级别设置时区(更多详情请参考 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置 中的 Planner 配置 table.local-time-zone 部分)。 Flink 对多种时间类型和时区的支持使得跨时区的数据处理变得非常容易。
1、TIMESTAMP vs TIMESTAMP_LTZ
1)、TIMESTAMP 类型
- TIMESTAMP§ 是 TIMESTAMP§ WITHOUT TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
- TIMESTAMP 用于描述年, 月, 日, 小时, 分钟, 秒 和 小数秒对应的时间戳。
- TIMESTAMP 可以通过一个字符串来指定,例如:
Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';
+-------------------------+
| 1970-01-01 00:00:04.001 |
+-------------------------+
2)、TIMESTAMP_LTZ 类型
- TIMESTAMP_LTZ(p) 是 TIMESTAMP(p) WITH LOCAL TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
- TIMESTAMP_LTZ 用于描述时间线上的绝对时间点, 使用 long 保存从 epoch 至今的毫秒数, 使用int保存毫秒中的纳秒数。 epoch 时间是从 java 的标准 epoch 时间 1970-01-01T00:00:00Z 开始计算。 在计算和可视化时, 每个 TIMESTAMP_LTZ 类型的数据都是使用的 session (会话)中配置的时区。
- TIMESTAMP_LTZ 没有字符串表达形式因此无法通过字符串来指定, 可以通过一个 long 类型的 epoch 时间来转化(例如: 通过 Java 来产生一个 long 类型的 epoch 时间 System.currentTimeMillis())
Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
Flink SQL> SET 'table.local-time-zone' = 'UTC';
Flink SQL> SELECT * FROM T1;
+---------------------------+
| TO_TIMESTAMP_LTZ(4001, 3) |
+---------------------------+
| 1970-01-01 00:00:04.001 |
+---------------------------+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
Flink SQL> SELECT * FROM T1;
+---------------------------+
| TO_TIMESTAMP_LTZ(4001, 3) |
+---------------------------+
| 1970-01-01 08:00:04.001 |
+---------------------------+
- TIMESTAMP_LTZ 可以用于跨时区的计算,因为它是一个基于 epoch 的绝对时间点(比如上例中的 4001 毫秒)代表的就是不同时区的同一个绝对时间点。 补充一个背景知识:在同一个时间点, 全世界所有的机器上执行 System.currentTimeMillis() 都会返回同样的值。 (比如上例中的 4001 milliseconds), 这就是绝对时间的定义。
2、时区的作用
本地时区定义了当前 session(会话)所在的时区, 你可以在 Sql client 或者应用程序中配置。
- java
代码片段示例
EnvironmentSettings envSetting = EnvironmentSettings.inStreamingMode();TableEnvironment tEnv = TableEnvironment.create(envSetting);// 设置为 UTC 时区tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));// 设置为上海时区tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));// 设置为 Los_Angeles 时区tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
- sql client
-- 设置为 UTC 时区
Flink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.-- 设置为上海时区
Flink SQL> SET 'table.local-time-zone' = 'America/Los_Angeles';
[INFO] Execute statement succeed.-- 设置为Los_Angeles时区
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.
session(会话)的时区设置在 Flink SQL 中非常有用, 它的主要用法如下:
1)、确定时间函数的返回值
session (会话)中配置的时区会对以下函数生效。
- LOCALTIME
- LOCALTIMESTAMP
- CURRENT_DATE
- CURRENT_TIME
- CURRENT_TIMESTAMP
- CURRENT_ROW_TIMESTAMP()
- NOW()
- PROCTIME()
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.Flink SQL> CREATE VIEW MyView1 AS
> SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
[INFO] Execute statement succeed.Flink SQL> DESC MyView1;
+-------------------+-----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+-------------------+-----------------------------+-------+-----+--------+-----------+
| LOCALTIME | TIME(0) | FALSE | | | |
| LOCALTIMESTAMP | TIMESTAMP(3) | FALSE | | | |
| CURRENT_DATE | DATE | FALSE | | | |
| CURRENT_TIME | TIME(0) | FALSE | | | |
| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | FALSE | | | |
| EXPR$5 | TIMESTAMP_LTZ(3) | FALSE | | | |
| EXPR$6 | TIMESTAMP_LTZ(3) | FALSE | | | |
| EXPR$7 | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
+-------------------+-----------------------------+-------+-----+--------+-----------+
8 rows in setFlink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView1;
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | EXPR$5 | EXPR$6 | EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 06:52:14 | 2023-11-10 06:52:14.144 | 2023-11-10 | 06:52:14 | 2023-11-10 06:52:14.144 | 2023-11-10 06:52:14.144 | 2023-11-10 06:52:14.144 | 2023-11-10 06:52:14.145 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Received a total of 1 rowFlink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView1;
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | EXPR$5 | EXPR$6 | EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 14:52:52 | 2023-11-10 14:52:52.305 | 2023-11-10 | 14:52:52 | 2023-11-10 14:52:52.305 | 2023-11-10 14:52:52.305 | 2023-11-10 14:52:52.305 | 2023-11-10 14:52:52.305 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Received a total of 1 row
2)、TIMESTAMP_LTZ 字符串表示
当一个 TIMESTAMP_LTZ 值转为 string 格式时, session 中配置的时区会生效。 例如打印这个值,将类型强制转化为 STRING 类型, 将类型强制转换为 TIMESTAMP ,将 TIMESTAMP 的值转化为 TIMESTAMP_LTZ 类型:
Flink SQL> CREATE VIEW MyView2 AS
> SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;
[INFO] Execute statement succeed.Flink SQL> DESC MyView2;
+------+------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------+------------------+-------+-----+--------+-----------+
| ltz | TIMESTAMP_LTZ(3) | TRUE | | | |
| ntz | TIMESTAMP(3) | FALSE | | | |
+------+------------------+-------+-----+--------+-----------+
2 rows in setFlink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView2;
+----+-------------------------+-------------------------+
| op | ltz | ntz |
+----+-------------------------+-------------------------+
| +I | 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+
Received a total of 1 rowFlink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView2;
+----+-------------------------+-------------------------+
| op | ltz | ntz |
+----+-------------------------+-------------------------+
| +I | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+
Received a total of 1 rowFlink SQL> CREATE VIEW MyView3 AS
> SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2;
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView3;
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
| op | ltz | EXPR$1 | EXPR$2 | ntz | EXPR$4 |
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
| +I | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
Received a total of 1 row
3、时间属性和时区
更多时间属性相关的详细介绍, 请参考:15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置 中的时间属性配置部分。
1)、处理时间和时区
Flink SQL 使用函数 PROCTIME() 来定义处理时间属性, 该函数返回的类型是 TIMESTAMP_LTZ 。
在 Flink1.13 之前, PROCTIME() 函数返回的类型是 TIMESTAMP , 返回值是UTC时区下的 TIMESTAMP 。
例如: 当上海的时间为 2021-11-11 12:00:00 时, PROCTIME() 显示的时间却是错误的 2021-11-11 04:00:00 。 这个问题在 Flink 1.13 中修复了, 因此用户不用再去处理时区的问题了。
PROCTIME() 返回的是本地时区的时间, 使用 TIMESTAMP_LTZ 类型也可以支持夏令时时间。
Flink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT PROCTIME();
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | 2023-11-10 06:59:30.998 |
+----+-------------------------+
Received a total of 1 rowFlink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.Flink SQL> SELECT PROCTIME();
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | 2023-11-10 14:59:54.031 |
+----+-------------------------+
Received a total of 1 rowFlink SQL> CREATE TABLE MyTable1 (
> item STRING,
> price DOUBLE,
> proctime as PROCTIME()
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'MyTable1',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE VIEW MyView3 AS
> SELECT
> TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,
> TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,
> TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime,
> item,
> MAX(price) as max_price
> FROM MyTable1
> GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;
[INFO] Execute statement succeed.Flink SQL> DESC MyView3;
+-----------------+-----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+-----------------+-----------------------------+-------+-----+--------+-----------+
| window_start | TIMESTAMP(3) | FALSE | | | |
| window_end | TIMESTAMP(3) | FALSE | | | |
| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | | |
| item | STRING | TRUE | | | |
| max_price | DOUBLE | TRUE | | | |
+-----------------+-----------------------------+-------+-----+--------+-----------+
5 rows in set
在终端执行以下命令写入数据到 MyTable1 :
[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic MyTable1
>A,1.1
>B,1.2
>A,1.8
>B,2.5
>C,3.8
>
Flink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView3;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op | window_start | window_end | window_proctime | item | max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-10 07:10:00.000 | 2023-11-10 07:20:00.000 | 2023-11-10 07:20:00.000 | A | 1.8 |
| +I | 2023-11-10 07:10:00.000 | 2023-11-10 07:20:00.000 | 2023-11-10 07:20:00.000 | C | 3.8 |
| +I | 2023-11-10 07:10:00.000 | 2023-11-10 07:20:00.000 | 2023-11-10 07:20:00.001 | B | 2.5 |
received a total of 3 rows
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.
相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口处理时间是不同的。
Flink SQL> SELECT * FROM MyView3;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op | window_start | window_end | window_proctime | item | max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-10 15:10:00.000 | 2023-11-10 15:20:00.000 | 2023-11-10 15:20:00.000 | A | 1.8 |
| +I | 2023-11-10 15:10:00.000 | 2023-11-10 15:20:00.000 | 2023-11-10 15:20:00.000 | C | 3.8 |
| +I | 2023-11-10 15:10:00.000 | 2023-11-10 15:20:00.000 | 2023-11-10 15:20:00.001 | B | 2.5 |
received a total of 3 rows
处理时间窗口是不确定的, 每次运行都会返回不同的窗口和聚合结果。 以上的示例只用于说明时区如何影响处理时间窗口。
2)、事件时间和时区
Flink 支持在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义时间属性。
1、TIMESTAMP 上的事件时间属性
如果 source 中的时间用于表示年-月-日-小时-分钟-秒, 通常是一个不带时区的字符串,
例如: 2023-11-13 08:13:40.564。 推荐在 TIMESTAMP 列上定义事件时间属性。
- 准备测试环境,即表、视图和数据
Flink SQL> CREATE TABLE MyTable2 (
> item STRING,
> price DOUBLE,
> ts TIMESTAMP(3), -- TIMESTAMP data type
> WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'MyTable2',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE VIEW MyView4 AS
> SELECT
> TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,
> TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,
> TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,
> item,
> MAX(price) as max_price
> FROM MyTable2
> GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;
[INFO] Execute statement succeed.Flink SQL> DESC MyView4;
+----------------+------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+----------------+------------------------+-------+-----+--------+-----------+
| window_start | TIMESTAMP(3) | FALSE | | | |
| window_end | TIMESTAMP(3) | FALSE | | | |
| window_rowtime | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
| item | STRING | TRUE | | | |
| max_price | DOUBLE | TRUE | | | |
+----------------+------------------------+-------+-----+--------+-----------+
5 rows in set
在终端执行以下命令用于写入数据到 MyTable2 :
[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_MyTable2
>A,1.1,2023-11-13 08:21:00
>B,1.2,2023-11-13 08:22:00
>A,1.8,2023-11-13 08:23:00
>B,2.5,2023-11-13 08:24:00
>C,3.8,2023-11-13 08:25:00
>C,3.8,2023-11-13 08:41:00
- 查看UTC与Asia/Shanghai的查询结果
Flink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView4;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op | window_start | window_end | window_rowtime | item | max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | A | 1.8 |
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | B | 2.5 |
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | C | 3.8 |
相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口的 rowtime 是相同的。
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView4;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op | window_start | window_end | window_rowtime | item | max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | A | 1.8 |
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | B | 2.5 |
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 | C | 3.8 |
2、TIMESTAMP_LTZ 上的事件时间属性
如果源数据中的时间为一个 epoch 时间, 通常是一个 long 值, 例如: 1618989564564 ,推荐将事件时间属性定义在 TIMESTAMP_LTZ 列上。
- 准备测试环境,即准备表、视图和数据
Flink SQL> CREATE TABLE MyTable3 (
> item STRING,
> price DOUBLE,
> ts BIGINT, -- long time value in epoch milliseconds
> ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
> WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alan_MyTable3',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE VIEW MyView5 AS
> SELECT
> TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,
> TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,
> TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime,
> item,
> MAX(price) as max_price
> FROM MyTable3
> GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;
[INFO] Execute statement succeed.Flink SQL> DESC MyView5;
+----------------+----------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+----------------+----------------------------+-------+-----+--------+-----------+
| window_start | TIMESTAMP(3) | FALSE | | | |
| window_end | TIMESTAMP(3) | FALSE | | | |
| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | TRUE | | | |
| item | STRING | TRUE | | | |
| max_price | DOUBLE | TRUE | | | |
+----------------+----------------------------+-------+-----+--------+-----------+
5 rows in set
在终端执行以下命令用于写入数据到 MyTable3 :
[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_MyTable3
>A,1.1,1699836971034 # The corresponding utc timestamp is 2023-11-13 08:56:xx
>B,1.2,1699837031044 # The corresponding utc timestamp is 2023-11-13 08:57:xx
>A,1.8,1699837091052 # The corresponding utc timestamp is 2023-11-13 08:58:xx
>B,2.5,1699837091052 # The corresponding utc timestamp is 2023-11-13 08:59:xx
>C,3.8,1699837211069 # The corresponding utc timestamp is 2023-11-13 09:00:xx
>C,3.8,1699837271070 # The corresponding utc timestamp is 2023-11-13 09:01:xx
- 查看UTC与Asia/Shanghai的查询结果
Flink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView5;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op | window_start | window_end | window_rowtime | item | max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-13 00:50:00.000 | 2023-11-13 01:00:00.000 | 2023-11-13 00:59:59.999 | A | 1.8 |
| +I | 2023-11-13 00:50:00.000 | 2023-11-13 01:00:00.000 | 2023-11-13 00:59:59.999 | B | 2.5 |
相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口的 rowtime 是不同的。
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView5;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op | window_start | window_end | window_rowtime | item | max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-13 08:50:00.000 | 2023-11-13 09:00:00.000 | 2023-11-13 08:59:59.999 | A | 1.8 |
| +I | 2023-11-13 08:50:00.000 | 2023-11-13 09:00:00.000 | 2023-11-13 08:59:59.999 | B | 2.5 |
4、夏令时支持
Flink SQL支持在 TIMESTAMP_LTZ列上定义时间属性, 基于这一特征,Flink SQL 在窗口中使用 TIMESTAMP 和 TIMESTAMP_LTZ 类型优雅地支持了夏令时。
Flink 使用时间戳的字符格式来分割窗口并通过每条记录对应的 epoch 时间来分配窗口。 这意味着 Flink 窗口开始时间和窗口结束时间使用的是 TIMESTAMP 类型(例如: TUMBLE_START 和 TUMBLE_END), 窗口的时间属性使用的是 TIMESTAMP_LTZ 类型(例如: TUMBLE_PROCTIME, TUMBLE_ROWTIME)。
给定一个 tumble window示例, 在 Los_Angeles 时区下夏令时从 2021-03-14 02:00:00 开始:
long epoch1 = 1615708800000L; // 2021-03-14 00:00:00
long epoch2 = 1615712400000L; // 2021-03-14 01:00:00
long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, 手表往前拨一小时,跳过 (2021-03-14 02:00:00)
long epoch4 = 1615719600000L; // 2021-03-14 04:00:00
在 Los_angele 时区下, tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] 将会收集3个小时的数据, 在其他非夏令时的时区下将会收集4个小时的数据,用户只需要在 TIMESTAMP_LTZ 列上声明时间属性即可。
Flink 的所有窗口(如 Hop window, Session window, Cumulative window)都会遵循这种方式, Flink SQL 中的所有操作都很好地支持了 TIMESTAMP_LTZ 类型,因此Flink可以非常优雅的支持夏令时。
5、Batch 模式和 Streaming 模式的区别
以下函数:
- LOCALTIME
- LOCALTIMESTAMP
- CURRENT_DATE
- CURRENT_TIME
- CURRENT_TIMESTAMP
- NOW()
Flink 会根据执行模式来进行不同计算,在 Streaming 模式下这些函数是每条记录都会计算一次,但在 Batch 模式下,只会在 query 开始时计算一次,所有记录都使用相同的结果。
以下时间函数无论是在 Streaming 模式还是 Batch 模式下,都会为每条记录计算一次结果:
- CURRENT_ROW_TIMESTAMP()
- PROCTIME()
以上,简单的介绍了Flink 中关于时区的概念,并以具体的示例进行说明。
相关文章:
33、Flink 的Table API 和 SQL 中的时区
Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...
Origin:科研绘图与学术图表绘制从入门到精通
文章目录 一、引言二、安装和启动Origin三、创建和保存图表四、深入学习Origin绘图功能五、应用Origin进行科研绘图和学术图表绘制六、总结与建议《Origin科研绘图与学术图表绘制从入门到精通》亮点内容简介作者简介目录获取方式 一、引言 Origin是一款功能强大的数据分析和科…...
腾讯云标准型SA4服务器AMD处理器性能测评
腾讯云服务器标准型SA4实例CPU采用AMD处理器,新一代腾讯云自研星星海双路服务器,搭配AMD EPYC Genoa处理器,内存采用最新 DDR5,默认网络优化,最高内网收发能力达4500万pps,最高内网带宽可支持100Gbps。阿腾…...
LeetCode 2656. K 个元素的最大和:一次遍历(附Python一行版代码)
【LetMeFly】2656.K 个元素的最大和:一次遍历(附Python一行版代码) 力扣题目链接:https://leetcode.cn/problems/maximum-sum-with-exactly-k-elements/ 给你一个下标从 0 开始的整数数组 nums 和一个整数 k 。你需要执行以下操…...
element-ui中Form表单使用自定义验证规则
data() {const validatePass (rule, value, callback) > {if (value.length < 3) {callback(new Error("密码不能小于3位"));} else {callback();}};return {rules: {password: [{ required: true, trigger: "blur", validator: validatePass },]}}…...
android源码添加adb host支持
本文开始参考在 android 上使用 adb client-CSDN博客,在shell中已经可以使用。但当我想在app中用 String command "/data/local/tmp/adb -s 307ef90dc8128844 shell ls";StringBuilder output new StringBuilder();try {Process process Runtime.getR…...
学习c#的第二天
目录 C# 基本语法 using 关键字 class 关键字 C# 中的注释 成员变量 成员函数 类的实例化 标识符 C# 关键字 C# 基本语法 C# 是一种面向对象的编程语言。在面向对象的程序设计方法中,程序由各种相互交互的对象组成。相同种类的对象通常具有相同的类型&…...
CodeWhisperer 使用经验分享
今天给大家分享一下 Amazon CodeWhisperer 编程工具(免费哦),使用这个软件后我的编码质量提升不少,给大家分享一下我的经验。希望大家支持哦。 Amazon CodeWhisperer 是亚⻢逊出品的一款基于机器学习的 AI 编程助手,可…...
数据结构与算法之美学习笔记:18 | 散列表(上):Word文档中的单词拼写检查功能是如何实现的?
目录 前言散列思想散列函数散列冲突解答开篇 前言 本节课程思维导图: Word 的单词拼写检查功能,虽然很小但却非常实用。你有没有想过,这个功能是如何实现的呢?其实啊,一点儿都不难。只要你学完今天的内容,…...
解决java发邮件错误javax.net.ssl.SSLHandshakeException: No appropriate protocol
java发送邮件时报以下错误信息: javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher [com.bm6api.controller.v1.AppUserController] - sendLoginAuthCodeMail 发送登录验证码邮件 : {"code":200,"inf…...
杭电oj 2035 人见人爱A^B C语言
#include<stdio.h>void main() {int a, b, i,num;while (~scanf_s("%d%d", &a, &b) && (a ! 0 || b ! 0)){num a;for (i 1; i < b; i){num * a;num % 1000;}printf("%d\n", num);} }...
[量化投资-学习笔记017]Python+TDengine从零开始搭建量化分析平台-异常处理
一个完成的程序一定少不了对异常的处理,以及错误日志的输出。 在之前章节的程序中对这两部分没有进行说明,以下用两个单独的章节进行介绍。 [量化投资-学习笔记016]PythonTDengine从零开始搭建量化分析平台-日志输出 异常处理 Python 通常使用 try .. except 和防…...
Mysql中的索引与事务和B树的知识补充
索引与事务和B树的知识补充 一.索引1.概念2.作用3.使用场景4.使用 二.事务1.为什么使用事务2.事务的概念3.使用3.1脏读问题3.2不可重复读3.3 幻读问题3.4解决3.5 使用代码 三.B树的知识补充1.B树2.B树 一.索引 1.概念 索引是一种特殊的文件,包含着对数据表里所有记…...
2024上海国际智能驾驶技术展览会(自动驾驶展)
2024上海国际智能驾驶技术展览会 2024 Shanghai International Autonomous driving Expo 时间:2024年3月26-28日 地点:上海跨国采购会展中心 随着科技的飞速发展,智能驾驶已经成为了汽车行业的重要趋势。在这个时代背景下,汽车不…...
嵌入式Linux开发,NFS文件系统挂载
在嵌入式linix的开发中,经常会需要在pc端和板端互相传输文件,优先可选择ftp传输,但是有些嵌入式板端不支持,只能使用nfs这种方式,即pc端作为服务端,板端作为客户端,将pc端的某个文件夹挂载到板端…...
什么是3D建模中的“高模”和“低模”?
3D建模中什么是高多边形和低多边形? 高多边形建模和低多边形建模之间的主要区别正如其名称所暗示的那样:您是否在模型中使用大量多边形或少量多边形。 然而,在决定每个模型的细节和多边形级别时,还需要考虑其他事项。最值得注意的…...
python数据结构与算法-04_队列
队列和栈 前面讲了线性和链式结构,如果你顺利掌握了,下边的队列和栈就小菜一碟了。因为我们会用前两章讲到的东西来实现队列和栈。 之所以放到一起讲是因为这两个东西很类似,队列是先进先出结构(FIFO, first in first out), 栈是…...
从理论到实践:深度解读BIO、NIO、AIO的优缺点及使用场景
文章目录 BIO优缺点示例代码 NIO优缺点示例代码 AIO优缺点示例代码 总结 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 BIO、NIO和AIO是Java编程语言中用于处理输入输出(IO…...
Mysql Innodb Cluster集群搭建 - docker
Mysql Innodb Cluster集群搭建 - docker 背景搭建环境架构图3台机器如下:修改三台机器的ip域名映射如下,并重启网络使其生效部署mysql server实例通过docker启动三台mysql server实例,需要映射数据请自行更改配置加入-v启动第一台mysql-server启动第二台mysql-server启动第三…...
如何在 macOS 中删除 Time Machine 本地快照
看到这个可用82GB(458.3MB可清除) 顿时感觉清爽,之前的还是可用82GB(65GB可清除),安装个xcode都安装不上,费解半天,怎么都解决不了这个问题,就是买磁盘情理软件也解决不了…...
多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
