当前位置: 首页 > news >正文

玩转数据-大数据-Flink SQL 中的时间属性

一、说明

时间属性是大数据中的一个重要方面,像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据,下面我们通过处理时间和事件时间来探讨一下Flink SQL 时间属性。

二、处理时间

2.1、准备WaterSensor类,方便使用

package com.lyh.bean;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {private String id;private Long ts;private Integer vc;
}

2.2、DataStream 到 Table 转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它新增一个字段。
代码段:

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_Proctime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));
// 1. 创建表的执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 声明一个额外的字段来作为处理时间字段Table sensorTable = tableEnv.fromDataStream(waterSensorStream, $("id"), $("ts"), $("vc"), $("pt").proctime());sensorTable.execute().print();}
}

执行结果:
在这里插入图片描述

2.3、创建数据文件sensor.txt 数据,方便使用

sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60

2.4、在创建表的 DDL 中定义

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_Procetime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(id string,ts bigint,vc int,pt_time as PROCTIME()) with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");Table table = tableEnv.sqlQuery("select * from sensor");table.execute().print();}
}

运行结果:
在这里插入图片描述

三、事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。

3.1、DataStream 到 Table 转换时定义

事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。
在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
1、在 schema 的结尾追加一个新的字段
2、替换一个已经存在的字段。
不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。
代码:
援用上面WaterSensor类

package com.lyh.flink12;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import static org.apache.flink.table.api.Expressions.$;public class Flink_Sql_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> waterSensorSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_1", 1000L, 100),new WaterSensor("sensor_2", 1000L, 200),new WaterSensor("sensor_2", 1000L, 200)).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordtime) -> element.getTs()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.fromDataStream(waterSensorSource,$("id"),$("ts"),$("vc"),$("pt").rowtime()).execute().print();}
}

运行结果:
在这里插入图片描述

3.2、使用已有的字段作为时间属性

.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime(), $("vc"));

3.3、在创建表的 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段.

package com.lyh.flink12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_Sql_ddl_EventTime {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("create table sensor(" +"id string," +"ts bigint," +"vc int, " +"t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +"watermark for t as t - interval '5' second)" +"with("+ "'connector' = 'filesystem',"+ "'path' = 'input/sensor.txt',"+ "'format' = 'csv'"+ ")");tableEnv.sqlQuery("select * from sensor").execute().print();}
}

运行结果:
在这里插入图片描述
说明:
1.把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
2.严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
3.递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。

相关文章:

玩转数据-大数据-Flink SQL 中的时间属性

一、说明 时间属性是大数据中的一个重要方面&#xff0c;像窗口&#xff08;在 Table API 和 SQL &#xff09;这种基于时间的操作&#xff0c;需要有时间信息。我们可以通过时间属性来更加灵活高效地处理数据&#xff0c;下面我们通过处理时间和事件时间来探讨一下Flink SQL …...

【论文笔记】A Review of Motion Planning for Highway Autonomous Driving

文章目录 I. INTRODUCTIONII. CONSIDERATIONS FOR HIGHWAY MOTION PLANNINGA. TerminologyB. Motion Planning SchemeC. Specificities of Highway DrivingD. Constraints on Highway DrivingE. What Is at Stake in this Paper III. STATE OF THE ARTA. Taxonomy DescriptionB…...

YOLOv8改进算法之添加CA注意力机制

1. CA注意力机制 CA&#xff08;Coordinate Attention&#xff09;注意力机制是一种用于加强深度学习模型对输入数据的空间结构理解的注意力机制。CA 注意力机制的核心思想是引入坐标信息&#xff0c;以便模型可以更好地理解不同位置之间的关系。如下图&#xff1a; 1. 输入特…...

2023年10月腾讯云优惠活动汇总:腾讯云最新优惠、代金券整理

腾讯云作为国内领先的云服务提供商&#xff0c;致力于为用户提供优质、稳定的云服务。为了更好地满足用户需求&#xff0c;腾讯云推出了各种优惠活动。本文将给大家分享腾讯云最新优惠活动&#xff0c;帮助用户充分利用腾讯云提供的优惠。 一、腾讯云优惠券领取【点此领取】 腾…...

BUUCTF reverse wp 65 - 70

[SWPU2019]ReverseMe 反编译的伪码看不明白, 直接动调 这里显示"Please input your flag", 然后接受输入, 再和32进行比较, 应该是flag长度要求32位, 符合要求则跳转到loc_E528EE分支继续执行 动调之后伪码可以读了 int __cdecl main(int argc, const char **arg…...

xorm数据库操作之Join、Union

golang的数据库操作xorm使用起来非常方便&#xff0c;不用再自己写SQl语句&#xff0c;而且xorm自己给我们做了SQL防注入等操作&#xff0c;用起来既方便又安全。此次文章我不会记录xorm的基本操作&#xff0c;我值记录一些特殊用法问题&#xff0c;包括动态创建表单、基于xorm…...

排序:基数排序算法分析

1.算法思想 假设长度为n的线性表中每个结点aj的关键字由d元组 ( k j d − 1 , k j d − 2 , k j d − 3 , . . . , k j 1 , k j 0 ) (k_{j}^{d-1},k_{j}^{d-2},k_{j}^{d-3},... ,k_{j}^{1} ,k_{j}^{0}) (kjd−1​,kjd−2​,kjd−3​,...,kj1​,kj0​)组成&#xff0c; 其中&am…...

用go实现http服务端和请求端

一、概述 本文旨在学习记录下如何用go实现建立一个http服务器&#xff0c;同时构造一个专用格式的http客户端。 二、代码实现 2.1 构造http服务端 1、http服务处理流程 基于HTTP构建的服务标准模型包括两个端&#xff0c;客户端(Client)和服务端(Server)。HTTP 请求从客户端…...

幂级数和幂级数的和函数有什么关系?

幂级数和幂级数的和函数有什么关系&#xff1f; 本文例子引用自&#xff1a;80_1幂级数运算&#xff0c;逐项积分、求导【小元老师】高等数学&#xff0c;考研数学 求幂级数 ∑ n 1 ∞ 1 n x n \sum\limits_{n1}^{\infty}\frac{1}{n}x^n n1∑∞​n1​xn 的和函数 &#xff…...

Git多账号管理通过ssh 公钥的方式,git,gitlab,gitee

按照目前国内访问git&#xff0c;如果不科学上网&#xff0c;我们很大可能访问会超时。基于这个&#xff0c;所以我现在的git 配置已经增加到了3个了 一个公司gitlab&#xff0c;一个git&#xff0c;一个gitee. 以下基于这个环境&#xff0c;我们来说明下如何创建配置ssh公钥。…...

在nodejs常见的不良做法及其优化解决方案

在nodejs常见的不良做法及其优化解决方案 当涉及到在express和nodejs中开发应用程序时。遵循最佳实践对于确保项目的健壮性、可维护性和安全性至关重要。 在本文中&#xff0c;我们将探索开发人员经常遇到的几种常见的错误做法&#xff0c;并通过代码示例研究优化的最佳做法&…...

关于layui upload上传组件上传文件无反应的问题

最近使用layui upload组件时&#xff0c;碰到了上传文件无反应的问题&#xff0c;感到非常困惑。 因为使用layui upload组件不是一次两次了&#xff0c;之前每次都可以&#xff0c;这次使用同样的配方&#xff0c;同样的姿势&#xff0c;为什么就不行了呢&#xff1f; 照例先…...

容器网络之Flannel

​ 第一个问题位置变化&#xff0c;往往是通过一个称为注册中心的地方统一管理的&#xff0c;这个是应用自己做的。当一个应用启动的时候&#xff0c;将自己所在环境的 IP 地址和端口&#xff0c;注册到注册中心指挥部&#xff0c;这样其他的应用请求它的时候&#xff0c;到指挥…...

SVM(下):如何进行乳腺癌检测?

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ 🐴作者:秋无之地 🐴简介:CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作,主要擅长领域有:爬虫、后端、大数据开发、数据分析等。 🐴欢迎小伙伴们点赞👍🏻、收藏⭐️、…...

嵌入式Linux应用开发-第十五章具体单板的按键驱动程序

嵌入式Linux应用开发-第十五章具体单板的按键驱动程序 第十五章 具体单板的按键驱动程序(查询方式)15.1 GPIO操作回顾15.2 AM335X的按键驱动程序(查询方式)15.2.1 先看原理图确定引脚及操作方法15.2.2 再看芯片手册确定寄存器及操作方法15.2.3 编程15.2.3.1 程序框架15.2.3.2 硬…...

MySQL体系结构和四层架构介绍

MySQL体系结构图如下&#xff1a; 四层介绍 1. 连接层&#xff1a; 它的主要功能是处理客户端与MySQL服务器之间的连接(比如Java应用程序通过JDBC连接MySQL)。当客户端应用程序连接到MySQL服务器时&#xff0c;连接层对用户进行身份验证、建立安全连接并管理会话状态。它还处理…...

【产品运营】如何做好B端产品规划

产品规划是基于当下掌握的多维度信息&#xff0c;为追求特定目的&#xff0c;而制定的产品资源投入计划。 产品规划是基于当下掌握的多维度信息&#xff08;客户需求、市场趋势、竞争对手、竞争策略等&#xff09;&#xff0c;为追求特定目的&#xff08;商业增长、客户满意等&…...

ruoyi-启动

1 springboot 版本 git 地址 ruoyi-vue-pro: &#x1f525; 官方推荐 &#x1f525; RuoYi-Vue 全新 Pro 版本&#xff0c;优化重构所有功能。基于 Spring Boot MyBatis Plus Vue & Element 实现的后台管理系统 微信小程序&#xff0c;支持 RBAC 动态权限、数据权限…...

select完成服务器并发

服务器 #include <myhead.h>#define PORT 4399 //端口号 #define IP "192.168.0.191"//IP地址//键盘输入事件 int keybord_events(fd_set readfds); //客户端交互事件 int cliRcvSnd_events(int , struct sockaddr_in*, fd_set *, int *); //客户端连接事件 …...

初级篇—第四章聚合函数

文章目录 聚合函数介绍聚合函数介绍COUNT函数AVG和SUM函数MIN和MAX函数 GROUP BY语法基本使用使用多个列分组WITH ROLLUP HAVING基本使用WHERE和HAVING的对比开发中的选择 SELECT的执行过程查询的结构SQL 的执行原理 练习流程函数 聚合函数介绍 聚合函数作用于一组数据&#x…...

保姆级教程:用Android 12新特性为你的App打造丝滑启动页(附完整代码示例)

Android 12启动页开发实战&#xff1a;从基础配置到高级动画优化 在移动应用体验中&#xff0c;启动页作为用户接触产品的第一印象&#xff0c;其流畅度直接影响用户留存率。Android 12引入的SplashScreen API为开发者提供了标准化且高度可定制的启动解决方案&#xff0c;本文将…...

英雄联盟智能助手如何解决游戏操作繁琐问题?提升游戏效率完全指南

英雄联盟智能助手如何解决游戏操作繁琐问题&#xff1f;提升游戏效率完全指南 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 你是…...

Dify插件安装全攻略:从在线市场到离线部署的完整实践

1. Dify插件安装前的准备工作 在开始安装Dify插件之前&#xff0c;我们需要先了解几个关键概念。Dify 1.0.0版本之后&#xff0c;所有工具和模型供应商都改为了插件形式&#xff0c;这意味着我们需要掌握插件的安装方法才能充分发挥Dify的功能。插件主要分为五大类&#xff1a…...

嵌入式新手入门:用快马平台生成带详细注释的LED控制项目

作为一个嵌入式开发新手&#xff0c;刚开始接触STM32时确实有点懵。寄存器配置、时钟树、GPIO模式这些概念扑面而来&#xff0c;光看理论文档很容易失去方向。最近我发现用InsCode(快马)平台生成带详细注释的基础项目特别适合入门&#xff0c;今天就以最经典的LED流水灯为例&am…...

【故障】解决ssh连接linux卡着不动的问题

1、原因使用xshell连接一台linux机器&#xff0c;发现连接不上&#xff0c;一直都开在连接这个界面&#xff0c;最后超时才停止。2、排查&#xff08;1&#xff09;首先&#xff0c;检查下防火墙或者selinuxsystem status firewalld #检查服务是否处于非Running的状态getenforc…...

建行江门市分行:银发关爱在行动 暖心服务送到家

服务无边界。近日&#xff0c;建行广东江门分行辖内多家网点接连上演暖心一幕&#xff0c;员工们主动跨出柜台&#xff0c;将金融服务送到客户家中、病房前&#xff0c;用一次次“特事特办”的上门服务&#xff0c;化解客户的“燃眉之急”&#xff0c;生动诠释了“以客户为中心…...

【开题答辩全过程】以 基于大数据的智能推送系统设计与实现为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…...

CF1335E2 Three Blocks Palindrome (hard version)

本题解也可通过CF1335E1 Three Blocks Palindrome (easy version)。做法&#xff1a;值域很小。只有200&#xff0c;考虑从这里入手。我们设q[i][j]表示数i第j次出现的位置&#xff0c;sum[i][j]表示种类i在1到j范围内出现过多少次。枚举 a,b 具体的值&#xff0c;枚举 x&#…...

EasyAnimateV5-7b-zh-InP多GPU分布式训练指南

EasyAnimateV5-7b-zh-InP多GPU分布式训练指南 1. 引言 如果你正在训练EasyAnimateV5这样的大模型&#xff0c;可能会发现单块GPU的训练速度实在太慢了。一张图片可能需要几分钟&#xff0c;一个完整的训练周期可能要花上好几天。这时候&#xff0c;多GPU分布式训练就成了必备…...

vLLM-v0.17.1应用场景:跨境电商多语言商品描述生成系统

vLLM-v0.17.1应用场景&#xff1a;跨境电商多语言商品描述生成系统 1. 跨境电商面临的商品描述挑战 跨境电商企业每天需要为成千上万的商品生成多语言描述&#xff0c;传统人工编写方式面临三大痛点&#xff1a; 人力成本高&#xff1a;每个语种都需要专业翻译人员&#xff…...