当前位置: 首页 > news >正文

【入门Flink】- 09Flink水位线Watermark

窗口的处理过程中,基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

什么是水位线

用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

有序流中水位线

(1)理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线

(2)实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往
往对处理计算也没什么影响。所以为了提高效率,会每隔一段时间生成一个水位线

乱序流中水位线

分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是
所谓的“乱序数据”。

(1)乱序+数据量小:还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。乱序数据,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

(2)乱序+数据量大:考虑到大量数据同时到来的处理效率,可以周期性地生成水位线。这时
只需要保存一下之前所有数据中的最大时间截,需要插入水位线时,就直接以它作为时间戳生成新的水位线。

(3)乱序+迟到数据:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据设置迟到时间,比如2秒:也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长,否则会对实时性会有所影响】

水位线的特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,代表t之前的所 有数据都到齐了,之后流中不会出现时间t'≤t的数据

水位线与窗口配合,完成对乱序数据的正确处理

水位线是流处理中对低延迟和结果正确性的一个权衡机制。

水位线生成策略

生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。【指定水位线生成策略】

stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy 水位线策略是一个接口,里面内置一些生成策略:

image-20231111224149038

有序流中内置水位线设置

时间戳单调增长,所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 乱序数据,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

自定义水位线生成器

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
模仿该类BoundedOutOfOrdernessWatermarks

public class CustomBoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}
}

在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;方法由系统框架周期性地调用,默认 200ms 一次。【不建议修改】

 env.getConfig().setAutoWatermarkInterval(400L);

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。

如下:只要有数据来就直接发射水位线

    @Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}

(3)在数据源中发送水位线

可以在自定义的数据源中抽取事件时间,然后发送水位线。

自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一

env.fromSource(
kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), 
// WatermarkStrategy.noWatermarks() 或者不发送水位线
"kafkasource"
)

水位线的传递(空闲等待withIdleness)

一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的作为当前任务的事件时钟

如下案例:当程序并行度设置为2时,自定义分区器导致一个分区一直拿不到数据(最小时钟一直为null),此时如不加以干预,事件时钟将永远不会推进,存在问题。设置空闲时间,当超过空闲时间一直收不到该分区数据,直接忽略该分区,还是会依旧推进时间时钟

        env.setParallelism(2);// 自定义分区器:数据%分区数,只输入奇数,都只会去往map 的一个子任务SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("xxxx", 7777).partitionCustom(new MyPartitioner(), r -> r).map(Integer::parseInt).assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L).withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s);// 分成两组:奇数一组,偶数一组,开 10s 的事件时间滚动窗口socketDS.keyBy(r -> r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10)))...

迟到数据的处理

1)推迟水印推进(设置延迟时间)

水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2)设置窗口延迟关闭

Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。当达到设置延迟关闭时间之后,才会真正关闭窗口,关闭窗口后再迟到的数据就不会再处理。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

3)使用侧流接收迟到的数据

最后兜底,窗口关闭之后的迟到数据,使用侧输出流输出。

完整方案:

public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.设置迟到时间 3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L)// .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;SingleOutputStreamOperator<WaterSensor>sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("latedata", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSWithWatermark.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗.sideOutputLateData(lateTag) // 3.关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}

相关文章:

【入门Flink】- 09Flink水位线Watermark

在窗口的处理过程中&#xff0c;基于数据的时间戳&#xff0c;自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝&#xff1b;它的时间进展&#xff0c;就是靠着新到数据的时间戳来推动的。 什么是水位线 用来衡量事件时间进展的标记&#xff0c;就被称作“水位线”&#x…...

华为交换机的基本配置,看完秒懂

一、 交换机的基本配置 交换机连接方式 本地&#xff1a;计算机COM口/USB口 --> Console线 --> 交换机Console口 远程&#xff1a;Putty、SecureCRT、Xshell远程管理工具 华为VRP网络操作系统 1&#xff09;华为的视图模式 <Huawei> //用户视图&#x…...

spark与scala的对应版本查看

仓库地址 https://mvnrepository.com/artifact/org.apache.spark/spark-core 总结 spark3.0 以后&#xff0c;不再支持 scala2.11spark3.0 以后&#xff0c;只能用 scala2.12以上...

影响力|子芽首创代码疫苗技术获评“年度技术突破者”

近日&#xff0c;由业界权威机构嘶吼产业研究院主办的2023网络安全“金帽子”年度评选结果已正式公布。本届评选由网安产业监管机构及相关委办局领导坐镇、行业资深专家、头部网安企业负责人权威加持。凭借首创的专利级代码疫苗技术创新突破了数字供应链安全领域关键核心技术&a…...

还在为忘记BIOS密码担心?至少有五种方法可以重置或删除BIOS密码

忘记密码是一个我们都非常熟悉的问题。虽然在大多数情况下,只需单击“忘记密码”选项,然后按照几个简单的步骤即可恢复访问权限,但情况并非总是如此。忘记BIOS密码(通常为避免进入BIOS设置或避免个人计算机启动而设置的密码)意味着你将无法完全启动系统。 幸运的是,就像…...

部署百川大语言模型Baichuan2

Baichuan2是百川智能推出的新一代开源大语言模型&#xff0c;采用 2.6 万亿 Tokens 的高质量语料训练。在多个权威的中文、英文和多语言的通用、领域 benchmark 上取得同尺寸最佳的效果。包含有 7B、13B 的 Base 和 Chat 版本&#xff0c;并提供了 Chat 版本的 4bits 量化。 模…...

java面试常问

文章目录 java 基础1、JDK 和 JRE的区别2、 和equals的区别3、String、StringBuffer、StringBuilder4、String str “a”、 new String(“a”)一样吗&#xff1f;5、ArrayList 和 LinkedList的区别&#xff1f;6、HashMap的原理与实现6.1、容量与扩容6.2、扩容机制 7、HashMa…...

关于nginx一个域名,配置多个端口https的方法

假如我有一个域名 abc.com。在这个域名下&#xff0c;部署了两个应用&#xff0c;分别对应端口&#xff1a;8081&#xff0c;8082 想要给两个应用接口都开启https访问。 nginx配置如下&#xff1a; server { #监听443端口 listen 443 ssl;…...

IntelliJ IDEA插件开发入门实战

介绍 IntelliJ IDEA是备受赞誉的Java开发工具&#xff0c;提供了丰富的功能和工具。通过使用插件&#xff0c;可以扩展和增强这个集成开发环境。IntelliJ IDEA拥有庞大的插件生态系统&#xff0c;涵盖了代码分析、格式化工具和完整的框架等各个领域。开发人员还可以创建自己的…...

站群服务器如何选择

站群服务器如何选择 1.站群服务器线路 双线服务器在访问网站不受线路影响&#xff0c;较稳定。 2.站群服务器的稳定性 选择站群服务器的时候&#xff0c;服务器的稳定性是非常重要的。 3.站群服务器带宽大小 站群服务器网站在日常使用时&#xff0c;主要的目的是为了集中网…...

【vue】AntDV组件库中a-upload实现文件上传:

文章目录 一、文档&#xff1a;二、使用(以Jeecg为例)&#xff1a;【1】template&#xff1a;【2】script&#xff1a; 三、效果图&#xff1a; 一、文档&#xff1a; Upload 上传–Ant Design Vue 二、使用(以Jeecg为例)&#xff1a; 【1】template&#xff1a; <a-uploa…...

JSP在Scriptlet中编写java代码的形式

我们想在jsp界面中去写java代码&#xff0c;就需要将java代码写在Scriptlet中 虽然说 有这种方式 但是 目前 大部分都会不建议你往jsp中去写java代码 因为 目前都在推广前后端分离 这也是jsp使用面有没有少的原因 jsp也建议解耦 不要让你的程序耦合性太高 还是前端是前端 后端是…...

btree,hash,fulltext,Rtree索引类型区别及使用场景

当涉及到数据库索引类型的选择时&#xff0c;理解其特点和适用场景非常重要。下面是对B树、哈希索引、全文索引和R树的详细介绍&#xff0c;以及它们在不同数据场景下的使用示例&#xff1a; B树&#xff08;B-tree&#xff09;&#xff1a;特点&#xff1a;B树是一种多路搜索…...

掌握这个技巧,你也能成为资产管理高手!

资产管理是企业管理中至关重要的一环&#xff0c;涉及到对公司财务、物资和信息等各个方面的有效监控和管理。 随着企业规模的扩大和业务复杂性的增加&#xff0c;采用先进的资产管理系统成为确保企业高效运营的必要条件之一。 客户案例 医疗机构 温州某医疗机构拥有大量的医…...

前端安全策略保障

文章目录 前言后台管理系统网络安全XSSCSRFSQL注入 后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;前端系列文章 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出…...

【实施】Sentry-self-hosted部署

Sentry-self-hosted部署 介绍 Sentry 是一个开源的错误追踪&#xff08;error tracking&#xff09;平台。它主要用于监控和追踪应用程序中的错误、异常和崩溃。Sentry允许开发人员实时地收集和分析错误&#xff0c;并提供了强大的工具来排查和修复问题&#xff0c;研发最近是…...

Django多表查询

目录 一.多表查询引入 1.数据准备 2.外键的增删改查 &#xff08;1&#xff09;一对多外键的增删改查 1.1外键的增加 1.2外键的删除 1.3外键的修改 (2)多对多外键的增删改查 2.1增加 2.2删除 2.3更改 2.4清空 3.正反向概念 二.多表查询 1.子查询&#xff08;基于…...

基于Springboot的非物质文化网站(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的非物质文化网站&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 项目介…...

1亿美元投资!加拿大量子公司Photonic告别隐身状态

​&#xff08;图片来源&#xff1a;网络&#xff09; 至今加拿大量子公司Photonic总融资额已达1.4亿美元&#xff0c;将推动可扩展、容错的量子计算和网络平台的快速开发。 官宣完成1亿美元新一轮融资 Photonic总部位于加拿大不列颠哥伦比亚省温哥华市&#xff0c;是一家基…...

Allegro的引流方式有哪些?Allegro买家号测评提高店铺的权重和排名

为了让更多的人发现你的平台并提高转化率&#xff0c;正确的引流是至关重要的。那么Allegro的引流方式有哪些&#xff1f; 首先&#xff0c;对于Allegro平台来说&#xff0c;一个有效且常用的引流方式就是通过搜索引擎优化&#xff08;SEO&#xff09;。通过合理地选择关键词、…...

Pytorch多GPU并行训练: DistributedDataParallel

1 模型并行化训练 1.1 为什么要并行训练 在训练大型数据集或者很大的模型时一块GPU很难放下&#xff0c;例如最初的AlexNet就是在两块GPU上计算的。并行计算一般采取两个策略&#xff1a;一个是模型并行&#xff0c;一个是数据并行。左图中是将模型的不同部分放在不同GPU上进…...

802.11ax-2021协议学习__$27-HE-PHY__$27.5-Parameters-for-HE-MCSs

802.11ax-2021协议学习__$27-HE-PHY__$27.5-Parameters-for-HE-MCSs 27.3.7 Modulation and coding scheme (HE-MCSs)27.3.8 HE-SIG-B modulation and coding schemes (HE-SIG-B-MCSs)27.5 Parameters for HE-MCSs27.5.1 General27.5.2 HE-MCSs for 26-tone RU27.5.3 HE-MCSs f…...

假如我是AI Agent专家,你会问什么来测试我的水平

1. 假如我是AI Agent专家&#xff0c;你会问什么来测试我的水平 作为AI Agent专家&#xff0c;您可能需要回答一系列关于AI代理的设计、实现和优化方面的问题。以下是一些可能的问题&#xff1a; AI代理的基本原理&#xff1a;AI代理的基本工作原理是什么&#xff1f;它们如何…...

github 私人仓库clone的问题

github 私人仓库clone的问题 公共仓库直接克隆就可以&#xff0c;私人仓库需要权限验证&#xff0c;要先申请token 1、登录到github&#xff0c;点击setting 打开的页面最底下&#xff0c;有一个developer setting 这里申请到token之后&#xff0c;注意要保存起来&#xff…...

基于 React 的 HT for Web ,由厦门图扑团队开发和维护 - 用于 2D/3D 图形渲染和交互

本心、输入输出、结果 文章目录 基于 React 的 HT for Web &#xff0c;由厦门图扑团队开发和维护 - 用于 2D/3D 图形渲染和交互前言什么是 HT for WebHT for Web 的特点如何使用 HT for Web相关链接弘扬爱国精神 基于 React 的 HT for Web &#xff0c;由厦门图扑团队开发和维…...

我把微信群聊机器人项目开源

▍PART 序 开源项目地址》InsCode - 让你的灵感立刻落地 目前支持的回复 ["抽签", "天气", "讲笑话", "讲情话", "梦到", "解第", "动漫图", "去水印-", "历史今天", "星座-…...

数据可视化在监控易中的艺术与实践

在数字化运维管理中&#xff0c;数据可视化成为一种日益重要的工具&#xff0c;它将复杂的数据通过图形化的方式呈现&#xff0c;帮助运维团队更加直观和快速地理解系统的运行状况。监控易&#xff08;MeiXin Era&#xff09;将数据可视化引入到运维监控中&#xff0c;通过科学…...

贪心 455.分发饼干

455.分发饼干 题目&#xff1a; 小朋友胃口值数组g[i]&#xff0c;饼干尺寸数组 s[j]&#xff0c;当饼干尺寸s[j]大于等于g[i]的时候&#xff0c;对应小朋友被满足&#xff0c;小朋友每一个最多一块饼干 &#xff0c;求给定条件下最多被满足的小朋友数量。 思路&#xff1a;…...

前后端分离项目在Linux的部署方法、一台Nginx如何部署多个Web应用

需求场景:目前有三个前后端分离项目(vue+springboot),Linux服务器一台,nginx一个,比如服务器地址为www.xxxxxxx.com 我想通过80端口访问服务①(即访问www.xxxxxxx.com);通过81端口访问服务②(即www.xxxxxxx.com:81);通过82端口访问服务③(即www.xxxxxxx.com:82) ①部…...

python之 flask 框架(2)项目拆分的 执行逻辑

项目的结构图 app.py # 导入__init__.py 比较特殊 from APP import create_appapp create_app() if __name__ __main__:app.run(debugTrue)init.py # __inti__.py # 初始化文件&#xff0c;创建Flask应用 from flask import Flask from .views import bluedef create_ap…...