当前位置: 首页 > news >正文

Flink-DataStreamAPI-生成水印

下面我们将学习Flink提供的用于处理事件时间戳和水印的API,也会介绍有关事件时间、流转时长和摄取时间,下面就让我们跟着官网来学习吧

一、水印策略介绍

为了处理事件时间,Flink需要知道事件时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过使用TimestampAssigner从元素中的某个字段访问/提取时间戳来完成的。

时间戳分配与生成水印密切相关,水印告诉系统事件时间的进度。我们可以通过指定WatermarkGenerator来配置它。

Flink API需要一个包含TimestampAssigner和WatermarkGenerator的WatermarkStrategy。WatermarkStrategy上有许多开箱即用的常用策略作为静态方法,用户也可以在需要时构建自己的策略。

以下是一个接口示例:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{/***实例化一个{@link Timestamp Assigner},用于根据此策略分配时间戳。*/@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);/*** 实例化一个水印生成器,根据此策略生成水印。*/@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

通常不会自己实现此接口,而是使用WatermarkStrategy上的静态帮助方法来实现常见的水印策略,或者将自定义TimestampAssigner与WatermarkGenerator捆绑在一起。例如,要将bounded-out-of-orderness水印和lambda函数用作时间戳赋值器,我们可以使用以下内容:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.f0);

指定TimestampAssigner是可选的,比如当使用Kafka或Kinesis时,将直接从Kafka/Kinesis记录中获取时间戳。

注意:时间戳和水印都指定为自1970-01-01T00:00:00Z的Java纪元以来的毫秒。

二、使用水印策略

Flink应用程序中有两个地方可以使用WatermarkStrategy:

        1)直接在源代码上,

        2)在非源代码操作之后。

第一个选项更可取,因为它允许源利用水印逻辑中有关分片/分区/拆分的知识。然后,源通常可以在更精细的级别跟踪水印,源生成的整体水印将更准确。直接在源上指定水印策略通常意味着我们必须使用特定于源的接口,我们我们详细看下水印策略和Kafka连接器,了解它在Kafka连接器上的工作原理,以及有关每个分区水印如何工作的更多详细信息。

第二个选项(在任意操作后设置WatermarkStrategy)仅在您不能直接在源上设置策略时才应使用:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>);withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) -> a.add(b) ).addSink(...);

以这种方式使用WatermarkStrategy获取一个流并生成一个带有时间戳元素和水印的新流。如果原始流已经有时间戳和/或水印,则时间戳分配程序会覆盖它们。

三、处理空闲源

如果其中一个输入拆分/分区/分片有一段时间没有携带事件,这意味着WatermarkGenerator也没有获得任何新信息来建立水印。我们称之为空闲输入或空闲源。这是一个问题,因为我们的一些分区可能仍然携带事件。在这种情况下,水印将被保留,因为它被计算为所有不同并行水印的最小值。

为了解决这个问题,我们可以使用WatermarkStrategy来检测空闲并将输入标记为空闲。WatermarkStrategy为此提供了一个方便的代码

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));

四、水印对齐

上面我知道拆分/分区/分片或源处于空闲状态并且可能会停止增加水印的情况。在光谱的另一方面,拆分/分区/分片或源可能会非常快地处理记录,从而相对更快地增加其水印。这本身不是问题。然而,对于使用水印发出一些数据的下游处理者来说,可能会成为一个问题。

在这种情况下,与空闲源相反,这种下游运算符的水印(如聚合上的窗口连接)可以继续。然而,这种运算符可能需要缓冲来自快速输入的过多数据,因为来自其所有输入的最小水印被滞后输入所抑制。因此,快速输入发出的所有记录都必须在所述下游运算符状态下进行缓冲,这可能导致运算符状态的不可控增长。

为了解决这个问题,我们可以启用水印对齐,这将确保没有源/拆分/分片/分区的水印比其他水印增加得太多。可以分别为每个源启用对齐:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));

注意:我们只能为FLIP-27源启用水印对齐

启用对齐时,我们需要告诉Flink,源应该属于哪个组。通过提供一个标签(例如对齐组-1)来做到这一点,该标签将共享它的所有源绑定在一起。此外,我们必须告诉属于该组的所有源的当前最小水印的最大漂移。第三个参数描述了当前最大水印应该更新的频率。频繁更新的缺点是会有更多的RPC消息在TM和JM之间传输。

为了实现对齐,Flink将暂停从源/任务中的消费,这会产生太远的未来水印。与此同时,它将继续从其他来源/任务中读取记录,这些记录可以向前移动组合水印,从而解除屏蔽更快的水印。

注意:从Flink 1.17开始,FLIP-27源框架支持拆分级水印对齐。源连接器必须实现一个接口来恢复和暂停拆分,以便拆分/分区/分片可以在同一个任务中对齐。

如果从1.15. x和1.16.x之间的Flink版本升级,可以通过设置管道.水印对齐来禁用拆分级别对齐。allow-unaligned-source-splits为真。此外,可以通过检查源代码是否在运行时抛出UnsupportedOperationException或读取javadocs来判断它是否支持拆分级别对齐。在这种情况下,最好禁用拆分级别水印对齐以避免致命异常。

当将标志设置为true时,只有当拆分/分片/分区的数量等于源运算符的并行性时,水印对齐才会正常工作。这会导致每个子任务都被分配一个工作单元。另一方面,如果有两个Kafka分区,它们以不同的速度产生水印并被分配给同一个任务,那么水印可能不会按预期运行。幸运的是,即使在最坏的情况下,基本对齐的性能也不会比没有对齐差。

此外,Flink还支持跨相同源和/或不同源的任务对齐,这在我们有两个不同的源(例如Kafka和File)以不同的速度生成水印时很有用。

五、编写水印生成器

TimestampAssigner是一个从事件中提取字段的简单函数,因此我们不需要详细了解它们。另一方面,WatermarkGenerator的编写有点复杂。这是WatermarkGenerator接口:

/*** WatermarkGenerator 根据事件或定期生成水印**/
@Public
public interface WatermarkGenerator<T> {/*** 为每个事件调用,允许水印生成器检查和记住事件时间戳,或根据事件本身发出水印。*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 定期调用,并且可能会发出新的水印,或者不** 调用此方法和生成水印的间隔取决于{@link ExecutionConfig#getAutoWatermarkInterval()}*/void onPeriodicEmit(WatermarkOutput output);
}

水印生成有两种不同的风格:周期性和标点

周期性生成器通常通过onEvent()观察传入的事件,然后在框架调用onperiodicEmit()时发出水印。

穿孔生成器将查看onEvent()中的事件,并等待流中携带水印信息的特殊标记事件或标点符号。当它看到这些事件之一时,它会立即发出水印。通常,标点生成器不会从onPeriodicEmit()发出水印。

接下来我们将看看如何为每种样式实现生成器。

1、编写周期性水印生成器

周期性生成器观察流事件并周期性地生成水印(可能取决于流元素,或者纯粹基于流转时长)。

生成水印的间隔(每n毫秒)是通过ExecutionConfig定义的。setAutoWatermarkInterval(…)。每次都会调用生成器的onperiodicEmit()方法,如果返回的水印非空且大于前一个水印,则会发出一个新的水印。

/*** 该生成器生成水印,假设元素到达时顺序错误,但仅在一定程度上。某个时间戳t的最新元素将在时间戳t最早元素之后最多n毫秒到达。*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 以当前最高时间戳减去无序界限的形式发出水印output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));}}/*** 此生成器生成的水印比处理时间滞后固定量。它假设元素在有界延迟后到达Flink。*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// 不需要做任何事情,因为我们需要处理时间}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}

注意:可以在每个事件上生成水印。但是,由于每个水印都会导致下游的一些计算,过多的水印会降低性能。

六、水印策略和Kafka连接器

当使用Apache Kafka作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(升序时间戳或有界无秩序)。然而,当使用来自Kafka的流时,多个分区通常会被并行使用,交错来自分区的事件并破坏每个分区的模式(这是Kafka的消费者客户端工作方式所固有的)。

在这种情况下,我们可以使用Flink的Kafka-partition-aware水印生成。使用该功能,水印在Kafka消费者内部,每个Kafka分区生成,每个分区的水印被合并,就像水印在流洗牌上合并一样。

例如,如果每个Kafka分区的事件时间戳严格升序,则使用升序时间戳水印生成器生成每个分区的水印将产生完美的整体水印。请注意,我们在示例中没有提供TimestampAssigner,而是使用Kafka记录本身的时间戳。

下面的插图展示了如何使用每个Kafka分区的水印生成,以及在这种情况下水印如何通过流数据流传播。

KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics("my-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "mySource");

七、如何处理水印

一般来说,需要在向下游转发之前去处理给定的水印。例如,WindowOperator将首先评估应触发的所有窗口,只有在产生水印触发的所有输出后,水印本身才会被发送到下游。换句话说,由于水印的出现而产生的所有元素都将在水印之前发出。

同样的规则也适用于TwoInputStreamOperator。但是,在这种情况下,运算符的当前水印定义为其两个输入的最小值。

 -------------------------------------------------------------------------------------------------------------------------------

大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:

2025年人工智能、数字媒体技术与社会计算国际学术会议

https://ais.cn/u/byAVfu

第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025)

https://ais.cn/u/77FJ3u

2025人工智能与计算机网络技术国际学术会议(ICAICN 2025)

https://ais.cn/u/jUfAVz

2025年数据挖掘与项目管理国际研讨会

https://ais.cn/u/nIbMvm

相关文章:

Flink-DataStreamAPI-生成水印

下面我们将学习Flink提供的用于处理事件时间戳和水印的API&#xff0c;也会介绍有关事件时间、流转时长和摄取时间&#xff0c;下面就让我们跟着官网来学习吧 一、水印策略介绍 为了处理事件时间&#xff0c;Flink需要知道事件时间戳&#xff0c;这意味着流中的每个元素都需要…...

【单片机】ARM 处理器简介

ARM 公司简介 ARM&#xff08;Advanced RISC Machine&#xff09; 是英国 ARM 公司&#xff08;原 Acorn RISC Machine&#xff09; 开发的一种精简指令集&#xff08;RISC&#xff09; 处理器架构。ARM 处理器因其低功耗、高性能、广泛适用性&#xff0c;成为嵌入式系统、移动…...

Flutter——最详细原生交互(MethodChannel、EventChannel、BasicMessageChannel)使用教程

MethodChannel&#xff08;方法通道&#xff09; 用途&#xff1a;实现 双向通信&#xff0c;用于调用原生平台提供的 API 并获取返回结果。 场景&#xff1a;适合一次性操作&#xff0c;如调用相机、获取设备信息等。 使用步骤&#xff1a; Flutter 端&#xff1a;通过 Meth…...

Kafka常用指令(详细)

Kafka常用指令&#xff08;详细&#xff09; 启停命令 前台启动 前台启动命令 ./bin/kafka-server-start.sh config/server.properties 后台启动方式1 后台启动命令加上参数-daemon&#xff0c;窗口关闭之后kafka后台程序继续运行 ./bin/kafka-server-start.sh -daemon co…...

供应链工作效率如何提升

提升供应链工作效率可以从以下几个关键方面入手&#xff1a; 1. 优化供应链管理 数据驱动决策&#xff1a;利用AI和大数据分析&#xff0c;提高预测准确性&#xff0c;优化库存管理。供应链可视化&#xff1a;采用ERP&#xff08;企业资源计划&#xff09;和SCM&#xff08;供…...

1-001:MySQL的存储引擎有哪些?它们之间有什么区别?

MySQL 存储引擎 ├── InnoDB&#xff08;默认引擎&#xff09; │ ├── 事务支持&#xff1a;支持 ACID 和事务&#xff08;事务日志、回滚、崩溃恢复&#xff09; │ ├── 锁机制&#xff1a;支持行级锁&#xff0c;提高并发性能 │ ├── 外键支持&#xff1a;支持外键…...

Java/Kotlin逆向基础与Smali语法精解

1. 法律警示与道德边界 1.1 司法判例深度剖析 案例一&#xff1a;2021年某游戏外挂团伙刑事案 犯罪手法&#xff1a;逆向《王者荣耀》通信协议&#xff0c;修改战斗数据包 技术细节&#xff1a;Hook libil2cpp.so的SendPacket函数 量刑依据&#xff1a;非法经营罪&#xff…...

用AI学编程4——swift学习1

一个swift文件&#xff0c; 整理出swift从入门到精通的所有知识点&#xff0c; 给出注释&#xff0c; 给出这样的swift文件 以下是一个整合了 Swift 从入门到精通的主要知识点的 Swift 文件&#xff0c;包含了基础语法、面向对象编程、异常处理、集合框架、多线程、Lambda 表达…...

数据集构建与训练前准备

训练数据集目录结构与格式 作者笨蛋学法&#xff0c;先将其公式化&#xff0c;后面逐步自己进行修改&#xff0c;读者觉得看不懂可以理解成&#xff0c;由结果去推过程&#xff0c;下面的这个yaml文件就是结果&#xff0c;我们去推需要的文件夹(名字可以不固定&#xff0c;但是…...

在大型语言模型的提示词设计中,system、user和assistant三个角色的区别与联系

在大型语言模型的提示词设计中,system、user和assistant三个角色承担不同的功能,其区别与联系如下: 1. 角色定义与功能 system(系统指令) 作用:设定模型的整体行为、角色定位和任务框架。例如,“你是一位专业的科技作家”或“仅回答与医疗相关的问题”。特点:在多轮对话…...

Zabbix监控进程报警(Zabbix Monitoring Process Alarm)

zabbix监控进程占cpu、内存、磁盘RAID情况 1、cpu达到90%时报警 名称: cpu user percent gt 90% 表达式&#xff1a;{Template OS Linux:system.cpu.util[,idle].avg(1m)}<10 2、内存达到80%时报警 配置—主机(选择监控主机)—监控项—创建监控项 1、创建监控项 名称&…...

p5.js:sound(音乐)可视化,动画显示音频高低变化

本文通过4个案例介绍了使用 p5.js 进行音乐可视化的实践&#xff0c;包括将音频振幅转化为图形、生成波形图。 承上一篇&#xff1a;vite&#xff1a;初学 p5.js demo 画圆圈 cd p5-demo copy .\node_modules\p5\lib\p5.min.js . copy .\node_modules\p5\lib\addons\p5.soun…...

HAL库常用函数

一、通用函数 系统初始化&#xff1a; HAL_Init(): 初始化HAL库和系统时钟&#xff08;调用前需配置系统时钟源&#xff09;。 HAL_Delay(uint32_t Delay): 毫秒级阻塞延时&#xff08;基于SysTick定时器&#xff09;。 HAL_GetTick(): 获取系统运行时间&#xff08;毫秒计数…...

【Zinx】Day5-Part3:Zinx 的连接管理

目录 Day5-Part3&#xff1a;Zinx 的连接管理创建连接管理模块将连接管理模块集成到 Zinx 当中将 ConnManager 集成到 Server 当中在 Connection 的工厂函数中将连接添加到 ConnManagerServer 中连接数量的判断连接的删除 补充&#xff1a;连接的带缓冲发包方式补充&#xff1a…...

C语言:6.20字符型数据练习题

编写程序,输人一行数字字符(用回车结束),每个数字字符 的前后都有空格。 把这一行中的数字转换成一个整数。 例如,若输入(<CR>代表 Enter键):2 4 8 3<CR>则输出 整数:2483。 #include <stdio.h>int main() {char ch;int number 0;printf("请输入一行…...

SpringBoot Test详解

目录 spring-boot-starter-test 1、概述2、常用注解 2.1、配置类型的注解2.2、Mock类型的注解2.3、自动配置类型的注解2.4、启动测试类型的注解2.5、相似注解的区别和联系 3、SpringBootTest和Junit的使用 3.1、单元测试3.2、集成测试 4、MockMvc 4.1、简单示例4.2、自动配置4…...

CDefView::_GetPIDL函数分析之ListView_GetItem函数的参数item的item.mask 为LVIF_PARAM

CDefView::_GetPIDL函数分析之ListView_GetItem函数的参数item的item.mask 为LVIF_PARAM 第一部分&#xff1a; 1: kd> t SHELL32!CDefView::_GetPIDL: 001b:77308013 55 push ebp 1: kd> dv this 0x00000015 i 0n21 …...

Android Retrofit 框架注解定义与解析模块深度剖析(一)

一、引言 在现代 Android 和 Java 开发中&#xff0c;网络请求是不可或缺的一部分。Retrofit 作为 Square 公司开源的一款强大的类型安全的 HTTP 客户端&#xff0c;凭借其简洁易用的 API 和高效的性能&#xff0c;在开发者社区中广受欢迎。Retrofit 的核心特性之一便是通过注…...

项目上传到Gitee过程

在gitee上新建一个仓库 点击“克隆/下载”获取仓库地址 电脑上要装好git 在电脑本地文件夹右键“Git Bash Here” 依次执行如下命令 git init git remote add origin https://gitee.com/qlexcel/stm32-simple.git git pull origin master git add . git commit -m ‘init’…...

DeepSeek R1在医学领域的应用与技术分析(Discuss V1版)

DeepSeek R1作为一款高性能、低成本的国产开源大模型,正在深刻重塑医学软件工程的开发逻辑与应用场景。其技术特性,如混合专家架构(MoE)和参数高效微调(PEFT),与医疗行业的实际需求紧密结合,推动医疗AI从“技术驱动”向“场景驱动”转型。以下从具体业务领域需求出发,…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...

【Go语言基础【13】】函数、闭包、方法

文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数&#xff08;函数作为参数、返回值&#xff09; 三、匿名函数与闭包1. 匿名函数&#xff08;Lambda函…...

现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?

现有的 Redis 分布式锁库&#xff08;如 Redisson&#xff09;相比于开发者自己基于 Redis 命令&#xff08;如 SETNX, EXPIRE, DEL&#xff09;手动实现分布式锁&#xff0c;提供了巨大的便利性和健壮性。主要体现在以下几个方面&#xff1a; 原子性保证 (Atomicity)&#xff…...

MySQL 主从同步异常处理

阅读原文&#xff1a;https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主&#xff0c;遇到的这个错误&#xff1a; Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一&#xff0c;通常表示&#xff…...

Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、Spring MVC与MyBatis技术解析 一、第一轮基础概念问题 1. Spring框架的核心容器是什么&#xff1f;它的作用是什么&#xff1f; Spring框架的核心容器是IoC&#xff08;控制反转&#xff09;容器。它的主要作用是管理对…...