当前位置: 首页 > news >正文

Flink -- 事件时间 Watermark

1、事件时间:

        指的是数据产生的时间或是说是数据发生的时间。

在Flink中有三种时间分别是:

                Event Time:事件时间,数据产生的时间,可以反应数据真实发生的时间

                Infestion Time:事件接收时间

                Processing Time:事件处理时间

为什么会提出事件时间这个概念?

        因为当使用Processing Time(事件的处理时间)来对数据进行处理,此时数据可能会乱序,没有办法还原数据本身的时间顺序,这种情况在Flink中会可能导致数据丢失,如果使用事件时间,它会根据事件真实发生的时间对数据排序,就不会出现数据乱序的情况。

总结来说,数据产生的时间就是事件时间,现实中实时的时间就是事件的处理时间

2、Processing Time 事件处理时间

处理时间是接收数据过后对数据操作的时间。处理时间的会按照实时的时间触发。

public class Demo03ProcessingTime {public static void main(String[] args)  throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}
3、事件时间:

数据产生的时间就是事件时间,不过在使用的时候使用的是时间戳。需要注意的是数据的时间与现实的时间是不一致的。

在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)

java,1699035731000
java,1699035732000
java,1699035735000
java,1699035733000
java,1699035736000
java,1699035737000
java,1699035740000例如上述:数据总共有两个部分组成,前面是单词,后面的是单词数据产生的时间戳


public class Demo04EventTime {public static void main(String[] args) throws Exception{/*** 需求:统计5秒内的单词的数量,使用的是事件时间滚动窗口* 触发的条件是事件时间5秒*///构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//需要并行度改成一env.setParallelism(1);//使用socket模拟实时的环境DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);/*** java,1699035731000* java,1699035732000* java,1699035735000* java,1699035733000* java,1699035736000* java,1699035737000* java,1699035740000*///此时的数据的格式并不是某一个单词,需要告诉flink哪一个是事件时间//首先对数据进行格式处理SingleOutputStreamOperator<Tuple2<String, Long>> kvDS = lineDS.map(line -> {String[] split = line.split(",");String word = split[0];
//            String time = split[1];long time1 = Long.parseLong(split[1]);return Tuple2.of(word, time1);}, Types.TUPLE(Types.STRING, Types.LONG));//告诉Flink,哪一个是事件的时间SingleOutputStreamOperator<Tuple2<String, Long>> assDS = kvDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定事件时间.withTimestampAssigner((kv, ts) -> kv.f1));//统计5秒钟的单词的数量DataStream<Tuple2<String, Integer>>keyByDS = assDS.map(kv -> Tuple2.of(kv.f0, 1),Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS1= keyByDS.keyBy(kv -> kv.f0);//开窗WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS1.window(TumblingEventTimeWindows.of(Time.seconds(5)));//对单词的数量进行统计SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);//打印数据countDS.print();//执行Flink的环境env.execute();}
}
        1、基于事件时间来说,触发窗口的条件:
    1、水位线需要大于等于窗口的结束时间2、窗口里面要存在数据3、窗口的划分时间是从1970年1月1日0时0分0秒开始的,按照窗口的大小轮替
4、水位线(watermark):默认是等于最新的一条数据的时间戳

5、在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)
        解决方法:将水位线向后推移

假设一个时间窗口是5秒,如果将此时的水位线向后推移5秒,假设4进入的时候,此时的水位线就变成-3,但是此时就不满足触发窗口的条件,此时假设遗漏的数据是3,此时的水位线依旧是小于窗口的时间,依旧不会触发窗口。

但是不能完全的保证数据不丢失,推移的时间越久,对于Flink的延迟就会越大。

        1、在Flink中是默认使用的是单调递增的时间戳分配器:在没有乱序情况下,默认水位线是等于最新的一条数据的时间戳
  //1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、指定水位线等于时间最新一条数据的时间戳,数据不存在乱序的时候使用,如果数据乱序,可能会丢失数据.<Tuple2<String, Long>>forMonotonousTimestamps()//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
        2、数据之间存在最大固定延迟的时间戳分配器:在乱序的情况下,就水位线先后推移固定的时间(是以最新的一条数据的时间戳为标准的)
  //1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、水位线生成方式:最新一条数据的时间戳减去5秒,会导致计算延迟触发.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
6、水位线的生成:

上图表示的是以Flink的流程图,图中总共有两个并行度,每一个Task上面都带着任务的时间,在Flink中,会将任务的时间向后传递,当途中上游map(1)将任务时间传递给下游window(1)时,下面的上游map(2)也会任务时间传递给下游window(1)(上游的任务是并行的),此时下游window(1)就会产生两个任务时间,此时就会选择时间最小的时间的作为水位线。因为当选择时间大的作为水位线,那么对于时间较小的数据可能会丢失。

        1、水位线对齐:

                因为上游的任务是并行执行的,指的时对于上游的所有的Task的水位线都需要逐步的向后推移。

相关文章:

Flink -- 事件时间 Watermark

1、事件时间&#xff1a; 指的是数据产生的时间或是说是数据发生的时间。 在Flink中有三种时间分别是&#xff1a; Event Time&#xff1a;事件时间&#xff0c;数据产生的时间&#xff0c;可以反应数据真实发生的时间 Infestion Time&#xff1a;事件接收时间 Processing Tim…...

Django框架简介

文章目录 Django框架介绍MVC与MVT模型MVCMTV 版本问题运行django注意事项 Django的下载与基本命令下载Django方式一&#xff1a;在命令界面使用pip安装方式二&#xff1a;使用pycharm安装 Django的基础命令命令行操作pycharm操作 Django项目命令行操作与Pycharm操作的区别应用D…...

把wpf的窗体保存为png图片

昨晚在stack overflow刷问题时看到有这个问题&#xff0c;今天早上刚好来尝试学习一下 stack overflow的链接如下&#xff1a; c# - How to render a WPF UserControl to a bitmap without creating a window - Stack Overflow 测试步骤如下&#xff1a; 1 新建.net frame…...

2023NOIP A层联测28-大眼鸹猫

给你两个长度为 n n n 的序列 a , b a,b a,b&#xff0c;这两个序列都是单调不降的。 你可以对 a a a 进行不超过 m m m 次操作&#xff0c;每次操作你可以选择一个 i i i 满足 1 ≤ i ≤ n 1\le i\le n 1≤i≤n&#xff0c;然后选择一个整数&#xff08;可以是负数&…...

电机应用-直流有刷电机

目录 直流有刷电机 工作原理 直流有刷减速电机的重要参数 电路原理与分析 驱动芯片分析 L298N驱动芯片 直流有刷减速电机控制实现 控制速度原理 硬件设计 L298N 野火直流有刷电机驱动板-MOS管搭建板 软件设计1&#xff1a;两个直流有刷减速电机按键控制 开发设计 …...

BIM、建筑机器人、隧道工程施工关键技术

一、BIM简介 &#xff08;一&#xff09;BIM概念 BIM&#xff08;Building Information Modeling&#xff09;&#xff0c;建筑信息模型。该技术通过数字化手段&#xff0c;在计算机中建立虚拟建筑&#xff0c;该虚拟建筑提供从单一到完整、包含逻辑关系的建筑信息库。信息库…...

快速了解什么是跳跃表(skip list)

什么是跳跃表&#xff08;skip list&#xff09; 跳跃表&#xff08;Skip List&#xff09;是一种概率性的数据结构&#xff0c;它通过在多层链表的基础上添加“快速通道”来提高搜索效率。跳跃表的效率可以与平衡树相媲美&#xff0c;即在平均和最坏的情况下&#xff0c;查找…...

【Node.js入门】1.1Node.js 简介

Node.js入门之—1.1Node.js 简介 文章目录 Node.js入门之—1.1Node.js 简介什么是 Node.js错误说法 Node.js 的特点跨平台三方类库自带http服务器非阻塞I/O事件驱动单线程 Node.js 的应用场合适合用Node.js的场合不适合用Node.js的场合弥补Node.js不足的解决方案 什么是 Node.j…...

数据库 高阶语句

目录 数据库 高阶语句 使用select 语句&#xff0c;用order by来对进行排序 区间判断查询和去重查询 如何对结果进行分组查询group by语句 limit 限制输出的结果记录&#xff0c;查看表中的指定行 通配符 设置别名&#xff1a;alias 简写就是 as 使用select 语句&#x…...

jenkins Java heap space

jenkins Java heap space&#xff0c;是内存不够。 两个解决方案&#xff1a; 一&#xff0c;修改配置文件 windows系统中&#xff0c;找到Jenkins的安装路径&#xff0c; 修改jenkins.xml 将 -Xmx256m 改为 -Xmx1024m 或者更大 重启jenkins服务。 二&#xff0c;jenkins增…...

OpenCV校准棋盘集合

棋盘格可以与相机校准工具一起使用&#xff0c;例如ROS的camera_calibration包。您可以通过单击下面的任何链接免费下载 PDF 格式的各种棋盘&#xff0c;没有水印或广告。此外&#xff0c;还添加了基于 JavaScript 的棋盘生成器&#xff0c;允许您生成自定义尺寸。 提示&#…...

使用git将本地项目推送到远程仓库github

总结&#xff1a;本地项目通过git上传到github 1)、在本地创建一个版本库&#xff08;即文件夹&#xff09;&#xff0c;通过 git init 把它变成Git仓库&#xff1b; 2)、把项目复制到这个文件夹里面&#xff0c;再通过 git add . 把项目添加到仓库&#xff1b; 3)、再通过 gi…...

Mybatis-Plus使用Wrapper自定义SQL

文章目录 准备工作Mybatis-Plus使用Wrapper自定义SQL注意事项目录结构如下所示domain层Controller层Service层ServiceImplMapper层UserMapper.xml 结果如下所示&#xff1a;单表查询条件构造器单表查询&#xff0c;Mybatis-Plus使用Wrapper自定义SQL联表查询不用&#xff0c;My…...

仿mudou库one thread one loop式并发服务器

目录 1.实现目标 2.HTTP服务器 实现高性能服务器-Reactor模型 模块划分 SERVER模块&#xff1a; HTTP协议模块&#xff1a; 3.项目中的子功能 秒级定时任务实现 时间轮实现 正则库的简单使用 通⽤类型any类型的实现 4.SERVER服务器实现 日志宏的封装 缓冲区Buffer…...

二十三种设计模式全面解析-组合模式与装饰器模式的结合:实现动态功能扩展

在前文中&#xff0c;我们介绍了组合模式的基本原理和应用&#xff0c;以及它在构建对象结构中的价值和潜力。然而&#xff0c;组合模式的魅力远不止于此。在本文中&#xff0c;我们将继续探索组合模式的进阶应用&#xff0c;并展示它与其他设计模式的结合使用&#xff0c;以构…...

智慧城市建设解决方案分享【完整】

文章目录 第1章 前言第2章 智慧城市建设的背景2.1 智慧城市的发展现状2.2 智慧城市的发展趋势 第3章 智慧城市“十二五”规划要点3.1 国民经济和社会发展“十二五”规划要点3.2 “十二五”信息化发展规划要点 第4章 大数据&#xff1a;智慧城市的智慧引擎4.1 大数据技术—智慧城…...

unity - Blend Shape - 变形器 - 实践

文章目录 目的Blend Shape 逐顶点 多个混合思路Blender3Ds maxUnity 中使用Project 目的 拾遗&#xff0c;备份 Blend Shape 逐顶点 多个混合思路 blend shape 基于&#xff1a; vertex number, vertex sn 相同&#xff0c;才能正常混合、播放 也就是 vertex buffer 的顶点数…...

asp.net core mvc之路由

一、默认路由 &#xff08;Startup.cs文件&#xff09; routes.MapRoute(name: "default",template: "{controllerHome}/{actionIndex}/{id?}" ); 默认访问可以匹配到 https://localhost:44302/home/index/1 https://localhost:44302/home/index https:…...

前端设计模式之【访问者模式】

文章目录 前言介绍实现优缺点应用场景后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;前端设计模式 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出现错误&#…...

通过docker-compose部署elk日志系统,并使用springboot整合

ELK是一种强大的分布式日志管理解决方案&#xff0c;它由三个核心组件组成&#xff1a; Elasticsearch&#xff1a;作为分布式搜索和分析引擎&#xff0c;Elasticsearch能够快速地存储、搜索和分析大量的日志数据&#xff0c;帮助用户轻松地找到所需的信息。 Logstash&#xf…...

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

后进先出(LIFO)详解

LIFO 是 Last In, First Out 的缩写&#xff0c;中文译为后进先出。这是一种数据结构的工作原则&#xff0c;类似于一摞盘子或一叠书本&#xff1a; 最后放进去的元素最先出来 -想象往筒状容器里放盘子&#xff1a; &#xff08;1&#xff09;你放进的最后一个盘子&#xff08…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

超短脉冲激光自聚焦效应

前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应&#xff0c;这是一种非线性光学现象&#xff0c;主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场&#xff0c;对材料产生非线性响应&#xff0c;可能…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

label-studio的使用教程(导入本地路径)

文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...