当前位置: 首页 > news >正文

Flink -- 事件时间 Watermark

1、事件时间:

        指的是数据产生的时间或是说是数据发生的时间。

在Flink中有三种时间分别是:

                Event Time:事件时间,数据产生的时间,可以反应数据真实发生的时间

                Infestion Time:事件接收时间

                Processing Time:事件处理时间

为什么会提出事件时间这个概念?

        因为当使用Processing Time(事件的处理时间)来对数据进行处理,此时数据可能会乱序,没有办法还原数据本身的时间顺序,这种情况在Flink中会可能导致数据丢失,如果使用事件时间,它会根据事件真实发生的时间对数据排序,就不会出现数据乱序的情况。

总结来说,数据产生的时间就是事件时间,现实中实时的时间就是事件的处理时间

2、Processing Time 事件处理时间

处理时间是接收数据过后对数据操作的时间。处理时间的会按照实时的时间触发。

public class Demo03ProcessingTime {public static void main(String[] args)  throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}
3、事件时间:

数据产生的时间就是事件时间,不过在使用的时候使用的是时间戳。需要注意的是数据的时间与现实的时间是不一致的。

在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)

java,1699035731000
java,1699035732000
java,1699035735000
java,1699035733000
java,1699035736000
java,1699035737000
java,1699035740000例如上述:数据总共有两个部分组成,前面是单词,后面的是单词数据产生的时间戳


public class Demo04EventTime {public static void main(String[] args) throws Exception{/*** 需求:统计5秒内的单词的数量,使用的是事件时间滚动窗口* 触发的条件是事件时间5秒*///构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//需要并行度改成一env.setParallelism(1);//使用socket模拟实时的环境DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);/*** java,1699035731000* java,1699035732000* java,1699035735000* java,1699035733000* java,1699035736000* java,1699035737000* java,1699035740000*///此时的数据的格式并不是某一个单词,需要告诉flink哪一个是事件时间//首先对数据进行格式处理SingleOutputStreamOperator<Tuple2<String, Long>> kvDS = lineDS.map(line -> {String[] split = line.split(",");String word = split[0];
//            String time = split[1];long time1 = Long.parseLong(split[1]);return Tuple2.of(word, time1);}, Types.TUPLE(Types.STRING, Types.LONG));//告诉Flink,哪一个是事件的时间SingleOutputStreamOperator<Tuple2<String, Long>> assDS = kvDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定事件时间.withTimestampAssigner((kv, ts) -> kv.f1));//统计5秒钟的单词的数量DataStream<Tuple2<String, Integer>>keyByDS = assDS.map(kv -> Tuple2.of(kv.f0, 1),Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS1= keyByDS.keyBy(kv -> kv.f0);//开窗WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS1.window(TumblingEventTimeWindows.of(Time.seconds(5)));//对单词的数量进行统计SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);//打印数据countDS.print();//执行Flink的环境env.execute();}
}
        1、基于事件时间来说,触发窗口的条件:
    1、水位线需要大于等于窗口的结束时间2、窗口里面要存在数据3、窗口的划分时间是从1970年1月1日0时0分0秒开始的,按照窗口的大小轮替
4、水位线(watermark):默认是等于最新的一条数据的时间戳

5、在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)
        解决方法:将水位线向后推移

假设一个时间窗口是5秒,如果将此时的水位线向后推移5秒,假设4进入的时候,此时的水位线就变成-3,但是此时就不满足触发窗口的条件,此时假设遗漏的数据是3,此时的水位线依旧是小于窗口的时间,依旧不会触发窗口。

但是不能完全的保证数据不丢失,推移的时间越久,对于Flink的延迟就会越大。

        1、在Flink中是默认使用的是单调递增的时间戳分配器:在没有乱序情况下,默认水位线是等于最新的一条数据的时间戳
  //1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、指定水位线等于时间最新一条数据的时间戳,数据不存在乱序的时候使用,如果数据乱序,可能会丢失数据.<Tuple2<String, Long>>forMonotonousTimestamps()//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
        2、数据之间存在最大固定延迟的时间戳分配器:在乱序的情况下,就水位线先后推移固定的时间(是以最新的一条数据的时间戳为标准的)
  //1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、水位线生成方式:最新一条数据的时间戳减去5秒,会导致计算延迟触发.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
6、水位线的生成:

上图表示的是以Flink的流程图,图中总共有两个并行度,每一个Task上面都带着任务的时间,在Flink中,会将任务的时间向后传递,当途中上游map(1)将任务时间传递给下游window(1)时,下面的上游map(2)也会任务时间传递给下游window(1)(上游的任务是并行的),此时下游window(1)就会产生两个任务时间,此时就会选择时间最小的时间的作为水位线。因为当选择时间大的作为水位线,那么对于时间较小的数据可能会丢失。

        1、水位线对齐:

                因为上游的任务是并行执行的,指的时对于上游的所有的Task的水位线都需要逐步的向后推移。

相关文章:

Flink -- 事件时间 Watermark

1、事件时间&#xff1a; 指的是数据产生的时间或是说是数据发生的时间。 在Flink中有三种时间分别是&#xff1a; Event Time&#xff1a;事件时间&#xff0c;数据产生的时间&#xff0c;可以反应数据真实发生的时间 Infestion Time&#xff1a;事件接收时间 Processing Tim…...

Django框架简介

文章目录 Django框架介绍MVC与MVT模型MVCMTV 版本问题运行django注意事项 Django的下载与基本命令下载Django方式一&#xff1a;在命令界面使用pip安装方式二&#xff1a;使用pycharm安装 Django的基础命令命令行操作pycharm操作 Django项目命令行操作与Pycharm操作的区别应用D…...

把wpf的窗体保存为png图片

昨晚在stack overflow刷问题时看到有这个问题&#xff0c;今天早上刚好来尝试学习一下 stack overflow的链接如下&#xff1a; c# - How to render a WPF UserControl to a bitmap without creating a window - Stack Overflow 测试步骤如下&#xff1a; 1 新建.net frame…...

2023NOIP A层联测28-大眼鸹猫

给你两个长度为 n n n 的序列 a , b a,b a,b&#xff0c;这两个序列都是单调不降的。 你可以对 a a a 进行不超过 m m m 次操作&#xff0c;每次操作你可以选择一个 i i i 满足 1 ≤ i ≤ n 1\le i\le n 1≤i≤n&#xff0c;然后选择一个整数&#xff08;可以是负数&…...

电机应用-直流有刷电机

目录 直流有刷电机 工作原理 直流有刷减速电机的重要参数 电路原理与分析 驱动芯片分析 L298N驱动芯片 直流有刷减速电机控制实现 控制速度原理 硬件设计 L298N 野火直流有刷电机驱动板-MOS管搭建板 软件设计1&#xff1a;两个直流有刷减速电机按键控制 开发设计 …...

BIM、建筑机器人、隧道工程施工关键技术

一、BIM简介 &#xff08;一&#xff09;BIM概念 BIM&#xff08;Building Information Modeling&#xff09;&#xff0c;建筑信息模型。该技术通过数字化手段&#xff0c;在计算机中建立虚拟建筑&#xff0c;该虚拟建筑提供从单一到完整、包含逻辑关系的建筑信息库。信息库…...

快速了解什么是跳跃表(skip list)

什么是跳跃表&#xff08;skip list&#xff09; 跳跃表&#xff08;Skip List&#xff09;是一种概率性的数据结构&#xff0c;它通过在多层链表的基础上添加“快速通道”来提高搜索效率。跳跃表的效率可以与平衡树相媲美&#xff0c;即在平均和最坏的情况下&#xff0c;查找…...

【Node.js入门】1.1Node.js 简介

Node.js入门之—1.1Node.js 简介 文章目录 Node.js入门之—1.1Node.js 简介什么是 Node.js错误说法 Node.js 的特点跨平台三方类库自带http服务器非阻塞I/O事件驱动单线程 Node.js 的应用场合适合用Node.js的场合不适合用Node.js的场合弥补Node.js不足的解决方案 什么是 Node.j…...

数据库 高阶语句

目录 数据库 高阶语句 使用select 语句&#xff0c;用order by来对进行排序 区间判断查询和去重查询 如何对结果进行分组查询group by语句 limit 限制输出的结果记录&#xff0c;查看表中的指定行 通配符 设置别名&#xff1a;alias 简写就是 as 使用select 语句&#x…...

jenkins Java heap space

jenkins Java heap space&#xff0c;是内存不够。 两个解决方案&#xff1a; 一&#xff0c;修改配置文件 windows系统中&#xff0c;找到Jenkins的安装路径&#xff0c; 修改jenkins.xml 将 -Xmx256m 改为 -Xmx1024m 或者更大 重启jenkins服务。 二&#xff0c;jenkins增…...

OpenCV校准棋盘集合

棋盘格可以与相机校准工具一起使用&#xff0c;例如ROS的camera_calibration包。您可以通过单击下面的任何链接免费下载 PDF 格式的各种棋盘&#xff0c;没有水印或广告。此外&#xff0c;还添加了基于 JavaScript 的棋盘生成器&#xff0c;允许您生成自定义尺寸。 提示&#…...

使用git将本地项目推送到远程仓库github

总结&#xff1a;本地项目通过git上传到github 1)、在本地创建一个版本库&#xff08;即文件夹&#xff09;&#xff0c;通过 git init 把它变成Git仓库&#xff1b; 2)、把项目复制到这个文件夹里面&#xff0c;再通过 git add . 把项目添加到仓库&#xff1b; 3)、再通过 gi…...

Mybatis-Plus使用Wrapper自定义SQL

文章目录 准备工作Mybatis-Plus使用Wrapper自定义SQL注意事项目录结构如下所示domain层Controller层Service层ServiceImplMapper层UserMapper.xml 结果如下所示&#xff1a;单表查询条件构造器单表查询&#xff0c;Mybatis-Plus使用Wrapper自定义SQL联表查询不用&#xff0c;My…...

仿mudou库one thread one loop式并发服务器

目录 1.实现目标 2.HTTP服务器 实现高性能服务器-Reactor模型 模块划分 SERVER模块&#xff1a; HTTP协议模块&#xff1a; 3.项目中的子功能 秒级定时任务实现 时间轮实现 正则库的简单使用 通⽤类型any类型的实现 4.SERVER服务器实现 日志宏的封装 缓冲区Buffer…...

二十三种设计模式全面解析-组合模式与装饰器模式的结合:实现动态功能扩展

在前文中&#xff0c;我们介绍了组合模式的基本原理和应用&#xff0c;以及它在构建对象结构中的价值和潜力。然而&#xff0c;组合模式的魅力远不止于此。在本文中&#xff0c;我们将继续探索组合模式的进阶应用&#xff0c;并展示它与其他设计模式的结合使用&#xff0c;以构…...

智慧城市建设解决方案分享【完整】

文章目录 第1章 前言第2章 智慧城市建设的背景2.1 智慧城市的发展现状2.2 智慧城市的发展趋势 第3章 智慧城市“十二五”规划要点3.1 国民经济和社会发展“十二五”规划要点3.2 “十二五”信息化发展规划要点 第4章 大数据&#xff1a;智慧城市的智慧引擎4.1 大数据技术—智慧城…...

unity - Blend Shape - 变形器 - 实践

文章目录 目的Blend Shape 逐顶点 多个混合思路Blender3Ds maxUnity 中使用Project 目的 拾遗&#xff0c;备份 Blend Shape 逐顶点 多个混合思路 blend shape 基于&#xff1a; vertex number, vertex sn 相同&#xff0c;才能正常混合、播放 也就是 vertex buffer 的顶点数…...

asp.net core mvc之路由

一、默认路由 &#xff08;Startup.cs文件&#xff09; routes.MapRoute(name: "default",template: "{controllerHome}/{actionIndex}/{id?}" ); 默认访问可以匹配到 https://localhost:44302/home/index/1 https://localhost:44302/home/index https:…...

前端设计模式之【访问者模式】

文章目录 前言介绍实现优缺点应用场景后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;前端设计模式 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出现错误&#…...

通过docker-compose部署elk日志系统,并使用springboot整合

ELK是一种强大的分布式日志管理解决方案&#xff0c;它由三个核心组件组成&#xff1a; Elasticsearch&#xff1a;作为分布式搜索和分析引擎&#xff0c;Elasticsearch能够快速地存储、搜索和分析大量的日志数据&#xff0c;帮助用户轻松地找到所需的信息。 Logstash&#xf…...

Snap.Hutao终极使用指南:免费开源的原神工具箱完全攻略

Snap.Hutao终极使用指南&#xff1a;免费开源的原神工具箱完全攻略 【免费下载链接】Snap.Hutao 实用的开源多功能原神工具箱 &#x1f9f0; / Multifunctional Open-Source Genshin Impact Toolkit &#x1f9f0; 项目地址: https://gitcode.com/GitHub_Trending/sn/Snap.Hu…...

从需求到界面:Phi-3-mini-128k-instruct辅助Qt桌面应用开发实战

从需求到界面&#xff1a;Phi-3-mini-128k-instruct辅助Qt桌面应用开发实战 最近在捣鼓一个Qt桌面小应用&#xff0c;想做个简单的音乐播放器。从画界面到写逻辑&#xff0c;虽说Qt的文档很全&#xff0c;但有时候对着各种Widget和布局管理器&#xff0c;还是免不了要反复查资…...

深度解析Neper高级功能:多晶体建模实战配置指南

深度解析Neper高级功能&#xff1a;多晶体建模实战配置指南 【免费下载链接】neper Polycrystal generation and meshing 项目地址: https://gitcode.com/gh_mirrors/nep/neper Neper是一款专业的多晶体生成与网格划分开源软件&#xff0c;专为材料科学研究提供高效的多…...

Stata实操:用xtreg命令搞定面板数据,固定效应和随机效应到底怎么选?

Stata面板数据分析实战&#xff1a;从数据清洗到模型选择的完整指南 当面对一份包含多个实体&#xff08;如公司、国家或个人&#xff09;在不同时间点观测值的数据集时&#xff0c;面板数据分析方法成为揭示深层规律的有力工具。不同于单纯的横截面或时间序列数据&#xff0c;…...

如何让Windows 10/11重新拥抱PL2303老芯片

如何让Windows 10/11重新拥抱PL2303老芯片 【免费下载链接】pl2303-win10 Windows 10 driver for end-of-life PL-2303 chipsets. 项目地址: https://gitcode.com/gh_mirrors/pl/pl2303-win10 还记得抽屉角落里那些积灰的串口设备吗&#xff1f;那些曾经陪伴你调试单片机…...

QMK Toolbox:解锁机械键盘自定义潜能的终极工具

QMK Toolbox&#xff1a;解锁机械键盘自定义潜能的终极工具 【免费下载链接】qmk_toolbox A Toolbox companion for QMK Firmware 项目地址: https://gitcode.com/gh_mirrors/qm/qmk_toolbox 想让你心爱的机械键盘拥有超乎想象的功能吗&#xff1f;厌倦了千篇一律的按键…...

从官方Demo工程到实际项目:手把手教你如何安全地‘魔改’FreeRTOS配置(避坑configUSE_TICK_HOOK)

从官方Demo工程到实际项目&#xff1a;手把手教你如何安全地‘魔改’FreeRTOS配置&#xff08;避坑configUSE_TICK_HOOK&#xff09; 当你第一次拿到FreeRTOS官方Demo工程时&#xff0c;那种感觉就像获得了一个功能齐全的"瑞士军刀"——它展示了各种RTOS特性的使用方…...

用51单片机+DAC0832做个简易信号发生器:手把手教你生成方波、三角波和锯齿波(附完整汇编代码)

51单片机与DAC0832实战&#xff1a;三波形信号发生器的设计与实现 在电子工程和嵌入式系统开发中&#xff0c;信号发生器是一个基础但极其重要的工具。无论是用于电路测试、教学演示还是原型验证&#xff0c;一个可靠的信号源都能大大提升工作效率。本文将带你从零开始&#xf…...

告别单机!用FinalShell和朋友联机玩DNF台服,完整配置与授权文件生成指南

告别单机&#xff01;用FinalShell和朋友联机玩DNF台服&#xff0c;完整配置与授权文件生成指南 和朋友一起重温DNF台服的经典版本&#xff0c;是许多老玩家的共同愿望。与单机版相比&#xff0c;联机玩法能带来更丰富的社交体验和团队协作乐趣。本文将详细介绍如何从零开始搭建…...

DS4Windows终极指南:让PS手柄在PC上完美运行的5个秘密技巧

DS4Windows终极指南&#xff1a;让PS手柄在PC上完美运行的5个秘密技巧 【免费下载链接】DS4Windows Like those other ds4tools, but sexier 项目地址: https://gitcode.com/gh_mirrors/ds/DS4Windows 你是否曾经想过&#xff0c;为什么PS4/PS5手柄在PC上总是"水土…...