当前位置: 首页 > news >正文

Flink -- 事件时间 Watermark

1、事件时间:

        指的是数据产生的时间或是说是数据发生的时间。

在Flink中有三种时间分别是:

                Event Time:事件时间,数据产生的时间,可以反应数据真实发生的时间

                Infestion Time:事件接收时间

                Processing Time:事件处理时间

为什么会提出事件时间这个概念?

        因为当使用Processing Time(事件的处理时间)来对数据进行处理,此时数据可能会乱序,没有办法还原数据本身的时间顺序,这种情况在Flink中会可能导致数据丢失,如果使用事件时间,它会根据事件真实发生的时间对数据排序,就不会出现数据乱序的情况。

总结来说,数据产生的时间就是事件时间,现实中实时的时间就是事件的处理时间

2、Processing Time 事件处理时间

处理时间是接收数据过后对数据操作的时间。处理时间的会按照实时的时间触发。

public class Demo03ProcessingTime {public static void main(String[] args)  throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}
3、事件时间:

数据产生的时间就是事件时间,不过在使用的时候使用的是时间戳。需要注意的是数据的时间与现实的时间是不一致的。

在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)

java,1699035731000
java,1699035732000
java,1699035735000
java,1699035733000
java,1699035736000
java,1699035737000
java,1699035740000例如上述:数据总共有两个部分组成,前面是单词,后面的是单词数据产生的时间戳


public class Demo04EventTime {public static void main(String[] args) throws Exception{/*** 需求:统计5秒内的单词的数量,使用的是事件时间滚动窗口* 触发的条件是事件时间5秒*///构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//需要并行度改成一env.setParallelism(1);//使用socket模拟实时的环境DataStreamSource<String> lineDS = env.socketTextStream("master", 8888);/*** java,1699035731000* java,1699035732000* java,1699035735000* java,1699035733000* java,1699035736000* java,1699035737000* java,1699035740000*///此时的数据的格式并不是某一个单词,需要告诉flink哪一个是事件时间//首先对数据进行格式处理SingleOutputStreamOperator<Tuple2<String, Long>> kvDS = lineDS.map(line -> {String[] split = line.split(",");String word = split[0];
//            String time = split[1];long time1 = Long.parseLong(split[1]);return Tuple2.of(word, time1);}, Types.TUPLE(Types.STRING, Types.LONG));//告诉Flink,哪一个是事件的时间SingleOutputStreamOperator<Tuple2<String, Long>> assDS = kvDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定事件时间.withTimestampAssigner((kv, ts) -> kv.f1));//统计5秒钟的单词的数量DataStream<Tuple2<String, Integer>>keyByDS = assDS.map(kv -> Tuple2.of(kv.f0, 1),Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS1= keyByDS.keyBy(kv -> kv.f0);//开窗WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS1.window(TumblingEventTimeWindows.of(Time.seconds(5)));//对单词的数量进行统计SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);//打印数据countDS.print();//执行Flink的环境env.execute();}
}
        1、基于事件时间来说,触发窗口的条件:
    1、水位线需要大于等于窗口的结束时间2、窗口里面要存在数据3、窗口的划分时间是从1970年1月1日0时0分0秒开始的,按照窗口的大小轮替
4、水位线(watermark):默认是等于最新的一条数据的时间戳

5、在使用事件时间的时候需要注意的是打入数据的数据时间是需要按照时间的顺序打入,否则数据就会丢失(也可以不按照顺序打入,后面有解决办法)
        解决方法:将水位线向后推移

假设一个时间窗口是5秒,如果将此时的水位线向后推移5秒,假设4进入的时候,此时的水位线就变成-3,但是此时就不满足触发窗口的条件,此时假设遗漏的数据是3,此时的水位线依旧是小于窗口的时间,依旧不会触发窗口。

但是不能完全的保证数据不丢失,推移的时间越久,对于Flink的延迟就会越大。

        1、在Flink中是默认使用的是单调递增的时间戳分配器:在没有乱序情况下,默认水位线是等于最新的一条数据的时间戳
  //1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、指定水位线等于时间最新一条数据的时间戳,数据不存在乱序的时候使用,如果数据乱序,可能会丢失数据.<Tuple2<String, Long>>forMonotonousTimestamps()//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
        2、数据之间存在最大固定延迟的时间戳分配器:在乱序的情况下,就水位线先后推移固定的时间(是以最新的一条数据的时间戳为标准的)
  //1、需要告诉flink哪一个字段是时间字段//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//1、水位线生成方式:最新一条数据的时间戳减去5秒,会导致计算延迟触发.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定时间字段.withTimestampAssigner((kv, ts) -> kv.f1));
6、水位线的生成:

上图表示的是以Flink的流程图,图中总共有两个并行度,每一个Task上面都带着任务的时间,在Flink中,会将任务的时间向后传递,当途中上游map(1)将任务时间传递给下游window(1)时,下面的上游map(2)也会任务时间传递给下游window(1)(上游的任务是并行的),此时下游window(1)就会产生两个任务时间,此时就会选择时间最小的时间的作为水位线。因为当选择时间大的作为水位线,那么对于时间较小的数据可能会丢失。

        1、水位线对齐:

                因为上游的任务是并行执行的,指的时对于上游的所有的Task的水位线都需要逐步的向后推移。

相关文章:

Flink -- 事件时间 Watermark

1、事件时间&#xff1a; 指的是数据产生的时间或是说是数据发生的时间。 在Flink中有三种时间分别是&#xff1a; Event Time&#xff1a;事件时间&#xff0c;数据产生的时间&#xff0c;可以反应数据真实发生的时间 Infestion Time&#xff1a;事件接收时间 Processing Tim…...

Django框架简介

文章目录 Django框架介绍MVC与MVT模型MVCMTV 版本问题运行django注意事项 Django的下载与基本命令下载Django方式一&#xff1a;在命令界面使用pip安装方式二&#xff1a;使用pycharm安装 Django的基础命令命令行操作pycharm操作 Django项目命令行操作与Pycharm操作的区别应用D…...

把wpf的窗体保存为png图片

昨晚在stack overflow刷问题时看到有这个问题&#xff0c;今天早上刚好来尝试学习一下 stack overflow的链接如下&#xff1a; c# - How to render a WPF UserControl to a bitmap without creating a window - Stack Overflow 测试步骤如下&#xff1a; 1 新建.net frame…...

2023NOIP A层联测28-大眼鸹猫

给你两个长度为 n n n 的序列 a , b a,b a,b&#xff0c;这两个序列都是单调不降的。 你可以对 a a a 进行不超过 m m m 次操作&#xff0c;每次操作你可以选择一个 i i i 满足 1 ≤ i ≤ n 1\le i\le n 1≤i≤n&#xff0c;然后选择一个整数&#xff08;可以是负数&…...

电机应用-直流有刷电机

目录 直流有刷电机 工作原理 直流有刷减速电机的重要参数 电路原理与分析 驱动芯片分析 L298N驱动芯片 直流有刷减速电机控制实现 控制速度原理 硬件设计 L298N 野火直流有刷电机驱动板-MOS管搭建板 软件设计1&#xff1a;两个直流有刷减速电机按键控制 开发设计 …...

BIM、建筑机器人、隧道工程施工关键技术

一、BIM简介 &#xff08;一&#xff09;BIM概念 BIM&#xff08;Building Information Modeling&#xff09;&#xff0c;建筑信息模型。该技术通过数字化手段&#xff0c;在计算机中建立虚拟建筑&#xff0c;该虚拟建筑提供从单一到完整、包含逻辑关系的建筑信息库。信息库…...

快速了解什么是跳跃表(skip list)

什么是跳跃表&#xff08;skip list&#xff09; 跳跃表&#xff08;Skip List&#xff09;是一种概率性的数据结构&#xff0c;它通过在多层链表的基础上添加“快速通道”来提高搜索效率。跳跃表的效率可以与平衡树相媲美&#xff0c;即在平均和最坏的情况下&#xff0c;查找…...

【Node.js入门】1.1Node.js 简介

Node.js入门之—1.1Node.js 简介 文章目录 Node.js入门之—1.1Node.js 简介什么是 Node.js错误说法 Node.js 的特点跨平台三方类库自带http服务器非阻塞I/O事件驱动单线程 Node.js 的应用场合适合用Node.js的场合不适合用Node.js的场合弥补Node.js不足的解决方案 什么是 Node.j…...

数据库 高阶语句

目录 数据库 高阶语句 使用select 语句&#xff0c;用order by来对进行排序 区间判断查询和去重查询 如何对结果进行分组查询group by语句 limit 限制输出的结果记录&#xff0c;查看表中的指定行 通配符 设置别名&#xff1a;alias 简写就是 as 使用select 语句&#x…...

jenkins Java heap space

jenkins Java heap space&#xff0c;是内存不够。 两个解决方案&#xff1a; 一&#xff0c;修改配置文件 windows系统中&#xff0c;找到Jenkins的安装路径&#xff0c; 修改jenkins.xml 将 -Xmx256m 改为 -Xmx1024m 或者更大 重启jenkins服务。 二&#xff0c;jenkins增…...

OpenCV校准棋盘集合

棋盘格可以与相机校准工具一起使用&#xff0c;例如ROS的camera_calibration包。您可以通过单击下面的任何链接免费下载 PDF 格式的各种棋盘&#xff0c;没有水印或广告。此外&#xff0c;还添加了基于 JavaScript 的棋盘生成器&#xff0c;允许您生成自定义尺寸。 提示&#…...

使用git将本地项目推送到远程仓库github

总结&#xff1a;本地项目通过git上传到github 1)、在本地创建一个版本库&#xff08;即文件夹&#xff09;&#xff0c;通过 git init 把它变成Git仓库&#xff1b; 2)、把项目复制到这个文件夹里面&#xff0c;再通过 git add . 把项目添加到仓库&#xff1b; 3)、再通过 gi…...

Mybatis-Plus使用Wrapper自定义SQL

文章目录 准备工作Mybatis-Plus使用Wrapper自定义SQL注意事项目录结构如下所示domain层Controller层Service层ServiceImplMapper层UserMapper.xml 结果如下所示&#xff1a;单表查询条件构造器单表查询&#xff0c;Mybatis-Plus使用Wrapper自定义SQL联表查询不用&#xff0c;My…...

仿mudou库one thread one loop式并发服务器

目录 1.实现目标 2.HTTP服务器 实现高性能服务器-Reactor模型 模块划分 SERVER模块&#xff1a; HTTP协议模块&#xff1a; 3.项目中的子功能 秒级定时任务实现 时间轮实现 正则库的简单使用 通⽤类型any类型的实现 4.SERVER服务器实现 日志宏的封装 缓冲区Buffer…...

二十三种设计模式全面解析-组合模式与装饰器模式的结合:实现动态功能扩展

在前文中&#xff0c;我们介绍了组合模式的基本原理和应用&#xff0c;以及它在构建对象结构中的价值和潜力。然而&#xff0c;组合模式的魅力远不止于此。在本文中&#xff0c;我们将继续探索组合模式的进阶应用&#xff0c;并展示它与其他设计模式的结合使用&#xff0c;以构…...

智慧城市建设解决方案分享【完整】

文章目录 第1章 前言第2章 智慧城市建设的背景2.1 智慧城市的发展现状2.2 智慧城市的发展趋势 第3章 智慧城市“十二五”规划要点3.1 国民经济和社会发展“十二五”规划要点3.2 “十二五”信息化发展规划要点 第4章 大数据&#xff1a;智慧城市的智慧引擎4.1 大数据技术—智慧城…...

unity - Blend Shape - 变形器 - 实践

文章目录 目的Blend Shape 逐顶点 多个混合思路Blender3Ds maxUnity 中使用Project 目的 拾遗&#xff0c;备份 Blend Shape 逐顶点 多个混合思路 blend shape 基于&#xff1a; vertex number, vertex sn 相同&#xff0c;才能正常混合、播放 也就是 vertex buffer 的顶点数…...

asp.net core mvc之路由

一、默认路由 &#xff08;Startup.cs文件&#xff09; routes.MapRoute(name: "default",template: "{controllerHome}/{actionIndex}/{id?}" ); 默认访问可以匹配到 https://localhost:44302/home/index/1 https://localhost:44302/home/index https:…...

前端设计模式之【访问者模式】

文章目录 前言介绍实现优缺点应用场景后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;前端设计模式 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出现错误&#…...

通过docker-compose部署elk日志系统,并使用springboot整合

ELK是一种强大的分布式日志管理解决方案&#xff0c;它由三个核心组件组成&#xff1a; Elasticsearch&#xff1a;作为分布式搜索和分析引擎&#xff0c;Elasticsearch能够快速地存储、搜索和分析大量的日志数据&#xff0c;帮助用户轻松地找到所需的信息。 Logstash&#xf…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat&#xff08;I/O Statistics&#xff09;是Linux系统下用于监视系统输入输出设备和CPU使…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

OPENCV形态学基础之二腐蚀

一.腐蚀的原理 (图1) 数学表达式&#xff1a;dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一&#xff0c;腐蚀跟膨胀属于反向操作&#xff0c;膨胀是把图像图像变大&#xff0c;而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版

7种色调职场工作汇报PPT&#xff0c;橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版&#xff1a;职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...

【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)

本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...

Java数值运算常见陷阱与规避方法

整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...

Vue ③-生命周期 || 脚手架

生命周期 思考&#xff1a;什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09; 什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a; 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...

在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7

在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤&#xff1a; 第一步&#xff1a; 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为&#xff1a; // 改为 v…...