Flink -- 事件时间 Watermark
1、事件时间:
指的是数据产生的时间或是说是数据发生的时间。
在Flink中有三种时间分别是:
Event Time:事件时间,数据产生的时间,可以反应数据真实发生的时间
Infestion Time:事件接收时间
Processing Time:事件处理时间

为什么会提出事件时间这个概念?
因为当使用Processing Time(事件的处理时间)来对数据进行处理,此时数据可能会乱序,没有办法还原数据本身的时间顺序,这种情况在Flink中会可能导致数据丢失,如果使用事件时间,它会根据事件真实发生的时间对数据排序,就不会出现数据乱序的情况。
总结来说,数据产生的时间就是事件时间,现实中实时的时间就是事件的处理时间
2、Processing Time 事件处理时间
处理时间是接收数据过后对数据操作的时间。处理时间的会按照实时的时间触发。
public class Demo03ProcessingTime {public static void main(String[] args) throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}
3、事件时间:
数据产生的时间就是事件时间,不过在使用的时候使用的是时间戳。需要注意的是数据的时间与现实的时间是不一致的。
在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)
java,1699035731000
java,1699035732000
java,1699035735000
java,1699035733000
java,1699035736000
java,1699035737000
java,1699035740000例如上述:数据总共有两个部分组成,前面是单词,后面的是单词数据产生的时间戳
public class Demo04EventTime {public static void main(String[] args) throws Exception{/*** 需求:统计5秒内的单词的数量,使用的是事件时间滚动窗口* 触发的条件是事件时间5秒*///构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//需要并行度改成一env.setParallelism(1);//使用socket模拟实时的环境DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);/*** java,1699035731000* java,1699035732000* java,1699035735000* java,1699035733000* java,1699035736000* java,1699035737000* java,1699035740000*///此时的数据的格式并不是某一个单词,需要告诉flink哪一个是事件时间//首先对数据进行格式处理SingleOutputStreamOperator<Tuple2<String, Long>> kvDS = lineDS.map(line -> {String[] split = line.split(",");String word = split[0];
// String time = split[1];long time1 = Long.parseLong(split[1]);return Tuple2.of(word, time1);}, Types.TUPLE(Types.STRING, Types.LONG));//告诉Flink,哪一个是事件的时间SingleOutputStreamOperator<Tuple2<String, Long>> assDS = kvDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定事件时间.withTimestampAssigner((kv, ts) -> kv.f1));//统计5秒钟的单词的数量DataStream<Tuple2<String, Integer>>keyByDS = assDS.map(kv -> Tuple2.of(kv.f0, 1),Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS1= keyByDS.keyBy(kv -> kv.f0);//开窗WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS1.window(TumblingEventTimeWindows.of(Time.seconds(5)));//对单词的数量进行统计SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);//打印数据countDS.print();//执行Flink的环境env.execute();}
}
1、基于事件时间来说,触发窗口的条件:
1、水位线需要大于等于窗口的结束时间2、窗口里面要存在数据3、窗口的划分时间是从1970年1月1日0时0分0秒开始的,按照窗口的大小轮替
4、水位线(watermark):默认是等于最新的一条数据的时间戳

5、在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)
解决方法:将水位线向后推移
假设一个时间窗口是5秒,如果将此时的水位线向后推移5秒,假设4进入的时候,此时的水位线就变成-3,但是此时就不满足触发窗口的条件,此时假设遗漏的数据是3,此时的水位线依旧是小于窗口的时间,依旧不会触发窗口。
但是不能完全的保证数据不丢失,推移的时间越久,对于Flink的延迟就会越大。
1、在Flink中是默认使用的是单调递增的时间戳分配器:在没有乱序情况下,默认水位线是等于最新的一条数据的时间戳
//1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、指定水位线等于时间最新一条数据的时间戳,数据不存在乱序的时候使用,如果数据乱序,可能会丢失数据.<Tuple2<String, Long>>forMonotonousTimestamps()//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
2、数据之间存在最大固定延迟的时间戳分配器:在乱序的情况下,就水位线先后推移固定的时间(是以最新的一条数据的时间戳为标准的)
//1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、水位线生成方式:最新一条数据的时间戳减去5秒,会导致计算延迟触发.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
6、水位线的生成:

上图表示的是以Flink的流程图,图中总共有两个并行度,每一个Task上面都带着任务的时间,在Flink中,会将任务的时间向后传递,当途中上游map(1)将任务时间传递给下游window(1)时,下面的上游map(2)也会任务时间传递给下游window(1)(上游的任务是并行的),此时下游window(1)就会产生两个任务时间,此时就会选择时间最小的时间的作为水位线。因为当选择时间大的作为水位线,那么对于时间较小的数据可能会丢失。
1、水位线对齐:
因为上游的任务是并行执行的,指的时对于上游的所有的Task的水位线都需要逐步的向后推移。
相关文章:
Flink -- 事件时间 Watermark
1、事件时间: 指的是数据产生的时间或是说是数据发生的时间。 在Flink中有三种时间分别是: Event Time:事件时间,数据产生的时间,可以反应数据真实发生的时间 Infestion Time:事件接收时间 Processing Tim…...
Django框架简介
文章目录 Django框架介绍MVC与MVT模型MVCMTV 版本问题运行django注意事项 Django的下载与基本命令下载Django方式一:在命令界面使用pip安装方式二:使用pycharm安装 Django的基础命令命令行操作pycharm操作 Django项目命令行操作与Pycharm操作的区别应用D…...
把wpf的窗体保存为png图片
昨晚在stack overflow刷问题时看到有这个问题,今天早上刚好来尝试学习一下 stack overflow的链接如下: c# - How to render a WPF UserControl to a bitmap without creating a window - Stack Overflow 测试步骤如下: 1 新建.net frame…...
2023NOIP A层联测28-大眼鸹猫
给你两个长度为 n n n 的序列 a , b a,b a,b,这两个序列都是单调不降的。 你可以对 a a a 进行不超过 m m m 次操作,每次操作你可以选择一个 i i i 满足 1 ≤ i ≤ n 1\le i\le n 1≤i≤n,然后选择一个整数(可以是负数&…...
电机应用-直流有刷电机
目录 直流有刷电机 工作原理 直流有刷减速电机的重要参数 电路原理与分析 驱动芯片分析 L298N驱动芯片 直流有刷减速电机控制实现 控制速度原理 硬件设计 L298N 野火直流有刷电机驱动板-MOS管搭建板 软件设计1:两个直流有刷减速电机按键控制 开发设计 …...
BIM、建筑机器人、隧道工程施工关键技术
一、BIM简介 (一)BIM概念 BIM(Building Information Modeling),建筑信息模型。该技术通过数字化手段,在计算机中建立虚拟建筑,该虚拟建筑提供从单一到完整、包含逻辑关系的建筑信息库。信息库…...
快速了解什么是跳跃表(skip list)
什么是跳跃表(skip list) 跳跃表(Skip List)是一种概率性的数据结构,它通过在多层链表的基础上添加“快速通道”来提高搜索效率。跳跃表的效率可以与平衡树相媲美,即在平均和最坏的情况下,查找…...
【Node.js入门】1.1Node.js 简介
Node.js入门之—1.1Node.js 简介 文章目录 Node.js入门之—1.1Node.js 简介什么是 Node.js错误说法 Node.js 的特点跨平台三方类库自带http服务器非阻塞I/O事件驱动单线程 Node.js 的应用场合适合用Node.js的场合不适合用Node.js的场合弥补Node.js不足的解决方案 什么是 Node.j…...
数据库 高阶语句
目录 数据库 高阶语句 使用select 语句,用order by来对进行排序 区间判断查询和去重查询 如何对结果进行分组查询group by语句 limit 限制输出的结果记录,查看表中的指定行 通配符 设置别名:alias 简写就是 as 使用select 语句&#x…...
jenkins Java heap space
jenkins Java heap space,是内存不够。 两个解决方案: 一,修改配置文件 windows系统中,找到Jenkins的安装路径, 修改jenkins.xml 将 -Xmx256m 改为 -Xmx1024m 或者更大 重启jenkins服务。 二,jenkins增…...
OpenCV校准棋盘集合
棋盘格可以与相机校准工具一起使用,例如ROS的camera_calibration包。您可以通过单击下面的任何链接免费下载 PDF 格式的各种棋盘,没有水印或广告。此外,还添加了基于 JavaScript 的棋盘生成器,允许您生成自定义尺寸。 提示&#…...
使用git将本地项目推送到远程仓库github
总结:本地项目通过git上传到github 1)、在本地创建一个版本库(即文件夹),通过 git init 把它变成Git仓库; 2)、把项目复制到这个文件夹里面,再通过 git add . 把项目添加到仓库; 3)、再通过 gi…...
Mybatis-Plus使用Wrapper自定义SQL
文章目录 准备工作Mybatis-Plus使用Wrapper自定义SQL注意事项目录结构如下所示domain层Controller层Service层ServiceImplMapper层UserMapper.xml 结果如下所示:单表查询条件构造器单表查询,Mybatis-Plus使用Wrapper自定义SQL联表查询不用,My…...
仿mudou库one thread one loop式并发服务器
目录 1.实现目标 2.HTTP服务器 实现高性能服务器-Reactor模型 模块划分 SERVER模块: HTTP协议模块: 3.项目中的子功能 秒级定时任务实现 时间轮实现 正则库的简单使用 通⽤类型any类型的实现 4.SERVER服务器实现 日志宏的封装 缓冲区Buffer…...
二十三种设计模式全面解析-组合模式与装饰器模式的结合:实现动态功能扩展
在前文中,我们介绍了组合模式的基本原理和应用,以及它在构建对象结构中的价值和潜力。然而,组合模式的魅力远不止于此。在本文中,我们将继续探索组合模式的进阶应用,并展示它与其他设计模式的结合使用,以构…...
智慧城市建设解决方案分享【完整】
文章目录 第1章 前言第2章 智慧城市建设的背景2.1 智慧城市的发展现状2.2 智慧城市的发展趋势 第3章 智慧城市“十二五”规划要点3.1 国民经济和社会发展“十二五”规划要点3.2 “十二五”信息化发展规划要点 第4章 大数据:智慧城市的智慧引擎4.1 大数据技术—智慧城…...
unity - Blend Shape - 变形器 - 实践
文章目录 目的Blend Shape 逐顶点 多个混合思路Blender3Ds maxUnity 中使用Project 目的 拾遗,备份 Blend Shape 逐顶点 多个混合思路 blend shape 基于: vertex number, vertex sn 相同,才能正常混合、播放 也就是 vertex buffer 的顶点数…...
asp.net core mvc之路由
一、默认路由 (Startup.cs文件) routes.MapRoute(name: "default",template: "{controllerHome}/{actionIndex}/{id?}" ); 默认访问可以匹配到 https://localhost:44302/home/index/1 https://localhost:44302/home/index https:…...
前端设计模式之【访问者模式】
文章目录 前言介绍实现优缺点应用场景后言 前言 hello world欢迎来到前端的新世界 😜当前文章系列专栏:前端设计模式 🐱👓博主在前端领域还有很多知识和技术需要掌握,正在不断努力填补技术短板。(如果出现错误&#…...
通过docker-compose部署elk日志系统,并使用springboot整合
ELK是一种强大的分布式日志管理解决方案,它由三个核心组件组成: Elasticsearch:作为分布式搜索和分析引擎,Elasticsearch能够快速地存储、搜索和分析大量的日志数据,帮助用户轻松地找到所需的信息。 Logstash…...
Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...
STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验
一、多模态商品数据接口的技术架构 (一)多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如,当用户上传一张“蓝色连衣裙”的图片时,接口可自动提取图像中的颜色(RGB值&…...
HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...
GraphQL 实战篇:Apollo Client 配置与缓存
GraphQL 实战篇:Apollo Client 配置与缓存 上一篇:GraphQL 入门篇:基础查询语法 依旧和上一篇的笔记一样,主实操,没啥过多的细节讲解,代码具体在: https://github.com/GoldenaArcher/graphql…...
倒装芯片凸点成型工艺
UBM(Under Bump Metallization)与Bump(焊球)形成工艺流程。我们可以将整张流程图分为三大阶段来理解: 🔧 一、UBM(Under Bump Metallization)工艺流程(黄色区域ÿ…...
PLC入门【4】基本指令2(SET RST)
04 基本指令2 PLC编程第四课基本指令(2) 1、运用上接课所学的基本指令完成个简单的实例编程。 2、学习SET--置位指令 3、RST--复位指令 打开软件(FX-TRN-BEG-C),从 文件 - 主画面,“B: 让我们学习基本的”- “B-3.控制优先程序”。 点击“梯形图编辑”…...
