当前位置: 首页 > news >正文

【入门Flink】- 09Flink水位线Watermark

窗口的处理过程中,基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

什么是水位线

用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

有序流中水位线

(1)理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线

(2)实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往
往对处理计算也没什么影响。所以为了提高效率,会每隔一段时间生成一个水位线

乱序流中水位线

分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是
所谓的“乱序数据”。

(1)乱序+数据量小:还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。乱序数据,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

(2)乱序+数据量大:考虑到大量数据同时到来的处理效率,可以周期性地生成水位线。这时
只需要保存一下之前所有数据中的最大时间截,需要插入水位线时,就直接以它作为时间戳生成新的水位线。

(3)乱序+迟到数据:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据设置迟到时间,比如2秒:也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长,否则会对实时性会有所影响】

水位线的特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,代表t之前的所 有数据都到齐了,之后流中不会出现时间t'≤t的数据

水位线与窗口配合,完成对乱序数据的正确处理

水位线是流处理中对低延迟和结果正确性的一个权衡机制。

水位线生成策略

生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。【指定水位线生成策略】

stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy 水位线策略是一个接口,里面内置一些生成策略:

image-20231111224149038

有序流中内置水位线设置

时间戳单调增长,所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 乱序数据,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

自定义水位线生成器

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
模仿该类BoundedOutOfOrdernessWatermarks

public class CustomBoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}
}

在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;方法由系统框架周期性地调用,默认 200ms 一次。【不建议修改】

 env.getConfig().setAutoWatermarkInterval(400L);

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。

如下:只要有数据来就直接发射水位线

    @Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}

(3)在数据源中发送水位线

可以在自定义的数据源中抽取事件时间,然后发送水位线。

自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一

env.fromSource(
kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), 
// WatermarkStrategy.noWatermarks() 或者不发送水位线
"kafkasource"
)

水位线的传递(空闲等待withIdleness)

一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的作为当前任务的事件时钟

如下案例:当程序并行度设置为2时,自定义分区器导致一个分区一直拿不到数据(最小时钟一直为null),此时如不加以干预,事件时钟将永远不会推进,存在问题。设置空闲时间,当超过空闲时间一直收不到该分区数据,直接忽略该分区,还是会依旧推进时间时钟

        env.setParallelism(2);// 自定义分区器:数据%分区数,只输入奇数,都只会去往map 的一个子任务SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("xxxx", 7777).partitionCustom(new MyPartitioner(), r -> r).map(Integer::parseInt).assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L).withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s);// 分成两组:奇数一组,偶数一组,开 10s 的事件时间滚动窗口socketDS.keyBy(r -> r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10)))...

迟到数据的处理

1)推迟水印推进(设置延迟时间)

水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2)设置窗口延迟关闭

Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。当达到设置延迟关闭时间之后,才会真正关闭窗口,关闭窗口后再迟到的数据就不会再处理。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

3)使用侧流接收迟到的数据

最后兜底,窗口关闭之后的迟到数据,使用侧输出流输出。

完整方案:

public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.设置迟到时间 3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L)// .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;SingleOutputStreamOperator<WaterSensor>sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("latedata", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSWithWatermark.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗.sideOutputLateData(lateTag) // 3.关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}

相关文章:

【入门Flink】- 09Flink水位线Watermark

在窗口的处理过程中&#xff0c;基于数据的时间戳&#xff0c;自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝&#xff1b;它的时间进展&#xff0c;就是靠着新到数据的时间戳来推动的。 什么是水位线 用来衡量事件时间进展的标记&#xff0c;就被称作“水位线”&#x…...

华为交换机的基本配置,看完秒懂

一、 交换机的基本配置 交换机连接方式 本地&#xff1a;计算机COM口/USB口 --> Console线 --> 交换机Console口 远程&#xff1a;Putty、SecureCRT、Xshell远程管理工具 华为VRP网络操作系统 1&#xff09;华为的视图模式 <Huawei> //用户视图&#x…...

spark与scala的对应版本查看

仓库地址 https://mvnrepository.com/artifact/org.apache.spark/spark-core 总结 spark3.0 以后&#xff0c;不再支持 scala2.11spark3.0 以后&#xff0c;只能用 scala2.12以上...

影响力|子芽首创代码疫苗技术获评“年度技术突破者”

近日&#xff0c;由业界权威机构嘶吼产业研究院主办的2023网络安全“金帽子”年度评选结果已正式公布。本届评选由网安产业监管机构及相关委办局领导坐镇、行业资深专家、头部网安企业负责人权威加持。凭借首创的专利级代码疫苗技术创新突破了数字供应链安全领域关键核心技术&a…...

还在为忘记BIOS密码担心?至少有五种方法可以重置或删除BIOS密码

忘记密码是一个我们都非常熟悉的问题。虽然在大多数情况下,只需单击“忘记密码”选项,然后按照几个简单的步骤即可恢复访问权限,但情况并非总是如此。忘记BIOS密码(通常为避免进入BIOS设置或避免个人计算机启动而设置的密码)意味着你将无法完全启动系统。 幸运的是,就像…...

部署百川大语言模型Baichuan2

Baichuan2是百川智能推出的新一代开源大语言模型&#xff0c;采用 2.6 万亿 Tokens 的高质量语料训练。在多个权威的中文、英文和多语言的通用、领域 benchmark 上取得同尺寸最佳的效果。包含有 7B、13B 的 Base 和 Chat 版本&#xff0c;并提供了 Chat 版本的 4bits 量化。 模…...

java面试常问

文章目录 java 基础1、JDK 和 JRE的区别2、 和equals的区别3、String、StringBuffer、StringBuilder4、String str “a”、 new String(“a”)一样吗&#xff1f;5、ArrayList 和 LinkedList的区别&#xff1f;6、HashMap的原理与实现6.1、容量与扩容6.2、扩容机制 7、HashMa…...

关于nginx一个域名,配置多个端口https的方法

假如我有一个域名 abc.com。在这个域名下&#xff0c;部署了两个应用&#xff0c;分别对应端口&#xff1a;8081&#xff0c;8082 想要给两个应用接口都开启https访问。 nginx配置如下&#xff1a; server { #监听443端口 listen 443 ssl;…...

IntelliJ IDEA插件开发入门实战

介绍 IntelliJ IDEA是备受赞誉的Java开发工具&#xff0c;提供了丰富的功能和工具。通过使用插件&#xff0c;可以扩展和增强这个集成开发环境。IntelliJ IDEA拥有庞大的插件生态系统&#xff0c;涵盖了代码分析、格式化工具和完整的框架等各个领域。开发人员还可以创建自己的…...

站群服务器如何选择

站群服务器如何选择 1.站群服务器线路 双线服务器在访问网站不受线路影响&#xff0c;较稳定。 2.站群服务器的稳定性 选择站群服务器的时候&#xff0c;服务器的稳定性是非常重要的。 3.站群服务器带宽大小 站群服务器网站在日常使用时&#xff0c;主要的目的是为了集中网…...

【vue】AntDV组件库中a-upload实现文件上传:

文章目录 一、文档&#xff1a;二、使用(以Jeecg为例)&#xff1a;【1】template&#xff1a;【2】script&#xff1a; 三、效果图&#xff1a; 一、文档&#xff1a; Upload 上传–Ant Design Vue 二、使用(以Jeecg为例)&#xff1a; 【1】template&#xff1a; <a-uploa…...

JSP在Scriptlet中编写java代码的形式

我们想在jsp界面中去写java代码&#xff0c;就需要将java代码写在Scriptlet中 虽然说 有这种方式 但是 目前 大部分都会不建议你往jsp中去写java代码 因为 目前都在推广前后端分离 这也是jsp使用面有没有少的原因 jsp也建议解耦 不要让你的程序耦合性太高 还是前端是前端 后端是…...

btree,hash,fulltext,Rtree索引类型区别及使用场景

当涉及到数据库索引类型的选择时&#xff0c;理解其特点和适用场景非常重要。下面是对B树、哈希索引、全文索引和R树的详细介绍&#xff0c;以及它们在不同数据场景下的使用示例&#xff1a; B树&#xff08;B-tree&#xff09;&#xff1a;特点&#xff1a;B树是一种多路搜索…...

掌握这个技巧,你也能成为资产管理高手!

资产管理是企业管理中至关重要的一环&#xff0c;涉及到对公司财务、物资和信息等各个方面的有效监控和管理。 随着企业规模的扩大和业务复杂性的增加&#xff0c;采用先进的资产管理系统成为确保企业高效运营的必要条件之一。 客户案例 医疗机构 温州某医疗机构拥有大量的医…...

前端安全策略保障

文章目录 前言后台管理系统网络安全XSSCSRFSQL注入 后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;前端系列文章 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出…...

【实施】Sentry-self-hosted部署

Sentry-self-hosted部署 介绍 Sentry 是一个开源的错误追踪&#xff08;error tracking&#xff09;平台。它主要用于监控和追踪应用程序中的错误、异常和崩溃。Sentry允许开发人员实时地收集和分析错误&#xff0c;并提供了强大的工具来排查和修复问题&#xff0c;研发最近是…...

Django多表查询

目录 一.多表查询引入 1.数据准备 2.外键的增删改查 &#xff08;1&#xff09;一对多外键的增删改查 1.1外键的增加 1.2外键的删除 1.3外键的修改 (2)多对多外键的增删改查 2.1增加 2.2删除 2.3更改 2.4清空 3.正反向概念 二.多表查询 1.子查询&#xff08;基于…...

基于Springboot的非物质文化网站(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的非物质文化网站&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 项目介…...

1亿美元投资!加拿大量子公司Photonic告别隐身状态

​&#xff08;图片来源&#xff1a;网络&#xff09; 至今加拿大量子公司Photonic总融资额已达1.4亿美元&#xff0c;将推动可扩展、容错的量子计算和网络平台的快速开发。 官宣完成1亿美元新一轮融资 Photonic总部位于加拿大不列颠哥伦比亚省温哥华市&#xff0c;是一家基…...

Allegro的引流方式有哪些?Allegro买家号测评提高店铺的权重和排名

为了让更多的人发现你的平台并提高转化率&#xff0c;正确的引流是至关重要的。那么Allegro的引流方式有哪些&#xff1f; 首先&#xff0c;对于Allegro平台来说&#xff0c;一个有效且常用的引流方式就是通过搜索引擎优化&#xff08;SEO&#xff09;。通过合理地选择关键词、…...

后进先出(LIFO)详解

LIFO 是 Last In, First Out 的缩写&#xff0c;中文译为后进先出。这是一种数据结构的工作原则&#xff0c;类似于一摞盘子或一叠书本&#xff1a; 最后放进去的元素最先出来 -想象往筒状容器里放盘子&#xff1a; &#xff08;1&#xff09;你放进的最后一个盘子&#xff08…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...