当前位置: 首页 > news >正文

Flink Watermark和时间语义

Flink 中的时间语义

[点击并拖拽以移动] ​

时间语义: EventTime:事件创建时间;Ingestion Time:数据进入Flink的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合,我们往往更关系事件时间Event Time。数据生成的时候就会自动注入时间戳,Event Time可以从日志数据的时间戳timestamp)中提取。

设置 Event Time

我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性。具体的时间,还需要从数据中提取时间戳timestamp

val env = StreamExecutionEnvironment.getExecutionEnvironment
//从调用时刻开始给 env 创建的每一个 stream 追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

乱序数据的影响

[点击并拖拽以移动] ​

FlinkEvent Time模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。由于网络、分布式等原因,会导致乱序数据的产生。如上图所示,理想情况与实际情况会存在差异,乱序数据会让窗口计算不准确。解决方案是让窗口等几分钟。

水位线 Watermark

怎么避免乱序数据带来计算不正确?
遇到一个时间戳到达了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。Watermark是一种衡量Event Time进展的机制,可以设置延迟触发。Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经达到了,因此,window的执行也是由Watermark触发的。Watermark用来让程序自己延迟和结果正确性。

Watermark 的特点: Watermark是一条特殊的数据记录,必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退。Watermark与数据的时间戳有关。
[点击并拖拽以移动] ​

watermark 的传递、引入和设定

watermark的传递: 一个Task输入可以并行多个,如下有4个并行度,输出也可能存在多个并行,如下有3个。每个任务Task内部都有一个事件时钟,且每个分区也维护了对应的WM,如下的Partition WM。当事件流流进Partition时会判断新事件流的WM是否大于当前的Partition WM,当大于时就更新Partition的时间戳WM为新流入的WM(取最大值),如下1->2象限Partition WM的变化。同时,如下Task也维护了一个全局的WM表示事件时钟,该值取分区中最小的WM作为输出的时间戳,如下第二象限的输出选择最小的WM=3向下传递。当第二个(横线)分区Partition WM流进来WM=7的事件流时,就会出现第三象限的情景,但是最小的WM还是=3,因此不更新Task全局的WM。当第三个分区Partition WM流进来WM=6的事件流时,就会出现第四象限的情景,此时分区Partition WM的最小值=4,因此Task全局WM=4
[点击并拖拽以移动] ​

watermark的引入: Event Time的使用一定要指定数据源中的时间戳。对于排好序的数据,只需要指定时间戳就够了,不需要延迟触发。

import org.apache.flink.streaming.api.windowing.time.Time
//同时分配时间戳和水位线
dataStream.assignTimestampsAndWatermarks(
//无序数据       Time.milliseconds(1000)=延迟时间
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {//提取事件戳 = timestamp * 1000是因为出入的毫秒override def extractTimestamp(t: SensorReading): Long = {t.timestamp * 1000}
})

【1】对于排好序的数据,不需要延迟触发,可以只指定事件戳就行了

dataStream.assignTimestampsAndWatermarks(_.timestamp * 1000)

【2】Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成 watermarkMyAssigner可以有两种类型,都继承自TimestampAssigner

dataStream.assignTimestampsAndWatermarks(new MyAssigner())

TimestampAssigner:定义了抽取时间戳,以及生成watermark的方法,有两种类型:
【1】AssignerWithPeriodicWatermarks 系统会周期性的将Watermark插入到流中。默认周期是200毫秒(如果是processingTimeWatermark = 0 ),可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。升序和前面乱序的处理BoundedOutOfOrderness,都是基于周期性watermark的。举例:如下产生watermark的逻辑:每隔5秒,Flink调用AssignerWithPeriodicWatermarksgetCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的water会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于之前水位的时间戳,则不会产生新的watermark

//方案一:
//EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//每隔 5秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000);//方案二
//自定义一个周期性的时间戳
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading]{val bound: Long = 60 * 1000 //延时为 1 分钟var maxTs: Long = Long.MinValue //观察到的最大时间戳//生成水位线override def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {maxTs = maxTs.max(t.timestamp)t.timestamp}
}

【2】AssignerWithPunctuatedWatermarks 没有时间周期规律,可打断的生成watermark

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading]{val bound: Long = 60 * 1000//获取水位线,根据数据触发override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = {if(t.id == "sensor_1"){new Watermark(l - bound)}else{null}}//抽取时间戳的方法override def extractTimestamp(t: SensorReading, l: Long): Long = {t.timestamp}
}

watermark 的设定:
【1】在Flink中,watermark由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。
【2】如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
【3】而如果watermark到达得太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题。

相关文章:

Flink Watermark和时间语义

Flink 中的时间语义 时间语义: EventTime:事件创建时间;Ingestion Time:数据进入Flink的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合&#x…...

HarmonyOS UI框架简介

HarmonyOS UI框架介绍 HarmonyOSUI框架是一个用于构建跨设备应用的开发框架,它属于HarmonyOS系统架构的上层框架。该框架通过提供一系列的开发模型、声明式UI范式、系统API等,帮助开发者更高效地构建用户界面。 在HarmonyOSUI框架中,开发语…...

编程羔手解决Maven引入多个版本的依赖包,导致包冲突了

最近升级了些依赖发现有个hutool的方法老报错,java.lang.NoSuchMethodError: cn.hutool.core.util.ObjectUtil.defaultIfNull(Ljava/lang/Object;Ljava/util/function/Supplier;) 在 Maven 项目中,当不同的依赖模块引入 Hutool 的不同版本时&#xff0c…...

C#,入门教程(08)——基本数据类型及使用的基础知识

上一篇: C#,入门教程(07)——软件项目的源文件与目录结构https://blog.csdn.net/beijinghorn/article/details/124139947 数据类型用于指定数据体(DataEntity,包括但不限于类或结构体的属性、变量、常量、函数返回值)…...

分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测

分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测 目录 分类预测 | Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现DBO-SVM蜣螂算法优化支持向量机多特征分类预测(完整…...

计算机二级Python选择题考点——公共基础部分

计算机完成一条指令所花费的时间称为一个指令周期。(指令周期越短,指令执行就越快)顺序程序不具有并发性。(具有顺序性、封闭性和可再现性)结构化程序设计强调程序的易读性。系统软件:操作系统、编译程序、数据库管理系统 应用软件:杀毒软件在…...

《微机原理与应用》期末考试题库(附答案解析)

第1章 微型计算机概述 1.微型计算机的硬件系统包括___A _____。 A.控制器、运算器、存储器和输入输出设备 B.控制器、主机、键盘和显示器 C.主机、电源、CPU和输入输出 D.CPU、键盘、显示器和打印机 2.微处…...

如何在Android Glide中结合使用CenterCrop和自定义圆角变换(图片部分圆角矩形)

如何在Android Glide中结合使用CenterCrop和自定义圆角变换(图片部分圆角矩形) 在Android开发中,使用Glide加载图片时,我们经常需要对图片进行特定的处理,比如裁剪和圆角变换,特别是一些设计稿,…...

华为机考-手拍球游戏

【手拍手计算次数和总数】游戏规则:左手和右手拍球初始数为0,首先左手第一次拍球数1下,右手拍球1下,接下来左手在拍球时是上一次左手上一次右手的总和,右手也是上一次左手上一次右手拍球的总和,最后拍球总数…...

【线上问题】两台服务器的时间不一致导致jwt解析错误

目录 一、问题描述二、解决方法 一、问题描述 1.线上生产问题,本地和测试环境均无问题 2.本地和测试由于网关和登录服务均在同一台机器 3.线上的登录服务和网关部署不在一起,登录服务的时间正常,网关服务的服务器时间比实际快5秒 4.登录服务j…...

58.网游逆向分析与插件开发-游戏增加自动化助手接口-游戏菜单文字资源读取的逆向分析

内容来源于:易道云信息技术研究院VIP课 之前的内容:接管游戏的自动药水设定功能-CSDN博客 码云地址(master分支):https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号:34b9c1d43b512d0b4a3c395b…...

Vue-2、初识Vue

1、helloword小案列 代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>初始Vue</title><!--引入vue--><script type"text/javascript" src"https://cdn.jsdelivr.n…...

机器学习项目标记图像数据 - 安装LabelImg及功能介绍

什么是LabelImg&#xff1f; LabelImg 是一款流行的图像标注工具&#xff0c;主要用于计算机视觉领域。它允许用户为机器学习项目标记图像数据&#xff0c;特别是用于训练目标检测模型。 如何安装LabelImg pip install PyQt5 pip install pyqt5-tools pip install lxml pip …...

12.15 log 122.买卖股票的最佳时机 II,55. 跳跃游戏

122.买卖股票的最佳时机 II class Solution { public:int maxProfit(vector<int>& prices) {int result0;for(int i0;i<prices.size();i){if(i>0&&prices[i]-prices[i-1]>0){resultprices[i]-prices[i-1];}}return result;} }; 这道题贪心贪的时每…...

Redis - 挖矿病毒 db0 库 backup 反复出现解决方案

问题描述 腾讯云的服务器&#xff0c;使用 Docker 部署了 Redis 之后&#xff0c;发现 DB0 中总是出现 4 条 key&#xff0c;分别是 backup01backup02backup03backup04 而自己每次存入 db0 中的数据过一会就会被无缘无故删除掉。 原因分析 挖矿病毒 解决方案 在启动的时候…...

LiveGBS流媒体平台GB/T28181常见问题-国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道

LiveGBS国标GB28181中国标编号是什么设备编号和通道国标编号标记唯一的摄像头|视频|镜头通道 1、什么是国标编号&#xff1f;2、国标设备ID和通道ID3、ID 统一编码规则4、搭建GB28181视频直播平台 1、什么是国标编号&#xff1f; 国标GB28181对接过程中&#xff0c;可能有的小…...

Unity ShaderGraph 技能冷却转圈效果

Unity ShaderGraph 技能冷却转圈效果 前言项目场景布置代码编写ShaderGraph 连线总结 参考 前言 遇到一个需求&#xff0c;要展示技能冷却的圆形遮罩效果。 项目 场景布置 代码编写 Shader核心的就两句 // 将uv坐标系的原点移到纹理中心 float2 uv i.uv - float2(0.5, 0…...

C++上位软件通过Snap7开源库访问西门子S7-1200/S7-1500数据块的方法

前言 本人一直从事C上位软件开发工作较多&#xff0c;在之前的项目中通过C访问西门子PLC S7-200/S7-1200/S7-1500并进行数据交互的应用中一直使用的是ModbusTCP/ModbusRTU协议进行。Modbus上位开源库采用的LibModbus。经过实际应用发现Modbus开源库单次发送和接受的数据不能超过…...

如何正确安装Axure插件?详细步骤分享

产品经理在使用Axure导出html文件时&#xff0c;如果选择“完成后打开浏览器”&#xff0c;浏览器往往无法识别。此时&#xff0c;我们需要使用Axure官方谷歌浏览器插件直接访问浏览器中的本地html项目&#xff0c;否则我们需要上传到AxureCloud或使用软件本身的预览功能。接下…...

[SwiftUI]工程最低适配iOS13

问题&#xff1a; 新建工程&#xff0c;选择最低支持iOS13报错&#xff1a; main() is only available in iOS 14.0 or newer Scene is only available in iOS 14.0 or newer WindowGroup is only available in iOS 14.0 or newer 解决&#xff1a; 注释掉上面代码&#x…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

k8s业务程序联调工具-KtConnect

概述 原理 工具作用是建立了一个从本地到集群的单向VPN&#xff0c;根据VPN原理&#xff0c;打通两个内网必然需要借助一个公共中继节点&#xff0c;ktconnect工具巧妙的利用k8s原生的portforward能力&#xff0c;简化了建立连接的过程&#xff0c;apiserver间接起到了中继节…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!

简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求&#xff0c;并检查收到的响应。它以以下模式之一…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别

【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而&#xff0c;传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案&#xff0c;能够实现大范围覆盖并远程采集数据。尽管具备这些优势&#xf…...

FFmpeg:Windows系统小白安装及其使用

一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】&#xff0c;注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录&#xff08;即exe所在文件夹&#xff09;加入系统变量…...