【API篇】十、生成Flink水位线
文章目录
- 1、水位线的生成原则
- 2、有序流内置水位线
- 3、乱序流内置水位线
- 4、自定义周期性水位线生成器
- 5、自定义断点式水位线生成器
- 6、从数据源中发送水位线
1、水位线的生成原则
水位线出现,即代表这个时间之前的数据已经全部到齐,之后不会再出现之前的数据了。参考前面的乱序流,可以得出:
- 想要保证数据绝对正确,就得加足够大的延迟,但实时性就没保障了
- 想要实时性强,就得把延迟设置小,但此时迟到数据可能遗漏,准确性降低
水位线的定义,是对低延迟和结果准确性的一个权衡。Flink生成水位线的方法是.assignTimestampsAndWatermarks()
,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间
DataStream<Event> stream = env.addSource(xxx);DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(WatermarkStrategy对象);
WatermarkStrategy是一个接口,包含了一个时间戳分配器TimestampAssigner和一个水位线生成WatermarkGenerator:
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
2、有序流内置水位线
有序流的时间戳全部单调递增,没有迟到数据,直接WatermarkStrategy.forMonotonousTimestamps()
就可以拿到WatermarkStrategy对象
public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:升序的watermark,没有等待时间.<WaterSensor>forMonotonousTimestamps()// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);// 返回的时间戳,要毫秒,这里拿自定义对象的ts属性做为时间戳return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用事件时间语义的窗口,别再用处理时间TumblingProcessTime.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
执行下,输入10时,逻辑时钟被推到了10s,到达区间,触发窗口,执行全窗口函数的process,输出当前窗口的数据:
3、乱序流内置水位线
调用WatermarkStrategy. forBoundedOutOfOrderness()
,传入延迟时间:
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱序的,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
执行:
简单分析下结果:
- 第一条数据s1,1,1进来,创建窗口,水位线为1s-3s(延迟3s)
- s1,10,10进来,水位线为10-3 =7s,还未到达10,窗口不触发(若是有序流,无等待下,此时窗口已被触发了)
- 此时进来一条乱序数据,比如s1,6,6,6-3=3s,水位线保持上面的7不变,watermark不会推进,且6这条数据也会被统计在
[0,10)
的区间内 - s1,11,11进来,11-3=8,也不会触发,但这条数据是属于
[10,20)
区间的那个桶的 - s1,13,13进来,达到10,窗口触发
4、自定义周期性水位线生成器
上面只是定义了时间戳的提取逻辑,水位线的生成采用的默认内置策略。接下来自定义水位线生成器:周期性水位生成器。
周期性生成器是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发射生成的水位线
// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());// 定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成器.<WaterSensor>forGenerator(context -> MyPeriodWatermarkGenerator<>(3000L))// 1.2 指定时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}}
模仿前面的内置生成器,定义自己的水位线生成器:
public class MyPeroidWatermarkGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳//构造方法,传入延迟时间,构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime = delayTime;this.maxTs = Long.MIN_VALUE + this.delayTime + 1;}/*** 每条数据进来都调用一次,用来提取最大的事件事件*/@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp);}/*** 周期性调用,默认20ms*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System,out,println("调用了onPeriodicEmit方法,生成watermark==" + (maxTimestamp - delayTs - 1) );}}
核心部分,指定水位线生成器的Lamdba表达式展开就是:
运行:
- 数据没进来前,每200ms调用一次发射水位线的方法,此时的水位线是构造方法里Long.MIN_VALUE那个
- 进来一条数据,调用onEvent,最大时间戳被更新,到周期后再发射水位线
maxTs-delayTs-1
- 继续周期性调用onPeriodicEmit方法
onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了,这个方法由系统框架周期性地调用,默认200ms一次
修改默认的周期,比如改为400ms:
env.getConfig().setAutoWatermarkInterval(400L);
5、自定义断点式水位线生成器
断点式生成器会不停地检测onEvent()中的事件,发现带有水位线信息的当事件时,就立即发出水位线。改下代码,定义水位线生成器:
public class PointWatermarkGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳//构造方法,传入延迟时间,构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime = delayTime;this.maxTs = Long.MIN_VALUE + this.delayTime + 1;}/*** 每条数据进来都调用一次,用来提取最大的事件事件*/@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳// 发射水位线output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System.out.println("调用了onEvent方法,获取目前为止最大的时间戳=" + maxTimestamp + ",生成watermark==" + (maxTimestamp - delayTs - 1));}/*** 周期性调用,默认20ms*/@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}}
周期性代码改为:
//...// 定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成器.<WaterSensor>forGenerator(context -> PointWatermarkGenerator<>(3000L))// 1.2 指定时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒return element.getTs() * 1000L;});
运行:此时不再周期性的发射水位线
6、从数据源中发送水位线
在自定义的数据源中抽取事件时间,然后发送水位线:
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)//注意fromSorce方法的第二个传参,之前用的WatermarkStrategy.noWatermark()
注意此时不用再assignTimestampsAndWatermarks了,在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一
相关文章:

【API篇】十、生成Flink水位线
文章目录 1、水位线的生成原则2、有序流内置水位线3、乱序流内置水位线4、自定义周期性水位线生成器5、自定义断点式水位线生成器6、从数据源中发送水位线 1、水位线的生成原则 水位线出现,即代表这个时间之前的数据已经全部到齐,之后不会再出现之前的数…...

【Javascript】弹出框
目录 警告框 确认框 提示框 警告框 alert(你好); 确认框 var isConfirm confirm(请确认) console.log( isConfirm); 提示框...

NSS [鹤城杯 2021]EasyP
NSS [鹤城杯 2021]EasyP 直接给了源码 <?php include utils.php;if (isset($_POST[guess])) {$guess (string) $_POST[guess];if ($guess $secret) {$message Congratulations! The flag is: . $flag;} else {$message Wrong. Try Again;} }if (preg_match(/utils\.p…...
mysql用户及权限管理(InsCode AI 创作助手)
MySQL是一个广泛使用的开源关系型数据库管理系统,用于存储和管理大量数据。对于那些需要使用MySQL的管理员和开发人员来说,用户权限管理是确保数据库安全性的至关重要的一环。在本篇技术博客中,我们将深入探讨MySQL的用户权限管理,…...

命令模式——让程序舒畅执行
● 命令模式介绍 命令模式(Command Pattern),是行为型设计模式之一。命令模式相对于其他的设计模式来说并没有那么多条条框框,其实并不是一个很“规矩”的模式,不过,就是基于一点,命令模式相对于…...

GZ035 5G组网与运维赛题第3套
2023年全国职业院校技能大赛 GZ035 5G组网与运维赛项(高职组) 赛题第3套 一、竞赛须知 1.竞赛内容分布 竞赛模块1--5G公共网络规划部署与开通(35分) 子任务1:5G公共网络部署与调试(15分) 子…...

071:mapboxGL上传含shp的zip文件,在map上解析显示图形
第071个 点击查看专栏目录 本示例是介绍演示如何在vue+mapbox中上传含有shp文件的zip,在地图上显示图形。这里先通过上传解压解析,转换生成geojson文件,然后在地图上渲染图形。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例效果所用的zip文…...

python下拉框选择测试
把下拉选择的值得打印出来: import tkinter as tk def on_select(event): # 当选择下拉框中的一项时,此函数将被调用 selected event.widget.cget("text") # 获取选中的文本 print(f"You selected: {selected}") # 打印选中…...

即时编译器JIT
类编译加载执行过程 如下图所示,一个Java代码从编译到运行大抵会经历以下几个过程。具体每个过程笔者会在下文站展开讨论。 类编译 首先是类编译阶段,这个阶段会将Java文件变为class文件,这个class文件包含一个常量池和方法表集合…...

npm更新包时This operation requires a one-time password.
[访问我的npm包](mhfwork/yt-ui - npm) 更新npm包时出现 This operation requires a one-time password.是因为需要认证 解决办法 1. 点击红线处的链接 2. 进入npm官网获取指定秘钥 3. 再次填入 one-time password 即可...

C++类模板再学习
之前已经学习了C类模板;类模板的写法和一般类的写法有很大的差别;不容易熟悉;下面再做一遍; 做一个椭圆类,成员有长轴长度和短轴长度; // ellipse.h: interface for the ellipse class. // //#if !define…...

华为终端智能家居应用方案
PLC-IoT概述 华为智能PLC-IoT工业物联网系列通信模块是基于电力线宽带载波技术的产品,实现数据在电力线上双向、高速、稳定的传输,广泛适用于电力、交通、工业制造、智能家居等领域,PLC-IoT通信模块包含头端和尾端两种类型,头端配…...
PHP下载文件
/***文件下载*param $filepath源文件路径 */function dwon_file($filepath){if(file_exists($filepath)){header(content-type:text/html;charsetutf8);header(Content-Description: File Transfer);header(Content-Type: application/octet-stream);header(Content-Dispositio…...

38基于matlab的期货预测,利用PSO优化SVM和未优化的SVM进行对比,得到实际输出和期望输出结果。
基于matlab的期货预测,利用PSO优化SVM和未优化的SVM进行对比,得到实际输出和期望输出结果。线性核函数、多项式、RBF核函数三种核函数任意可选,并给出均方根误差,相对误差等结果,程序已调通,可直接运行。 3…...
【Codeforces】 CF582D Number of Binominal Coefficients
题目链接 CF方向 Luogu方向 题目解法 看到 p α ∣ ( n k ) p^{\alpha} | \binom{n}{k} pα∣(kn) ,首先想到 k u m m e r kummer kummer 定理,那么限制即为 n − k n-k n−k 和 k k k 做加法在 p p p 进制下的进位数 ≥ α \ge \alpha ≥α …...

sql第二次上机作业
1查找借阅了ISBN为“4-6045-1023-4”的借书证号,读者姓名,专业名和借书时间 use tsgl go select Reader.Lno,Rname,Spec,Lend.Bordate FROM Reader,Lend WHERE Reader.LnoLend.Lno AND ISBN 4-6045-1023-42查找借阅了《数据库原理》一书的借阅信息&…...

辅助驾驶功能开发-功能规范篇(22)-3-L2级辅助驾驶方案功能规范
1.3.3 TLA系统功能定义 1.3.3.1 状态机 1.3.3.2 状态迁移图 1.3.3.3 功能定义 1.3.3.3.1 信号需求列表 1.3.3.3.2 系统开启关闭 1)初始化 车辆上电后,交通灯辅助系统(TLA)进行初始化,控制器需在 220ms 内发出第一帧报文,并在 3s 内完成内部自检,同时上电 3s 内不进行…...
Python基础入门例程16-NP16 发送offer(列表)
目录 描述 输入描述: 输出描述: 解答 : 说明: 描述 某公司在面试结束后,创建了一个依次包含字符串 Allen 和 Tom 的列表offer_list,作为通过面试的名单。 请你依次对列表中的名字发送类似 Allen, you…...
Web前端面试之Vue—对Vue的理解
目录 一、web发展历程 二、vue是什么 三、Vue核心特性 组件化 数据驱动 指令 四、Vue与Angular以及React的区别 一、web发展历程 Web是World Wide Web的简称,中文译为万维网 我们可以将它规划成如下的几个时代来进行理解 静态网页:最早的网页是没…...

C/C++晶晶赴约会 2020年12月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析
目录 C/C晶晶赴约会 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C/C晶晶赴约会 2020年12月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 晶晶的朋友贝贝约晶晶下周一起去看展览࿰…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...

如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

dify打造数据可视化图表
一、概述 在日常工作和学习中,我们经常需要和数据打交道。无论是分析报告、项目展示,还是简单的数据洞察,一个清晰直观的图表,往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server,由蚂蚁集团 AntV 团队…...

初学 pytest 记录
安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...
scikit-learn机器学习
# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...
前端工具库lodash与lodash-es区别详解
lodash 和 lodash-es 是同一工具库的两个不同版本,核心功能完全一致,主要区别在于模块化格式和优化方式,适合不同的开发环境。以下是详细对比: 1. 模块化格式 lodash 使用 CommonJS 模块格式(require/module.exports&a…...