大数据Flink(一百一十八):SQL水印操作(Watermark)
文章目录
SQL水印操作(Watermark)
一、为什么要有WaterMark
二、Watermark解决的问题
三、代码演示
SQL水印操作(Watermark)
一、为什么要有WaterMark
当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:
假设在一个5秒的Tumble窗口,有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:该如何处理迟到数据
二、Watermark解决的问题
上面的问题在于如何将迟来的EventTime 为11的元素正确处理?
当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:
如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下:
通过watermark来解决,简单来说就是延迟窗口关闭的时间,等一会迟到的数据,窗口关闭不在依据数据的时间,而是到达的watermark的时间。
watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用。
三、代码演示
- 使用Socket模拟接收数据
- 设置WaterMark
- 设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推
- 使用滚动Event Time窗口,将5秒内的同组数据,进行聚合输出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;
若输入第一条数据:hello,2022-03-25 16:39:45
那么,我先假设后续的数据Event Time间隔为1秒,推断一下WaterMark的设定,如下图所示
1.第一条数据的Event Time为1648197585000,那么当前窗口时间为:1648197585000-> 1648197589000,即下图中红色框线
2.第一条数据进来时,这条数据之前的WaterMark为0,当第一条数据已经进入后,指定Event Time位置,并与现在的WaterMark比较,将两者中大的那个值设置为新的WaterMark,那么当前数据的WaterMark为1648197585000
3.第二条数据进来时,前一条数据的WaterMark为1648197585000,第二条数据的Event Time比之前的WaterMark大,于是更新WaterMark,将当前的WaterMark更新为1648197586000,但还没到窗口触发时间,不进行计算
4.后面几个以此类推,直到Event Time为:1648197590000的数据进来的时候,前一条数据的WaterMark为1648197589000,于是更新当前的WaterMark为1648197590000,Flink认为1648197590000之前的数据都已经到达,且达到了窗口的触发条件,开始进行计算
根据上面的推断,启动程序验证一下,向9999端口监听终端输入以下内容:
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
Flink输出结果:
Rowtime列在经过窗口操作后,其Event Time属性将丢失。可以使用辅助函数TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME,获取窗口中的Rowtime列的最大值max(rowtime)作为时间窗口的Rowtime,其类型是具有Rowtime属性的TIMESTAMP,取值为 window_end - 1 。 例如[00:00, 00:15) 的窗口,返回值为00:14:59.999 。
数据乱序的场景
上面的实例,Event Time是有序,现在来做一下数据乱序的场景模拟启动程序(注意要关闭之前的查询,重新运行查询语句),在监听终端中输入如下数据:
其中,在触发了了第一个窗口计算后,又来了两条迟到数据hello,2022-03-25 16:39:47,hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink结果:
从结果中可以看到,在第二个窗口中,那两条迟到数据并没有进行处理,这个就是迟到丢弃。
乱序时间的设置:
为了解决上面的问题,我们允许Flink处理延迟在5秒内的迟到数据
修改最大乱序时间(新建的表仅水印与之前不同)
CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;
在监听终端中,输入数据
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink输出结果:
可以看到,之前迟到的两条数据在第一个窗口中进行了处理。因为设置了最大允许乱序时间后,WaterMark要比原来低5秒,可以对延迟5秒内的数据进行处理,窗口的触发条件也同样会往后延迟关于延迟时间,请结合业务场景进行设置。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:

大数据Flink(一百一十八):SQL水印操作(Watermark)
文章目录 SQL水印操作(Watermark) 一、为什么要有WaterMark 二、Watermark解决的问题 三、代码演示 SQL水印操作(Watermark) 一、为什么要…...

【QGC】把QGroundControl地面站添加到Ubuntu侧边菜单栏启动
把QGroundControl地面站添加到Ubuntu侧边菜单栏启动 简介准备工作步骤 1: 创建 Desktop Entry 文件步骤 2: 编辑 Desktop Entry 文件步骤 3: 刷新应用程序菜单步骤 4: 将 QGroundControl 固定到侧边栏 环境: Ubuntu :20.04 LTS 简介 QGroundControl 是…...

PostgreSQL配置主从同步
PostgreSQL配置主从同步 1 主、备库安装postgresql软件 su - pg12 cd /home/pg12/resource tar -zxvf postgresql-12.9.tar.gz cd postgresql-12.9/ ./configure --prefix/home/pg12/soft/ make -j 16 && make install2 主、备库配置环境变量 vi ~/.bash_profile…...

基于python+django+vue的鲜花商城系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于pythondjangovueMySQL的线…...

李飞飞任CEO,空间智能公司World Labs亮相,全明星阵容曝光
人工智能的下个大方向已经出现,标志性学者决定下场创业。 本周五,一个重磅消息引爆了 AI 圈:斯坦福大学计算机科学家李飞飞正式宣布创办 AI 初创公司 ——World Labs,旨在向人工智能系统传授有关物理现实的深入知识。 李飞飞说道&…...

PyTorch详解-可视化模块
PyTorch详解-可视化模块 Tensorboard 基础与使用启动 TensorBoard访问 TensorBoard使用 TensorBoardSummaryWriter类介绍参数说明常用方法 CNN卷积核与特征图可视化参数说明返回值 混淆矩阵与训练曲线可视化混淆矩阵可视化训练曲线绘制 模型参数打印参数说明输出解释 Tensorboa…...

Bootstrap 警告信息(Alerts)使用介绍
本章将讲解警告(Alerts)以及 Bootstrap 所提供的用于警告的 class。警告(Alerts)向用户提供了一种定义消息样式的方式。它们为典型的用户操作提供了上下文信息反馈。 您可以为警告框添加一个可选的关闭按钮。为了创建一个内联的可…...

uniapp(H5)设置反向代理,设置成功后页面报错
设置反向代理后,页面报错图: 反向代理代码:devServer下面就是配置对应的代理,一般这样就没问题了 "h5": {"router": {"mode": "hash"},"devServer": {"port": 517…...

define、typedef和using的使用
define、typedef 和 using 是 C(以及 C 语言中的 define)中用于定义别名或简化复杂类型的三个关键字,但它们各自有着不同的用途和行为。下面将分别对比这三个关键字: 1. #define 定义方式:#define 是预处理指令&…...

vue element时间选择不能超过今天 时间选中长度不能超过7天
背景: 使用elenmet plus 组件实现时间选择;且日期时间选择不能超过今天;连续选中时间的长度范围不能超过7天 效果展示: 实现思路: 一、使用element组件自带的属性和方法; :disabled-date"disabledDate…...

如何 吧一个 一维数组 切分成相同等分,一维数组作为lstm的输入(三维数据)的数据预处理 collate_fn的应用
要将一个一维数组切分成相同等分,你可以使用 Python 的内置功能或者 NumPy 库(如果你处理的是数值数据)。以下是几种不同的方法: 方法3 pad_sequence 结合dataloader 应该是最佳方案 ### 方法 1: 使用 Python 的内置切片功能 如果…...

Remix 学习 - @remix-run/react 中主要的 hooks
在 remix-run/react 中,有几个常用的 hooks,它们帮助你在 Remix 应用中处理路由、数据加载和其他功能。以下是一些主要的 hooks: useLoaderData: 用于获取从 loader 函数中返回的数据。 通常在组件中调用,以便访问路由加载的数据…...

STL之stack
stack容器 - 先进后出” - stack是堆栈容器,是一种的容器。 - 头文件:#include <stack> stack的push()与pop()方法 stack.push(elem);//往栈头添加元素 stack.pop();//从栈头移除第一个元素 stack<int> stkInt; stkInt.push(1);stkInt…...

如何用3个月零基础入门网络安全?_网络安全零基础怎么学习
前 言 写这篇教程的初衷是很多朋友都想了解如何入门/转行网络安全,实现自己的“黑客梦”。文章的宗旨是: 1.指出一些自学的误区 2.提供客观可行的学习表 3.推荐我认为适合小白学习的资源.大佬绕道哈! →点击获取网络安全资料攻略← 一、自学…...

适合学生党开学买的蓝牙耳机?分享开放式耳机排行榜前十名
学生党开学想买耳机的话,我觉得比较适合入手开放式耳机,因为这类耳机佩戴舒适度高,长时间使用也不会感到不适或疲劳,同时保持耳道干爽透气,更加健康卫生,还能提供自然、开阔的音场,音质表现优秀…...

汽车租赁系统1.0版本
汽车租赁系统1.0版本比较简陋,以后还会有2.0、3.0……就像《我爱发明》里面的一代机器二代机器,三代机器一样,是一个迭代更新的过程(最近比较忙,可能会很久),这个1.0版本很简陋,也请…...

DockerDocker Compose安装(离线+在线)
Docker&Docker Compose安装(离线在线) Docker离线安装 下载想要安装的docker软件版本:https://download.docker.com/linux/static/stable/x86_64/ 如目标机无法从链接下载,可以在本机下载后 scp docker版本压缩包[如docker-20.10.9.tgz] usernameh…...

【泰克生物】酵母展示建库技术解析:构建高质量抗体文库的实用指南
酵母展示库是抗体酵母展示服务的核心组成部分。酵母展示技术利用酵母细胞表面的展示系统,将目标蛋白质(如抗体)展示在细胞膜上。这一过程首先涉及到将抗体基因克隆到酵母表达载体中。随后,表达载体被转化到酵母细胞中,…...

QT Mode/View之View
目录 概念 使用已存在的视图 使用模型 使用模型的多个视图 处理元素的选择 视图间共享选择 概念 在模型/视图架构中,视图从模型中获取数据项并将它们呈现给用户。数据的表示方式不必与模型提供的数据表示形式相似,而且可能与用于存储数据项的底层数…...

URP 线性空间 ui资源制作规范
前言: 关于颜色空间的介绍,可参阅 unity 文档 Color space URP实现了基于物理的渲染,为了保证光照计算的准确,需要使用线性空间; 使用线性空间会带来一个问题,ui资源在unity中进行透明度混合时ÿ…...

如何精确统计Pytorch模型推理时间
文章目录 0 背景1 精确统计方法2 手动synchronize和Event适用场景 0 背景 在分析模型性能时需要精确地统计出模型的推理时间,但仅仅通过在模型推理前后打时间戳然后相减得到的时间其实是Host侧向Device侧下发指令的时间。如下图所示,Host侧下发指令与De…...

Mybatis-plus-Generator 3.5.5 自定义模板支持 (DTO/VO 等) 配置
随着项目节奏越来越快,为了减少把时间浪费在新建DTO 、VO 等地方,直接直接基于Mybatis-plus 这颗大树稍微扩展一下,在原来生成PO、 DAO、Service、ServiceImpl、Controller 基础新增。为了解决这个问题,网上找了一堆资料ÿ…...

C#环境下MAC地址获取方法解析
在C#中,获取MAC地址并不是直接支持的,因为出于安全和隐私的考虑,操作系统通常会限制对这类硬件信息的直接访问。不过,仍然可以通过一些方法间接地获取到本地网络接口(比如以太网接口)的MAC地址。 以下是几…...

(k8s)Kubernetes 从0到1容器编排之旅
一、引言 在当今数字化的浪潮中,Kubernetes 如同一艘强大的航船,引领着容器化应用的部署与管理。它以其卓越的灵活性、可扩展性和可靠性,成为众多企业和开发者的首选。然而,要真正发挥 Kubernetes 的强大威力,仅仅掌握…...

Rust Web开发框架对比:Warp与Actix-web
文章目录 Rust Web开发框架对比:Warp与Actix-web引言框架概述Warp框架简介Actix-web框架简介 设计理念Warp的设计理念Actix-web的设计理念 性能比较可扩展性和生态插件和中间件支持社区和文档 使用示例使用Warp构建简单的HTTP服务使用Actix-web构建简单的HTTP服务 学…...

F12抓包12:Performance(性能)前端性能分析
课程大纲 使用场景: ① 前端界面加载性能测试。 ② 导出性能报告给前端开发。 复习:后端(接口)性能分析 ① 所有请求耗时时间轴:“网络”(Network) - 概览。 ② 单个请求耗时:“网络”(Network…...

数据结构(Day13)
一、学习内容 内存空间划分 1、一个进程启动后,计算机会给该进程分配4G的虚拟内存 2、其中0G-3G是用户空间【程序员写代码操作部分】【应用层】 3、3G-4G是内核空间【与底层驱动有关】 4、所有进程共享3G-4G的内核空间,每个进程独立拥有0G-3G的用户空间 …...

链表的快速排序(C/C++实现)
一、前言 大家在做需要排名的项目的时候,需要把各种数据从高到低排序。如果用的快速排序的话,处理数组是十分简单的。因为数组的存储空间的连续的,可以通过下标就可以简单的实现。但如果是链表的话,内存地址是随机分配的…...

css总结(记录一下...)
文字 语法说明word-wrapword-wrap:normal| break-word normal:使用浏览器默认的换行 break-word:允许在单词内换行 text-overflow clip:修剪文本 ellipsis:显示省略符号来代表被修剪的文本 text-shadow可向文本应用的阴影。能够规定水平阴影、垂直阴影、模糊距离,以…...

SpringBoot 处理 @KafkaListener 消息
消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理,相当于一个消费者; 看看其整体代码结构: 可以发现其入口方法为doStart(),…...