当前位置: 首页 > news >正文

Flink Watermark和时间语义

Flink 中的时间语义

[点击并拖拽以移动] ​

时间语义: EventTime:事件创建时间;Ingestion Time:数据进入Flink的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合,我们往往更关系事件时间Event Time。数据生成的时候就会自动注入时间戳,Event Time可以从日志数据的时间戳timestamp)中提取。

设置 Event Time

我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性。具体的时间,还需要从数据中提取时间戳timestamp

val env = StreamExecutionEnvironment.getExecutionEnvironment
//从调用时刻开始给 env 创建的每一个 stream 追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

乱序数据的影响

[点击并拖拽以移动] ​

FlinkEvent Time模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。由于网络、分布式等原因,会导致乱序数据的产生。如上图所示,理想情况与实际情况会存在差异,乱序数据会让窗口计算不准确。解决方案是让窗口等几分钟。

水位线 Watermark

怎么避免乱序数据带来计算不正确?
遇到一个时间戳到达了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。Watermark是一种衡量Event Time进展的机制,可以设置延迟触发。Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经达到了,因此,window的执行也是由Watermark触发的。Watermark用来让程序自己延迟和结果正确性。

Watermark 的特点: Watermark是一条特殊的数据记录,必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退。Watermark与数据的时间戳有关。
[点击并拖拽以移动] ​

watermark 的传递、引入和设定

watermark的传递: 一个Task输入可以并行多个,如下有4个并行度,输出也可能存在多个并行,如下有3个。每个任务Task内部都有一个事件时钟,且每个分区也维护了对应的WM,如下的Partition WM。当事件流流进Partition时会判断新事件流的WM是否大于当前的Partition WM,当大于时就更新Partition的时间戳WM为新流入的WM(取最大值),如下1->2象限Partition WM的变化。同时,如下Task也维护了一个全局的WM表示事件时钟,该值取分区中最小的WM作为输出的时间戳,如下第二象限的输出选择最小的WM=3向下传递。当第二个(横线)分区Partition WM流进来WM=7的事件流时,就会出现第三象限的情景,但是最小的WM还是=3,因此不更新Task全局的WM。当第三个分区Partition WM流进来WM=6的事件流时,就会出现第四象限的情景,此时分区Partition WM的最小值=4,因此Task全局WM=4
[点击并拖拽以移动] ​

watermark的引入: Event Time的使用一定要指定数据源中的时间戳。对于排好序的数据,只需要指定时间戳就够了,不需要延迟触发。

import org.apache.flink.streaming.api.windowing.time.Time
//同时分配时间戳和水位线
dataStream.assignTimestampsAndWatermarks(
//无序数据       Time.milliseconds(1000)=延迟时间
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {//提取事件戳 = timestamp * 1000是因为出入的毫秒override def extractTimestamp(t: SensorReading): Long = {t.timestamp * 1000}
})

【1】对于排好序的数据,不需要延迟触发,可以只指定事件戳就行了

dataStream.assignTimestampsAndWatermarks(_.timestamp * 1000)

【2】Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成 watermarkMyAssigner可以有两种类型,都继承自TimestampAssigner

dataStream.assignTimestampsAndWatermarks(new MyAssigner())

TimestampAssigner:定义了抽取时间戳,以及生成watermark的方法,有两种类型:
【1】AssignerWithPeriodicWatermarks 系统会周期性的将Watermark插入到流中。默认周期是200毫秒(如果是processingTimeWatermark = 0 ),可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。升序和前面乱序的处理BoundedOutOfOrderness,都是基于周期性watermark的。举例:如下产生watermark的逻辑:每隔5秒,Flink调用AssignerWithPeriodicWatermarksgetCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的water会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于之前水位的时间戳,则不会产生新的watermark

//方案一:
//EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//每隔 5秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000);//方案二
//自定义一个周期性的时间戳
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading]{val bound: Long = 60 * 1000 //延时为 1 分钟var maxTs: Long = Long.MinValue //观察到的最大时间戳//生成水位线override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {maxTs = maxTs.max(t.timestamp)t.timestamp}
}

【2】AssignerWithPunctuatedWatermarks 没有时间周期规律,可打断的生成watermark

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading]{val bound: Long = 60 * 1000//获取水位线,根据数据触发override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = {if(t.id == "sensor_1"){new Watermark(l - bound)}else{null}}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {t.timestamp}
}

watermark 的设定:
【1】在Flink中,watermark由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。
【2】如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
【3】而如果watermark到达得太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题。

相关文章:

Flink Watermark和时间语义

Flink 中的时间语义 时间语义: EventTime:事件创建时间;Ingestion Time:数据进入Flink的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合&#x…...

HarmonyOS UI框架简介

HarmonyOS UI框架介绍 HarmonyOSUI框架是一个用于构建跨设备应用的开发框架,它属于HarmonyOS系统架构的上层框架。该框架通过提供一系列的开发模型、声明式UI范式、系统API等,帮助开发者更高效地构建用户界面。 在HarmonyOSUI框架中,开发语…...

编程羔手解决Maven引入多个版本的依赖包,导致包冲突了

最近升级了些依赖发现有个hutool的方法老报错,java.lang.NoSuchMethodError: cn.hutool.core.util.ObjectUtil.defaultIfNull(Ljava/lang/Object;Ljava/util/function/Supplier;) 在 Maven 项目中,当不同的依赖模块引入 Hutool 的不同版本时&#xff0c…...

C#,入门教程(08)——基本数据类型及使用的基础知识

上一篇: C#,入门教程(07)——软件项目的源文件与目录结构https://blog.csdn.net/beijinghorn/article/details/124139947 数据类型用于指定数据体(DataEntity,包括但不限于类或结构体的属性、变量、常量、函数返回值)…...

分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测

分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测 目录 分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测(完整…...

计算机二级Python选择题考点——公共基础部分

计算机完成一条指令所花费的时间称为一个指令周期。(指令周期越短,指令执行就越快)顺序程序不具有并发性。(具有顺序性、封闭性和可再现性)结构化程序设计强调程序的易读性。系统软件:操作系统、编译程序、数据库管理系统 应用软件:杀毒软件在…...

《微机原理与应用》期末考试题库(附答案解析)

第1章 微型计算机概述 1.微型计算机的硬件系统包括___A _____。 A.控制器、运算器、存储器和输入输出设备 B.控制器、主机、键盘和显示器 C.主机、电源、CPU和输入输出 D.CPU、键盘、显示器和打印机 2.微处…...

如何在Android Glide中结合使用CenterCrop和自定义圆角变换(图片部分圆角矩形)

如何在Android Glide中结合使用CenterCrop和自定义圆角变换(图片部分圆角矩形) 在Android开发中,使用Glide加载图片时,我们经常需要对图片进行特定的处理,比如裁剪和圆角变换,特别是一些设计稿,…...

华为机考-手拍球游戏

【手拍手计算次数和总数】游戏规则:左手和右手拍球初始数为0,首先左手第一次拍球数1下,右手拍球1下,接下来左手在拍球时是上一次左手上一次右手的总和,右手也是上一次左手上一次右手拍球的总和,最后拍球总数…...

【线上问题】两台服务器的时间不一致导致jwt解析错误

目录 一、问题描述二、解决方法 一、问题描述 1.线上生产问题,本地和测试环境均无问题 2.本地和测试由于网关和登录服务均在同一台机器 3.线上的登录服务和网关部署不在一起,登录服务的时间正常,网关服务的服务器时间比实际快5秒 4.登录服务j…...

58.网游逆向分析与插件开发-游戏增加自动化助手接口-游戏菜单文字资源读取的逆向分析

内容来源于:易道云信息技术研究院VIP课 之前的内容:接管游戏的自动药水设定功能-CSDN博客 码云地址(master分支):https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号:34b9c1d43b512d0b4a3c395b…...

Vue-2、初识Vue

1、helloword小案列 代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>初始Vue</title><!--引入vue--><script type"text/javascript" src"https://cdn.jsdelivr.n…...

机器学习项目标记图像数据 - 安装LabelImg及功能介绍

什么是LabelImg&#xff1f; LabelImg 是一款流行的图像标注工具&#xff0c;主要用于计算机视觉领域。它允许用户为机器学习项目标记图像数据&#xff0c;特别是用于训练目标检测模型。 如何安装LabelImg pip install PyQt5 pip install pyqt5-tools pip install lxml pip …...

12.15 log 122.买卖股票的最佳时机 II,55. 跳跃游戏

122.买卖股票的最佳时机 II class Solution { public:int maxProfit(vector<int>& prices) {int result0;for(int i0;i<prices.size();i){if(i>0&&prices[i]-prices[i-1]>0){resultprices[i]-prices[i-1];}}return result;} }; 这道题贪心贪的时每…...

Redis - 挖矿病毒 db0 库 backup 反复出现解决方案

问题描述 腾讯云的服务器&#xff0c;使用 Docker 部署了 Redis 之后&#xff0c;发现 DB0 中总是出现 4 条 key&#xff0c;分别是 backup01backup02backup03backup04 而自己每次存入 db0 中的数据过一会就会被无缘无故删除掉。 原因分析 挖矿病毒 解决方案 在启动的时候…...

LiveGBS流媒体平台GB/T28181常见问题-国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道

LiveGBS国标GB28181中国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道 1、什么是国标编号&#xff1f;2、国标设备ID和通道ID3、ID 统一编码规则4、搭建GB28181视频直播平台 1、什么是国标编号&#xff1f; 国标GB28181对接过程中&#xff0c;可能有的小…...

Unity ShaderGraph 技能冷却转圈效果

Unity ShaderGraph 技能冷却转圈效果 前言项目场景布置代码编写ShaderGraph 连线总结 参考 前言 遇到一个需求&#xff0c;要展示技能冷却的圆形遮罩效果。 项目 场景布置 代码编写 Shader核心的就两句 // 将uv坐标系的原点移到纹理中心 float2 uv i.uv - float2(0.5, 0…...

C++上位软件通过Snap7开源库访问西门子S7-1200/S7-1500数据块的方法

前言 本人一直从事C上位软件开发工作较多&#xff0c;在之前的项目中通过C访问西门子PLC S7-200/S7-1200/S7-1500并进行数据交互的应用中一直使用的是ModbusTCP/ModbusRTU协议进行。Modbus上位开源库采用的LibModbus。经过实际应用发现Modbus开源库单次发送和接受的数据不能超过…...

如何正确安装Axure插件?详细步骤分享

产品经理在使用Axure导出html文件时&#xff0c;如果选择“完成后打开浏览器”&#xff0c;浏览器往往无法识别。此时&#xff0c;我们需要使用Axure官方谷歌浏览器插件直接访问浏览器中的本地html项目&#xff0c;否则我们需要上传到AxureCloud或使用软件本身的预览功能。接下…...

[SwiftUI]工程最低适配iOS13

问题&#xff1a; 新建工程&#xff0c;选择最低支持iOS13报错&#xff1a; main() is only available in iOS 14.0 or newer Scene is only available in iOS 14.0 or newer WindowGroup is only available in iOS 14.0 or newer 解决&#xff1a; 注释掉上面代码&#x…...

1999-2025.4汽车之家、懂车帝汽车配置信息数据库

汽车配置信息数据是连接汽车生产、销售、使用及后市场服务的核心纽带&#xff0c;对不同主体均具有不可替代的价值。对消费者可辅助决策&#xff0c;规避风险&#xff0c;对车企可指导研发&#xff0c;优化生产&#xff0c;对经销商可精准销售&#xff0c;提升转化&#xff0c;…...

别光重启!Ping域名失败但nslookup能通?一个注册表键值引发的血案(附排查脚本)

当Ping域名失败但nslookup正常&#xff1a;深入解析Windows注册表键值缺失的连锁反应 那天凌晨三点&#xff0c;运维工程师李明在机房盯着屏幕&#xff0c;额头渗出细密的汗珠。客户的核心业务系统刚刚完成迁移&#xff0c;却在最后验收阶段出现诡异现象——所有服务器都能通过…...

GPEN图像修复新手入门:界面介绍与功能详解

GPEN图像修复新手入门&#xff1a;界面介绍与功能详解 1. 认识GPEN图像修复工具 你是否遇到过这样的情况&#xff1a;翻出老照片想分享给亲友&#xff0c;却发现照片已经泛黄、模糊甚至出现划痕&#xff1f;GPEN图像修复工具就是为解决这些问题而生的专业解决方案。这个由科哥…...

保姆级教程:用LayoutLMv3和CDLA数据集搞定文档版面分析(附完整代码)

从零构建文档智能分析系统&#xff1a;基于LayoutLMv3与CDLA的实战指南 当一份复杂的合同或报告需要快速解析时&#xff0c;传统OCR技术往往只能提供杂乱无章的文本碎片。而现代文档智能系统已经能够理解文档的逻辑结构——自动识别标题、段落、表格的位置关系&#xff0c;就像…...

BMN31K522 UART雾化控制协议深度解析与跨平台移植

1. BMN31K522 原子化雾化适配器模块&#xff1a;嵌入式UART控制全解析BMN31K522 是由 Flextron 公司推出的专用原子化雾化适配器模块&#xff0c;面向工业加湿、农业喷雾、实验室气溶胶生成及医疗雾化等场景设计。该模块不直接驱动压电陶瓷或超声换能器&#xff0c;而是作为智能…...

GitHub中文界面插件:3分钟告别英文障碍,专注代码协作

GitHub中文界面插件&#xff1a;3分钟告别英文障碍&#xff0c;专注代码协作 【免费下载链接】github-chinese GitHub 汉化插件&#xff0c;GitHub 中文化界面。 (GitHub Translation To Chinese) 项目地址: https://gitcode.com/gh_mirrors/gi/github-chinese 你是否曾…...

学术专著不用愁!AI专著写作工具,为你打造专属学术大作

一、研究者专著写作困境与AI工具的出现 对于很多研究人员来说&#xff0c;写学术专著时面临的最大难题就是“有限的精力”与“无限的需求”之间的矛盾。专著的写作通常需要花费3到5年甚至更久的时间&#xff0c;但研究者们在日常工作中&#xff0c;除了教学和科研项目外&#…...

UNet全维度改进模型库重磅发布

突破边界&#xff0c;赋能工业质检&#xff1a;UNet全维度改进模型库重磅发布 在工业缺陷检测领域&#xff0c;分割精度与效率的平衡始终是技术落地的核心命题。我们倾力打造**「UNet全维度改进模型库」&#xff0c;以37项原创性结构创新为引擎&#xff0c;深度融合注意力机制…...

告别右键菜单臃肿困境:ContextMenuManager如何实现40%效率提升

告别右键菜单臃肿困境&#xff1a;ContextMenuManager如何实现40%效率提升 【免费下载链接】ContextMenuManager &#x1f5b1;️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 当你右键点击文件时&#xff0c;是否遇…...

若依分离版集成Activiti7:从零构建企业级流程中心

1. 环境准备与版本兼容性检查 在开始整合之前&#xff0c;我们需要先确认几个关键点。若依分离版是基于SpringBoot的前后端分离架构&#xff0c;而Activiti7作为新一代工作流引擎&#xff0c;两者整合最需要注意的就是版本兼容性。我去年在金融项目里就遇到过因为版本不匹配导致…...