当前位置: 首页 > news >正文

Flink-DataStreamAPI-生成水印

下面我们将学习Flink提供的用于处理事件时间戳和水印的API,也会介绍有关事件时间、流转时长和摄取时间,下面就让我们跟着官网来学习吧

一、水印策略介绍

为了处理事件时间,Flink需要知道事件时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过使用TimestampAssigner从元素中的某个字段访问/提取时间戳来完成的。

时间戳分配与生成水印密切相关,水印告诉系统事件时间的进度。我们可以通过指定WatermarkGenerator来配置它。

Flink API需要一个包含TimestampAssigner和WatermarkGenerator的WatermarkStrategy。WatermarkStrategy上有许多开箱即用的常用策略作为静态方法,用户也可以在需要时构建自己的策略。

以下是一个接口示例:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{/***实例化一个{@link Timestamp Assigner},用于根据此策略分配时间戳。*/@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);/*** 实例化一个水印生成器,根据此策略生成水印。*/@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

通常不会自己实现此接口,而是使用WatermarkStrategy上的静态帮助方法来实现常见的水印策略,或者将自定义TimestampAssigner与WatermarkGenerator捆绑在一起。例如,要将bounded-out-of-orderness水印和lambda函数用作时间戳赋值器,我们可以使用以下内容:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.f0);

指定TimestampAssigner是可选的,比如当使用Kafka或Kinesis时,将直接从Kafka/Kinesis记录中获取时间戳。

注意:时间戳和水印都指定为自1970-01-01T00:00:00Z的Java纪元以来的毫秒。

二、使用水印策略

Flink应用程序中有两个地方可以使用WatermarkStrategy:

        1)直接在源代码上,

        2)在非源代码操作之后。

第一个选项更可取,因为它允许源利用水印逻辑中有关分片/分区/拆分的知识。然后,源通常可以在更精细的级别跟踪水印,源生成的整体水印将更准确。直接在源上指定水印策略通常意味着我们必须使用特定于源的接口,我们我们详细看下水印策略和Kafka连接器,了解它在Kafka连接器上的工作原理,以及有关每个分区水印如何工作的更多详细信息。

第二个选项(在任意操作后设置WatermarkStrategy)仅在您不能直接在源上设置策略时才应使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>);withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) -> a.add(b) ).addSink(...);

以这种方式使用WatermarkStrategy获取一个流并生成一个带有时间戳元素和水印的新流。如果原始流已经有时间戳和/或水印,则时间戳分配程序会覆盖它们。

三、处理空闲源

如果其中一个输入拆分/分区/分片有一段时间没有携带事件,这意味着WatermarkGenerator也没有获得任何新信息来建立水印。我们称之为空闲输入或空闲源。这是一个问题,因为我们的一些分区可能仍然携带事件。在这种情况下,水印将被保留,因为它被计算为所有不同并行水印的最小值。

为了解决这个问题,我们可以使用WatermarkStrategy来检测空闲并将输入标记为空闲。WatermarkStrategy为此提供了一个方便的代码

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));

四、水印对齐

上面我知道拆分/分区/分片或源处于空闲状态并且可能会停止增加水印的情况。在光谱的另一方面,拆分/分区/分片或源可能会非常快地处理记录,从而相对更快地增加其水印。这本身不是问题。然而,对于使用水印发出一些数据的下游处理者来说,可能会成为一个问题。

在这种情况下,与空闲源相反,这种下游运算符的水印(如聚合上的窗口连接)可以继续。然而,这种运算符可能需要缓冲来自快速输入的过多数据,因为来自其所有输入的最小水印被滞后输入所抑制。因此,快速输入发出的所有记录都必须在所述下游运算符状态下进行缓冲,这可能导致运算符状态的不可控增长。

为了解决这个问题,我们可以启用水印对齐,这将确保没有源/拆分/分片/分区的水印比其他水印增加得太多。可以分别为每个源启用对齐:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));

注意:我们只能为FLIP-27源启用水印对齐

启用对齐时,我们需要告诉Flink,源应该属于哪个组。通过提供一个标签(例如对齐组-1)来做到这一点,该标签将共享它的所有源绑定在一起。此外,我们必须告诉属于该组的所有源的当前最小水印的最大漂移。第三个参数描述了当前最大水印应该更新的频率。频繁更新的缺点是会有更多的RPC消息在TM和JM之间传输。

为了实现对齐,Flink将暂停从源/任务中的消费,这会产生太远的未来水印。与此同时,它将继续从其他来源/任务中读取记录,这些记录可以向前移动组合水印,从而解除屏蔽更快的水印。

注意:从Flink 1.17开始,FLIP-27源框架支持拆分级水印对齐。源连接器必须实现一个接口来恢复和暂停拆分,以便拆分/分区/分片可以在同一个任务中对齐。

如果从1.15. x和1.16.x之间的Flink版本升级,可以通过设置管道.水印对齐来禁用拆分级别对齐。allow-unaligned-source-splits为真。此外,可以通过检查源代码是否在运行时抛出UnsupportedOperationException或读取javadocs来判断它是否支持拆分级别对齐。在这种情况下,最好禁用拆分级别水印对齐以避免致命异常。

当将标志设置为true时,只有当拆分/分片/分区的数量等于源运算符的并行性时,水印对齐才会正常工作。这会导致每个子任务都被分配一个工作单元。另一方面,如果有两个Kafka分区,它们以不同的速度产生水印并被分配给同一个任务,那么水印可能不会按预期运行。幸运的是,即使在最坏的情况下,基本对齐的性能也不会比没有对齐差。

此外,Flink还支持跨相同源和/或不同源的任务对齐,这在我们有两个不同的源(例如Kafka和File)以不同的速度生成水印时很有用。

五、编写水印生成器

TimestampAssigner是一个从事件中提取字段的简单函数,因此我们不需要详细了解它们。另一方面,WatermarkGenerator的编写有点复杂。这是WatermarkGenerator接口:

/*** WatermarkGenerator 根据事件或定期生成水印**/
@Public
public interface WatermarkGenerator<T> {/*** 为每个事件调用,允许水印生成器检查和记住事件时间戳,或根据事件本身发出水印。*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 定期调用,并且可能会发出新的水印,或者不** 调用此方法和生成水印的间隔取决于{@link ExecutionConfig#getAutoWatermarkInterval()}*/void onPeriodicEmit(WatermarkOutput output);
}

水印生成有两种不同的风格:周期性和标点

周期性生成器通常通过onEvent()观察传入的事件,然后在框架调用onperiodicEmit()时发出水印。

穿孔生成器将查看onEvent()中的事件,并等待流中携带水印信息的特殊标记事件或标点符号。当它看到这些事件之一时,它会立即发出水印。通常,标点生成器不会从onPeriodicEmit()发出水印。

接下来我们将看看如何为每种样式实现生成器。

1、编写周期性水印生成器

周期性生成器观察流事件并周期性地生成水印(可能取决于流元素,或者纯粹基于流转时长)。

生成水印的间隔(每n毫秒)是通过ExecutionConfig定义的。setAutoWatermarkInterval(…)。每次都会调用生成器的onperiodicEmit()方法,如果返回的水印非空且大于前一个水印,则会发出一个新的水印。

/*** 该生成器生成水印,假设元素到达时顺序错误,但仅在一定程度上。某个时间戳t的最新元素将在时间戳t最早元素之后最多n毫秒到达。*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 以当前最高时间戳减去无序界限的形式发出水印output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));}}/*** 此生成器生成的水印比处理时间滞后固定量。它假设元素在有界延迟后到达Flink。*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// 不需要做任何事情,因为我们需要处理时间}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}

注意:可以在每个事件上生成水印。但是,由于每个水印都会导致下游的一些计算,过多的水印会降低性能。

六、水印策略和Kafka连接器

当使用Apache Kafka作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(升序时间戳或有界无秩序)。然而,当使用来自Kafka的流时,多个分区通常会被并行使用,交错来自分区的事件并破坏每个分区的模式(这是Kafka的消费者客户端工作方式所固有的)。

在这种情况下,我们可以使用Flink的Kafka-partition-aware水印生成。使用该功能,水印在Kafka消费者内部,每个Kafka分区生成,每个分区的水印被合并,就像水印在流洗牌上合并一样。

例如,如果每个Kafka分区的事件时间戳严格升序,则使用升序时间戳水印生成器生成每个分区的水印将产生完美的整体水印。请注意,我们在示例中没有提供TimestampAssigner,而是使用Kafka记录本身的时间戳。

下面的插图展示了如何使用每个Kafka分区的水印生成,以及在这种情况下水印如何通过流数据流传播。

KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics("my-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");

七、如何处理水印

一般来说,需要在向下游转发之前去处理给定的水印。例如,WindowOperator将首先评估应触发的所有窗口,只有在产生水印触发的所有输出后,水印本身才会被发送到下游。换句话说,由于水印的出现而产生的所有元素都将在水印之前发出。

同样的规则也适用于TwoInputStreamOperator。但是,在这种情况下,运算符的当前水印定义为其两个输入的最小值。

 -------------------------------------------------------------------------------------------------------------------------------

大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

2025年人工智能、数字媒体技术与社会计算国际学术会议

https://ais.cn/u/byAVfu

第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025)

https://ais.cn/u/77FJ3u

2025人工智能与计算机网络技术国际学术会议(ICAICN 2025)

https://ais.cn/u/jUfAVz

2025年数据挖掘与项目管理国际研讨会

https://ais.cn/u/nIbMvm

相关文章:

Flink-DataStreamAPI-生成水印

下面我们将学习Flink提供的用于处理事件时间戳和水印的API&#xff0c;也会介绍有关事件时间、流转时长和摄取时间&#xff0c;下面就让我们跟着官网来学习吧 一、水印策略介绍 为了处理事件时间&#xff0c;Flink需要知道事件时间戳&#xff0c;这意味着流中的每个元素都需要…...

【单片机】ARM 处理器简介

ARM 公司简介 ARM&#xff08;Advanced RISC Machine&#xff09; 是英国 ARM 公司&#xff08;原 Acorn RISC Machine&#xff09; 开发的一种精简指令集&#xff08;RISC&#xff09; 处理器架构。ARM 处理器因其低功耗、高性能、广泛适用性&#xff0c;成为嵌入式系统、移动…...

Flutter——最详细原生交互(MethodChannel、EventChannel、BasicMessageChannel)使用教程

MethodChannel&#xff08;方法通道&#xff09; 用途&#xff1a;实现 双向通信&#xff0c;用于调用原生平台提供的 API 并获取返回结果。 场景&#xff1a;适合一次性操作&#xff0c;如调用相机、获取设备信息等。 使用步骤&#xff1a; Flutter 端&#xff1a;通过 Meth…...

Kafka常用指令(详细)

Kafka常用指令&#xff08;详细&#xff09; 启停命令 前台启动 前台启动命令 ./bin/kafka-server-start.sh config/server.properties 后台启动方式1 后台启动命令加上参数-daemon&#xff0c;窗口关闭之后kafka后台程序继续运行 ./bin/kafka-server-start.sh -daemon co…...

供应链工作效率如何提升

提升供应链工作效率可以从以下几个关键方面入手&#xff1a; 1. 优化供应链管理 数据驱动决策&#xff1a;利用AI和大数据分析&#xff0c;提高预测准确性&#xff0c;优化库存管理。供应链可视化&#xff1a;采用ERP&#xff08;企业资源计划&#xff09;和SCM&#xff08;供…...

1-001:MySQL的存储引擎有哪些?它们之间有什么区别?

MySQL 存储引擎 ├── InnoDB&#xff08;默认引擎&#xff09; │ ├── 事务支持&#xff1a;支持 ACID 和事务&#xff08;事务日志、回滚、崩溃恢复&#xff09; │ ├── 锁机制&#xff1a;支持行级锁&#xff0c;提高并发性能 │ ├── 外键支持&#xff1a;支持外键…...

Java/Kotlin逆向基础与Smali语法精解

1. 法律警示与道德边界 1.1 司法判例深度剖析 案例一&#xff1a;2021年某游戏外挂团伙刑事案 犯罪手法&#xff1a;逆向《王者荣耀》通信协议&#xff0c;修改战斗数据包 技术细节&#xff1a;Hook libil2cpp.so的SendPacket函数 量刑依据&#xff1a;非法经营罪&#xff…...

用AI学编程4——swift学习1

一个swift文件&#xff0c; 整理出swift从入门到精通的所有知识点&#xff0c; 给出注释&#xff0c; 给出这样的swift文件 以下是一个整合了 Swift 从入门到精通的主要知识点的 Swift 文件&#xff0c;包含了基础语法、面向对象编程、异常处理、集合框架、多线程、Lambda 表达…...

数据集构建与训练前准备

训练数据集目录结构与格式 作者笨蛋学法&#xff0c;先将其公式化&#xff0c;后面逐步自己进行修改&#xff0c;读者觉得看不懂可以理解成&#xff0c;由结果去推过程&#xff0c;下面的这个yaml文件就是结果&#xff0c;我们去推需要的文件夹(名字可以不固定&#xff0c;但是…...

在大型语言模型的提示词设计中,system、user和assistant三个角色的区别与联系

在大型语言模型的提示词设计中,system、user和assistant三个角色承担不同的功能,其区别与联系如下: 1. 角色定义与功能 system(系统指令) 作用:设定模型的整体行为、角色定位和任务框架。例如,“你是一位专业的科技作家”或“仅回答与医疗相关的问题”。特点:在多轮对话…...

Zabbix监控进程报警(Zabbix Monitoring Process Alarm)

zabbix监控进程占cpu、内存、磁盘RAID情况 1、cpu达到90%时报警 名称: cpu user percent gt 90% 表达式&#xff1a;{Template OS Linux:system.cpu.util[,idle].avg(1m)}<10 2、内存达到80%时报警 配置—主机(选择监控主机)—监控项—创建监控项 1、创建监控项 名称&…...

p5.js:sound(音乐)可视化,动画显示音频高低变化

本文通过4个案例介绍了使用 p5.js 进行音乐可视化的实践&#xff0c;包括将音频振幅转化为图形、生成波形图。 承上一篇&#xff1a;vite&#xff1a;初学 p5.js demo 画圆圈 cd p5-demo copy .\node_modules\p5\lib\p5.min.js . copy .\node_modules\p5\lib\addons\p5.soun…...

HAL库常用函数

一、通用函数 系统初始化&#xff1a; HAL_Init(): 初始化HAL库和系统时钟&#xff08;调用前需配置系统时钟源&#xff09;。 HAL_Delay(uint32_t Delay): 毫秒级阻塞延时&#xff08;基于SysTick定时器&#xff09;。 HAL_GetTick(): 获取系统运行时间&#xff08;毫秒计数…...

【Zinx】Day5-Part3:Zinx 的连接管理

目录 Day5-Part3&#xff1a;Zinx 的连接管理创建连接管理模块将连接管理模块集成到 Zinx 当中将 ConnManager 集成到 Server 当中在 Connection 的工厂函数中将连接添加到 ConnManagerServer 中连接数量的判断连接的删除 补充&#xff1a;连接的带缓冲发包方式补充&#xff1a…...

C语言:6.20字符型数据练习题

编写程序,输人一行数字字符(用回车结束),每个数字字符 的前后都有空格。 把这一行中的数字转换成一个整数。 例如,若输入(<CR>代表 Enter键):2 4 8 3<CR>则输出 整数:2483。 #include <stdio.h>int main() {char ch;int number 0;printf("请输入一行…...

SpringBoot Test详解

目录 spring-boot-starter-test 1、概述2、常用注解 2.1、配置类型的注解2.2、Mock类型的注解2.3、自动配置类型的注解2.4、启动测试类型的注解2.5、相似注解的区别和联系 3、SpringBootTest和Junit的使用 3.1、单元测试3.2、集成测试 4、MockMvc 4.1、简单示例4.2、自动配置4…...

CDefView::_GetPIDL函数分析之ListView_GetItem函数的参数item的item.mask 为LVIF_PARAM

CDefView::_GetPIDL函数分析之ListView_GetItem函数的参数item的item.mask 为LVIF_PARAM 第一部分&#xff1a; 1: kd> t SHELL32!CDefView::_GetPIDL: 001b:77308013 55 push ebp 1: kd> dv this 0x00000015 i 0n21 …...

Android Retrofit 框架注解定义与解析模块深度剖析(一)

一、引言 在现代 Android 和 Java 开发中&#xff0c;网络请求是不可或缺的一部分。Retrofit 作为 Square 公司开源的一款强大的类型安全的 HTTP 客户端&#xff0c;凭借其简洁易用的 API 和高效的性能&#xff0c;在开发者社区中广受欢迎。Retrofit 的核心特性之一便是通过注…...

项目上传到Gitee过程

在gitee上新建一个仓库 点击“克隆/下载”获取仓库地址 电脑上要装好git 在电脑本地文件夹右键“Git Bash Here” 依次执行如下命令 git init git remote add origin https://gitee.com/qlexcel/stm32-simple.git git pull origin master git add . git commit -m ‘init’…...

DeepSeek R1在医学领域的应用与技术分析(Discuss V1版)

DeepSeek R1作为一款高性能、低成本的国产开源大模型,正在深刻重塑医学软件工程的开发逻辑与应用场景。其技术特性,如混合专家架构(MoE)和参数高效微调(PEFT),与医疗行业的实际需求紧密结合,推动医疗AI从“技术驱动”向“场景驱动”转型。以下从具体业务领域需求出发,…...

数学之快速幂-数的幂次

题目描述 给定三个正整数 N,M,P&#xff0c;求 输入描述 第 1 行为一个整数 T&#xff0c;表示测试数据数量。 接下来的 T 行每行包含三个正整数 N,M,P。 输出描述 输出共 T 行&#xff0c;每行包含一个整数&#xff0c;表示答案。 输入输出样例 示例 1 输入 3 2 3 7 4…...

git subtree管理的仓库怎么删除子仓库

要删除通过 git subtree 管理的子仓库&#xff0c;可以按照以下步骤操作&#xff1a; 1. 确认子仓库路径 首先确认要删除的子仓库的路径&#xff0c;假设子仓库路径为 <subtree-path>。 2. 从主仓库中移除子仓库目录 使用 git rm 命令删除子仓库的目录&#xff1a; …...

学习资料电子版 免费下载的网盘网站(非常全!)

我分享一个私人收藏的电子书免费下载的网盘网站&#xff08;学习资料为主&#xff09;&#xff1a; link3.cc/sbook123 所有资料都保存在网盘了&#xff0c;直接转存即可&#xff0c;非常的便利&#xff01; 包括了少儿&#xff0c;小学&#xff0c;初中&#xff0c;中职&am…...

SpringMVC-全局异常处理

文章目录 1. 全局异常处理2. 项目异常处理方案2.1 异常分类2.2 异常解决方案2.3 异常解决方案具体实现 1. 全局异常处理 问题&#xff1a;当我们在SpingMVC代码中没有对异常进行处理时&#xff0c;三层架构的默认处理异常方案是将异常抛给上级调用者。也就是说Mapper层报错会将…...

基于Spring Boot的宠物健康顾问系统的设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…...

【Linux内核系列】:深入理解缓冲区

&#x1f525; 本文专栏&#xff1a;Linux &#x1f338;作者主页&#xff1a;努力努力再努力wz ★★★ 本文前置知识&#xff1a; 文件系统以及相关系统调用接口 输入以及输出重定向 那么在此前的学习中&#xff0c;我们了解了文件的概念以及相关的系统调用接口&#xff0c;并…...

Python开发Scikit-learn面试题及参考答案

目录 如何用 SimpleImputer 处理数据集中的缺失值? 使用 StandardScaler 对数据进行标准化的原理是什么?与 MinMaxScaler 有何区别? 如何用 OneHotEncoder 对类别型特征进行编码? 解释特征选择中 SelectKBest 与 VarianceThreshold 的应用场景。 如何通过 PolynomialFe…...

~(取反)在算法竞赛中的常见用法和注意事项

在算法竞赛中&#xff0c;取反符号 ~ 主要用于按位取反操作&#xff0c;其功能是对整数的二进制表示逐位取反&#xff08;0 变 1&#xff0c;1 变 0&#xff09;。以下是 ~ 在算法竞赛中的常见用法和注意事项&#xff1a; 1. 按位取反的基本用法 ~ 对整数的二进制表示进行取反…...

C++ MySQL 常用接口(基于 MySQL Connector/C++)

C MySQL 常用接口&#xff08;基于 MySQL Connector/C&#xff09; 1. 数据库连接 接口&#xff1a; sql::mysql::MySQL_Driver *driver; sql::Connection *con;作用&#xff1a; 用于创建 MySQL 连接对象。 示例&#xff1a; driver sql::mysql::get_mysql_driver_insta…...

本地部署 OpenManus 保姆级教程(Windows 版)

一、环境搭建 我的电脑是Windows 10版本&#xff0c;其他的没尝试&#xff0c;如果大家系统和我的不一致&#xff0c;请自行判断&#xff0c;基本上没什么大的出入啊。 openManus的Git地址&#xff1a;https://github.com/mannaandpoem/OpenManus 根据官网的两种安装推荐方式如…...