当前位置: 首页 > news >正文

玩转数据-大数据-Flink SQL 中的时间属性

一、说明

时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。

二、处理时间

2.1、准备WaterSensor类,方便使用

package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}

2.2、DataStream 到 Table 转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它新增一个字段。
代码段:

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 声明一个额外的字段来作为处理时间字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}

执行结果:
在这里插入图片描述

2.3、创建数据文件sensor.txt 数据,方便使用

sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

2.4、在创建表的 DDL 中定义

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}

运行结果:
在这里插入图片描述

三、事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

3.1、DataStream 到 Table 转换时定义

事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
1、在 schema 的结尾追加一个新的字段
2、替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
代码:
援用上面WaterSensor类

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}

运行结果:
在这里插入图片描述

3.2、使用已有的字段作为时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

3.3、在创建表的 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}

运行结果:
在这里插入图片描述
说明:
1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
2.严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。

相关文章:

玩转数据-大数据-Flink SQL 中的时间属性

一、说明 时间属性是大数据中的一个重要方面&#xff0c;像窗口&#xff08;在 Table API 和 SQL &#xff09;这种基于时间的操作&#xff0c;需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据&#xff0c;下面我们通过处理时间和事件时间来探讨一下Flink SQL …...

【论文笔记】A Review of Motion Planning for Highway Autonomous Driving

文章目录 I. INTRODUCTIONII. CONSIDERATIONS FOR HIGHWAY MOTION PLANNINGA. TerminologyB. Motion Planning SchemeC. Specificities of Highway DrivingD. Constraints on Highway DrivingE. What Is at Stake in this Paper III. STATE OF THE ARTA. Taxonomy DescriptionB…...

YOLOv8改进算法之添加CA注意力机制

1. CA注意力机制 CA&#xff08;Coordinate Attention&#xff09;注意力机制是一种用于加强深度学习模型对输入数据的空间结构理解的注意力机制。CA 注意力机制的核心思想是引入坐标信息&#xff0c;以便模型可以更好地理解不同位置之间的关系。如下图&#xff1a; 1. 输入特…...

2023年10月腾讯云优惠活动汇总:腾讯云最新优惠、代金券整理

腾讯云作为国内领先的云服务提供商&#xff0c;致力于为用户提供优质、稳定的云服务。为了更好地满足用户需求&#xff0c;腾讯云推出了各种优惠活动。本文将给大家分享腾讯云最新优惠活动&#xff0c;帮助用户充分利用腾讯云提供的优惠。 一、腾讯云优惠券领取【点此领取】 腾…...

BUUCTF reverse wp 65 - 70

[SWPU2019]ReverseMe 反编译的伪码看不明白, 直接动调 这里显示"Please input your flag", 然后接受输入, 再和32进行比较, 应该是flag长度要求32位, 符合要求则跳转到loc_E528EE分支继续执行 动调之后伪码可以读了 int __cdecl main(int argc, const char **arg…...

xorm数据库操作之Join、Union

golang的数据库操作xorm使用起来非常方便&#xff0c;不用再自己写SQl语句&#xff0c;而且xorm自己给我们做了SQL防注入等操作&#xff0c;用起来既方便又安全。此次文章我不会记录xorm的基本操作&#xff0c;我值记录一些特殊用法问题&#xff0c;包括动态创建表单、基于xorm…...

排序:基数排序算法分析

1.算法思想 假设长度为n的线性表中每个结点aj的关键字由d元组 ( k j d − 1 , k j d − 2 , k j d − 3 , . . . , k j 1 , k j 0 ) (k_{j}^{d-1},k_{j}^{d-2},k_{j}^{d-3},... ,k_{j}^{1} ,k_{j}^{0}) (kjd−1​,kjd−2​,kjd−3​,...,kj1​,kj0​)组成&#xff0c; 其中&am…...

用go实现http服务端和请求端

一、概述 本文旨在学习记录下如何用go实现建立一个http服务器&#xff0c;同时构造一个专用格式的http客户端。 二、代码实现 2.1 构造http服务端 1、http服务处理流程 基于HTTP构建的服务标准模型包括两个端&#xff0c;客户端(Client)和服务端(Server)。HTTP 请求从客户端…...

幂级数和幂级数的和函数有什么关系?

幂级数和幂级数的和函数有什么关系&#xff1f; 本文例子引用自&#xff1a;80_1幂级数运算&#xff0c;逐项积分、求导【小元老师】高等数学&#xff0c;考研数学 求幂级数 ∑ n 1 ∞ 1 n x n \sum\limits_{n1}^{\infty}\frac{1}{n}x^n n1∑∞​n1​xn 的和函数 &#xff…...

Git多账号管理通过ssh 公钥的方式,git,gitlab,gitee

按照目前国内访问git&#xff0c;如果不科学上网&#xff0c;我们很大可能访问会超时。基于这个&#xff0c;所以我现在的git 配置已经增加到了3个了 一个公司gitlab&#xff0c;一个git&#xff0c;一个gitee. 以下基于这个环境&#xff0c;我们来说明下如何创建配置ssh公钥。…...

在nodejs常见的不良做法及其优化解决方案

在nodejs常见的不良做法及其优化解决方案 当涉及到在express和nodejs中开发应用程序时。遵循最佳实践对于确保项目的健壮性、可维护性和安全性至关重要。 在本文中&#xff0c;我们将探索开发人员经常遇到的几种常见的错误做法&#xff0c;并通过代码示例研究优化的最佳做法&…...

关于layui upload上传组件上传文件无反应的问题

最近使用layui upload组件时&#xff0c;碰到了上传文件无反应的问题&#xff0c;感到非常困惑。 因为使用layui upload组件不是一次两次了&#xff0c;之前每次都可以&#xff0c;这次使用同样的配方&#xff0c;同样的姿势&#xff0c;为什么就不行了呢&#xff1f; 照例先…...

容器网络之Flannel

​ 第一个问题位置变化&#xff0c;往往是通过一个称为注册中心的地方统一管理的&#xff0c;这个是应用自己做的。当一个应用启动的时候&#xff0c;将自己所在环境的 IP 地址和端口&#xff0c;注册到注册中心指挥部&#xff0c;这样其他的应用请求它的时候&#xff0c;到指挥…...

SVM(下):如何进行乳腺癌检测?

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ 🐴作者:秋无之地 🐴简介:CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作,主要擅长领域有:爬虫、后端、大数据开发、数据分析等。 🐴欢迎小伙伴们点赞👍🏻、收藏⭐️、…...

嵌入式Linux应用开发-第十五章具体单板的按键驱动程序

嵌入式Linux应用开发-第十五章具体单板的按键驱动程序 第十五章 具体单板的按键驱动程序(查询方式)15.1 GPIO操作回顾15.2 AM335X的按键驱动程序(查询方式)15.2.1 先看原理图确定引脚及操作方法15.2.2 再看芯片手册确定寄存器及操作方法15.2.3 编程15.2.3.1 程序框架15.2.3.2 硬…...

MySQL体系结构和四层架构介绍

MySQL体系结构图如下&#xff1a; 四层介绍 1. 连接层&#xff1a; 它的主要功能是处理客户端与MySQL服务器之间的连接(比如Java应用程序通过JDBC连接MySQL)。当客户端应用程序连接到MySQL服务器时&#xff0c;连接层对用户进行身份验证、建立安全连接并管理会话状态。它还处理…...

【产品运营】如何做好B端产品规划

产品规划是基于当下掌握的多维度信息&#xff0c;为追求特定目的&#xff0c;而制定的产品资源投入计划。 产品规划是基于当下掌握的多维度信息&#xff08;客户需求、市场趋势、竞争对手、竞争策略等&#xff09;&#xff0c;为追求特定目的&#xff08;商业增长、客户满意等&…...

ruoyi-启动

1 springboot 版本 git 地址 ruoyi-vue-pro: &#x1f525; 官方推荐 &#x1f525; RuoYi-Vue 全新 Pro 版本&#xff0c;优化重构所有功能。基于 Spring Boot MyBatis Plus Vue & Element 实现的后台管理系统 微信小程序&#xff0c;支持 RBAC 动态权限、数据权限…...

select完成服务器并发

服务器 #include <myhead.h>#define PORT 4399 //端口号 #define IP "192.168.0.191"//IP地址//键盘输入事件 int keybord_events(fd_set readfds); //客户端交互事件 int cliRcvSnd_events(int , struct sockaddr_in*, fd_set *, int *); //客户端连接事件 …...

初级篇—第四章聚合函数

文章目录 聚合函数介绍聚合函数介绍COUNT函数AVG和SUM函数MIN和MAX函数 GROUP BY语法基本使用使用多个列分组WITH ROLLUP HAVING基本使用WHERE和HAVING的对比开发中的选择 SELECT的执行过程查询的结构SQL 的执行原理 练习流程函数 聚合函数介绍 聚合函数作用于一组数据&#x…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

工业安全零事故的智能守护者:一体化AI智能安防平台

前言&#xff1a; 通过AI视觉技术&#xff0c;为船厂提供全面的安全监控解决方案&#xff0c;涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面&#xff0c;能够实现对应负责人反馈机制&#xff0c;并最终实现数据的统计报表。提升船厂…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)

宇树机器人多姿态起立控制强化学习框架论文解析 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架&#xff08;一&#xff09; 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)

在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...