当前位置: 首页 > news >正文

Flink之SQL客户端与DDL操作

SQL客户端与DDL操作

  • Flink SQL
  • SQL客户端
    • 1.启动Flink
    • 2.启动Flink的SQL客户端
    • 3.HELP命令
    • 4.验证连接
    • 5.结果显示模式
    • 6.执行配置
  • 数据库操作
    • 1.创建数据库
    • 2.查询数据库
    • 3.修改数据库
    • 4.删除数据库
  • 表操作
    • 1.创建表
      • 表列属性
      • 表Watermark属性
      • 列PRIMARY KEY属性
      • 列PARTITIONED BY属性
      • 列WITH选项属性
      • 列LIKE属性
      • 列AS select_statement属性
      • 创建一张基于 Print的简单表
    • 2.查看表
    • 3.修改表
    • 4.删除表
  • 其他
    • 动态表
      • 将流转换成动态表
      • 连续查询
      • 将动态表转换为流
    • 时间属性
      • 处理时间
      • 事件时间

Flink SQL

Flink SQL是Apache Flink框架中的一种查询语言,用于对数据流和批处理作业执行SQL查询和转换操作。它提供了一种声明性的方式来处理数据,使得开发人员能够使用熟悉的SQL语法来操作流式和批处理数据。

Flink的Table API和SQL是流批统一的API,具有相同的语义。

Table API是用于Scala和Java语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join等关系型算子。

Flink SQL是基于Apache Calcite来实现的标准SQL,这两种API中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。

SQL客户端

Flink的Table & SQL API可以处理SQL语言编写的查询语句,但是这些查询需要嵌入用Java或Scala编写的表程序中。此外,这些程序在提交到集群前需要用构建工具打包。这或多或少限制了Java/Scala程序员对Flink的使用。

SQL客户端的目的是提供一种简单的方式来编写、调试和提交表程序到Flink集群上,而无需写一行Java或Scala代码。SQL客户端命令行界面(CLI)能够在命令行中检索和可视化分布式应用中实时产生的结果。

SQL 客户端捆绑在常规 Flink 发行版中,因此可以直接运行。它仅需要一个正在运行的 Flink 集群就可以在其中执行表程序。

1.启动Flink

# 基于独立模式的会话模式部署
./bin/start-cluster.sh# 基于YARN运行模式的会话模式部署
./bin/yarn-session.sh -d

2.启动Flink的SQL客户端

SQL Client 脚本也位于 Flink 的 bin 目录中。用户可以通过启动嵌入式 standalone 进程或通过连接到远程 SQL Gateway 来启动 SQL 客户端命令行界面。

# SQL客户端默认使用embedded模式
./bin/sql-client.sh# 显式使用embedded 模式
./bin/sql-client.sh embedded# 使用gateway模式
./bin/sql-client.sh gateway --endpoint <gateway address># 基于YARN运行模式的embedded模式
./bin/sql-client.sh embedded -s yarn-session# 启动时,指定sql文件
./bin/sql-client.sh embedded -s yarn-session -i ./sql-client-init.sql

3.HELP命令

命令行界面启动后,使用 HELP 命令列出所有可用的 SQL 语句

HELP                            打印可用命令的帮助信息。
QUIT/EXIT                       退出 SQL CLI 客户端。
CLEAR                           清除当前终端的内容。
SET                             设置会话配置属性。语法:"SET '<key>'='<value>';". 使用 "SET;" 列出所有属性。
RESET                           重置会话配置属性。语法:"RESET '<key>';". 使用 "RESET;" 重置所有会话属性。
INSERT INTOSQL SELECT 查询的结果插入到指定的表中。
INSERT OVERWRITESQL SELECT 查询的结果覆盖插入到指定的表中,覆盖现有数据。
SELECTFlink 集群上执行 SQL SELECT 查询。
EXPLAIN                         描述给定名称的查询或表的执行计划。
BEGIN STATEMENT SET             开始一个语句集合。语法:"BEGIN STATEMENT SET;"
END                             结束一个语句集合。语法:"END;"
ADD JAR                         将指定的 jar 文件添加到提交的作业类加载器中。语法:"ADD JAR '<path_to_filename>.jar'"
REMOVE JAR                      从提交的作业类加载器中移除指定的 jar 文件。语法:"REMOVE JAR '<path_to_filename>.jar'"
SHOW JARS                       显示用户指定的 jar 依赖列表。该列表受到 --jar 和 --library 启动选项以及 ADD/REMOVE JAR 命令的影响。

4.验证连接

输入第一条 SQL 查询语句并按 Enter 键执行,可以验证设置及集群连接是否正确

[root@node01 flink]# ./bin/sql-client.sh▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░        ▒▒▒▓██▒  ▒░██▒   ▒▒▓▓█▓▓▒░      ▒██████▒         ░▒▓███▒    ▒█▒█▒░▓█            ███   ▓░▒██▓█       ▒▒▒▒▒▓██▓░▒░▓▓██░ █   ▒▒░       ███▓▓█ ▒█▒▒▒████░   ▒▓█▓      ██▒▒▒ ▓███▒░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓███▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒▓█   ▒█▓   ░     █░                ▒█              █▓█▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░█▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒███   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░______ _ _       _       _____  ____  _         _____ _ _            _  BETA|  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | || |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_|  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|| |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_|_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Command history file path: /root/.flink-sql-historyFlink SQL>

输入SQL:SELECT 'Hello World';,该查询不需要 table source,并且只产生一行结果。CLI 将从集群中检索结果并将其可视化。按 Q 键退出结果视图。
在这里插入图片描述

5.结果显示模式

CLI为维护和可视化结果提供三种模式。默认table,还可以设置为tableau、changelog

1.表格模式(table mode)

在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。

执行如下命令启用:

SET 'sql-client.execution.result-mode' = 'table';

执行SQL:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
                           name                  cntAlice                    1Greg                    1Bob                    2

2.变更日志模式(changelog mode

不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

SET 'sql-client.execution.result-mode' = 'changelog';
 op                           name                  cnt+I                            Bob                    1+I                          Alice                    1+I                           Greg                    1-U                            Bob                    1+U                            Bob                    2

3.Tableau模式(tableau mode)

更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同:

SET 'sql-client.execution.result-mode' = 'tableau';
+----+--------------------------------+----------------------+
| op |                           name |                  cnt |
+----+--------------------------------+----------------------+
| +I |                            Bob |                    1 |
| +I |                          Alice |                    1 |
| +I |                           Greg |                    1 |
| -U |                            Bob |                    1 |
| +U |                            Bob |                    2 |
+----+--------------------------------+----------------------+
Received a total of 5 rows

6.执行配置

执行环境默认streaming,也可以设置batch

SET execution.runtime-mode=streaming;

设置默认并行度

SET parallelism.default=1;

设置状态TTL

SET table.exec.state.ttl=1000;

数据库操作

1.创建数据库

根据给定的表属性创建数据库。若数据库中已存在同名表会抛出异常。

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name[COMMENT database_comment]WITH (key1=val1, key2=val2, ...)
IF NOT EXISTS:若数据库已经存在,则不会进行任何操作。WITH OPTIONS:数据库属性一般用于存储关于这个数据库额外的信息。 表达式 key1=val1 中的键和值都需要是字符串文本常量。
Flink SQL> CREATE DATABASE db_flink;
[INFO] Execute statement succeed.

2.查询数据库

查询所有数据库

Flink SQL> SHOW DATABASES;
+------------------+
|    database name |
+------------------+
| default_database |
|         db_flink |
+------------------+
2 rows in set

查询当前数据库

Flink SQL> SHOW CURRENT DATABASE;
+-----------------------+
| current database name |
+-----------------------+
|      default_database |
+-----------------------+
1 row in set

切换数据库

USE database_name;

3.修改数据库

在数据库中设置一个或多个属性。若个别属性已经在数据库中设定,将会使用新值覆盖旧值。

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

4.删除数据库

根据给定的表名删除数据库。若需要删除的数据库不存在会抛出异常 。

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]
IF EXISTS:若数据库不存在,不执行任何操作。RESTRICT:当删除一个非空数据库时,会触发异常。(默认为开)CASCADE:删除一个非空数据库时,把相关联的表与函数一并删除。
Flink SQL> DROP DATABASE db_flink;
[INFO] Execute statement succeed.

表操作

1.创建表

根据指定的表名创建一个表,如果同名表已经在 catalog 中存在了,则无法注册。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name({ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n][ <watermark_definition> ][ <table_constraint> ][ , ...n])[COMMENT table_comment][PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]WITH (key1=val1, key2=val2, ...)[ LIKE source_table [( <like_options> )] | AS select_query ]

表列属性

1.物理/常规列

physical_column_definition:定义物理列,其定义了物理介质中存储的数据中字段的名称、类型和顺序

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING
) WITH (...
);

2.元数据列

metadata_column_definition: 定义元数据列

例如:元数据列可用于从 Kafka 记录中读取和写入时间戳,以进行基于时间的操作。简单说就是将Kafka数据中的一个时间戳作为表的一个字段

创建一个带有引用元数据字段的附加元数据列的表timestamp

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp
) WITH ('connector' = 'kafka'...
);
# 如果自定义的列名称和定义metadata字段的名称一样, FROM子句可省略
`timestamp` TIMESTAMP_LTZ(3) METADATA# 如果自定义列的数据类型和定义的metadata字段的数据类型不一致,程序运行时会自动强转,但是要求两种数据类型是可以强转
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA

3.计算列

computed_column_definition:定义计算列,将一些列经过自定义运算生成的新列,在物理上并不存储在表中,只能读不能写。

`money` AS price * quanitity

表Watermark属性

WATERMARK定义了表的事件时间属性,其形式为 :

WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name:把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。

Flink SQL提供了几种 WATERMARK 生产策略:

1.严格升序:

Flink任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。

WATERMARK FOR rowtime_column AS rowtime_column

2.递增:

一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。

WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 

3.有界无序:

用于设置最大乱序时间,此类Watermark 生成策略通常用于有数据乱序的场景,实际场景中,数据也都是会存在乱序,所以使用此类策略。

WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL 'string' timeUnit 

假如设置为WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND ,则生成的是运行 5s 延迟的Watermark。

CREATE TABLE Orders (`user` BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

列PRIMARY KEY属性

主键约束表明表中的一列或一组列是唯一的,并且它们不包含NULL值。主键唯一地标识表中的一行,只支持 not enforced。

键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
PARYMARY KEY(user_id) not enforced
) 

列PARTITIONED BY属性

创建分区表

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。

列WITH选项属性

创建表的表属性,表属性用于创建 table source/sink ,一般用于寻找和创建底层的连接器。用于指定外部存储系统的元数据信息。配置属性时,表达式key1=val1的键和值都应该是字符串字面值。

一般with中的配置项由Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的with 配置项都是不同的。

具体参考:Table & SQL Connectors

CREATE TABLE KafkaTable (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
)

列LIKE属性

LIKE子句可以基于现有表的定义去创建新表,并且可以扩展或排除原始表中的某些部分。

还可以使用该子句,重用(或改写)指定的连接器配置属性或者可以向外部表添加 watermark 定义,

CREATE TABLE Orders (`user` BIGINT,product STRING,order_time TIMESTAMP(3)
) WITH ( 'connector' = 'kafka','scan.startup.mode' = 'earliest-offset'
);CREATE TABLE Orders_with_watermark (-- 添加 watermark 定义WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (-- 改写 startup-mode 属性'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

结果表 Orders_with_watermark 等效于使用以下语句创建的表:

CREATE TABLE Orders_with_watermark (`user` BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH ('connector' = 'kafka','scan.startup.mode' = 'latest-offset'
);

列AS select_statement属性

通过查询的结果创建和填充表。CTAS是使用单个命令创建数据并向表中插入数据的最简单、最快速的方法。

CREATE TABLE my_ctas_table
WITH ('connector' = 'kafka',...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

结果表 my_ctas_table 等效于使用以下语句创建表并写入数据:

CREATE TABLE my_ctas_table (id BIGINT,name STRING,age INT
) WITH ('connector' = 'kafka',...
);INSERT INTO my_ctas_table SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

注意CTAS 有如下约束:

暂不支持创建临时表
暂不支持指定列信息
暂不支持指定 Watermark
暂不支持创建分区表
暂不支持主键约束

创建一张基于 Print的简单表

Print连接器允许将每一行写入标准输出流或者标准错误流。

CREATE TABLE print_table (f0 INT,f1 INT,f2 STRING,f3 DOUBLE
) WITH ('connector' = 'print'
)

也可以通过 LIKE子句 基于已有表的结构去创建新表

CREATE TABLE print_table 
WITH ('connector' = 'print')
LIKE source_table (EXCLUDING ALL)

2.查看表

查看所有表

展示指定库的所有表,如果没有指定库则展示当前库的所有表。另外返回的结果能被一个可选的匹配字符串过滤。

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]

注意:如果没有指定数据库,则从当前数据库返回表。

LIKE子句中 sql 正则式的语法与 MySQL 方言中的语法相同

% 匹配任意数量的字符, 也包括0数量字符, \% 匹配一个 % 字符_ 只匹配一个字符, \_ 匹配一个 _ 字符
Flink SQL> SHOW TABLES;
+-------------+
|  table name |
+-------------+
| print_table |
+-------------+
1 row in set

查看表信息

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name
Flink SQL> DESC print_table;
+------+--------+------+-----+--------+-----------+
| name |   type | null | key | extras | watermark |
+------+--------+------+-----+--------+-----------+
|   f0 |    INT | TRUE |     |        |           |
|   f1 |    INT | TRUE |     |        |           |
|   f2 | STRING | TRUE |     |        |           |
|   f3 | DOUBLE | TRUE |     |        |           |
+------+--------+------+-----+--------+-----------+
4 rows in set

3.修改表

修改表名

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

修改表属性

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

4.删除表

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

其他

动态表

动态表是Flink的支持流数据的Table API和SQL的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。本质上,动态表上的连续查询非常类似于定义物化视图的查询。

注意:

连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。

流、动态表和连续查询之间的关系:

1.将流转换为动态表2.在动态表上计算一个连续查询,生成一个新的动态表3.生成的动态表被转换回流

在这里插入图片描述

将流转换成动态表

为了使用关系查询处理流,必须将其转换成 Table。流的每条记录都被解释为对结果表的 INSERT 操作

如下:单击事件流转换为表。当插入更多的单击流记录时,结果表将不断增长。
在这里插入图片描述

连续查询

在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表。在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。

在这里插入图片描述

1.更新查询

当原始动态表不停地插入新的数据时,查询得到的结果表会持续地进行更改。这里的更改操作可以是简单的插入,也可以是对之前数据的更新。这种持续查询被称为更新查询

2.追加查询

上述查询过程用到分组聚合,结果表中就会产生更新操作。如果执行一个简单的条件查询,结果表中就会像原始表一样,只有插入操作。那么这样的持续查询,就被称为追加查询,它定义的结果表的更新日志流中只有INSERT操作。

将动态表转换为流

动态表可以通过插入、更新和删除操作,进行持续的更改。在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。

Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:

1.仅追加流 Append-only

仅通过INSERT插入更改来修改的动态表,可以直接转换为仅追加流。这个流中发出的数据,其实就是动态表中新增的每一行。

2.撤回流 Retract

撤回流是包含两类消息的流,添加消息和撤回消息。

INSERT插入操作编码为add消息DELETE删除操作编码为retract消息UPDATE更新操作则编码为被更改行的retract消息和更新后行(新行)的add消息。

这样通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流。

将动态表转换为 retract 流的过程。
在这里插入图片描述

3.更新插入流 Upsert

更新插入流中只包含两种类型的消息:更新插入消息和删除消息。

对于更新插入流来说,INSERT插入操作和UPDATE更新操作,统一被编码为upsert消息,而DELETE删除操作则被编码为delete消息。

在这里插入图片描述

时间属性

Flink 可以基于几种不同的 时间 概念来处理数据。

处理时间 指的是执行具体操作时的机器时间事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间摄入时间 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理

处理时间

处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性。它既不需要从数据里获取时间,也不需要生成 watermark。

1.在创建表的DDL中定义

处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 PROCTIME() 就可以定义处理时间,函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ 。

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

2.在DataStream到Table转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后。

DataStream<Tuple2<String, String>> stream = ...;// 声明一个额外的字段作为时间属性字段
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());WindowedTable windowedTable = table.window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

3.使用TableSource定义
逻辑的时间属性会放在 TableSource 已有物理字段的最后

// 定义一个由处理时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// create streamDataStream<Row> stream = ...;return stream;}@Overridepublic String getProctimeAttribute() {// 这个名字的列会被追加到最后,作为第三列return "user_action_time";}
}// register table source
tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现的结果。

事件时间属性也有类似于处理时间的三种定义方式:

1.在DDL中定义2.在DataStream到Table转换时定义3.用TableSource定义

1.在DDL中定义

事件时间属性可以在创建表DDL中定义,增加一个字段,通过WATERMARK语句来定义事件时间属性。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermarkWATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

时间戳类型必须是 TIMESTAMP 或者TIMESTAMP_LTZ 类型。但是时间戳一般都是秒或者是毫秒(BIGINT 类型),这种情况可以通过如下方式转换

ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),

2.在DataStream到Table转换时定义

在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

在schema的结尾追加一个新的字段替换一个已经存在的字段
// Option 1:
// 基于 stream 中的事件产生时间戳和 watermark
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());// Option 2:
// 从第一个字段获取事件时间,并且产生 watermark
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));// Usage:
WindowedTable windowedTable = table.window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

3.使用TableSource定义

// 定义一个有事件时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name", "data", "user_action_time"};TypeInformation[] types =new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 构造 DataStream// ...// 基于 "user_action_time" 定义 watermarkDataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);return stream;}@Overridepublic List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {// 标记 "user_action_time" 字段是事件时间字段// 给 "user_action_time" 构造一个时间属性描述符RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor("user_action_time",new ExistingField("user_action_time"),new AscendingTimestamps());List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);return listRowtimeAttrDescr;}
}// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

相关文章:

Flink之SQL客户端与DDL操作

SQL客户端与DDL操作 Flink SQLSQL客户端1.启动Flink2.启动Flink的SQL客户端3.HELP命令4.验证连接5.结果显示模式6.执行配置 数据库操作1.创建数据库2.查询数据库3.修改数据库4.删除数据库 表操作1.创建表表列属性表Watermark属性列PRIMARY KEY属性列PARTITIONED BY属性列WITH选…...

记录第一次银行测试岗面试【总结几点面试不要犯得错误】

LZ在一个18线小城市做测试&#xff0c;近来想走出自己的舒适区&#xff0c;去做一点不一样的测试工作。 18线地区&#xff0c;测试工作并不多。最好的差不多就是LZ目前待着的公司了。遂决定去魔都闯荡几年&#xff0c;对一个在魔都无房无车无户口的人来讲&#xff0c;这意味着…...

一篇带你精通php

华子目录 什么是phpphp发展史平台支持和数据库支持网站静态网站和动态网站的区别静态网站动态网站的特点 关键名词解析服务器概念IP的概念域名DNS端口 web程序的访问流程静态网站访问流程动态网站访问流程 php标记脚本标记标准标记&#xff08;常用&#xff09; php注释 什么是…...

Go 语言函数

文章目录 Go 语言函数1. **函数的定义**&#xff1a;2. **参数和返回值**&#xff1a;3. **函数调用**&#xff1a;4. **多返回值**&#xff1a;5. **匿名函数**&#xff1a;6. **函数作为值**&#xff1a;7. **变参函数**&#xff1a;8. **递归函数**&#xff1a;9. **函数方法…...

前端小技巧: 拍平数组的6种常见方法

关于数组拍平 所谓数组拍平&#xff0c;就是按照顺序&#xff0c;把他们全放在一个数组中需要考虑多层级和嵌套的问题来彻底拍平数组 * 实现方案 1 &#xff09;一般思路, 先实现一级扁平化&#xff0c;然后递归&#xff0c;直到全部扁平 function flat(arr) {const res […...

c++day6

#include <iostream>using namespace std; class Animal { public:virtual void peform() 0; }; class Monekey:public Animal { public:void peform(){cout << "猴子黑桃A" << endl;} }; class Elepthant:public Animal {void peform(){cout &l…...

LeetCode(1)合并两个有序数组【数组/字符串】【简单】

目录 1.题目2.答案3.提交结果截图 链接&#xff1a; 88. 合并两个有序数组 1.题目 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使合…...

剪贴板管理软件 Paste Wizard mac中文版功能特色

Paste Wizard mac是一款剪贴板管理工具&#xff0c;它可以帮助用户更高效地管理剪贴板中的文本、图片、链接等内容。 Paste Wizard mac特色功能 提供了多种方式来保存和管理剪贴板中的内容。用户可以创建自定义的标签&#xff0c;将内容按照标签进行分类&#xff0c;方便快速查…...

【数据结构】树的基本性质(计算树的总结点数与叶结点数)

树的基本性质 ⭐️计算树的总结点与叶结点数&#x1f4ab;性质1&#x1f4ab;性质2&#x1f4ab;例题1&#x1f4ab;例题2 ⭐️计算树的总结点与叶结点数 &#x1f4ab;性质1 性质1 树中的结点数等于所有结点的度数之和加1 例如上面这棵树&#xff0c;A的孩子为B、C、D&…...

android手机平板拓展电脑屏幕

有这么两个软件 spacedesk_driver_Win_10_64_v1065_BETA.msi 安装在电脑上 spacedeskv0.91.1_chinese.apk 安装在android设备上 同一个局域网投屏就好了。 局域网无限投屏是很吃带宽的。 建议usb共享网络&#xff0c;不占用带宽、延迟低。 下载地址&#xff1a; https:/…...

接口测试的流程

接口通俗的理解就是不同部分之间的连接通道&#xff0c;可以是程序之内的&#xff0c;也可以是不同程序之间的。一般公司都会要求做接口测试&#xff0c;因为这是测试前移和测试左移的一种方式&#xff0c;会极大的解决bug的成本。 接口测试流程 接口测试的流程一般包括&…...

HMAC 详解:在 Golang 中实现消息认证码

目录 什么是 HMAC HMAC 的主要用途 HMAC 的工作原理 Golang 中的 crypto/hmac 包 如何选择合适的哈希函数和密钥长度 小结 什么是 HMAC HMAC&#xff08;Hash-based Message Authentication Code&#xff09;是一种基于 Hash 函数和密钥的消息认证码&#xff0c;由 H.Kr…...

阻塞队列和定时器的使用

阻塞队列 谈到队列,大家就能想到队列的先进先出原则,但有些特殊的队列,虽然也是先进先出的,但是带有阻塞功能,我们把这种队列叫做阻塞队列. ★如果队列为空,执行出队操作就会阻塞,阻塞到另外一个线程往队列里添加元素(队列不为空)为止. ★如果队列满了,执行入队操作时,也会阻…...

JavaScript脚本操作CSS

脚本化CSS就是使用JavaScript脚本操作CSS&#xff0c;配合HTML5、Ajax、jQuery等技术&#xff0c;可以设计出细腻、逼真的页面特效和交互行为&#xff0c;提升用户体验&#xff0c;如网页对象的显示/隐藏、定位、变形、运动等动态样式。 1、CSS脚本化基础 CSS样式有两种形式&…...

Rust4.1 Managing Growing Projects with Packages, Crates, and Modules

Rust学习笔记 Rust编程语言入门教程课程笔记 参考教材: The Rust Programming Language (by Steve Klabnik and Carol Nichols, with contributions from the Rust Community) Lecture 7: Managing Growing Projects with Packages, Crates, and Modules src/main.rs // s…...

RPA在财务预测和分析中的应用

在现代企业管理中&#xff0c;财务数据分析是决策制定和战略规划的关键环节。大数据的兴起带来财务数据的复杂性和数量不断增加&#xff0c;企业为此消耗了大量人力和物力。随着当今数字化、智能化时代的到来&#xff0c;越来越多企业引进RPA技术来提高工作效率&#xff0c;实现…...

无人机航拍技术基础入门,无人机拍摄的方法与技巧

一、教程描述 买了无人机&#xff0c;可是我不敢飞怎么办&#xff1f;禁飞区越来越多&#xff0c;到底哪儿才能飞&#xff1f;我的无人机跟你一样&#xff0c;为什么我拍不出大片&#xff1f;厂家的说明书看不进去&#xff0c;有没有一套无人机的课程&#xff0c;可以快速上手…...

PTA 哈密尔回路(建图搜索)

题目 著名的“汉密尔顿&#xff08;Hamilton&#xff09;回路问题”是要找一个能遍历图中所有顶点的简单回路&#xff08;即每个顶点只访问 1 次&#xff09;。本题就要求你判断任一给定的回路是否汉密尔顿回路。 输入格式&#xff1a; 首先第一行给出两个正整数&#xff1a…...

如何利用产品帮助中心提升用户体验

在当今竞争激烈的市场中&#xff0c;提供优秀的用户体验是吸引和保留客户的关键。而一个高效和易于使用的产品帮助中心&#xff0c;正成为越来越多企业用以提升用户体验的重要工具。产品帮助中心是一个集中的信息库&#xff0c;为用户提供关于产品功能、故障排除、常见问题解答…...

【Python大数据笔记_day05_Hive基础操作】

一.SQL,Hive和MapReduce的关系 用户在hive上编写sql语句,hive把sql语句转化为MapReduce程序去执行 二.Hive架构映射流程 用户接口: 包括CLI、JDBC/ODBC、WebGUI&#xff0c;CLI(command line interface&#xff09;为shell命令行&#xff1b;Hive中的Thrift服务器允许外部客户端…...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表&#xff0c;若其中包含环&#xff0c;则输出环的入口节点。 若其中不包含环&#xff0c;则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

回溯算法学习

一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...

三分算法与DeepSeek辅助证明是单峰函数

前置 单峰函数有唯一的最大值&#xff0c;最大值左侧的数值严格单调递增&#xff0c;最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值&#xff0c;最小值左侧的数值严格单调递减&#xff0c;最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...

Caliper 负载(Workload)详细解析

Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...

阿里云Ubuntu 22.04 64位搭建Flask流程(亲测)

cd /home 进入home盘 安装虚拟环境&#xff1a; 1、安装virtualenv pip install virtualenv 2.创建新的虚拟环境&#xff1a; virtualenv myenv 3、激活虚拟环境&#xff08;激活环境可以在当前环境下安装包&#xff09; source myenv/bin/activate 此时&#xff0c;终端…...

LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》

&#x1f9e0; LangChain 中 TextSplitter 的使用详解&#xff1a;从基础到进阶&#xff08;附代码&#xff09; 一、前言 在处理大规模文本数据时&#xff0c;特别是在构建知识库或进行大模型训练与推理时&#xff0c;文本切分&#xff08;Text Splitting&#xff09; 是一个…...