【FLink】水位线(Watermark)
目录
1、关于时间语义
1.1事件时间
1.2处理时间编辑
2、什么是水位线
2.1 顺序流和乱序流
2.2乱序数据的处理
2.3 水位线的特性
3 、水位线的生成
3.1 生成水位线的总体原则
3.2 水位线生成策略
3.3 Flink内置水位线
3.3.1 有序流中内置水位线设置
3.4.2 断点式水位线生成器(Punctuated Generator)
3.4.3 在数据源中发送水位线
4、水位线的传递
5、迟到数据的处理
1、关于时间语义
1.1事件时间
一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。
1.2处理时间
2、什么是水位线
在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。说白了就是事件时间戳。
2.1 顺序流和乱序流
有序流就是指数据按照生成的先后顺序,每条数据产生一个有先后顺序的水位线

这是一种理想的状态(数据量较小),而在实际中,我们产生的数据量往往非常庞大,而数据之间的时间间隔非常之小,所以为了提高效率,一般会每隔一段时间生成一个水位线。

在实际生产中,由于多服务之间网络传输等的因素,往往我们的数据流,并不是我们所想的顺序结果,而是数据先后错乱,这就是乱序流。

2.2乱序数据的处理
由于数据是乱序的,我们无法正确处理“迟到”的数据,为了让窗口能够正确的收集到迟到的数据,我们也可以让窗口等上一段时间,比如2秒。也就是说,我们可以在数据的时间戳基础上加上一些延迟来尽量保证不丢数据。

2.3 水位线的特性
3
3 、水位线的生成
3.1 生成水位线的总体原则
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
3.2 水位线生成策略
在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。
DataStream<Event> stream = env.addSource(new ClickSource());DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(<watermark strategy>);
WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
3.3 Flink内置水位线
3.3.1 有序流中内置水位线设置
对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。
public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:升序的watermark,没有等待时间.<WaterSensor>forMonotonousTimestamps()// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
3.3.2 乱序流中内置水位线设置
调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。
这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱序的,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
3.4 自定义水位线生成器
3.4.1 周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element,long recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}
如果想修改默认周期时间,可以通过下面方法修改。
//修改默认周期为400ms
env.getConfig().setAutoWatermarkInterval(400L);
3.4.2 断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。
3.4.3 在数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)
4、水位线的传递

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
也就是说:水位线的传递是以最小事件时间为准则。
5、迟到数据的处理
5.1 推迟水印推进
在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
5.2 设置窗口延迟关闭
当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
5.3 使用侧流接收迟到的数据
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)
完整示例:
public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}
相关文章:
【FLink】水位线(Watermark)
目录 1、关于时间语义 1.1事件时间 1.2处理时间编辑 2、什么是水位线 2.1 顺序流和乱序流 2.2乱序数据的处理 2.3 水位线的特性 3 、水位线的生成 3.1 生成水位线的总体原则 3.2 水位线生成策略 3.3 Flink内置水位线 3.3.1 有序流中内置水位线设置 3.4.2 断点式…...
github访问不了问题
git clone github上的项目的时候,不是访问不了,就是克隆过程被中断了 最近找到一个代理,从代理那里clone而不是github上 GitHub代理 – 初果编程...
【Java】认识String类
文章目录 一、String类的重要性二、String类中的常用方法1.字符串构造2.String对象的比较3.字符串查找4.转换5.字符串替换6.字符串拆分7.字符串截取8.其他操作方法9.字符串的不可变性10.字符串修改 三、StringBuilder和StringBuffer 一、String类的重要性 在C语言中已经涉及到…...
算法——滑动窗口(Sliding Window)
一、背景知识 滑动窗口算法(Sliding Window): 在给定数组 / 字符串上维护一个固定长度或不定长度的窗口。可以对窗口进行滑动操作、缩放操作,以及维护最优解操作。题型一:固定长度题型二:不固定长度 二、例…...
Android异步之旅:探索AsyncTask
前言: 在Android应用程序开发中,异步操作是非常常见的需求。比如,我们可能需要在后台线程中执行网络请求、数据库操作或者其他耗时的任务,而不阻塞UI线程。为了实现这些异步操作,Android提供了多种方式,其…...
kibana 7安装
手动安装 下载 wget https://artifacts.elastic.co/downloads/kibana/kibana-7.17.15-linux-x86_64.tar.gz 解压 mv kibana-7.17.15-linux-x86_64.tar.gz /usr/local tar -zxvf kibana-7.17.15-linux-x86_64.tar.gz chown -R es:es kibana-7.17.15-linux-x86_64修改配置 s…...
为何内存不够用?微服务改造启动多个Spring Boot的陷阱与解决方案
在生产环境中我们会遇到一些问题,此文主要记录并复盘一下当时项目中的实际问题及解决过程。 背景简述 最初系统上线后都比较正常风平浪静的。在系统运行了一段时间后,业务量上升后,生产上发现java应用内存占用过高,服务器总共64…...
大模型变身双面人:虚假新闻制造机VS假新闻鉴别大师!
大家是怎样看待大型语言模型生成信息的可靠性呢? 尽管大语言模型生成的内容“像模像样”,但这些模型偶尔的失误揭示了一个关键问题:它们生成的内容并不总是真实可靠的。 那么,这种“不保真”特性能否被用来制造虚假信息呢&#x…...
WordPress网站如何修复数千个帖子的SEO错误
在本教程中,我们将向您展示如何解决您经常犯的SEO错误。 最好的是您不必花费太多时间,因为您不需要打开并编辑每个帖子。 相反,我们将向您展示如何使用 WordPress 内的电子表格来修复 WordPress 帖子的 SEO。 在这里,我们为您提…...
Mac如何搭建Vue项目
目录 一、安装node 二、安装NPM 1、本地安装和全局安装 2、通过Node.js官方安装程序安装 3、通过Homebrew安装 三、NPM常用命令 1、查看模块的版本号 2、安装指定版本 3、卸载模块 4、更新模块 5、查看模块信息 6、查看模块地址 7、更新命令 8、卸载NPM 四、安装…...
深入 Django 的 URL 分发器
概要 在 Django 的 MVC 架构中,URL 分发器扮演着至关重要的角色,它负责将用户的请求路由到相应的视图函数或类。这一机制不仅保证了 Django 应用的高度可扩展性,还为开发者提供了灵活的 URL 设计能力。本文将详细介绍 Django 中的 URL 分发器…...
基于单片机设计的气压与海拔高度检测计(采用MPL3115A2芯片实现)
一、前言 随着科技的不断发展,在许多领域中,对气压与海拔高度的测量变得越来越重要。例如,对于航空和航天工业、气象预报、气候研究等领域,都需要高精度、可靠的气压与海拔高度检测装置。针对这一需求,基于单片机设计…...
云原生入门系列(背景和驱动力)
做任何一件事,或者学习、应用一个领域的技术,莫过于先要想好阶段的目标和理解、学习它的意义是什么?解决了什么问题? 这部分,就尝试来探讨下这个阶段需要理解并达成的目标以及践行云原生的意义在哪里。 1.历程 任何阶…...
Django中间件
目录 一.介绍 1.什么是Django中间件 2.作用: 3.示例 二.Django请求生命周期流程图 三.Django中间件是Django的门户 四.中间件方法 1.必须掌握的中间件方法 (1)process_request: 示例: 2.需要了解的中间件方法 &#x…...
redis运维(十九)redis 的扩展应用 lua(一)
一 redis 的扩展应用 lua redis如何保证原子操作 说明:引入lua脚本,核心解决原子性问题 ① redis为什么引入lua? lua脚本本身体积小,启动速度快 ② redis引入lua的优势 小结: 类似自定义redis命令 ③ redis中如何使用lua ④ EVAL 说明&#…...
SpringBoot——MVC原理
优质博文:IT-BLOG-CN 一、SpringMVC自动配置 SpringMVC auto-configuration:SpringBoot自动配置好了SpringMVC。以下是SpringBoot对SpringMVC的默认配置:[WebMvcAutoConfiguration] 【1】包括ContentNegotiatingViewResolver和BeanNameView…...
[Linux] shell条件语句和if语句
一、条件语句 1.1 测试 test 测试文件的表达式是否成立 格式:test 条件表达式 [ 条件表达式 ] 选项作用-d测试是否为目录-e测试目录或文件是否存在-a测试目录或文件是否存在-f测试是否为文件-r测试当前用户是否有权限读取-w测试当前用户是否有权限写入-x测试当前…...
【陈老板赠书活动 - 18期】-如何成为架构师这几本书推荐给你
陈老老老板🦸 👨💻本文专栏:赠书活动专栏(为大家争取的福利,免费送书) 👨💻本文简述:生活就像海洋,只有意志坚强的人,才能到达彼岸。 👨&am…...
chrome 插件 Mobile simulator
谷歌浏览器插件Mobile simulator v3.8.2.0-2023-4-27(做屏幕适应的前端工具)-(Chrome插件)谷歌浏览器插件网 百度网盘:https://pan.baidu.com/s/1xVyny8CtlMjSchhTIlfRAA 提取码:cj5c...
JavaScript框架 Angular、React、Vue.js 的全栈解决方案比较
在 Web 开发领域,JavaScript 提供大量技术栈可供选择。其中最典型的三套组合,分别是 MERN、MEAN 和 MEVN。前端框架(React、Angular 和 Vue)进行简化比较。 MERN 技术栈详解 MERN 技术栈包含四大具体组件: MongoDB&am…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
Map相关知识
数据结构 二叉树 二叉树,顾名思义,每个节点最多有两个“叉”,也就是两个子节点,分别是左子 节点和右子节点。不过,二叉树并不要求每个节点都有两个子节点,有的节点只 有左子节点,有的节点只有…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...
