【FLink】水位线(Watermark)
目录
1、关于时间语义
1.1事件时间
1.2处理时间编辑
2、什么是水位线
2.1 顺序流和乱序流
2.2乱序数据的处理
2.3 水位线的特性
3 、水位线的生成
3.1 生成水位线的总体原则
3.2 水位线生成策略
3.3 Flink内置水位线
3.3.1 有序流中内置水位线设置
3.4.2 断点式水位线生成器(Punctuated Generator)
3.4.3 在数据源中发送水位线
4、水位线的传递
5、迟到数据的处理
1、关于时间语义
1.1事件时间
一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。
1.2处理时间
2、什么是水位线
在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。说白了就是事件时间戳。
2.1 顺序流和乱序流
有序流就是指数据按照生成的先后顺序,每条数据产生一个有先后顺序的水位线

这是一种理想的状态(数据量较小),而在实际中,我们产生的数据量往往非常庞大,而数据之间的时间间隔非常之小,所以为了提高效率,一般会每隔一段时间生成一个水位线。

在实际生产中,由于多服务之间网络传输等的因素,往往我们的数据流,并不是我们所想的顺序结果,而是数据先后错乱,这就是乱序流。

2.2乱序数据的处理
由于数据是乱序的,我们无法正确处理“迟到”的数据,为了让窗口能够正确的收集到迟到的数据,我们也可以让窗口等上一段时间,比如2秒。也就是说,我们可以在数据的时间戳基础上加上一些延迟来尽量保证不丢数据。

2.3 水位线的特性
3
3 、水位线的生成
3.1 生成水位线的总体原则
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
3.2 水位线生成策略
在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。
DataStream<Event> stream = env.addSource(new ClickSource());DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(<watermark strategy>);
WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
3.3 Flink内置水位线
3.3.1 有序流中内置水位线设置
对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。
public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:升序的watermark,没有等待时间.<WaterSensor>forMonotonousTimestamps()// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
3.3.2 乱序流中内置水位线设置
调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。
这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱序的,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
3.4 自定义水位线生成器
3.4.1 周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element,long recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}
如果想修改默认周期时间,可以通过下面方法修改。
//修改默认周期为400ms
env.getConfig().setAutoWatermarkInterval(400L);
3.4.2 断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。
3.4.3 在数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)
4、水位线的传递

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
也就是说:水位线的传递是以最小事件时间为准则。
5、迟到数据的处理
5.1 推迟水印推进
在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
5.2 设置窗口延迟关闭
当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
5.3 使用侧流接收迟到的数据
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)
完整示例:
public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}
相关文章:
【FLink】水位线(Watermark)
目录 1、关于时间语义 1.1事件时间 1.2处理时间编辑 2、什么是水位线 2.1 顺序流和乱序流 2.2乱序数据的处理 2.3 水位线的特性 3 、水位线的生成 3.1 生成水位线的总体原则 3.2 水位线生成策略 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 3.4.2 断点式…...
github访问不了问题
git clone github上的项目的时候,不是访问不了,就是克隆过程被中断了 最近找到一个代理,从代理那里clone而不是github上 GitHub代理 – 初果编程...
【Java】认识String类
文章目录 一、String类的重要性二、String类中的常用方法1.字符串构造2.String对象的比较3.字符串查找4.转换5.字符串替换6.字符串拆分7.字符串截取8.其他操作方法9.字符串的不可变性10.字符串修改 三、StringBuilder和StringBuffer 一、String类的重要性 在C语言中已经涉及到…...
算法——滑动窗口(Sliding Window)
一、背景知识 滑动窗口算法(Sliding Window): 在给定数组 / 字符串上维护一个固定长度或不定长度的窗口。可以对窗口进行滑动操作、缩放操作,以及维护最优解操作。题型一:固定长度题型二:不固定长度 二、例…...
Android异步之旅:探索AsyncTask
前言: 在Android应用程序开发中,异步操作是非常常见的需求。比如,我们可能需要在后台线程中执行网络请求、数据库操作或者其他耗时的任务,而不阻塞UI线程。为了实现这些异步操作,Android提供了多种方式,其…...
kibana 7安装
手动安装 下载 wget https://artifacts.elastic.co/downloads/kibana/kibana-7.17.15-linux-x86_64.tar.gz 解压 mv kibana-7.17.15-linux-x86_64.tar.gz /usr/local tar -zxvf kibana-7.17.15-linux-x86_64.tar.gz chown -R es:es kibana-7.17.15-linux-x86_64修改配置 s…...
为何内存不够用?微服务改造启动多个Spring Boot的陷阱与解决方案
在生产环境中我们会遇到一些问题,此文主要记录并复盘一下当时项目中的实际问题及解决过程。 背景简述 最初系统上线后都比较正常风平浪静的。在系统运行了一段时间后,业务量上升后,生产上发现java应用内存占用过高,服务器总共64…...
大模型变身双面人:虚假新闻制造机VS假新闻鉴别大师!
大家是怎样看待大型语言模型生成信息的可靠性呢? 尽管大语言模型生成的内容“像模像样”,但这些模型偶尔的失误揭示了一个关键问题:它们生成的内容并不总是真实可靠的。 那么,这种“不保真”特性能否被用来制造虚假信息呢&#x…...
WordPress网站如何修复数千个帖子的SEO错误
在本教程中,我们将向您展示如何解决您经常犯的SEO错误。 最好的是您不必花费太多时间,因为您不需要打开并编辑每个帖子。 相反,我们将向您展示如何使用 WordPress 内的电子表格来修复 WordPress 帖子的 SEO。 在这里,我们为您提…...
Mac如何搭建Vue项目
目录 一、安装node 二、安装NPM 1、本地安装和全局安装 2、通过Node.js官方安装程序安装 3、通过Homebrew安装 三、NPM常用命令 1、查看模块的版本号 2、安装指定版本 3、卸载模块 4、更新模块 5、查看模块信息 6、查看模块地址 7、更新命令 8、卸载NPM 四、安装…...
深入 Django 的 URL 分发器
概要 在 Django 的 MVC 架构中,URL 分发器扮演着至关重要的角色,它负责将用户的请求路由到相应的视图函数或类。这一机制不仅保证了 Django 应用的高度可扩展性,还为开发者提供了灵活的 URL 设计能力。本文将详细介绍 Django 中的 URL 分发器…...
基于单片机设计的气压与海拔高度检测计(采用MPL3115A2芯片实现)
一、前言 随着科技的不断发展,在许多领域中,对气压与海拔高度的测量变得越来越重要。例如,对于航空和航天工业、气象预报、气候研究等领域,都需要高精度、可靠的气压与海拔高度检测装置。针对这一需求,基于单片机设计…...
云原生入门系列(背景和驱动力)
做任何一件事,或者学习、应用一个领域的技术,莫过于先要想好阶段的目标和理解、学习它的意义是什么?解决了什么问题? 这部分,就尝试来探讨下这个阶段需要理解并达成的目标以及践行云原生的意义在哪里。 1.历程 任何阶…...
Django中间件
目录 一.介绍 1.什么是Django中间件 2.作用: 3.示例 二.Django请求生命周期流程图 三.Django中间件是Django的门户 四.中间件方法 1.必须掌握的中间件方法 (1)process_request: 示例: 2.需要了解的中间件方法 &#x…...
redis运维(十九)redis 的扩展应用 lua(一)
一 redis 的扩展应用 lua redis如何保证原子操作 说明:引入lua脚本,核心解决原子性问题 ① redis为什么引入lua? lua脚本本身体积小,启动速度快 ② redis引入lua的优势 小结: 类似自定义redis命令 ③ redis中如何使用lua ④ EVAL 说明&#…...
SpringBoot——MVC原理
优质博文:IT-BLOG-CN 一、SpringMVC自动配置 SpringMVC auto-configuration:SpringBoot自动配置好了SpringMVC。以下是SpringBoot对SpringMVC的默认配置:[WebMvcAutoConfiguration] 【1】包括ContentNegotiatingViewResolver和BeanNameView…...
[Linux] shell条件语句和if语句
一、条件语句 1.1 测试 test 测试文件的表达式是否成立 格式:test 条件表达式 [ 条件表达式 ] 选项作用-d测试是否为目录-e测试目录或文件是否存在-a测试目录或文件是否存在-f测试是否为文件-r测试当前用户是否有权限读取-w测试当前用户是否有权限写入-x测试当前…...
【陈老板赠书活动 - 18期】-如何成为架构师这几本书推荐给你
陈老老老板🦸 👨💻本文专栏:赠书活动专栏(为大家争取的福利,免费送书) 👨💻本文简述:生活就像海洋,只有意志坚强的人,才能到达彼岸。 👨&am…...
chrome 插件 Mobile simulator
谷歌浏览器插件Mobile simulator v3.8.2.0-2023-4-27(做屏幕适应的前端工具)-(Chrome插件)谷歌浏览器插件网 百度网盘:https://pan.baidu.com/s/1xVyny8CtlMjSchhTIlfRAA 提取码:cj5c...
JavaScript框架 Angular、React、Vue.js 的全栈解决方案比较
在 Web 开发领域,JavaScript 提供大量技术栈可供选择。其中最典型的三套组合,分别是 MERN、MEAN 和 MEVN。前端框架(React、Angular 和 Vue)进行简化比较。 MERN 技术栈详解 MERN 技术栈包含四大具体组件: MongoDB&am…...
Win11Debloat:三步轻松解决Windows 11臃肿问题,让你的电脑重获新生
Win11Debloat:三步轻松解决Windows 11臃肿问题,让你的电脑重获新生 【免费下载链接】Win11Debloat A simple, lightweight PowerShell script that allows you to remove pre-installed apps, disable telemetry, as well as perform various other chan…...
绿色循环经济下的农业新范式:让每一株蔬菜的“遗骸”化作新生
在山东临沂的兰陵县,一场关于农业废弃物资源化利用的变革正在发生。曾经令人头疼的农业秸秆和牛粪,如今正成为驱动当地蔬菜育苗产业的全新动力。这一变化的起点,是2023年9月正式投产的生升鸿强基质工厂。这家总投资1.1亿元的工厂,…...
终极指南:如何在5分钟内将图片转换为3D打印模型
终极指南:如何在5分钟内将图片转换为3D打印模型 【免费下载链接】ImageToSTL This tool allows you to easily convert any image into a 3D print-ready STL model. The surface of the model will display the image when illuminated from the left side. 项目…...
Windows激活终极指南:KMS_VL_ALL_AIO智能激活解决方案
Windows激活终极指南:KMS_VL_ALL_AIO智能激活解决方案 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 还在为Windows激活弹窗烦恼吗?每次重装系统后都要四处寻找激活工具…...
Windows 10系统臃肿不堪?这个开源工具让你3步重获清爽体验
Windows 10系统臃肿不堪?这个开源工具让你3步重获清爽体验 【免费下载链接】Win10BloatRemover Configurable CLI tool to easily and aggressively debloat and tweak Windows 10 by removing preinstalled UWP apps, services and more. Originally based on the …...
用游戏化思维学Python循环:从ICode训练场代码反推关卡设计思路
游戏化Python教学:从ICode训练场代码反推关卡设计艺术 在编程教育领域,游戏化学习已经成为激发学生兴趣的有效手段。ICode国际青少年编程竞赛的训练场关卡,巧妙地将Python循环概念转化为一系列趣味挑战。本文将通过逆向工程的方法࿰…...
现在不配,下周就掉队!VS Code Copilot Next 2024.9新特性强制依赖项解析,3个必须升级的扩展版本号
更多请点击: https://intelliparadigm.com 第一章:VS Code Copilot Next 自动化工作流配置 如何实现快速接入 VS Code Copilot Next 是微软推出的增强型 AI 编程助手,支持上下文感知补全、跨文件推理与轻量级工作流编排。要实现快速接入&…...
密码安全最佳实践:结合password_compat构建健壮认证系统
密码安全最佳实践:结合password_compat构建健壮认证系统 【免费下载链接】password_compat Compatibility with the password_* functions that ship with PHP 5.5 项目地址: https://gitcode.com/gh_mirrors/pa/password_compat 在当今数字化时代࿰…...
OpenWrt 23.05版本解析:路由器与嵌入式系统升级
1. OpenWrt 23.05版本深度解析:从路由器到嵌入式系统的全面升级作为一名长期使用OpenWrt的网络工程师,每次新版本发布都像拆盲盒一样充满期待。这次23.05版本的更新幅度之大,让我不得不连夜刷机测试。这个专为路由器和资源受限设备打造的Linu…...
机器人二次开发机器人动作定制?定制化舞蹈
在机器人二次开发领域,不少项目因开发周期长、算法泛化不足而陷于停滞。行业数据显示,传统方案依赖人工标定,场景微调即需重新部署,项目平均周期常超6个月。同时,实验室模型在真实环境中性能骤降,测试表明跨…...
