当前位置: 首页 > news >正文

33、Flink 的Table API 和 SQL 中的时区

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
  • 一、时区
    • 1、TIMESTAMP vs TIMESTAMP_LTZ
      • 1)、TIMESTAMP 类型
      • 2)、TIMESTAMP_LTZ 类型
    • 2、时区的作用
      • 1)、确定时间函数的返回值
      • 2)、TIMESTAMP_LTZ 字符串表示
    • 3、时间属性和时区
      • 1)、处理时间和时区
      • 2)、事件时间和时区
        • 1、TIMESTAMP 上的事件时间属性
        • 2、TIMESTAMP_LTZ 上的事件时间属性
    • 4、夏令时支持
    • 5、Batch 模式和 Streaming 模式的区别


本文简单的介绍了Flink 中关于时区的概念,并以具体的示例进行说明。
本文依赖flink、kafka集群能正常使用。
本文分为5个部分,即TIMESTAMP vs TIMESTAMP_LTZ介绍、时区的作用、时区属性与时区、夏令时支持与流批关于时间的处理区别。
本文的示例是在Flink 1.17版本中运行。

一、时区

Flink 为日期和时间提供了丰富的数据类型, 包括 DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND (更多详情请参考 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 中的 Date and Time)。 Flink 支持在 session (会话)级别设置时区(更多详情请参考 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置 中的 Planner 配置 table.local-time-zone 部分)。 Flink 对多种时间类型和时区的支持使得跨时区的数据处理变得非常容易。

1、TIMESTAMP vs TIMESTAMP_LTZ

1)、TIMESTAMP 类型

  • TIMESTAMP§ 是 TIMESTAMP§ WITHOUT TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
  • TIMESTAMP 用于描述年, 月, 日, 小时, 分钟, 秒 和 小数秒对应的时间戳。
  • TIMESTAMP 可以通过一个字符串来指定,例如:
Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';
+-------------------------+
| 1970-01-01 00:00:04.001 |
+-------------------------+

2)、TIMESTAMP_LTZ 类型

  • TIMESTAMP_LTZ(p) 是 TIMESTAMP(p) WITH LOCAL TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
  • TIMESTAMP_LTZ 用于描述时间线上的绝对时间点, 使用 long 保存从 epoch 至今的毫秒数, 使用int保存毫秒中的纳秒数。 epoch 时间是从 java 的标准 epoch 时间 1970-01-01T00:00:00Z 开始计算。 在计算和可视化时, 每个 TIMESTAMP_LTZ 类型的数据都是使用的 session (会话)中配置的时区。
  • TIMESTAMP_LTZ 没有字符串表达形式因此无法通过字符串来指定, 可以通过一个 long 类型的 epoch 时间来转化(例如: 通过 Java 来产生一个 long 类型的 epoch 时间 System.currentTimeMillis())
Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
Flink SQL> SET 'table.local-time-zone' = 'UTC';
Flink SQL> SELECT * FROM T1;
+---------------------------+
| TO_TIMESTAMP_LTZ(4001, 3) |
+---------------------------+
|   1970-01-01 00:00:04.001 |
+---------------------------+Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
Flink SQL> SELECT * FROM T1;
+---------------------------+
| TO_TIMESTAMP_LTZ(4001, 3) |
+---------------------------+
|   1970-01-01 08:00:04.001 |
+---------------------------+
  • TIMESTAMP_LTZ 可以用于跨时区的计算,因为它是一个基于 epoch 的绝对时间点(比如上例中的 4001 毫秒)代表的就是不同时区的同一个绝对时间点。 补充一个背景知识:在同一个时间点, 全世界所有的机器上执行 System.currentTimeMillis() 都会返回同样的值。 (比如上例中的 4001 milliseconds), 这就是绝对时间的定义。

2、时区的作用

本地时区定义了当前 session(会话)所在的时区, 你可以在 Sql client 或者应用程序中配置。

  • java
    代码片段示例
EnvironmentSettings envSetting = EnvironmentSettings.inStreamingMode();TableEnvironment tEnv = TableEnvironment.create(envSetting);// 设置为 UTC 时区tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));// 设置为上海时区tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));// 设置为 Los_Angeles 时区tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
  • sql client
-- 设置为 UTC 时区
Flink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.-- 设置为上海时区
Flink SQL> SET 'table.local-time-zone' = 'America/Los_Angeles';
[INFO] Execute statement succeed.-- 设置为Los_Angeles时区
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.

session(会话)的时区设置在 Flink SQL 中非常有用, 它的主要用法如下:

1)、确定时间函数的返回值

session (会话)中配置的时区会对以下函数生效。

  • LOCALTIME
  • LOCALTIMESTAMP
  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • CURRENT_ROW_TIMESTAMP()
  • NOW()
  • PROCTIME()
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.Flink SQL> CREATE VIEW MyView1 AS 
> SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
[INFO] Execute statement succeed.Flink SQL> DESC MyView1;
+-------------------+-----------------------------+-------+-----+--------+-----------+
|              name |                        type |  null | key | extras | watermark |
+-------------------+-----------------------------+-------+-----+--------+-----------+
|         LOCALTIME |                     TIME(0) | FALSE |     |        |           |
|    LOCALTIMESTAMP |                TIMESTAMP(3) | FALSE |     |        |           |
|      CURRENT_DATE |                        DATE | FALSE |     |        |           |
|      CURRENT_TIME |                     TIME(0) | FALSE |     |        |           |
| CURRENT_TIMESTAMP |            TIMESTAMP_LTZ(3) | FALSE |     |        |           |
|            EXPR$5 |            TIMESTAMP_LTZ(3) | FALSE |     |        |           |
|            EXPR$6 |            TIMESTAMP_LTZ(3) | FALSE |     |        |           |
|            EXPR$7 | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
+-------------------+-----------------------------+-------+-----+--------+-----------+
8 rows in setFlink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView1;
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | LOCALTIME |          LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME |       CURRENT_TIMESTAMP |                  EXPR$5 |                  EXPR$6 |                  EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |  06:52:14 | 2023-11-10 06:52:14.144 |   2023-11-10 |     06:52:14 | 2023-11-10 06:52:14.144 | 2023-11-10 06:52:14.144 | 2023-11-10 06:52:14.144 | 2023-11-10 06:52:14.145 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Received a total of 1 rowFlink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView1;
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | LOCALTIME |          LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME |       CURRENT_TIMESTAMP |                  EXPR$5 |                  EXPR$6 |                  EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |  14:52:52 | 2023-11-10 14:52:52.305 |   2023-11-10 |     14:52:52 | 2023-11-10 14:52:52.305 | 2023-11-10 14:52:52.305 | 2023-11-10 14:52:52.305 | 2023-11-10 14:52:52.305 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
Received a total of 1 row

2)、TIMESTAMP_LTZ 字符串表示

当一个 TIMESTAMP_LTZ 值转为 string 格式时, session 中配置的时区会生效。 例如打印这个值,将类型强制转化为 STRING 类型, 将类型强制转换为 TIMESTAMP ,将 TIMESTAMP 的值转化为 TIMESTAMP_LTZ 类型:

Flink SQL> CREATE VIEW MyView2 AS 
> SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001'  AS ntz;
[INFO] Execute statement succeed.Flink SQL> DESC MyView2;
+------+------------------+-------+-----+--------+-----------+
| name |             type |  null | key | extras | watermark |
+------+------------------+-------+-----+--------+-----------+
|  ltz | TIMESTAMP_LTZ(3) |  TRUE |     |        |           |
|  ntz |     TIMESTAMP(3) | FALSE |     |        |           |
+------+------------------+-------+-----+--------+-----------+
2 rows in setFlink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView2;
+----+-------------------------+-------------------------+
| op |                     ltz |                     ntz |
+----+-------------------------+-------------------------+
| +I | 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+
Received a total of 1 rowFlink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView2;
+----+-------------------------+-------------------------+
| op |                     ltz |                     ntz |
+----+-------------------------+-------------------------+
| +I | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+
Received a total of 1 rowFlink SQL> CREATE VIEW MyView3 AS 
> SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2;
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView3;
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
| op |                     ltz |                  EXPR$1 |                         EXPR$2 |                     ntz |                  EXPR$4 |
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
| +I | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 |        1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
+----+-------------------------+-------------------------+--------------------------------+-------------------------+-------------------------+
Received a total of 1 row

3、时间属性和时区

更多时间属性相关的详细介绍, 请参考:15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置 中的时间属性配置部分。

1)、处理时间和时区

Flink SQL 使用函数 PROCTIME() 来定义处理时间属性, 该函数返回的类型是 TIMESTAMP_LTZ 。

在 Flink1.13 之前, PROCTIME() 函数返回的类型是 TIMESTAMP , 返回值是UTC时区下的 TIMESTAMP 。
例如: 当上海的时间为 2021-11-11 12:00:00 时, PROCTIME() 显示的时间却是错误的 2021-11-11 04:00:00 。 这个问题在 Flink 1.13 中修复了, 因此用户不用再去处理时区的问题了。

PROCTIME() 返回的是本地时区的时间, 使用 TIMESTAMP_LTZ 类型也可以支持夏令时时间。

Flink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT PROCTIME();
+----+-------------------------+
| op |                  EXPR$0 |
+----+-------------------------+
| +I | 2023-11-10 06:59:30.998 |
+----+-------------------------+
Received a total of 1 rowFlink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.Flink SQL> SELECT PROCTIME();
+----+-------------------------+
| op |                  EXPR$0 |
+----+-------------------------+
| +I | 2023-11-10 14:59:54.031 |
+----+-------------------------+
Received a total of 1 rowFlink SQL> CREATE TABLE MyTable1 (
>                   item STRING,
>                   price DOUBLE,
>                   proctime as PROCTIME()
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'MyTable1',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE VIEW MyView3 AS
>             SELECT
>                 TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,
>                 TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,
>                 TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime,
>                 item,
>                 MAX(price) as max_price
>             FROM MyTable1
>                 GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;
[INFO] Execute statement succeed.Flink SQL> DESC MyView3;
+-----------------+-----------------------------+-------+-----+--------+-----------+
|            name |                        type |  null | key | extras | watermark |
+-----------------+-----------------------------+-------+-----+--------+-----------+
|    window_start |                TIMESTAMP(3) | FALSE |     |        |           |
|      window_end |                TIMESTAMP(3) | FALSE |     |        |           |
| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     |        |           |
|            item |                      STRING |  TRUE |     |        |           |
|       max_price |                      DOUBLE |  TRUE |     |        |           |
+-----------------+-----------------------------+-------+-----+--------+-----------+
5 rows in set

在终端执行以下命令写入数据到 MyTable1 :

[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic MyTable1
>A,1.1
>B,1.2
>A,1.8
>B,2.5
>C,3.8
>
Flink SQL> SET 'table.local-time-zone' = 'UTC';
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView3;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op |            window_start |              window_end |         window_proctime |                           item |                      max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-10 07:10:00.000 | 2023-11-10 07:20:00.000 | 2023-11-10 07:20:00.000 |                              A |                            1.8 |
| +I | 2023-11-10 07:10:00.000 | 2023-11-10 07:20:00.000 | 2023-11-10 07:20:00.000 |                              C |                            3.8 |
| +I | 2023-11-10 07:10:00.000 | 2023-11-10 07:20:00.000 | 2023-11-10 07:20:00.001 |                              B |                            2.5 |
received a total of 3 rows
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
[INFO] Execute statement succeed.

相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口处理时间是不同的。

Flink SQL> SELECT * FROM MyView3;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op |            window_start |              window_end |         window_proctime |                           item |                      max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-10 15:10:00.000 | 2023-11-10 15:20:00.000 | 2023-11-10 15:20:00.000 |                              A |                            1.8 |
| +I | 2023-11-10 15:10:00.000 | 2023-11-10 15:20:00.000 | 2023-11-10 15:20:00.000 |                              C |                            3.8 |
| +I | 2023-11-10 15:10:00.000 | 2023-11-10 15:20:00.000 | 2023-11-10 15:20:00.001 |                              B |                            2.5 |
received a total of 3 rows

处理时间窗口是不确定的, 每次运行都会返回不同的窗口和聚合结果。 以上的示例只用于说明时区如何影响处理时间窗口。

2)、事件时间和时区

Flink 支持在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义时间属性。

1、TIMESTAMP 上的事件时间属性

如果 source 中的时间用于表示年-月-日-小时-分钟-秒, 通常是一个不带时区的字符串,
例如: 2023-11-13 08:13:40.564。 推荐在 TIMESTAMP 列上定义事件时间属性。

  • 准备测试环境,即表、视图和数据
Flink SQL> CREATE TABLE MyTable2 (
>   item STRING,
>   price DOUBLE,
>   ts TIMESTAMP(3), -- TIMESTAMP data type
>   WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
>    ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'MyTable2',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE VIEW MyView4 AS
> SELECT
> TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,
> TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,
> TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,
> item,
> MAX(price) as max_price
> FROM MyTable2
> GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;
[INFO] Execute statement succeed.Flink SQL> DESC MyView4;
+----------------+------------------------+-------+-----+--------+-----------+
|           name |                   type |  null | key | extras | watermark |
+----------------+------------------------+-------+-----+--------+-----------+
|   window_start |           TIMESTAMP(3) | FALSE |     |        |           |
|     window_end |           TIMESTAMP(3) | FALSE |     |        |           |
| window_rowtime | TIMESTAMP(3) *ROWTIME* |  TRUE |     |        |           |
|           item |                 STRING |  TRUE |     |        |           |
|      max_price |                 DOUBLE |  TRUE |     |        |           |
+----------------+------------------------+-------+-----+--------+-----------+
5 rows in set

在终端执行以下命令用于写入数据到 MyTable2 :

[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_MyTable2
>A,1.1,2023-11-13 08:21:00
>B,1.2,2023-11-13 08:22:00
>A,1.8,2023-11-13 08:23:00
>B,2.5,2023-11-13 08:24:00
>C,3.8,2023-11-13 08:25:00
>C,3.8,2023-11-13 08:41:00
  • 查看UTC与Asia/Shanghai的查询结果
Flink SQL> SET 'table.local-time-zone' = 'UTC'; 
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView4;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op |            window_start |              window_end |          window_rowtime |                           item |                      max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 |                              A |                            1.8 |
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 |                              B |                            2.5 |
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 |                              C |                            3.8 |

相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口的 rowtime 是相同的。

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; 
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView4;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op |            window_start |              window_end |          window_rowtime |                           item |                      max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 |                              A |                            1.8 |
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 |                              B |                            2.5 |
| +I | 2023-11-13 08:20:00.000 | 2023-11-13 08:30:00.000 | 2023-11-13 08:29:59.999 |                              C |                            3.8 |
2、TIMESTAMP_LTZ 上的事件时间属性

如果源数据中的时间为一个 epoch 时间, 通常是一个 long 值, 例如: 1618989564564 ,推荐将事件时间属性定义在 TIMESTAMP_LTZ 列上。

  • 准备测试环境,即准备表、视图和数据
Flink SQL> CREATE TABLE MyTable3 (
>  item STRING,
>  price DOUBLE,
>  ts BIGINT, -- long time value in epoch milliseconds
>  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
>  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'alan_MyTable3',
>   'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE VIEW MyView5 AS 
>     SELECT 
> TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,        
> TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,
> TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime,
> item,
> MAX(price) as max_price
>     FROM MyTable3
> GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;
[INFO] Execute statement succeed.Flink SQL> DESC MyView5;
+----------------+----------------------------+-------+-----+--------+-----------+
|           name |                       type |  null | key | extras | watermark |
+----------------+----------------------------+-------+-----+--------+-----------+
|   window_start |               TIMESTAMP(3) | FALSE |     |        |           |
|     window_end |               TIMESTAMP(3) | FALSE |     |        |           |
| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* |  TRUE |     |        |           |
|           item |                     STRING |  TRUE |     |        |           |
|      max_price |                     DOUBLE |  TRUE |     |        |           |
+----------------+----------------------------+-------+-----+--------+-----------+
5 rows in set

在终端执行以下命令用于写入数据到 MyTable3 :

[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_MyTable3
>A,1.1,1699836971034 # The corresponding utc timestamp is 2023-11-13 08:56:xx
>B,1.2,1699837031044 # The corresponding utc timestamp is 2023-11-13 08:57:xx
>A,1.8,1699837091052 # The corresponding utc timestamp is 2023-11-13 08:58:xx
>B,2.5,1699837091052 # The corresponding utc timestamp is 2023-11-13 08:59:xx
>C,3.8,1699837211069 # The corresponding utc timestamp is 2023-11-13 09:00:xx
>C,3.8,1699837271070 # The corresponding utc timestamp is 2023-11-13 09:01:xx
  • 查看UTC与Asia/Shanghai的查询结果
Flink SQL> SET 'table.local-time-zone' = 'UTC'; 
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView5;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op |            window_start |              window_end |          window_rowtime |                           item |                      max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-13 00:50:00.000 | 2023-11-13 01:00:00.000 | 2023-11-13 00:59:59.999 |                              A |                            1.8 |
| +I | 2023-11-13 00:50:00.000 | 2023-11-13 01:00:00.000 | 2023-11-13 00:59:59.999 |                              B |                            2.5 |

相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口的 rowtime 是不同的。

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; 
[INFO] Execute statement succeed.Flink SQL> SELECT * FROM MyView5;
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| op |            window_start |              window_end |          window_rowtime |                           item |                      max_price |
+----+-------------------------+-------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | 2023-11-13 08:50:00.000 | 2023-11-13 09:00:00.000 | 2023-11-13 08:59:59.999 |                              A |                            1.8 |
| +I | 2023-11-13 08:50:00.000 | 2023-11-13 09:00:00.000 | 2023-11-13 08:59:59.999 |                              B |                            2.5 |

4、夏令时支持

Flink SQL支持在 TIMESTAMP_LTZ列上定义时间属性, 基于这一特征,Flink SQL 在窗口中使用 TIMESTAMP 和 TIMESTAMP_LTZ 类型优雅地支持了夏令时。

Flink 使用时间戳的字符格式来分割窗口并通过每条记录对应的 epoch 时间来分配窗口。 这意味着 Flink 窗口开始时间和窗口结束时间使用的是 TIMESTAMP 类型(例如: TUMBLE_START 和 TUMBLE_END), 窗口的时间属性使用的是 TIMESTAMP_LTZ 类型(例如: TUMBLE_PROCTIME, TUMBLE_ROWTIME)。

给定一个 tumble window示例, 在 Los_Angeles 时区下夏令时从 2021-03-14 02:00:00 开始:

long epoch1 = 1615708800000L; // 2021-03-14 00:00:00
long epoch2 = 1615712400000L; // 2021-03-14 01:00:00
long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, 手表往前拨一小时,跳过 (2021-03-14 02:00:00)
long epoch4 = 1615719600000L; // 2021-03-14 04:00:00 

在 Los_angele 时区下, tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] 将会收集3个小时的数据, 在其他非夏令时的时区下将会收集4个小时的数据,用户只需要在 TIMESTAMP_LTZ 列上声明时间属性即可。

Flink 的所有窗口(如 Hop window, Session window, Cumulative window)都会遵循这种方式, Flink SQL 中的所有操作都很好地支持了 TIMESTAMP_LTZ 类型,因此Flink可以非常优雅的支持夏令时。

5、Batch 模式和 Streaming 模式的区别

以下函数:

  • LOCALTIME
  • LOCALTIMESTAMP
  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()

Flink 会根据执行模式来进行不同计算,在 Streaming 模式下这些函数是每条记录都会计算一次,但在 Batch 模式下,只会在 query 开始时计算一次,所有记录都使用相同的结果。

以下时间函数无论是在 Streaming 模式还是 Batch 模式下,都会为每条记录计算一次结果:

  • CURRENT_ROW_TIMESTAMP()
  • PROCTIME()

以上,简单的介绍了Flink 中关于时区的概念,并以具体的示例进行说明。

相关文章:

33、Flink 的Table API 和 SQL 中的时区

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...

Origin:科研绘图与学术图表绘制从入门到精通

文章目录 一、引言二、安装和启动Origin三、创建和保存图表四、深入学习Origin绘图功能五、应用Origin进行科研绘图和学术图表绘制六、总结与建议《Origin科研绘图与学术图表绘制从入门到精通》亮点内容简介作者简介目录获取方式 一、引言 Origin是一款功能强大的数据分析和科…...

腾讯云标准型SA4服务器AMD处理器性能测评

腾讯云服务器标准型SA4实例CPU采用AMD处理器,新一代腾讯云自研星星海双路服务器,搭配AMD EPYC Genoa处理器,内存采用最新 DDR5,默认网络优化,最高内网收发能力达4500万pps,最高内网带宽可支持100Gbps。阿腾…...

LeetCode 2656. K 个元素的最大和:一次遍历(附Python一行版代码)

【LetMeFly】2656.K 个元素的最大和:一次遍历(附Python一行版代码) 力扣题目链接:https://leetcode.cn/problems/maximum-sum-with-exactly-k-elements/ 给你一个下标从 0 开始的整数数组 nums 和一个整数 k 。你需要执行以下操…...

element-ui中Form表单使用自定义验证规则

data() {const validatePass (rule, value, callback) > {if (value.length < 3) {callback(new Error("密码不能小于3位"));} else {callback();}};return {rules: {password: [{ required: true, trigger: "blur", validator: validatePass },]}}…...

android源码添加adb host支持

本文开始参考在 android 上使用 adb client-CSDN博客&#xff0c;在shell中已经可以使用。但当我想在app中用 String command "/data/local/tmp/adb -s 307ef90dc8128844 shell ls";StringBuilder output new StringBuilder();try {Process process Runtime.getR…...

学习c#的第二天

目录 C# 基本语法 using 关键字 class 关键字 C# 中的注释 成员变量 成员函数 类的实例化 标识符 C# 关键字 C# 基本语法 C# 是一种面向对象的编程语言。在面向对象的程序设计方法中&#xff0c;程序由各种相互交互的对象组成。相同种类的对象通常具有相同的类型&…...

CodeWhisperer 使用经验分享

今天给大家分享一下 Amazon CodeWhisperer 编程工具&#xff08;免费哦&#xff09;&#xff0c;使用这个软件后我的编码质量提升不少&#xff0c;给大家分享一下我的经验。希望大家支持哦。 Amazon CodeWhisperer 是亚⻢逊出品的一款基于机器学习的 AI 编程助手&#xff0c;可…...

数据结构与算法之美学习笔记:18 | 散列表(上):Word文档中的单词拼写检查功能是如何实现的?

目录 前言散列思想散列函数散列冲突解答开篇 前言 本节课程思维导图&#xff1a; Word 的单词拼写检查功能&#xff0c;虽然很小但却非常实用。你有没有想过&#xff0c;这个功能是如何实现的呢&#xff1f;其实啊&#xff0c;一点儿都不难。只要你学完今天的内容&#xff0c;…...

解决java发邮件错误javax.net.ssl.SSLHandshakeException: No appropriate protocol

java发送邮件时报以下错误信息&#xff1a; javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher [com.bm6api.controller.v1.AppUserController] - sendLoginAuthCodeMail 发送登录验证码邮件 : {"code":200,"inf…...

杭电oj 2035 人见人爱A^B C语言

#include<stdio.h>void main() {int a, b, i,num;while (~scanf_s("%d%d", &a, &b) && (a ! 0 || b ! 0)){num a;for (i 1; i < b; i){num * a;num % 1000;}printf("%d\n", num);} }...

[量化投资-学习笔记017]Python+TDengine从零开始搭建量化分析平台-异常处理

一个完成的程序一定少不了对异常的处理&#xff0c;以及错误日志的输出。 在之前章节的程序中对这两部分没有进行说明,以下用两个单独的章节进行介绍。 [量化投资-学习笔记016]PythonTDengine从零开始搭建量化分析平台-日志输出 异常处理 Python 通常使用 try .. except 和防…...

Mysql中的索引与事务和B树的知识补充

索引与事务和B树的知识补充 一.索引1.概念2.作用3.使用场景4.使用 二.事务1.为什么使用事务2.事务的概念3.使用3.1脏读问题3.2不可重复读3.3 幻读问题3.4解决3.5 使用代码 三.B树的知识补充1.B树2.B树 一.索引 1.概念 索引是一种特殊的文件&#xff0c;包含着对数据表里所有记…...

2024上海国际智能驾驶技术展览会(自动驾驶展)

2024上海国际智能驾驶技术展览会 2024 Shanghai International Autonomous driving Expo 时间&#xff1a;2024年3月26-28日 地点&#xff1a;上海跨国采购会展中心 随着科技的飞速发展&#xff0c;智能驾驶已经成为了汽车行业的重要趋势。在这个时代背景下&#xff0c;汽车不…...

嵌入式Linux开发,NFS文件系统挂载

在嵌入式linix的开发中&#xff0c;经常会需要在pc端和板端互相传输文件&#xff0c;优先可选择ftp传输&#xff0c;但是有些嵌入式板端不支持&#xff0c;只能使用nfs这种方式&#xff0c;即pc端作为服务端&#xff0c;板端作为客户端&#xff0c;将pc端的某个文件夹挂载到板端…...

什么是3D建模中的“高模”和“低模”?

3D建模中什么是高多边形和低多边形&#xff1f; 高多边形建模和低多边形建模之间的主要区别正如其名称所暗示的那样&#xff1a;您是否在模型中使用大量多边形或少量多边形。 然而&#xff0c;在决定每个模型的细节和多边形级别时&#xff0c;还需要考虑其他事项。最值得注意的…...

python数据结构与算法-04_队列

队列和栈 前面讲了线性和链式结构&#xff0c;如果你顺利掌握了&#xff0c;下边的队列和栈就小菜一碟了。因为我们会用前两章讲到的东西来实现队列和栈。 之所以放到一起讲是因为这两个东西很类似&#xff0c;队列是先进先出结构(FIFO, first in first out)&#xff0c; 栈是…...

从理论到实践:深度解读BIO、NIO、AIO的优缺点及使用场景

文章目录 BIO优缺点示例代码 NIO优缺点示例代码 AIO优缺点示例代码 总结 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 BIO、NIO和AIO是Java编程语言中用于处理输入输出&#xff08;IO…...

Mysql Innodb Cluster集群搭建 - docker

Mysql Innodb Cluster集群搭建 - docker 背景搭建环境架构图3台机器如下:修改三台机器的ip域名映射如下,并重启网络使其生效部署mysql server实例通过docker启动三台mysql server实例,需要映射数据请自行更改配置加入-v启动第一台mysql-server启动第二台mysql-server启动第三…...

如何在 macOS 中删除 Time Machine 本地快照

看到这个可用82GB&#xff08;458.3MB可清除&#xff09; 顿时感觉清爽&#xff0c;之前的还是可用82GB&#xff08;65GB可清除&#xff09;&#xff0c;安装个xcode都安装不上&#xff0c;费解半天&#xff0c;怎么都解决不了这个问题&#xff0c;就是买磁盘情理软件也解决不了…...

mysql的sql_mode参数

msql修改了这个参数&#xff0c;首先mysql需要重新才能生效&#xff0c;还有就是java连接的springboot项目也需要重新启动。之前是遇到了下面的这个报错。只需要把sql_mode设置为空&#xff0c;重启mysql和服务就行 报错 In aggregated query without GROUP BY, expression #1…...

模拟业务流程+构造各种测试数据,一文带你测试效率提升80%

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…...

【linux】 Shell函数返回值

概述 return 返回 shell中通过return返回是有限制的&#xff0c;必须是数字&#xff0c;最大返回255&#xff0c;超过255&#xff0c;则从0开始计算。 通常仅返回0或1&#xff1b;0表示成功&#xff0c;1表示失败。通过echo 直接返回。 在没有return 语句&#xff0c;函数将以…...

面试:容器技术

目录 为什么需要 DevOpsDocker 是什么&#xff1f;Docker 与虚拟机有何不同&#xff1f;什么是 Docker 镜像&#xff1f;什么是 Docker 容器&#xff1f;Docker 容器有几种状态&#xff1f;解释一下 Dockerfile 的 ONBUILD 指令&#xff1f;什么是 Docker Swarm&#xff1f;如何…...

在Linux中nacos集群模式部署

一、安装 配置nacos 在Linux中建立一个nacos文件夹 mkdir nacos 把下载的压缩包拉入刚才创建好的nacos文件中 解压 tar -zxvf nacos-server-1.4.1\.tar.gz 修改配置文件 进入nacos文件中的conf文件的cluster.conf.example 修改cluster.conf.example文件 vim cluster.conf.exa…...

7天入门python系列之爬取热门小说项目实战,互联网的东西怎么算白嫖呢

第七天 Python项目实操 编者打算开一个python 初学主题的系列文章&#xff0c;用于指导想要学习python的同学。关于文章有任何疑问都可以私信作者。对于初学者想在7天内入门Python&#xff0c;这是一个紧凑的学习计划。但并不是不可完成的。 学到第7天说明你已经对python有了一…...

产品经理墨刀学习----注册页面

我们做的产品是一个校园论坛学习开发系统&#xff0c;目前才开始学习。 &#xff08;一&#xff09;流程图 &#xff08;二&#xff09;简单墨刀设计--注册页面 &#xff08;1&#xff09;有账号 &#xff08;a&#xff09;直接登录&#xff1a; &#xff08;b&#xff09;忘…...

算法通关村——归并排序

归并排序 1、归并排序原理 ​ 归并排序是一种很经典的分治策略。 ​ 归并排序(MERGE-SORT)简单来说就是将大的序列先视为若干小的数组&#xff0c;分成几个比较小的结构&#xff0c;然后是利用归并的思想实现的排序方法。将一个大的问题分解成一些小的问题分别求解&#xff…...

SDL2 播放音频数据(PCM)

1.简介 这里以常用的视频原始数据PCM数据为例&#xff0c;展示音频的播放。 SDL播放音频的流程如下&#xff1a; 初始化音频子系统&#xff1a;SDL_Init()。设置音频参数&#xff1a;SDL_AudioSpec。设置回调函数&#xff1a;SDL_AudioCallback。打开音频设备&#xff1a;SD…...

优秀智慧园区案例 - 重庆AI PARK智慧创意园区,先进智慧园区建设方案经验

一、项目背景 1、智慧园区是国家实现经济增长、产业升级的载体 智慧园区建设是城市智慧化创新发展的核心&#xff0c;在数智化升级和低碳化转型的经济发展双引擎的驱动下&#xff0c;十四五、数字经济的政策大力支持&#xff0c;以及人工智能、5G、大数据、区块链等技术的不断…...