玩转数据-大数据-Flink SQL 中的时间属性
一、说明
时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。
二、处理时间
2.1、准备WaterSensor类,方便使用
package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}
2.2、DataStream 到 Table 转换时定义
处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它新增一个字段。
代码段:
package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 声明一个额外的字段来作为处理时间字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}
执行结果:

2.3、创建数据文件sensor.txt 数据,方便使用
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
2.4、在创建表的 DDL 中定义
package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}
运行结果:

三、事件时间
事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。
3.1、DataStream 到 Table 转换时定义
事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
1、在 schema 的结尾追加一个新的字段
2、替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
代码:
援用上面WaterSensor类
package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}
运行结果:

3.2、使用已有的字段作为时间属性
.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));
3.3、在创建表的 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.
package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}
运行结果:

说明:
1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
2.严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。
相关文章:
玩转数据-大数据-Flink SQL 中的时间属性
一、说明 时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL …...
【论文笔记】A Review of Motion Planning for Highway Autonomous Driving
文章目录 I. INTRODUCTIONII. CONSIDERATIONS FOR HIGHWAY MOTION PLANNINGA. TerminologyB. Motion Planning SchemeC. Specificities of Highway DrivingD. Constraints on Highway DrivingE. What Is at Stake in this Paper III. STATE OF THE ARTA. Taxonomy DescriptionB…...
YOLOv8改进算法之添加CA注意力机制
1. CA注意力机制 CA(Coordinate Attention)注意力机制是一种用于加强深度学习模型对输入数据的空间结构理解的注意力机制。CA 注意力机制的核心思想是引入坐标信息,以便模型可以更好地理解不同位置之间的关系。如下图: 1. 输入特…...
2023年10月腾讯云优惠活动汇总:腾讯云最新优惠、代金券整理
腾讯云作为国内领先的云服务提供商,致力于为用户提供优质、稳定的云服务。为了更好地满足用户需求,腾讯云推出了各种优惠活动。本文将给大家分享腾讯云最新优惠活动,帮助用户充分利用腾讯云提供的优惠。 一、腾讯云优惠券领取【点此领取】 腾…...
BUUCTF reverse wp 65 - 70
[SWPU2019]ReverseMe 反编译的伪码看不明白, 直接动调 这里显示"Please input your flag", 然后接受输入, 再和32进行比较, 应该是flag长度要求32位, 符合要求则跳转到loc_E528EE分支继续执行 动调之后伪码可以读了 int __cdecl main(int argc, const char **arg…...
xorm数据库操作之Join、Union
golang的数据库操作xorm使用起来非常方便,不用再自己写SQl语句,而且xorm自己给我们做了SQL防注入等操作,用起来既方便又安全。此次文章我不会记录xorm的基本操作,我值记录一些特殊用法问题,包括动态创建表单、基于xorm…...
排序:基数排序算法分析
1.算法思想 假设长度为n的线性表中每个结点aj的关键字由d元组 ( k j d − 1 , k j d − 2 , k j d − 3 , . . . , k j 1 , k j 0 ) (k_{j}^{d-1},k_{j}^{d-2},k_{j}^{d-3},... ,k_{j}^{1} ,k_{j}^{0}) (kjd−1,kjd−2,kjd−3,...,kj1,kj0)组成, 其中&am…...
用go实现http服务端和请求端
一、概述 本文旨在学习记录下如何用go实现建立一个http服务器,同时构造一个专用格式的http客户端。 二、代码实现 2.1 构造http服务端 1、http服务处理流程 基于HTTP构建的服务标准模型包括两个端,客户端(Client)和服务端(Server)。HTTP 请求从客户端…...
幂级数和幂级数的和函数有什么关系?
幂级数和幂级数的和函数有什么关系? 本文例子引用自:80_1幂级数运算,逐项积分、求导【小元老师】高等数学,考研数学 求幂级数 ∑ n 1 ∞ 1 n x n \sum\limits_{n1}^{\infty}\frac{1}{n}x^n n1∑∞n1xn 的和函数 ÿ…...
Git多账号管理通过ssh 公钥的方式,git,gitlab,gitee
按照目前国内访问git,如果不科学上网,我们很大可能访问会超时。基于这个,所以我现在的git 配置已经增加到了3个了 一个公司gitlab,一个git,一个gitee. 以下基于这个环境,我们来说明下如何创建配置ssh公钥。…...
在nodejs常见的不良做法及其优化解决方案
在nodejs常见的不良做法及其优化解决方案 当涉及到在express和nodejs中开发应用程序时。遵循最佳实践对于确保项目的健壮性、可维护性和安全性至关重要。 在本文中,我们将探索开发人员经常遇到的几种常见的错误做法,并通过代码示例研究优化的最佳做法&…...
关于layui upload上传组件上传文件无反应的问题
最近使用layui upload组件时,碰到了上传文件无反应的问题,感到非常困惑。 因为使用layui upload组件不是一次两次了,之前每次都可以,这次使用同样的配方,同样的姿势,为什么就不行了呢? 照例先…...
容器网络之Flannel
第一个问题位置变化,往往是通过一个称为注册中心的地方统一管理的,这个是应用自己做的。当一个应用启动的时候,将自己所在环境的 IP 地址和端口,注册到注册中心指挥部,这样其他的应用请求它的时候,到指挥…...
SVM(下):如何进行乳腺癌检测?
⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ 🐴作者:秋无之地 🐴简介:CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作,主要擅长领域有:爬虫、后端、大数据开发、数据分析等。 🐴欢迎小伙伴们点赞👍🏻、收藏⭐️、…...
嵌入式Linux应用开发-第十五章具体单板的按键驱动程序
嵌入式Linux应用开发-第十五章具体单板的按键驱动程序 第十五章 具体单板的按键驱动程序(查询方式)15.1 GPIO操作回顾15.2 AM335X的按键驱动程序(查询方式)15.2.1 先看原理图确定引脚及操作方法15.2.2 再看芯片手册确定寄存器及操作方法15.2.3 编程15.2.3.1 程序框架15.2.3.2 硬…...
MySQL体系结构和四层架构介绍
MySQL体系结构图如下: 四层介绍 1. 连接层: 它的主要功能是处理客户端与MySQL服务器之间的连接(比如Java应用程序通过JDBC连接MySQL)。当客户端应用程序连接到MySQL服务器时,连接层对用户进行身份验证、建立安全连接并管理会话状态。它还处理…...
【产品运营】如何做好B端产品规划
产品规划是基于当下掌握的多维度信息,为追求特定目的,而制定的产品资源投入计划。 产品规划是基于当下掌握的多维度信息(客户需求、市场趋势、竞争对手、竞争策略等),为追求特定目的(商业增长、客户满意等&…...
ruoyi-启动
1 springboot 版本 git 地址 ruoyi-vue-pro: 🔥 官方推荐 🔥 RuoYi-Vue 全新 Pro 版本,优化重构所有功能。基于 Spring Boot MyBatis Plus Vue & Element 实现的后台管理系统 微信小程序,支持 RBAC 动态权限、数据权限…...
select完成服务器并发
服务器 #include <myhead.h>#define PORT 4399 //端口号 #define IP "192.168.0.191"//IP地址//键盘输入事件 int keybord_events(fd_set readfds); //客户端交互事件 int cliRcvSnd_events(int , struct sockaddr_in*, fd_set *, int *); //客户端连接事件 …...
初级篇—第四章聚合函数
文章目录 聚合函数介绍聚合函数介绍COUNT函数AVG和SUM函数MIN和MAX函数 GROUP BY语法基本使用使用多个列分组WITH ROLLUP HAVING基本使用WHERE和HAVING的对比开发中的选择 SELECT的执行过程查询的结构SQL 的执行原理 练习流程函数 聚合函数介绍 聚合函数作用于一组数据&#x…...
高效实现Windows任务栏个性化的5个极简方案:轻量级透明化工具TranslucentTB全指南
高效实现Windows任务栏个性化的5个极简方案:轻量级透明化工具TranslucentTB全指南 【免费下载链接】TranslucentTB A lightweight utility that makes the Windows taskbar translucent/transparent. 项目地址: https://gitcode.com/gh_mirrors/tr/TranslucentTB …...
League-Toolkit英雄联盟工具集启动故障解决方案
League-Toolkit英雄联盟工具集启动故障解决方案 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit League-Toolkit作为一款基于LCU A…...
2026年主流接口测试平台慢因分析与选型参考
2026年主流接口测试平台慢因分析与选型参考 核心观点摘要 2026年接口测试响应慢核心诱因可归为三类:工具本身并发调度能力不足、协议适配不全导致额外转码开销、缺少AI智能链路优化能力,多数企业接口测试效率低与工具选型不当直接相关。本次盘点覆盖当前…...
别再只盯着Midjourney了!2025年,这5款文生图模型更适合你的具体业务场景
2025年五大文生图模型实战指南:如何为你的业务精准匹配AI工具 当Midjourney成为文生图领域的"网红"时,真正懂行的从业者已经在根据具体业务需求选择更合适的工具了。就像专业摄影师不会只用一款镜头拍所有题材,明智的AI应用者需要建…...
从单变量到多变量:ODE与PDE的核心差异与应用场景解析
1. 从自变量数量看本质差异 第一次接触微分方程时,我也曾被ODE和PDE搞得晕头转向。直到有天导师用了个特别形象的比喻:ODE就像观察单车道上的车流,而PDE则是分析整个立交桥的交通网络。这个比方一下子点醒了我——核心差异就在于自变量数量这…...
保姆级避坑指南:手把手教你搞定CARLA 0.9.11与Autoware的ROS话题转发(附完整代码)
深度解析CARLA与Autoware联合仿真中的ROS话题转发实战 在自动驾驶仿真开发领域,CARLA与Autoware的联合使用已成为研究热点。许多开发者在尝试将两者结合时,往往会在ROS话题转发环节遇到各种"坑"。本文将聚焦这一关键环节,提供一份详…...
ai辅助c语言开发:让快马智能生成复杂格式文件读写代码
最近在开发一个C语言程序时需要处理自定义数据包格式,正好体验了用AI辅助开发的便捷。这个数据包格式包含包头标识、包体长度和JSON格式的包体数据,需要实现读写功能。下面分享我的实现过程和AI辅助开发的实用技巧。 数据包结构分析 首先明确数据包由三部…...
RVC 技术指南:从问题解决到效率提升
RVC 技术指南:从问题解决到效率提升 【免费下载链接】rvc RVC is a Linux console UI for vSphere, built on the RbVmomi bindings to the vSphere API. 项目地址: https://gitcode.com/gh_mirrors/rvc/rvc 问题场景→核心原理→分步方案→进阶技巧 一、环…...
Chatbot Arena排行榜单实战指南:从数据采集到模型优化
Chatbot Arena排行榜单实战指南:从数据采集到模型优化 在构建和优化自己的对话AI时,我们常常面临一个核心问题:如何客观、全面地评估它的性能?闭门造车式的测试往往带有主观偏见,而Chatbot Arena这类公开的排行榜单&a…...
从座舱芯片到指尖触控:聊聊高通8155/8295上那个你可能没注意到的Virtio Touch框架
从座舱芯片到指尖触控:高通8155/8295中的Virtio Touch框架解析 当你的手指在车载中控屏上滑动时,一组坐标数据正以微秒级速度穿越两个操作系统——这背后是高通座舱芯片中鲜为人知的Virtio Touch框架在发挥作用。作为连接QNX Hypervisor与Android系统的神…...
