大数据Flink(一百一十八):SQL水印操作(Watermark)

文章目录
SQL水印操作(Watermark)
一、为什么要有WaterMark
二、Watermark解决的问题
三、代码演示
SQL水印操作(Watermark)
一、为什么要有WaterMark
当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:
假设在一个5秒的Tumble窗口,有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:该如何处理迟到数据

二、Watermark解决的问题
上面的问题在于如何将迟来的EventTime 为11的元素正确处理?
当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:

如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下:

通过watermark来解决,简单来说就是延迟窗口关闭的时间,等一会迟到的数据,窗口关闭不在依据数据的时间,而是到达的watermark的时间。
watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用。
三、代码演示
- 使用Socket模拟接收数据
- 设置WaterMark
- 设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推
- 使用滚动Event Time窗口,将5秒内的同组数据,进行聚合输出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;
若输入第一条数据:hello,2022-03-25 16:39:45
那么,我先假设后续的数据Event Time间隔为1秒,推断一下WaterMark的设定,如下图所示
1.第一条数据的Event Time为1648197585000,那么当前窗口时间为:1648197585000-> 1648197589000,即下图中红色框线
2.第一条数据进来时,这条数据之前的WaterMark为0,当第一条数据已经进入后,指定Event Time位置,并与现在的WaterMark比较,将两者中大的那个值设置为新的WaterMark,那么当前数据的WaterMark为1648197585000
3.第二条数据进来时,前一条数据的WaterMark为1648197585000,第二条数据的Event Time比之前的WaterMark大,于是更新WaterMark,将当前的WaterMark更新为1648197586000,但还没到窗口触发时间,不进行计算
4.后面几个以此类推,直到Event Time为:1648197590000的数据进来的时候,前一条数据的WaterMark为1648197589000,于是更新当前的WaterMark为1648197590000,Flink认为1648197590000之前的数据都已经到达,且达到了窗口的触发条件,开始进行计算
根据上面的推断,启动程序验证一下,向9999端口监听终端输入以下内容:
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
Flink输出结果:

Rowtime列在经过窗口操作后,其Event Time属性将丢失。可以使用辅助函数TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME,获取窗口中的Rowtime列的最大值max(rowtime)作为时间窗口的Rowtime,其类型是具有Rowtime属性的TIMESTAMP,取值为 window_end - 1 。 例如[00:00, 00:15) 的窗口,返回值为00:14:59.999 。
数据乱序的场景
上面的实例,Event Time是有序,现在来做一下数据乱序的场景模拟启动程序(注意要关闭之前的查询,重新运行查询语句),在监听终端中输入如下数据:
其中,在触发了了第一个窗口计算后,又来了两条迟到数据hello,2022-03-25 16:39:47,hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink结果:

从结果中可以看到,在第二个窗口中,那两条迟到数据并没有进行处理,这个就是迟到丢弃。
乱序时间的设置:
为了解决上面的问题,我们允许Flink处理延迟在5秒内的迟到数据
修改最大乱序时间(新建的表仅水印与之前不同)
CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;
在监听终端中,输入数据
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink输出结果:

可以看到,之前迟到的两条数据在第一个窗口中进行了处理。因为设置了最大允许乱序时间后,WaterMark要比原来低5秒,可以对延迟5秒内的数据进行处理,窗口的触发条件也同样会往后延迟关于延迟时间,请结合业务场景进行设置。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:
大数据Flink(一百一十八):SQL水印操作(Watermark)
文章目录 SQL水印操作(Watermark) 一、为什么要有WaterMark 二、Watermark解决的问题 三、代码演示 SQL水印操作(Watermark) 一、为什么要…...
【QGC】把QGroundControl地面站添加到Ubuntu侧边菜单栏启动
把QGroundControl地面站添加到Ubuntu侧边菜单栏启动 简介准备工作步骤 1: 创建 Desktop Entry 文件步骤 2: 编辑 Desktop Entry 文件步骤 3: 刷新应用程序菜单步骤 4: 将 QGroundControl 固定到侧边栏 环境: Ubuntu :20.04 LTS 简介 QGroundControl 是…...
PostgreSQL配置主从同步
PostgreSQL配置主从同步 1 主、备库安装postgresql软件 su - pg12 cd /home/pg12/resource tar -zxvf postgresql-12.9.tar.gz cd postgresql-12.9/ ./configure --prefix/home/pg12/soft/ make -j 16 && make install2 主、备库配置环境变量 vi ~/.bash_profile…...
基于python+django+vue的鲜花商城系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于pythondjangovueMySQL的线…...
李飞飞任CEO,空间智能公司World Labs亮相,全明星阵容曝光
人工智能的下个大方向已经出现,标志性学者决定下场创业。 本周五,一个重磅消息引爆了 AI 圈:斯坦福大学计算机科学家李飞飞正式宣布创办 AI 初创公司 ——World Labs,旨在向人工智能系统传授有关物理现实的深入知识。 李飞飞说道&…...
PyTorch详解-可视化模块
PyTorch详解-可视化模块 Tensorboard 基础与使用启动 TensorBoard访问 TensorBoard使用 TensorBoardSummaryWriter类介绍参数说明常用方法 CNN卷积核与特征图可视化参数说明返回值 混淆矩阵与训练曲线可视化混淆矩阵可视化训练曲线绘制 模型参数打印参数说明输出解释 Tensorboa…...
Bootstrap 警告信息(Alerts)使用介绍
本章将讲解警告(Alerts)以及 Bootstrap 所提供的用于警告的 class。警告(Alerts)向用户提供了一种定义消息样式的方式。它们为典型的用户操作提供了上下文信息反馈。 您可以为警告框添加一个可选的关闭按钮。为了创建一个内联的可…...
uniapp(H5)设置反向代理,设置成功后页面报错
设置反向代理后,页面报错图: 反向代理代码:devServer下面就是配置对应的代理,一般这样就没问题了 "h5": {"router": {"mode": "hash"},"devServer": {"port": 517…...
define、typedef和using的使用
define、typedef 和 using 是 C(以及 C 语言中的 define)中用于定义别名或简化复杂类型的三个关键字,但它们各自有着不同的用途和行为。下面将分别对比这三个关键字: 1. #define 定义方式:#define 是预处理指令&…...
vue element时间选择不能超过今天 时间选中长度不能超过7天
背景: 使用elenmet plus 组件实现时间选择;且日期时间选择不能超过今天;连续选中时间的长度范围不能超过7天 效果展示: 实现思路: 一、使用element组件自带的属性和方法; :disabled-date"disabledDate…...
如何 吧一个 一维数组 切分成相同等分,一维数组作为lstm的输入(三维数据)的数据预处理 collate_fn的应用
要将一个一维数组切分成相同等分,你可以使用 Python 的内置功能或者 NumPy 库(如果你处理的是数值数据)。以下是几种不同的方法: 方法3 pad_sequence 结合dataloader 应该是最佳方案 ### 方法 1: 使用 Python 的内置切片功能 如果…...
Remix 学习 - @remix-run/react 中主要的 hooks
在 remix-run/react 中,有几个常用的 hooks,它们帮助你在 Remix 应用中处理路由、数据加载和其他功能。以下是一些主要的 hooks: useLoaderData: 用于获取从 loader 函数中返回的数据。 通常在组件中调用,以便访问路由加载的数据…...
STL之stack
stack容器 - 先进后出” - stack是堆栈容器,是一种的容器。 - 头文件:#include <stack> stack的push()与pop()方法 stack.push(elem);//往栈头添加元素 stack.pop();//从栈头移除第一个元素 stack<int> stkInt; stkInt.push(1);stkInt…...
如何用3个月零基础入门网络安全?_网络安全零基础怎么学习
前 言 写这篇教程的初衷是很多朋友都想了解如何入门/转行网络安全,实现自己的“黑客梦”。文章的宗旨是: 1.指出一些自学的误区 2.提供客观可行的学习表 3.推荐我认为适合小白学习的资源.大佬绕道哈! →点击获取网络安全资料攻略← 一、自学…...
适合学生党开学买的蓝牙耳机?分享开放式耳机排行榜前十名
学生党开学想买耳机的话,我觉得比较适合入手开放式耳机,因为这类耳机佩戴舒适度高,长时间使用也不会感到不适或疲劳,同时保持耳道干爽透气,更加健康卫生,还能提供自然、开阔的音场,音质表现优秀…...
汽车租赁系统1.0版本
汽车租赁系统1.0版本比较简陋,以后还会有2.0、3.0……就像《我爱发明》里面的一代机器二代机器,三代机器一样,是一个迭代更新的过程(最近比较忙,可能会很久),这个1.0版本很简陋,也请…...
DockerDocker Compose安装(离线+在线)
Docker&Docker Compose安装(离线在线) Docker离线安装 下载想要安装的docker软件版本:https://download.docker.com/linux/static/stable/x86_64/ 如目标机无法从链接下载,可以在本机下载后 scp docker版本压缩包[如docker-20.10.9.tgz] usernameh…...
【泰克生物】酵母展示建库技术解析:构建高质量抗体文库的实用指南
酵母展示库是抗体酵母展示服务的核心组成部分。酵母展示技术利用酵母细胞表面的展示系统,将目标蛋白质(如抗体)展示在细胞膜上。这一过程首先涉及到将抗体基因克隆到酵母表达载体中。随后,表达载体被转化到酵母细胞中,…...
QT Mode/View之View
目录 概念 使用已存在的视图 使用模型 使用模型的多个视图 处理元素的选择 视图间共享选择 概念 在模型/视图架构中,视图从模型中获取数据项并将它们呈现给用户。数据的表示方式不必与模型提供的数据表示形式相似,而且可能与用于存储数据项的底层数…...
URP 线性空间 ui资源制作规范
前言: 关于颜色空间的介绍,可参阅 unity 文档 Color space URP实现了基于物理的渲染,为了保证光照计算的准确,需要使用线性空间; 使用线性空间会带来一个问题,ui资源在unity中进行透明度混合时ÿ…...
(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...
从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
AI病理诊断七剑下天山,医疗未来触手可及
一、病理诊断困局:刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断",医生需通过显微镜观察组织切片,在细胞迷宫中捕捉癌变信号。某省病理质控报告显示,基层医院误诊率达12%-15%,专家会诊…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...
招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...
