详解 Flink 的时间语义和 watermark
一、Flink 时间语义类型

- Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳
- Ingestion Time :是数据进入 Flink 的时间
- Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time
二、EventTime 引入
Flink 默认是按照 ProcessingTime 来处理数据的
/**在 Flink 的流式处理中,绝大部分情况推荐使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 Ing estionTime 。使用 EventTime ,需要先引入 EventTime 的时间属性
*/
public class EventTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//引入 EvenetTime//TimeCharacteristic 是一个枚举类,有 ProcessingTime、IngestionTime 和 EventTime 三个属性env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);}
}
三、Watermark
1. 数据乱序情况

- 正常情况下,Flink 接收到的事件应该要是按照事件的产生时间 (EventTime) 的先后顺序排列的
- 实际情况下,事件从产生到进入 source 再到触发 operator,其中间是有一个过程和时间的,而且由于网络、分布式等原因会造成 Flink 接收到的事件的先后顺序不是严格按照事件的 EventTime 顺序排列的,即所谓的乱序数据
- 乱序数据的问题会造成窗口触发关闭的时间混乱,计算不准确
- Flink 处理乱序数据的机制:Watermark + allowedLateness + sideOutputLateData
2. Watermark 介绍
- Watermark 是一种使用延迟触发 window 执行来处理乱序数据的机制
- 原理:当设置 Watermark = t 时 (即延迟时长为 t),则 Flink 每一次都会获取已经到达的数据中的最大的 EventTime,然后判断 maxEventTime - t 是否等于某一个窗口的触发时间,如果相等则认为属于这个窗口的所有数据都已经到达,这个窗口被触发执行关闭,也可能存在数据丢失
- 在数据有序的流中,相当于 Watermark = 0,即已经到达的数据中的最大的 EventTime 等于某一个窗口的触发时间,则这个窗口被触发执行关闭
- 一般将 Watermark 设置为乱序数据流中最大的迟到时间差
3. Watermark 特点和行为
- 水位线 (Watermark) 是作为一个特殊的数据插入到数据流中的一个标记
- 水位线 (Watermark) 在 Flink 程序中是一个常量类,有一个时间戳属性,用来表示当前事件时间的进展
- 水位线 (Watermark) 是基于数据的 EventTime 时间戳生成的
- 水位线 (Watermark) 的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
4. Watermark 在任务间的传递
任务并行度不为 1;Watermark 设置的位置越靠近 Source 端越好

- 一个任务会接收上游多个并行任务的数据,也会向下游多个并行任务发送数据
- 从上游多个并行任务接收 Watermark:使用 Partition WM 分别存储接收到的不同分区任务的 Watermark,并以其中最小的 Watermark 作为自己当前的事件时间
- 向下游多个并行任务发送 Watermark:采取广播的分区策略,向下游的每一个任务都发送一份 Watermark,如果后续 Watermark 没有变更则不会重复发送
5. Watermark 引入
5.1 核心代码
/**方法签名:DataStream.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T>)DataStream.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T>)参数:1.AssignerWithPeriodicWatermarks:继承 TimestampAssigner 接口,周期性的生成 watermark,常用实现类为:BoundedOutOfOrdernessTimestampExtractor 和 AscendingTimestampExtractor2.AssignerWithPunctuatedWatermarks:继承 TimestampAssigner 接口,间断式地生成 watermark
*/
public class WatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//引入 EvenetTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<String> dataStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> inputStream = dataStream.map(new MapFunction<SensorReading>() {@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//有序数据设置事件时间戳(毫秒数)和watermark//不需要传递watermark延迟时间,默认是当前事件时间戳 - 1ms 作为watermarkinputStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {@Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//乱序数据设置事件时间戳(毫秒数)和watermark//BoundedOutOfOrdernessTimestampExtractor 构造方法必须传入watermark延迟时间//生成的watermark时间戳 = 当前所有事件的最大时间戳 - 延迟时间inputStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});env.execute();}
}
5.2 AssignerWithPeriodicWatermarks
系统会周期性地生成 watermark 并插入到数据流中,默认周期是 200 毫秒
/**设置watermark生成周期:env.getConfig.setAutoWatermarkInterval(milliseconds);产生watermark的逻辑:每隔 0.2 秒钟,Flink 会调用 AssignerWithPeriodicWatermarks 的 getCurrentWatermark() 方法获取一个时间戳,如果大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的 watermark自定义watermark周期生成器:实现 AssignerWithPeriodicWatermarks 接口,并重写 getCurrentWatermark 和 extractTimestamp 方法
*/
public class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading> {private Long bound = 60 * 1000L; // watermark延迟时间private Long maxTs = Long.MIN_VALUE; // 当前最大时间戳@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(maxTs - bound);}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {maxTs = Math.max(maxTs, element.getTimestamp()); //获取当前最大的事件时间戳return element.getTimestamp();}
}
5.3 AssignerWithPunctuatedWatermarks
间断式地生成 watermark,可以根据需要对每条数据进行条件判断筛选来确定是否生成 watermark
public class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReading> {private Long bound = 60 * 1000L; // 延迟时间@Nullable@Overridepublic Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {if(lastElement.getId().equals("sensor_1")) {return new Watermark(extractedTimestamp - bound);} else {return null;}}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {return element.getTimestamp();}
}
四、EventTime 的 window 操作
1. 滚动时间窗口操作
/**需求:统计 15 秒内的最小温度值,设置 2 秒的延迟
*/
public class TumblingEventTimeWindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);/*sensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1sensor_1,1547718207,36.3sensor_1,1547718209,32.8sensor_1,1547718212,37.1...*/DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<SensorReading>() {@Overridepublic SensorReading map(String value) {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//开窗聚合SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).minBy("temperature");minTempStream.print("minTemp");/**输出的结果分析:1.在接收到 sensor_1,1547718212,37.1 时,触发了一个窗口关闭,此时数据的 EventTime 为 1547718212,由于 watermark 延迟时间设置为 2,所以该窗口触发关闭的时间戳为 1547718212 - 2 = 1547718210,该窗口的范围为 [1547718195,1547718210)2.当前第一个窗口是 [1547718195,1547718210),其起始点的确定规则为:2.1 滚动时间窗口使用的窗口分配器为 TumblingEventTimeWindows 类2.2 TumblingEventTimeWindows 的 assignWindows 方法中调用 getWindowStartWithOffset 方法获取起始点2.3 getWindowStartWithOffset(timestamp, offset, windowSize):方法逻辑为 timestamp - (timestamp - offset + windowSize) % windowSize,默认 offset 为 0,所以最终得到的起始点应该是 windowSize 的整数倍,在本例中的起始点为 1547718199 - (1547718199-0+15)%15 = 15477181953.偏移量 offset:一般是用来处理不同时区的数据*/env.execute();}
}
2. 迟到数据处理
/**需求:统计 15 秒内的最小温度值,设置 2 秒的延迟,并允许 1 分钟的迟到数据,1 分钟后的数据写入侧输出流
*/
public class TumblingEventTimeWindowDelayTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<SensorReading>() {@Overridepublic SensorReading map(String value) {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late"){};//开窗聚合SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1));.sideOutputLateData(outputTag).minBy("temperature");minTempStream.print("minTemp");minTempStream.getSideOutput(outputTag).print("late");/**依次输入数据:sensor_1,1547718199,35.8sensor_1,1547718206,36.3sensor_1,1547718210,34.7sensor_1,1547718211,31sensor_1,1547718209,34.9sensor_1,1547718212,37.1sensor_1,1547718213,33sensor_1,1547718206,34.2sensor_1,1547718202,36...sensor_1,1547718272,34sensor_1,1547718203,30.6输出的结果分析:1.在接收到 sensor_1,1547718212,37.1 时,触发 [1547718195,1547718210) 窗口执行,此时输出数据 sensor_1,1547718209,34.9,此时 2 秒内的延迟数据能被处理 2.在接收到 sensor_1,1547718206,34.2 时,由于设置了允许 1 分钟迟到,所以 [1547718195,1547718210) 窗口仍然没有关闭,此时会更新数据为 sensor_1,1547718206,34.2,此时的系统时间戳为 1547718213 - 2 = 1547718211 - 1547718210 < 603.在接收到 sensor_1,1547718202,36 时,[1547718195,1547718210) 窗口仍然会更新输出一次数据 sensor_1,1547718206,34.24.在接收到 sensor_1,1547718272,34 时,属于 [1547718210,1547718225) 窗口的数据会输出 sensor_1,1547718211,31,此时的系统时间戳为 1547718272 - 2 = 1547718270,由于 1547718270 - 1547718210 >= 60,所以 [1547718195,1547718210) 窗口会真正的关闭5.在之后接收到 sensor_1,1547718203,30.6 时,会把数据输出到侧输出流中*/env.execute();}
}
相关文章:
详解 Flink 的时间语义和 watermark
一、Flink 时间语义类型 Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳Ingestion Time :是数据进入 Flink…...
Unreal Engine项目结构与关卡设置详解
引言 Unreal Engine 是一款功能强大的游戏引擎,为开发者提供了丰富的工具来创建和管理游戏项目。本文将详细介绍一个基本的 Unreal Engine 项目结构,并讲解如何在 Unreal 编辑器中进行关卡设置与操作。 Unreal Engine 项目结构 一个基本的 Unreal Eng…...
Access数据中的SQL偏移注入
使用场景: 目标数据表的字段较多,无法一一获取的时候,尝试使用偏移注入的方式实现SQL注入。 原理: 例如:一个表有6个字段,而你想获取的目标表admin的字段不知道,此时可以使用联合查询的方式获…...
Unity 编辑器扩展,获取目录下所有的预制件
先看演示效果 实现方案 1创建几个用于测试的cube 2,创建一个Editor脚本 3,编写脚本内容 附上源码 using UnityEditor; using UnityEngine;public class GetPrefeb : EditorWindow {private string folderPath "Assets/Resources"; // 指定预…...
【Python】解决Python报错:ValueError: not enough values to unpack (expected 2, got 1)
文章目录 引言1. 错误详解2. 常见的出错场景2.1 函数返回值解包2.2 遍历含有不同长度元组的列表 3. 解决方案3.1 检查和调整返回值3.2 安全的解包操作 4. 预防措施4.1 使用异常处理4.2 单元测试 结语 引言 在Python编程中,ValueError 是一个常见的异常类…...
政安晨【零基础玩转各类开源AI项目】解析开源:gradio:改进真实虚拟试穿的扩散模型
政安晨的个人主页:政安晨 欢迎 👍点赞✍评论⭐收藏 收录专栏: 零基础玩转各类开源AI项目 希望政安晨的博客能够对您有所裨益,如有不足之处,欢迎在评论区提出指正! Gradio 是一个开源 Python 软件包,可以让你…...
深入解读Prometheus Adapter:云原生监控的核心组件
一、引言 Prometheus Adapter的背景与重要性 在现代的云原生架构中,微服务和容器化技术得到了广泛的应用。这些技术带来了系统灵活性和扩展性的提升,但同时也增加了系统监控和管理的复杂度。Prometheus作为一款开源的监控系统,因其强大的指标…...
【计算机视觉】数字图像处理基础:以像素为单位的图像基本运算(点运算、代数运算、逻辑运算、几何运算、插值)
0、前言 在上篇文章中,我们对什么是数字图像、以及数字图像的组成(离散的像素点)进行了讲解🔗【计算机视觉】数字图像处理基础知识:模拟和数字图像、采样量化、像素的基本关系、灰度直方图、图像的分类。 我们知道&a…...
Spring Boot整合WebSocket和Redis实现直播间在线人数统计功能
😄 19年之后由于某些原因断更了三年,23年重新扬帆起航,推出更多优质博文,希望大家多多支持~ 🌷 古之立大事者,不惟有超世之才,亦必有坚忍不拔之志 🎐 个人CSND主页——Mi…...
uniapp自定义的下面导航
uniapp自定义的下面导航 看看效果图片吧 文章目录 uniapp自定义的下面导航 看看效果图片吧  前言一、写组件、我这里就没有写组件了直接写了一个页面?总结 前言 在…...
【Python】selenium使用find_element时解决【StaleElementReferenceException】问题的方法
StaleElementReferenceException 是 Selenium WebDriver 中的一种异常,通常在元素与当前页面的状态不同步时抛出,比如页面已经刷新或导航到另一个页面,但是尝试操作的元素引用仍然是旧页面上的元素。 以下是一些解决 StaleElementReferenceE…...
Apache IoTDB 分布式架构三部曲(三)副本与共识算法
IoTDB 首创并应用的共识协议统一框架,为用户提供了灵活选择不同共识算法的可能性。 对于一个分布式集群而言,为了使得海量数据场景下集群能够横向扩展,集群需要按照一定的规则将全部数据分成多个子集存储在不同的节点上,从而能够更…...
数据挖掘--聚类分析:基本概念和方法
数据挖掘--引论 数据挖掘--认识数据 数据挖掘--数据预处理 数据挖掘--数据仓库与联机分析处理 数据挖掘--挖掘频繁模式、关联和相关性:基本概念和方法 数据挖掘--分类 数据挖掘--聚类分析:基本概念和方法 聚类分析 聚类分析是把一个数据对象&…...
APP单页分发源码下载安卓苹果自动识别apk描述文件免签自动安装
下载地址:APP单页分发源码下载安卓苹果自动识别apk描述文件免签自动安装...
golang定时器使用示例
1.定时器创建与停止 //定时器使用t1 : time.NewTimer(2 * time.Second)<-t1.Cfmt.Println("timer1 fired")t2 : time.NewTimer(5 * time.Second)go func() {fmt.Println("go协程处理中,等待5秒后输出...")<-t2.Cfmt.Println("timer2 fired&quo…...
[FSCTF 2023]Tea_apk
得到密文和密钥 import base64 from ctypes import c_uint32import libnumDELTA 0x9E3779B9def decrypt(v, n, k):rounds 6 int(52 / n)sum c_uint32(rounds * DELTA)y v[0].valuewhile rounds > 0:e (sum.value >> 2) & 3p n - 1while p > 0:z v[p …...
分享一个用python写的本地WIFI密码查看器
本章教程,主要分享一个本地wifi密码查看器,用python实现的,感兴趣的可以试一试。 具体代码 import subprocess # 导入 subprocess 模块,用于执行系统命令 import tkinter as tk # 导入 tkinter 模块,用于创建图形用…...
【SkyWalking】启用apm-trace-ignore-plugin追踪忽略插件
背景 使用Agent采集追踪数据的时候,想排除某些路径,比如健康检查等,这样可以减少上报的数据,也可以去除一些不必要的干扰数据。 加载插件 在agent/optional-plugins目录中有个apm-trace-ignore-plugin-${version}.jar插件&…...
独立游戏之路 -- 获取OAID提升广告收益
Unity 之 获取手机:OAID、IMEI、ClientId、GUID 前言一、Oaid 介绍1.1 Oaid 说明1.2 移动安全联盟(MSA) 二、站在巨人的肩膀上2.1 本文实现参考2.2 本文实现效果2.3 本文相关插件 三、Unity 中获取Oaid3.1 查看实现源码3.2 工程配置3.3 代码实现3.4 场景搭建 四、总…...
反转链表 (oj题)
一、题目链接 https://leetcode.cn/problems/reverse-linked-list/submissions/538124207 二、题目思路 1.定义三个指针,p1先指向NULL p2指向头结点 p3指向第二个结点 2.p2的next指向p1。然后移动指针,p1来到p2的位置,p2来到p3的位置&…...
Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...
12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...
CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云
目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)
本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...
