【入门Flink】- 09Flink水位线Watermark
在窗口的处理过程中,基于数据的时间戳,自定义一个
“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。
什么是水位线
用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
有序流中水位线
(1)理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线;
(2)实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往
往对处理计算也没什么影响。所以为了提高效率,会每隔一段时间生成一个水位线。
乱序流中水位线
在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是
所谓的“乱序数据”。
(1)乱序+数据量小:还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。乱序数据,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
(2)乱序+数据量大:考虑到大量数据同时到来的处理效率,可以周期性地生成水位线。这时
只需要保存一下之前所有数据中的最大时间截,需要插入水位线时,就直接以它作为时间戳生成新的水位线。
(3)乱序+迟到数据:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,设置迟到时间,比如2秒:也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长,否则会对实时性会有所影响】
水位线的特性
- 水位线是插入到
数据流中的一个标记,可以认为是一个特殊的数据 - 水位线主要的内容是一个
时间戳,用来表示当前事件时间的进展 - 水位线是
基于数据的时间戳生成的 - 水位线的时间戳
必须单调递增,以确保任务的事件时间时钟一直向前推进 - 水位线可以通过
设置延迟,来保证正确处理乱序数据 - 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,
代表t之前的所 有数据都到齐了,之后流中不会出现时间t'≤t的数据
水位线与窗口配合,完成对乱序数据的正确处理。
水位线是流处理中对低延迟和结果正确性的一个权衡机制。
水位线生成策略
生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。【指定水位线生成策略】
stream.assignTimestampsAndWatermarks(<watermark strategy>);
WatermarkStrategy 水位线策略是一个接口,里面内置一些生成策略:

有序流中内置水位线设置
时间戳单调增长,所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps()
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});
乱序流中内置水位线设置
由于乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness()
WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 乱序数据,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});
自定义水位线生成器
(1)周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过 onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
模仿该类BoundedOutOfOrdernessWatermarks
public class CustomBoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}
}
在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;方法由系统框架周期性地调用,默认 200ms 一次。【不建议修改】
env.getConfig().setAutoWatermarkInterval(400L);
(2)断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。
如下:只要有数据来就直接发射水位线
@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}
(3)在数据源中发送水位线
可以在自定义的数据源中抽取事件时间,然后发送水位线。
自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。
env.fromSource(
kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
// WatermarkStrategy.noWatermarks() 或者不发送水位线
"kafkasource"
)
水位线的传递(空闲等待withIdleness)
一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的作为当前任务的事件时钟。
如下案例:当程序并行度设置为2时,自定义分区器导致一个分区一直拿不到数据(最小时钟一直为null),此时如不加以干预,事件时钟将永远不会推进,存在问题。设置空闲时间,当超过空闲时间一直收不到该分区数据,直接忽略该分区,还是会依旧推进时间时钟。
env.setParallelism(2);// 自定义分区器:数据%分区数,只输入奇数,都只会去往map 的一个子任务SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("xxxx", 7777).partitionCustom(new MyPartitioner(), r -> r).map(Integer::parseInt).assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L).withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s);// 分成两组:奇数一组,偶数一组,开 10s 的事件时间滚动窗口socketDS.keyBy(r -> r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10)))...
迟到数据的处理
1)推迟水印推进(设置延迟时间)
水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
2)设置窗口延迟关闭
Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。当达到设置延迟关闭时间之后,才会真正关闭窗口,关闭窗口后再迟到的数据就不会再处理。
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
3)使用侧流接收迟到的数据
最后兜底,窗口关闭之后的迟到数据,使用侧输出流输出。
完整方案:
public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.设置迟到时间 3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L)// .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;SingleOutputStreamOperator<WaterSensor>sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("latedata", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSWithWatermark.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗.sideOutputLateData(lateTag) // 3.关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}
相关文章:
【入门Flink】- 09Flink水位线Watermark
在窗口的处理过程中,基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。 什么是水位线 用来衡量事件时间进展的标记,就被称作“水位线”&#x…...
华为交换机的基本配置,看完秒懂
一、 交换机的基本配置 交换机连接方式 本地:计算机COM口/USB口 --> Console线 --> 交换机Console口 远程:Putty、SecureCRT、Xshell远程管理工具 华为VRP网络操作系统 1)华为的视图模式 <Huawei> //用户视图&#x…...
spark与scala的对应版本查看
仓库地址 https://mvnrepository.com/artifact/org.apache.spark/spark-core 总结 spark3.0 以后,不再支持 scala2.11spark3.0 以后,只能用 scala2.12以上...
影响力|子芽首创代码疫苗技术获评“年度技术突破者”
近日,由业界权威机构嘶吼产业研究院主办的2023网络安全“金帽子”年度评选结果已正式公布。本届评选由网安产业监管机构及相关委办局领导坐镇、行业资深专家、头部网安企业负责人权威加持。凭借首创的专利级代码疫苗技术创新突破了数字供应链安全领域关键核心技术&a…...
还在为忘记BIOS密码担心?至少有五种方法可以重置或删除BIOS密码
忘记密码是一个我们都非常熟悉的问题。虽然在大多数情况下,只需单击“忘记密码”选项,然后按照几个简单的步骤即可恢复访问权限,但情况并非总是如此。忘记BIOS密码(通常为避免进入BIOS设置或避免个人计算机启动而设置的密码)意味着你将无法完全启动系统。 幸运的是,就像…...
部署百川大语言模型Baichuan2
Baichuan2是百川智能推出的新一代开源大语言模型,采用 2.6 万亿 Tokens 的高质量语料训练。在多个权威的中文、英文和多语言的通用、领域 benchmark 上取得同尺寸最佳的效果。包含有 7B、13B 的 Base 和 Chat 版本,并提供了 Chat 版本的 4bits 量化。 模…...
java面试常问
文章目录 java 基础1、JDK 和 JRE的区别2、 和equals的区别3、String、StringBuffer、StringBuilder4、String str “a”、 new String(“a”)一样吗?5、ArrayList 和 LinkedList的区别?6、HashMap的原理与实现6.1、容量与扩容6.2、扩容机制 7、HashMa…...
关于nginx一个域名,配置多个端口https的方法
假如我有一个域名 abc.com。在这个域名下,部署了两个应用,分别对应端口:8081,8082 想要给两个应用接口都开启https访问。 nginx配置如下: server { #监听443端口 listen 443 ssl;…...
IntelliJ IDEA插件开发入门实战
介绍 IntelliJ IDEA是备受赞誉的Java开发工具,提供了丰富的功能和工具。通过使用插件,可以扩展和增强这个集成开发环境。IntelliJ IDEA拥有庞大的插件生态系统,涵盖了代码分析、格式化工具和完整的框架等各个领域。开发人员还可以创建自己的…...
站群服务器如何选择
站群服务器如何选择 1.站群服务器线路 双线服务器在访问网站不受线路影响,较稳定。 2.站群服务器的稳定性 选择站群服务器的时候,服务器的稳定性是非常重要的。 3.站群服务器带宽大小 站群服务器网站在日常使用时,主要的目的是为了集中网…...
【vue】AntDV组件库中a-upload实现文件上传:
文章目录 一、文档:二、使用(以Jeecg为例):【1】template:【2】script: 三、效果图: 一、文档: Upload 上传–Ant Design Vue 二、使用(以Jeecg为例): 【1】template: <a-uploa…...
JSP在Scriptlet中编写java代码的形式
我们想在jsp界面中去写java代码,就需要将java代码写在Scriptlet中 虽然说 有这种方式 但是 目前 大部分都会不建议你往jsp中去写java代码 因为 目前都在推广前后端分离 这也是jsp使用面有没有少的原因 jsp也建议解耦 不要让你的程序耦合性太高 还是前端是前端 后端是…...
btree,hash,fulltext,Rtree索引类型区别及使用场景
当涉及到数据库索引类型的选择时,理解其特点和适用场景非常重要。下面是对B树、哈希索引、全文索引和R树的详细介绍,以及它们在不同数据场景下的使用示例: B树(B-tree):特点:B树是一种多路搜索…...
掌握这个技巧,你也能成为资产管理高手!
资产管理是企业管理中至关重要的一环,涉及到对公司财务、物资和信息等各个方面的有效监控和管理。 随着企业规模的扩大和业务复杂性的增加,采用先进的资产管理系统成为确保企业高效运营的必要条件之一。 客户案例 医疗机构 温州某医疗机构拥有大量的医…...
前端安全策略保障
文章目录 前言后台管理系统网络安全XSSCSRFSQL注入 后言 前言 hello world欢迎来到前端的新世界 😜当前文章系列专栏:前端系列文章 🐱👓博主在前端领域还有很多知识和技术需要掌握,正在不断努力填补技术短板。(如果出…...
【实施】Sentry-self-hosted部署
Sentry-self-hosted部署 介绍 Sentry 是一个开源的错误追踪(error tracking)平台。它主要用于监控和追踪应用程序中的错误、异常和崩溃。Sentry允许开发人员实时地收集和分析错误,并提供了强大的工具来排查和修复问题,研发最近是…...
Django多表查询
目录 一.多表查询引入 1.数据准备 2.外键的增删改查 (1)一对多外键的增删改查 1.1外键的增加 1.2外键的删除 1.3外键的修改 (2)多对多外键的增删改查 2.1增加 2.2删除 2.3更改 2.4清空 3.正反向概念 二.多表查询 1.子查询(基于…...
基于Springboot的非物质文化网站(有报告)。Javaee项目,springboot项目。
演示视频: 基于Springboot的非物质文化网站(有报告)。Javaee项目,springboot项目。 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 项目介…...
1亿美元投资!加拿大量子公司Photonic告别隐身状态
(图片来源:网络) 至今加拿大量子公司Photonic总融资额已达1.4亿美元,将推动可扩展、容错的量子计算和网络平台的快速开发。 官宣完成1亿美元新一轮融资 Photonic总部位于加拿大不列颠哥伦比亚省温哥华市,是一家基…...
Allegro的引流方式有哪些?Allegro买家号测评提高店铺的权重和排名
为了让更多的人发现你的平台并提高转化率,正确的引流是至关重要的。那么Allegro的引流方式有哪些? 首先,对于Allegro平台来说,一个有效且常用的引流方式就是通过搜索引擎优化(SEO)。通过合理地选择关键词、…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
label-studio的使用教程(导入本地路径)
文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...
Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...
渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止
<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet: https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
