当前位置: 首页 > news >正文

【API篇】十、生成Flink水位线

文章目录

  • 1、水位线的生成原则
  • 2、有序流内置水位线
  • 3、乱序流内置水位线
  • 4、自定义周期性水位线生成器
  • 5、自定义断点式水位线生成器
  • 6、从数据源中发送水位线

1、水位线的生成原则

水位线出现,即代表这个时间之前的数据已经全部到齐,之后不会再出现之前的数据了。参考前面的乱序流,可以得出:

  • 想要保证数据绝对正确,就得加足够大的延迟,但实时性就没保障了
  • 想要实时性强,就得把延迟设置小,但此时迟到数据可能遗漏,准确性降低

水位线的定义,是对低延迟和结果准确性的一个权衡。Flink生成水位线的方法是.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间

DataStream<Event> stream = env.addSource(xxx);DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(WatermarkStrategy对象);

WatermarkStrategy是一个接口,包含了一个时间戳分配器TimestampAssigner和一个水位线生成WatermarkGenerator:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

2、有序流内置水位线

有序流的时间戳全部单调递增,没有迟到数据,直接WatermarkStrategy.forMonotonousTimestamps()就可以拿到WatermarkStrategy对象

public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:升序的watermark,没有等待时间.<WaterSensor>forMonotonousTimestamps()// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);// 返回的时间戳,要毫秒,这里拿自定义对象的ts属性做为时间戳return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用事件时间语义的窗口,别再用处理时间TumblingProcessTime.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}

执行下,输入10时,逻辑时钟被推到了10s,到达区间,触发窗口,执行全窗口函数的process,输出当前窗口的数据:

在这里插入图片描述

3、乱序流内置水位线

调用WatermarkStrategy. forBoundedOutOfOrderness(),传入延迟时间:

public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱序的,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}

执行:

在这里插入图片描述

简单分析下结果:

  • 第一条数据s1,1,1进来,创建窗口,水位线为1s-3s(延迟3s)
  • s1,10,10进来,水位线为10-3 =7s,还未到达10,窗口不触发(若是有序流,无等待下,此时窗口已被触发了)
  • 此时进来一条乱序数据,比如s1,6,6,6-3=3s,水位线保持上面的7不变,watermark不会推进,且6这条数据也会被统计在[0,10)的区间内
  • s1,11,11进来,11-3=8,也不会触发,但这条数据是属于[10,20)区间的那个桶的
  • s1,13,13进来,达到10,窗口触发

4、自定义周期性水位线生成器

上面只是定义了时间戳的提取逻辑,水位线的生成采用的默认内置策略。接下来自定义水位线生成器:周期性水位生成器。

周期性生成器是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发射生成的水位线

// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// 定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成器.<WaterSensor>forGenerator(context -> MyPeriodWatermarkGenerator<>(3000L))// 1.2 指定时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}}

模仿前面的内置生成器,定义自己的水位线生成器:

public class MyPeroidWatermarkGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳//构造方法,传入延迟时间,构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime = delayTime;this.maxTs = Long.MIN_VALUE + this.delayTime + 1;}/*** 每条数据进来都调用一次,用来提取最大的事件事件*/@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp);}/*** 周期性调用,默认20ms*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System,out,println("调用了onPeriodicEmit方法,生成watermark==" + (maxTimestamp - delayTs - 1) );}}

核心部分,指定水位线生成器的Lamdba表达式展开就是:

在这里插入图片描述

运行:

  • 数据没进来前,每200ms调用一次发射水位线的方法,此时的水位线是构造方法里Long.MIN_VALUE那个
  • 进来一条数据,调用onEvent,最大时间戳被更新,到周期后再发射水位线maxTs-delayTs-1
  • 继续周期性调用onPeriodicEmit方法

在这里插入图片描述

onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了,这个方法由系统框架周期性地调用,默认200ms一次

修改默认的周期,比如改为400ms:

env.getConfig().setAutoWatermarkInterval(400L);

5、自定义断点式水位线生成器

断点式生成器会不停地检测onEvent()中的事件,发现带有水位线信息的当事件时,就立即发出水位线。改下代码,定义水位线生成器:

public class PointWatermarkGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳//构造方法,传入延迟时间,构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime = delayTime;this.maxTs = Long.MIN_VALUE + this.delayTime + 1;}/*** 每条数据进来都调用一次,用来提取最大的事件事件*/@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳// 发射水位线output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp + ",生成watermark==" + (maxTimestamp - delayTs - 1));}/*** 周期性调用,默认20ms*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}}

周期性代码改为:

//...// 定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成器.<WaterSensor>forGenerator(context -> PointWatermarkGenerator<>(3000L))// 1.2 指定时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒return element.getTs() * 1000L;});

运行:此时不再周期性的发射水位线

在这里插入图片描述

6、从数据源中发送水位线

在自定义的数据源中抽取事件时间,然后发送水位线:

env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource" 
)//注意fromSorce方法的第二个传参,之前用的WatermarkStrategy.noWatermark()

注意此时不用再assignTimestampsAndWatermarks了,在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一

相关文章:

【API篇】十、生成Flink水位线

文章目录 1、水位线的生成原则2、有序流内置水位线3、乱序流内置水位线4、自定义周期性水位线生成器5、自定义断点式水位线生成器6、从数据源中发送水位线 1、水位线的生成原则 水位线出现&#xff0c;即代表这个时间之前的数据已经全部到齐&#xff0c;之后不会再出现之前的数…...

【Javascript】弹出框

目录 警告框 确认框 提示框 警告框 alert(你好); 确认框 var isConfirm confirm(请确认) console.log( isConfirm); 提示框...

NSS [鹤城杯 2021]EasyP

NSS [鹤城杯 2021]EasyP 直接给了源码 <?php include utils.php;if (isset($_POST[guess])) {$guess (string) $_POST[guess];if ($guess $secret) {$message Congratulations! The flag is: . $flag;} else {$message Wrong. Try Again;} }if (preg_match(/utils\.p…...

mysql用户及权限管理(InsCode AI 创作助手)

MySQL是一个广泛使用的开源关系型数据库管理系统&#xff0c;用于存储和管理大量数据。对于那些需要使用MySQL的管理员和开发人员来说&#xff0c;用户权限管理是确保数据库安全性的至关重要的一环。在本篇技术博客中&#xff0c;我们将深入探讨MySQL的用户权限管理&#xff0c…...

命令模式——让程序舒畅执行

● 命令模式介绍 命令模式&#xff08;Command Pattern&#xff09;&#xff0c;是行为型设计模式之一。命令模式相对于其他的设计模式来说并没有那么多条条框框&#xff0c;其实并不是一个很“规矩”的模式&#xff0c;不过&#xff0c;就是基于一点&#xff0c;命令模式相对于…...

GZ035 5G组网与运维赛题第3套

2023年全国职业院校技能大赛 GZ035 5G组网与运维赛项&#xff08;高职组&#xff09; 赛题第3套 一、竞赛须知 1.竞赛内容分布 竞赛模块1--5G公共网络规划部署与开通&#xff08;35分&#xff09; 子任务1&#xff1a;5G公共网络部署与调试&#xff08;15分&#xff09; 子…...

071:mapboxGL上传含shp的zip文件,在map上解析显示图形

第071个 点击查看专栏目录 本示例是介绍演示如何在vue+mapbox中上传含有shp文件的zip,在地图上显示图形。这里先通过上传解压解析,转换生成geojson文件,然后在地图上渲染图形。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例效果所用的zip文…...

python下拉框选择测试

把下拉选择的值得打印出来&#xff1a; import tkinter as tk def on_select(event): # 当选择下拉框中的一项时&#xff0c;此函数将被调用 selected event.widget.cget("text") # 获取选中的文本 print(f"You selected: {selected}") # 打印选中…...

即时编译器JIT

类编译加载执行过程 如下图所示&#xff0c;一个Java代码从编译到运行大抵会经历以下几个过程。具体每个过程笔者会在下文站展开讨论。 类编译 首先是类编译阶段&#xff0c;这个阶段会将Java文件变为class文件&#xff0c;这个class文件包含一个常量池和方法表集合&#xf…...

npm更新包时This operation requires a one-time password.

[访问我的npm包](mhfwork/yt-ui - npm) 更新npm包时出现 This operation requires a one-time password.是因为需要认证 解决办法 1. 点击红线处的链接 2. 进入npm官网获取指定秘钥 3. 再次填入 one-time password 即可...

C++类模板再学习

之前已经学习了C类模板&#xff1b;类模板的写法和一般类的写法有很大的差别&#xff1b;不容易熟悉&#xff1b;下面再做一遍&#xff1b; 做一个椭圆类&#xff0c;成员有长轴长度和短轴长度&#xff1b; // ellipse.h: interface for the ellipse class. // //#if !define…...

华为终端智能家居应用方案

PLC-IoT概述 华为智能PLC-IoT工业物联网系列通信模块是基于电力线宽带载波技术的产品&#xff0c;实现数据在电力线上双向、高速、稳定的传输&#xff0c;广泛适用于电力、交通、工业制造、智能家居等领域&#xff0c;PLC-IoT通信模块包含头端和尾端两种类型&#xff0c;头端配…...

PHP下载文件

/***文件下载*param $filepath源文件路径 */function dwon_file($filepath){if(file_exists($filepath)){header(content-type:text/html;charsetutf8);header(Content-Description: File Transfer);header(Content-Type: application/octet-stream);header(Content-Dispositio…...

38基于matlab的期货预测,利用PSO优化SVM和未优化的SVM进行对比,得到实际输出和期望输出结果。

基于matlab的期货预测&#xff0c;利用PSO优化SVM和未优化的SVM进行对比&#xff0c;得到实际输出和期望输出结果。线性核函数、多项式、RBF核函数三种核函数任意可选&#xff0c;并给出均方根误差&#xff0c;相对误差等结果&#xff0c;程序已调通&#xff0c;可直接运行。 3…...

【Codeforces】 CF582D Number of Binominal Coefficients

题目链接 CF方向 Luogu方向 题目解法 看到 p α ∣ ( n k ) p^{\alpha} | \binom{n}{k} pα∣(kn​) &#xff0c;首先想到 k u m m e r kummer kummer 定理&#xff0c;那么限制即为 n − k n-k n−k 和 k k k 做加法在 p p p 进制下的进位数 ≥ α \ge \alpha ≥α …...

sql第二次上机作业

1查找借阅了ISBN为“4-6045-1023-4”的借书证号&#xff0c;读者姓名&#xff0c;专业名和借书时间 use tsgl go select Reader.Lno,Rname,Spec,Lend.Bordate FROM Reader,Lend WHERE Reader.LnoLend.Lno AND ISBN 4-6045-1023-42查找借阅了《数据库原理》一书的借阅信息&…...

辅助驾驶功能开发-功能规范篇(22)-3-L2级辅助驾驶方案功能规范

1.3.3 TLA系统功能定义 1.3.3.1 状态机 1.3.3.2 状态迁移图 1.3.3.3 功能定义 1.3.3.3.1 信号需求列表 1.3.3.3.2 系统开启关闭 1)初始化 车辆上电后,交通灯辅助系统(TLA)进行初始化,控制器需在 220ms 内发出第一帧报文,并在 3s 内完成内部自检,同时上电 3s 内不进行…...

Python基础入门例程16-NP16 发送offer(列表)

目录 描述 输入描述&#xff1a; 输出描述&#xff1a; 解答 &#xff1a; 说明&#xff1a; 描述 某公司在面试结束后&#xff0c;创建了一个依次包含字符串 Allen 和 Tom 的列表offer_list&#xff0c;作为通过面试的名单。 请你依次对列表中的名字发送类似 Allen, you…...

Web前端面试之Vue—对Vue的理解

目录 一、web发展历程 二、vue是什么 三、Vue核心特性 组件化 数据驱动 指令 四、Vue与Angular以及React的区别 一、web发展历程 Web是World Wide Web的简称&#xff0c;中文译为万维网 我们可以将它规划成如下的几个时代来进行理解 静态网页&#xff1a;最早的网页是没…...

C/C++晶晶赴约会 2020年12月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析

目录 C/C晶晶赴约会 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C/C晶晶赴约会 2020年12月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 晶晶的朋友贝贝约晶晶下周一起去看展览&#xff0…...

从电路振荡到种群竞争:常系数线性微分方程组在建模中的实战指南

从电路振荡到种群竞争&#xff1a;常系数线性微分方程组在建模中的实战指南微分方程是描述动态系统的数学语言&#xff0c;而常系数线性微分方程组则是其中最具工程实用价值的一类。不同于纯数学视角下的求解技巧&#xff0c;本文将带你穿越两个经典场景——电子工程中的RLC振荡…...

JDK常用类与工具(速览版)

JDK常用类与工具&#xff08;速览版&#xff09;JDK&#xff08;Java Development Kit&#xff09;提供了丰富的标准库和实用工具&#xff0c;它们构成了Java开发者日常工作的基石。掌握这些核心类、集合框架、并发工具、IO/NIO库、日期时间API、正则表达式、异常处理机制、日志…...

GPT-4稀疏激活机制解析:1.8万亿参数为何仅用2%

1. 项目概述&#xff1a;参数规模与稀疏激活的真相拆解“GPT-4 Has 1.8 Trillion Parameters. It Uses 2% of Them Per Token.”——这句话过去两年在技术社区被反复引用、误读、放大&#xff0c;甚至成为AI算力焦虑的具象化符号。但作为从2017年就开始部署LSTM语音模型、2019年…...

AI、机器学习、深度学习到底是什么关系?用‘模型’一词说清楚

1. 项目概述&#xff1a;为什么“人工智能”这个词让人越看越迷糊&#xff1f;你有没有过这种感觉&#xff1f;刷到一篇讲“AI赋能”的文章&#xff0c;开头说“大模型正在重塑生产力”&#xff0c;中间列了三个“基于Transformer架构的微调方案”&#xff0c;结尾呼吁“拥抱AG…...

从仿真曲线到实际性能:手把手教你用IPKISS分析MZI Lattice Filter的插损与带宽

从仿真曲线到实际性能&#xff1a;手把手教你用IPKISS分析MZI Lattice Filter的插损与带宽 在光子集成电路设计中&#xff0c;仿真结果往往只是第一步。真正考验工程师功力的&#xff0c;是如何从这些曲线中提取出有工程价值的性能指标。本文将带您深入解读MZI Lattice Filter的…...

Play Integrity API Checker:你的Android设备安全检测终极指南

Play Integrity API Checker&#xff1a;你的Android设备安全检测终极指南 【免费下载链接】play-integrity-checker-app Get info about your Device Integrity through the Play Intergrity API 项目地址: https://gitcode.com/gh_mirrors/pl/play-integrity-checker-app …...

权威深度指南:使用iperf3 Windows版进行网络性能评估与优化实战

权威深度指南&#xff1a;使用iperf3 Windows版进行网络性能评估与优化实战 【免费下载链接】iperf3-win-builds iperf3 binaries for Windows. Benchmark your network limits. 项目地址: https://gitcode.com/gh_mirrors/ip/iperf3-win-builds iperf3 Windows版是专业…...

3分钟搞定Windows 11系统优化:Win11Debloat开源工具完整指南

3分钟搞定Windows 11系统优化&#xff1a;Win11Debloat开源工具完整指南 【免费下载链接】Win11Debloat A simple, lightweight PowerShell script that allows you to remove pre-installed apps, disable telemetry, as well as perform various other changes to declutter …...

3分钟快速指南:如何使用Forza Painter将任何图片变成《极限竞速》专业涂装

3分钟快速指南&#xff1a;如何使用Forza Painter将任何图片变成《极限竞速》专业涂装 【免费下载链接】forza-painter Import images into Forza 项目地址: https://gitcode.com/gh_mirrors/fo/forza-painter 还在为《极限竞速&#xff1a;地平线》系列游戏中复杂的车辆…...

告别弃用参数:Kubelet连接containerd的正确姿势(附config.toml避坑指南)

告别弃用参数&#xff1a;Kubelet连接containerd的正确姿势&#xff08;附config.toml避坑指南&#xff09; 在Kubernetes集群的日常运维中&#xff0c;kubelet与容器运行时的连接配置是一个看似简单却暗藏玄机的环节。许多管理员习惯性地沿用旧版本参数&#xff0c;殊不知Kube…...