当前位置: 首页 > news >正文

【入门Flink】- 09Flink水位线Watermark

窗口的处理过程中,基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

什么是水位线

用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

有序流中水位线

(1)理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线

(2)实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往
往对处理计算也没什么影响。所以为了提高效率,会每隔一段时间生成一个水位线

乱序流中水位线

分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是
所谓的“乱序数据”。

(1)乱序+数据量小:还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。乱序数据,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

(2)乱序+数据量大:考虑到大量数据同时到来的处理效率,可以周期性地生成水位线。这时
只需要保存一下之前所有数据中的最大时间截,需要插入水位线时,就直接以它作为时间戳生成新的水位线。

(3)乱序+迟到数据:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据设置迟到时间,比如2秒:也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长,否则会对实时性会有所影响】

水位线的特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,代表t之前的所 有数据都到齐了,之后流中不会出现时间t'≤t的数据

水位线与窗口配合,完成对乱序数据的正确处理

水位线是流处理中对低延迟和结果正确性的一个权衡机制。

水位线生成策略

生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。【指定水位线生成策略】

stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy 水位线策略是一个接口,里面内置一些生成策略:

image-20231111224149038

有序流中内置水位线设置

时间戳单调增长,所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 乱序数据,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

自定义水位线生成器

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
模仿该类BoundedOutOfOrdernessWatermarks

public class CustomBoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}
}

在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;方法由系统框架周期性地调用,默认 200ms 一次。【不建议修改】

 env.getConfig().setAutoWatermarkInterval(400L);

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。

如下:只要有数据来就直接发射水位线

    @Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}

(3)在数据源中发送水位线

可以在自定义的数据源中抽取事件时间,然后发送水位线。

自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一

env.fromSource(
kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), 
// WatermarkStrategy.noWatermarks() 或者不发送水位线
"kafkasource"
)

水位线的传递(空闲等待withIdleness)

一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的作为当前任务的事件时钟

如下案例:当程序并行度设置为2时,自定义分区器导致一个分区一直拿不到数据(最小时钟一直为null),此时如不加以干预,事件时钟将永远不会推进,存在问题。设置空闲时间,当超过空闲时间一直收不到该分区数据,直接忽略该分区,还是会依旧推进时间时钟

        env.setParallelism(2);// 自定义分区器:数据%分区数,只输入奇数,都只会去往map 的一个子任务SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("xxxx", 7777).partitionCustom(new MyPartitioner(), r -> r).map(Integer::parseInt).assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L).withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s);// 分成两组:奇数一组,偶数一组,开 10s 的事件时间滚动窗口socketDS.keyBy(r -> r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10)))...

迟到数据的处理

1)推迟水印推进(设置延迟时间)

水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2)设置窗口延迟关闭

Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。当达到设置延迟关闭时间之后,才会真正关闭窗口,关闭窗口后再迟到的数据就不会再处理。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

3)使用侧流接收迟到的数据

最后兜底,窗口关闭之后的迟到数据,使用侧输出流输出。

完整方案:

public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.设置迟到时间 3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L)// .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;SingleOutputStreamOperator<WaterSensor>sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("latedata", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSWithWatermark.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗.sideOutputLateData(lateTag) // 3.关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}

相关文章:

【入门Flink】- 09Flink水位线Watermark

在窗口的处理过程中&#xff0c;基于数据的时间戳&#xff0c;自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝&#xff1b;它的时间进展&#xff0c;就是靠着新到数据的时间戳来推动的。 什么是水位线 用来衡量事件时间进展的标记&#xff0c;就被称作“水位线”&#x…...

华为交换机的基本配置,看完秒懂

一、 交换机的基本配置 交换机连接方式 本地&#xff1a;计算机COM口/USB口 --> Console线 --> 交换机Console口 远程&#xff1a;Putty、SecureCRT、Xshell远程管理工具 华为VRP网络操作系统 1&#xff09;华为的视图模式 <Huawei> //用户视图&#x…...

spark与scala的对应版本查看

仓库地址 https://mvnrepository.com/artifact/org.apache.spark/spark-core 总结 spark3.0 以后&#xff0c;不再支持 scala2.11spark3.0 以后&#xff0c;只能用 scala2.12以上...

影响力|子芽首创代码疫苗技术获评“年度技术突破者”

近日&#xff0c;由业界权威机构嘶吼产业研究院主办的2023网络安全“金帽子”年度评选结果已正式公布。本届评选由网安产业监管机构及相关委办局领导坐镇、行业资深专家、头部网安企业负责人权威加持。凭借首创的专利级代码疫苗技术创新突破了数字供应链安全领域关键核心技术&a…...

还在为忘记BIOS密码担心?至少有五种方法可以重置或删除BIOS密码

忘记密码是一个我们都非常熟悉的问题。虽然在大多数情况下,只需单击“忘记密码”选项,然后按照几个简单的步骤即可恢复访问权限,但情况并非总是如此。忘记BIOS密码(通常为避免进入BIOS设置或避免个人计算机启动而设置的密码)意味着你将无法完全启动系统。 幸运的是,就像…...

部署百川大语言模型Baichuan2

Baichuan2是百川智能推出的新一代开源大语言模型&#xff0c;采用 2.6 万亿 Tokens 的高质量语料训练。在多个权威的中文、英文和多语言的通用、领域 benchmark 上取得同尺寸最佳的效果。包含有 7B、13B 的 Base 和 Chat 版本&#xff0c;并提供了 Chat 版本的 4bits 量化。 模…...

java面试常问

文章目录 java 基础1、JDK 和 JRE的区别2、 和equals的区别3、String、StringBuffer、StringBuilder4、String str “a”、 new String(“a”)一样吗&#xff1f;5、ArrayList 和 LinkedList的区别&#xff1f;6、HashMap的原理与实现6.1、容量与扩容6.2、扩容机制 7、HashMa…...

关于nginx一个域名,配置多个端口https的方法

假如我有一个域名 abc.com。在这个域名下&#xff0c;部署了两个应用&#xff0c;分别对应端口&#xff1a;8081&#xff0c;8082 想要给两个应用接口都开启https访问。 nginx配置如下&#xff1a; server { #监听443端口 listen 443 ssl;…...

IntelliJ IDEA插件开发入门实战

介绍 IntelliJ IDEA是备受赞誉的Java开发工具&#xff0c;提供了丰富的功能和工具。通过使用插件&#xff0c;可以扩展和增强这个集成开发环境。IntelliJ IDEA拥有庞大的插件生态系统&#xff0c;涵盖了代码分析、格式化工具和完整的框架等各个领域。开发人员还可以创建自己的…...

站群服务器如何选择

站群服务器如何选择 1.站群服务器线路 双线服务器在访问网站不受线路影响&#xff0c;较稳定。 2.站群服务器的稳定性 选择站群服务器的时候&#xff0c;服务器的稳定性是非常重要的。 3.站群服务器带宽大小 站群服务器网站在日常使用时&#xff0c;主要的目的是为了集中网…...

【vue】AntDV组件库中a-upload实现文件上传:

文章目录 一、文档&#xff1a;二、使用(以Jeecg为例)&#xff1a;【1】template&#xff1a;【2】script&#xff1a; 三、效果图&#xff1a; 一、文档&#xff1a; Upload 上传–Ant Design Vue 二、使用(以Jeecg为例)&#xff1a; 【1】template&#xff1a; <a-uploa…...

JSP在Scriptlet中编写java代码的形式

我们想在jsp界面中去写java代码&#xff0c;就需要将java代码写在Scriptlet中 虽然说 有这种方式 但是 目前 大部分都会不建议你往jsp中去写java代码 因为 目前都在推广前后端分离 这也是jsp使用面有没有少的原因 jsp也建议解耦 不要让你的程序耦合性太高 还是前端是前端 后端是…...

btree,hash,fulltext,Rtree索引类型区别及使用场景

当涉及到数据库索引类型的选择时&#xff0c;理解其特点和适用场景非常重要。下面是对B树、哈希索引、全文索引和R树的详细介绍&#xff0c;以及它们在不同数据场景下的使用示例&#xff1a; B树&#xff08;B-tree&#xff09;&#xff1a;特点&#xff1a;B树是一种多路搜索…...

掌握这个技巧,你也能成为资产管理高手!

资产管理是企业管理中至关重要的一环&#xff0c;涉及到对公司财务、物资和信息等各个方面的有效监控和管理。 随着企业规模的扩大和业务复杂性的增加&#xff0c;采用先进的资产管理系统成为确保企业高效运营的必要条件之一。 客户案例 医疗机构 温州某医疗机构拥有大量的医…...

前端安全策略保障

文章目录 前言后台管理系统网络安全XSSCSRFSQL注入 后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;前端系列文章 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出…...

【实施】Sentry-self-hosted部署

Sentry-self-hosted部署 介绍 Sentry 是一个开源的错误追踪&#xff08;error tracking&#xff09;平台。它主要用于监控和追踪应用程序中的错误、异常和崩溃。Sentry允许开发人员实时地收集和分析错误&#xff0c;并提供了强大的工具来排查和修复问题&#xff0c;研发最近是…...

Django多表查询

目录 一.多表查询引入 1.数据准备 2.外键的增删改查 &#xff08;1&#xff09;一对多外键的增删改查 1.1外键的增加 1.2外键的删除 1.3外键的修改 (2)多对多外键的增删改查 2.1增加 2.2删除 2.3更改 2.4清空 3.正反向概念 二.多表查询 1.子查询&#xff08;基于…...

基于Springboot的非物质文化网站(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的非物质文化网站&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 项目介…...

1亿美元投资!加拿大量子公司Photonic告别隐身状态

​&#xff08;图片来源&#xff1a;网络&#xff09; 至今加拿大量子公司Photonic总融资额已达1.4亿美元&#xff0c;将推动可扩展、容错的量子计算和网络平台的快速开发。 官宣完成1亿美元新一轮融资 Photonic总部位于加拿大不列颠哥伦比亚省温哥华市&#xff0c;是一家基…...

Allegro的引流方式有哪些?Allegro买家号测评提高店铺的权重和排名

为了让更多的人发现你的平台并提高转化率&#xff0c;正确的引流是至关重要的。那么Allegro的引流方式有哪些&#xff1f; 首先&#xff0c;对于Allegro平台来说&#xff0c;一个有效且常用的引流方式就是通过搜索引擎优化&#xff08;SEO&#xff09;。通过合理地选择关键词、…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现

目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...

设计模式和设计原则回顾

设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

连锁超市冷库节能解决方案:如何实现超市降本增效

在连锁超市冷库运营中&#xff0c;高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术&#xff0c;实现年省电费15%-60%&#xff0c;且不改动原有装备、安装快捷、…...

WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)

一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解&#xff0c;适合用作学习或写简历项目背景说明。 &#x1f9e0; 一、概念简介&#xff1a;Solidity 合约开发 Solidity 是一种专门为 以太坊&#xff08;Ethereum&#xff09;平台编写智能合约的高级编…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

CMake控制VS2022项目文件分组

我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...

【JVM面试篇】高频八股汇总——类加载和类加载器

目录 1. 讲一下类加载过程&#xff1f; 2. Java创建对象的过程&#xff1f; 3. 对象的生命周期&#xff1f; 4. 类加载器有哪些&#xff1f; 5. 双亲委派模型的作用&#xff08;好处&#xff09;&#xff1f; 6. 讲一下类的加载和双亲委派原则&#xff1f; 7. 双亲委派模…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...