FLINK SQL时间属性
Flink三种时间属性简介
在Flink SQL中,时间属性是一个核心概念,它主要用于处理与时间相关的数据流。Flink支持三种时间属性:事件时间(event time)、处理时间(processing time)和摄入时间(ingestion time)。以下是对这三种时间属性的详细解释:
一、事件时间(Event Time)
- 定义:事件时间指的是数据本身携带的时间,即数据在产生时的时间戳。
- 特点:
- 反映了数据实际发生的时间。
- 需要从数据中提取时间戳,并可能需要生成watermark来处理乱序数据。
- 在Flink SQL触发计算时,使用数据本身携带的时间。
- 应用场景:适用于需要基于数据实际发生时间进行计算的场景,如实时日志分析、交易系统等。
二、处理时间(Processing Time)
- 定义:处理时间指的是具体算子计算数据执行时的机器时间,即在Flink集群中处理数据的节点所在机器的本地时间。
- 特点:
- 是最简单的一种时间概念,不需要从数据里获取时间,也不需要生成watermark。
- 提供了较低的时间精度和确定性,因为不同节点的处理时间可能不同。
- 应用场景:适用于对时间精度要求不高,或者数据不需要基于事件时间进行处理的场景。
- 定义方式:
- 在DataStream转换时直接指定。
- 在定义Table Schema时指定,使用.proctime后缀。
- 在创建表的DDL中指定,使用PROCTIME()函数。
三、摄入时间(Ingestion Time)
- 定义:摄入时间指的是数据从数据源进入Flink的时间。
- 特点:
- 反映了数据被Flink系统接收的时间。
- 适用于数据源与Flink集群之间存在较大时间差的场景。
- 应用场景:在Flink SQL中,摄入时间的使用相对较少,因为大多数场景更关注数据实际发生的时间(事件时间)或处理时间。
四、时间属性的应用
在Flink SQL中,时间属性主要用于时间窗口的计算、自定义时间语义的计算等。通过定义时间属性,可以方便地实现基于时间的聚合、过滤、连接等操作。
注意事项
- 在一个Flink任务中,通常只会选择一个时间属性作为全局时间属性。
- 时间属性的定义方式取决于具体的应用场景和需求。
- 在使用事件时间时,需要注意处理乱序数据的问题,并合理设置watermark的生成策略。
Flink三种时间属性应用场景
一、事件时间(Event Time)应用场景:
- 实时日志分析:在实时日志分析中,通常使用事件时间作为分析的基础。例如,需要统计某个时间段内的日志数量或类型,使用事件时间可以确保统计结果基于日志实际发生的时间。
- 交易系统:在交易系统中,事件时间用于处理交易数据的实时分析。例如,计算某支股票在特定时间段内的价格波动,需要确保时间戳与交易发生的时间一致。
- 实时推荐系统:在实时推荐系统中,用户行为数据的时间戳是事件时间。通过基于事件时间的分析,可以了解用户在不同时间段的行为模式,从而提供更加个性化的推荐。
二、处理时间(Processing Time)应用场景:
- 非实时数据分析:对于不需要严格基于事件时间进行分析的场景,可以使用处理时间。例如,进行批处理任务时,不关心数据实际发生的时间,只关注任务开始和结束的时间。
- 本地开发和测试:在本地开发和测试环境中,由于无法模拟真实的事件时间,可以使用处理时间进行简化处理。
三、摄入时间(Ingestion Time)应用场景:
- 数据源与Flink集群时间差较大:当数据源与Flink集群之间存在较大的时间差时,使用摄入时间可以确保数据在Flink集群中处理的一致性。然而,在实际应用中,摄入时间的使用相对较少,因为大多数场景更关注数据实际发生的时间(事件时间)或处理时间。
SQL指定时间属性两种方式
在Flink SQL中,指定时间属性主要有两种方式,以下是对这两种方式的详细解释:
一、在创建表的DDL中指定时间属性
- 事件时间(Event Time):
- 在创建表的DDL语句中,可以通过增加一个时间戳字段并使用WATERMARK语句来定义事件时间属性。
- 事件时间列的字段类型必须是TIMESTAMP或TIMESTAMP_LTZ类型。
- WATERMARK语句用于生成水印(watermark),以处理乱序数据。水印是一个延迟阈值,表示在该时间戳之前的所有数据都已经到达。
示例代码:
CREATE TABLE user_actions ( user_name STRING, data STRING, user_action_time TIMESTAMP(3), WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...);
在这个例子中,user_action_time被声明为事件时间,并且设置了5秒的水印延迟。
2. 处理时间(Processing Time):
- 在创建表的DDL语句中,可以通过增加一个字段并使用PROCTIME()函数来定义处理时间属性。
- PROCTIME()函数是Flink SQL内置的函数,用于获取当前处理时间。
示例代码:
CREATE TABLE EventTable ( user STRING, url STRING, ts AS PROCTIME()
) WITH (...);
在这个例子中,ts字段被定义为处理时间属性。
二、在DataStream转换时指定时间属性
- 事件时间(Event Time):
- 在DataStream API中,可以通过assignTimestampsAndWatermarks方法来为数据流分配时间戳和水印。
- 这种方法通常用于从外部数据源(如Kafka)读取数据时,为数据分配事件时间。
示例代码(伪代码):
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer<MyEvent>(...)) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(5)) { @Override public long extractTimestamp(MyEvent event) { return event.getTimestamp(); // 从事件中提取时间戳 } });
在这个例子中,使用BoundedOutOfOrdernessTimestampExtractor为数据流分配了事件时间,并设置了5秒的最大乱序时间。
2. 处理时间(Processing Time):
- 在DataStream API中,处理时间是默认的时间属性,不需要显式指定。
- 但是,如果需要在后续操作中引用处理时间,可以通过在Table API中使用.proctime后缀来访问。
示例代码(伪代码):
Table table = tableEnv.fromDataStream(stream, "user, temperature, timestamp, pt.proctime as processingTime");
在这个例子中,pt.proctime被用作处理时间属性,并在Table API中进行了访问。
需要注意的是,在实际应用中,选择哪种方式指定时间属性取决于具体的应用场景和需求。在Flink SQL中,通常更倾向于在创建表的DDL中指定时间属性,因为这样可以更直观地定义表的模式结构(schema),并且方便后续的时间相关操作。而在DataStream API中指定时间属性则更灵活,适用于需要从外部数据源读取数据并为其分配时间戳的场景。
SQL事件时间案例
以下是一个关于Flink SQL事件时间的案例,用于展示如何在Flink SQL中使用事件时间属性进行窗口聚合操作。
案例背景
假设有一个数据流,其中包含了用户的点击事件。每个事件都有一个事件时间戳,表示用户点击的时间。任务是计算每个用户在每10分钟窗口内的点击次数。
步骤一:创建数据源表
首先,需要创建一个数据源表,并声明事件时间属性。在这个例子中,假设数据源是一个Kafka主题,并且事件时间戳存储在名为eventTime的字段中。
CREATE TABLE clicks ( userId STRING, eventTime TIMESTAMP(3), -- 事件时间戳 WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 声明水印,用于处理乱序数据
) WITH ( 'connector' = 'kafka', 'topic' = 'clicks_topic', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'scan.startup.mode' = 'earliest-offset'
);
步骤二:使用窗口聚合操作
接下来,可以使用Flink SQL的窗口聚合操作来计算每个用户在每10分钟窗口内的点击次数。在这个例子中,将使用滚动窗口(TUMBLE)进行聚合。
SELECT userId, TUMBLE_START(eventTime, INTERVAL '10' MINUTE) AS windowStart, -- 窗口开始时间 TUMBLE_END(eventTime, INTERVAL '10' MINUTE) AS windowEnd, -- 窗口结束时间 COUNT(*) AS clickCount -- 点击次数
FROM clicks
GROUP BY userId, TUMBLE(eventTime, INTERVAL '10' MINUTE);
解释
- 创建数据源表:在创建表时,指定了eventTime字段为事件时间属性,并设置了5秒的水印延迟。这意味着Flink将等待最多5秒以处理可能到达的乱序数据。
- 窗口聚合操作:使用TUMBLE函数定义了一个滚动窗口,窗口大小为10分钟。然后,按用户ID和窗口进行分组,并计算每个分组中的点击次数。
- 结果:查询结果将包含用户ID、窗口开始时间、窗口结束时间和点击次数。
注意事项
- 水印:在处理事件时间时,水印是非常重要的。它们允许Flink处理乱序数据,并确保在窗口聚合时不会遗漏任何数据。
- 时间属性类型:事件时间列的字段类型必须是TIMESTAMP或TIMESTAMP_LTZ类型。如果数据源中的时间戳是BIGINT类型(表示毫秒或秒),则需要在创建表时将其转换为TIMESTAMP类型。
- 窗口类型:Flink SQL支持多种类型的窗口,如滚动窗口(TUMBLE)、滑动窗口(SLIDE)和会话窗口(SESSION)等。根据具体需求选择合适的窗口类型。
SQL处理时间案例
在Flink SQL中,处理时间(Processing Time)是指数据被具体算子处理时的系统时间。以下是一个基于处理时间的Flink SQL案例,用于展示如何使用处理时间属性进行窗口聚合操作。
案例背景
假设有一个数据流,其中包含了传感器读取的数据。每个数据都有一个读取时间戳,但这个时间戳不是事件发生时的时间,而是数据被读取到系统的时间。任务是计算每5分钟内读取的数据量。
步骤一:创建数据源表
首先,需要创建一个数据源表,并声明处理时间属性(在Flink SQL中,处理时间属性是隐式的,不需要显式声明,但可以通过特定的函数来引用)。在这个例子中,假设数据源是一个Socket流。
-- 假设有一个Socket数据源,数据格式为:id,value,timestamp(这里的timestamp是读取时间戳)
CREATE TABLE sensor_data ( id STRING, value DOUBLE, timestamp BIGINT -- 读取时间戳,单位为毫秒
) WITH ( 'connector' = 'socket', 'hostname' = 'localhost', 'port' = '9999', 'format' = 'csv'
);
步骤二:转换时间戳并创建处理时间窗口
由于Flink SQL中的处理时间属性是隐式的,不能直接对其进行操作。但是,可以通过将读取时间戳转换为TIMESTAMP类型(尽管这不是必要的,因为处理时间窗口不需要显式的时间戳字段),然后使用Flink SQL提供的窗口函数来创建处理时间窗口。不过,在这个例子中,将直接使用处理时间窗口函数,而不进行显式的转换。
-- 使用处理时间滚动窗口计算每5分钟内的数据量
SELECT TUMBLE_START(PROCTIME()) AS window_start, -- 窗口开始时间(处理时间) TUMBLE_END(PROCTIME()) AS window_end, -- 窗口结束时间(处理时间) COUNT(*) AS data_count -- 数据量
FROM sensor_data
GROUP BY TUMBLE(PROCTIME(), INTERVAL '5' MINUTE); -- 使用处理时间滚动窗口,窗口大小为5分钟
解释
- 创建数据源表:创建了一个名为sensor_data的数据源表,它接收来自Socket流的数据。数据包含id、value和timestamp字段,其中timestamp是数据被读取到系统的时间戳(以毫秒为单位)。
- 转换时间戳并创建窗口:在这个例子中,实际上没有显式地将timestamp字段转换为TIMESTAMP类型,因为处理时间窗口不需要这样做。相反,直接使用了PROCTIME()函数来获取处理时间,并使用TUMBLE函数创建了一个滚动窗口。窗口大小为5分钟,意味着每5分钟将计算一次窗口内的数据量。
- 结果:查询结果将包含窗口开始时间、窗口结束时间和窗口内的数据量。
注意事项
- 处理时间属性:在Flink SQL中,处理时间属性是隐式的,不需要显式声明。它表示数据被具体算子处理时的系统时间。
- 窗口函数:Flink SQL提供了多种窗口函数,如TUMBLE(滚动窗口)、SLIDE(滑动窗口)和SESSION(会话窗口)等。根据具体需求选择合适的窗口函数。
- 数据源:在这个例子中,使用了Socket作为数据源。在实际应用中,数据源可能是Kafka、文件系统或其他数据源。
相关文章:
FLINK SQL时间属性
Flink三种时间属性简介 在Flink SQL中,时间属性是一个核心概念,它主要用于处理与时间相关的数据流。Flink支持三种时间属性:事件时间(event time)、处理时间(processing time)和摄入时间&#…...
android——Groovy gralde 脚本迁移到DSL
1、implementation的转换 implementation com.github.CymChad:BaseRecyclerViewAdapterHelper:*** 转换为 implementation ("com.github.CymChad:BaseRecyclerViewAdapterHelper:***") 2、plugin的转换 apply plugin: kotlin-android-extensions 转换为&#x…...
工程项目管理中的最常见概念!蓝燕云总结!
01 怎么理解工程项目管理? 建设工程项目管理指的是专业性的管理,并非行政事务管理。建设工程项目管理是对工程项目全生命周期的管理,确保项目能够按计划的进度、成本和质量完成。 建设工程项目不同阶段管理的主要内容不同,通常…...
PostgreSQL AUTO INCREMENT
PostgreSQL AUTO INCREMENT 在数据库管理系统中,自动递增(AUTO INCREMENT)是一种常见特性,用于在插入新记录时自动生成唯一的标识符。PostgreSQL,作为一个功能强大的开源关系数据库管理系统,也提供了类似的…...

24-10-13-读书笔记(二十五)-《一只特立独行的猪》([中] 王小波)用一生来学习艺术
文章目录 《一只特立独行的猪》([中] 王小波)目录阅读笔记记录总结 《一只特立独行的猪》([中] 王小波) 十月第五篇,放慢脚步,秋季快要过去了,要步入冬季了,心中也是有些跌宕起伏&am…...

Java—继承性与多态性
目录 一、this关键字 1. 理解this 2. this练习 二、继承性 2.1 继承性的理解 2.1.1 多层继承 2.2 继承性的使用练习 2.2.1 练习1 2.2.2 练习2 2.3 方法的重写 2.4 super关键字 2.4.1 子类对象实例化 三、多态性 3.1 多态性的理解 3.2 向下转型与多态练习 四、Ob…...

打通华为认证实验考试“痛点”:备考指南全解析
华为认证体系中的实验考试环节,尤其是针对高端的HCIE认证,是评估考生实践技能的关键部分。这一环节的核心目标是检验考生对华为设备和解决方案的操作熟练度、技术实施技能以及面对现实工作挑战时的问题解决能力。通过在真实环境中进行的实践操作…...
【软考】子系统划分
目录 1. 子系统划分的原则1.1 子系统要具有相对独立性1.2 子系统之间数据的依赖性尽量小1.3 子系统划分的结果应使数据几余较小1.4 子系统的设置应考虑今后管理发展的需要1.5 子系统的划分应便于系统分阶段实现1.6 子系统的划分应考虑到各类资源的充分利用 2. 子系统结构设计3.…...

【Python】selenium获取鼠标在网页上的位置,并定位到网页位置模拟点击的方法
在使用Selenium写自动化爬虫时,遇到验证码是常事了。我在写爬取测试的时候,遇到了点击型的验证码,例如下图这种: 这种看似很简单,但是它居然卡爬虫?用简单的点触验证码的方法来做也没法实现 平常的点触的方…...
【C++ 真题】B2078 含 k 个 3 的数
含 k 个 3 的数 题目描述 输入两个正整数 m m m 和 k k k,其中 1 < m ≤ 1 0 15 1 \lt m \leq 10^{15} 1<m≤1015, 1 < k ≤ 15 1 \lt k \leq 15 1<k≤15 ,判断 m m m 是否恰好含有 k k k 个 3 3 3,如果满足条…...
蓝桥杯省赛真题——冶炼金属
问题描述 小蓝有一个神奇的炉子用于将普通金属 O 冶炼成为一种特殊金属 X。这个炉子有一个称作转换率的属性 V,V 是一个正整数,这意味着消耗 V 个普通金属 O 恰好可以冶炼出一个特殊金属 X,当普通金属 O 的数目不足 V 时,无法继续…...

【Mac苹果电脑安装】DBeaverEE for Mac 数据库管理工具软件教程【保姆级教程】
Mac分享吧 文章目录 DBeaverEE 数据库管理工具 软件安装完成,打开效果图片Mac电脑 DBeaverEE 数据库管理工具 软件安装——v24.21️⃣:下载软件2️⃣:安装JDK,根据下图操作步骤提示完成安装3️⃣:安装DBeaverEE&#…...
数据仓库中的维度建模:深入理解与案例分析
数据仓库中的维度建模:深入理解与案例分析 维度建模是数据仓库设计中最常用的一种方法,旨在简化数据访问、提高查询效率,特别适用于需要对数据进行多维分析的场景。本文将深入探讨维度建模的核心概念、设计步骤以及如何将其应用于实际案例中…...

前端打印功能(vue +springboot)
后端 后端依赖生成pdf的方法pdf转图片使用(用的打印模版是带参数的 ,参数是aaa)总结 前端页面 效果 后端 依赖 依赖 一个是用模版生成对应的pdf,一个是用来将pdf转成图片需要的 <!--打印的--><dependency><groupId>net.sf.jasperreports</groupId>&l…...

中间件有哪些分类?
中间件的分类 中间件是位于操作系统和应用程序之间的软件,它提供了一系列服务来简化分布式系统中的应用程序开发和集成。中间件可以根据其功能和用途被分为不同的类别。以下是中间件的一些主要分类: 1. 通信处理(消息)中间件&am…...

开始新征程__10.13
好久没有更新 csdn 了,身边的人都说 csdn 水,但是在我看来,它在我大一这一年里对我的帮助很大,最近上账号看看,看见了网友评论,哈哈,决定以后还是继续更新,分享自己的学习心得。...
SAP 联合创始人谈Home Office
软件公司 SAP 的家庭办公室规定继续引发激烈争论,其联合创始人哈索-普拉特纳(Hasso Plattner)对此也有明确看法。 沃尔多夫--年初,SAP 首席执行官克里斯蒂安-克莱因(Christian Klein)向员工宣誓 "努力…...

基于Jenkins+K8S构建DevOps自动化运维管理平台
目录 1.k8s助力DevOps在企业落地实践 1.1 传统方式部署项目为什么发布慢,效率低? 1.2 上线一个功能,有多少时间被浪费了? 1.3 如何解决发布慢,效率低的问题呢? 1.5 什么是DevOps? 1.5.1 敏…...
【OpenCV】(一)—— 安装opencv环境
【OpenCV】(一)—— 安装opencv环境 OpenCV(Open Source Computer Vision Library)是一个开源的计算机视觉和机器学习软件库。OpenCV 是用 C 编写的,但它也有 Python、Java 和 MATLAB 接口,并支持 Windows…...
MybatisPlus操作符和运算值
好久没有更新了,这次更新一个当前端需要对运算符和运算值都需要前端传递给后端,动态拼接运算条件时的处理方法。 1、踩雷 查询年龄 >20,其中>前端下拉框选择,20值前端下拉框选择 1)用户表: CREAT…...

python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 
【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...

Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

ABAP设计模式之---“简单设计原则(Simple Design)”
“Simple Design”(简单设计)是软件开发中的一个重要理念,倡导以最简单的方式实现软件功能,以确保代码清晰易懂、易维护,并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计,遵循“让事情保…...
Python 包管理器 uv 介绍
Python 包管理器 uv 全面介绍 uv 是由 Astral(热门工具 Ruff 的开发者)推出的下一代高性能 Python 包管理器和构建工具,用 Rust 编写。它旨在解决传统工具(如 pip、virtualenv、pip-tools)的性能瓶颈,同时…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)
考察一般的三次多项式,以r为参数: p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]; 此多项式的根为: 尽管看起来这个多项式是特殊的,其实一般的三次多项式都是可以通过线性变换化为这个形式…...
Qt 事件处理中 return 的深入解析
Qt 事件处理中 return 的深入解析 在 Qt 事件处理中,return 语句的使用是另一个关键概念,它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别:不同层级的事件处理 方…...

一些实用的chrome扩展0x01
简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序,无论是测试应用程序、搜寻漏洞还是收集情报,它们都能提升工作流程。 FoxyProxy 代理管理工具,此扩展简化了使用代理(如 Burp…...