大数据Flink(一百一十八):SQL水印操作(Watermark)
文章目录
SQL水印操作(Watermark)
一、为什么要有WaterMark
二、Watermark解决的问题
三、代码演示
SQL水印操作(Watermark)
一、为什么要有WaterMark
当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:
假设在一个5秒的Tumble窗口,有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:该如何处理迟到数据
二、Watermark解决的问题
上面的问题在于如何将迟来的EventTime 为11的元素正确处理?
当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:
如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下:
通过watermark来解决,简单来说就是延迟窗口关闭的时间,等一会迟到的数据,窗口关闭不在依据数据的时间,而是到达的watermark的时间。
watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用。
三、代码演示
- 使用Socket模拟接收数据
- 设置WaterMark
- 设置的逻辑:在第一条数据进来时,设置WaterMark为0,指定第一条数据的时间戳后,获取该时间戳与当前 WaterMark的最大值,并将最大值设置为下一条数据的WaterMark,以此类推
- 使用滚动Event Time窗口,将5秒内的同组数据,进行聚合输出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '0' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;
若输入第一条数据:hello,2022-03-25 16:39:45
那么,我先假设后续的数据Event Time间隔为1秒,推断一下WaterMark的设定,如下图所示
1.第一条数据的Event Time为1648197585000,那么当前窗口时间为:1648197585000-> 1648197589000,即下图中红色框线
2.第一条数据进来时,这条数据之前的WaterMark为0,当第一条数据已经进入后,指定Event Time位置,并与现在的WaterMark比较,将两者中大的那个值设置为新的WaterMark,那么当前数据的WaterMark为1648197585000
3.第二条数据进来时,前一条数据的WaterMark为1648197585000,第二条数据的Event Time比之前的WaterMark大,于是更新WaterMark,将当前的WaterMark更新为1648197586000,但还没到窗口触发时间,不进行计算
4.后面几个以此类推,直到Event Time为:1648197590000的数据进来的时候,前一条数据的WaterMark为1648197589000,于是更新当前的WaterMark为1648197590000,Flink认为1648197590000之前的数据都已经到达,且达到了窗口的触发条件,开始进行计算
根据上面的推断,启动程序验证一下,向9999端口监听终端输入以下内容:
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
Flink输出结果:
Rowtime列在经过窗口操作后,其Event Time属性将丢失。可以使用辅助函数TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME,获取窗口中的Rowtime列的最大值max(rowtime)作为时间窗口的Rowtime,其类型是具有Rowtime属性的TIMESTAMP,取值为 window_end - 1 。 例如[00:00, 00:15) 的窗口,返回值为00:14:59.999 。
数据乱序的场景
上面的实例,Event Time是有序,现在来做一下数据乱序的场景模拟启动程序(注意要关闭之前的查询,重新运行查询语句),在监听终端中输入如下数据:
其中,在触发了了第一个窗口计算后,又来了两条迟到数据hello,2022-03-25 16:39:47,hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink结果:
从结果中可以看到,在第二个窗口中,那两条迟到数据并没有进行处理,这个就是迟到丢弃。
乱序时间的设置:
为了解决上面的问题,我们允许Flink处理延迟在5秒内的迟到数据
修改最大乱序时间(新建的表仅水印与之前不同)
CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'socket',
'hostname' = '178.23.142.233',
'port' = '9999',
'format' = 'csv'
);SELECT
date_format(TUMBLE_START(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_start,
date_format(TUMBLE_END(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL '5' SECOND),'yyyy-MM-dd hh:mm:ss.SSS') as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), item;
在监听终端中,输入数据
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink输出结果:
可以看到,之前迟到的两条数据在第一个窗口中进行了处理。因为设置了最大允许乱序时间后,WaterMark要比原来低5秒,可以对延迟5秒内的数据进行处理,窗口的触发条件也同样会往后延迟关于延迟时间,请结合业务场景进行设置。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:

大数据Flink(一百一十八):SQL水印操作(Watermark)
文章目录 SQL水印操作(Watermark) 一、为什么要有WaterMark 二、Watermark解决的问题 三、代码演示 SQL水印操作(Watermark) 一、为什么要…...

【QGC】把QGroundControl地面站添加到Ubuntu侧边菜单栏启动
把QGroundControl地面站添加到Ubuntu侧边菜单栏启动 简介准备工作步骤 1: 创建 Desktop Entry 文件步骤 2: 编辑 Desktop Entry 文件步骤 3: 刷新应用程序菜单步骤 4: 将 QGroundControl 固定到侧边栏 环境: Ubuntu :20.04 LTS 简介 QGroundControl 是…...
PostgreSQL配置主从同步
PostgreSQL配置主从同步 1 主、备库安装postgresql软件 su - pg12 cd /home/pg12/resource tar -zxvf postgresql-12.9.tar.gz cd postgresql-12.9/ ./configure --prefix/home/pg12/soft/ make -j 16 && make install2 主、备库配置环境变量 vi ~/.bash_profile…...

基于python+django+vue的鲜花商城系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于pythondjangovueMySQL的线…...

李飞飞任CEO,空间智能公司World Labs亮相,全明星阵容曝光
人工智能的下个大方向已经出现,标志性学者决定下场创业。 本周五,一个重磅消息引爆了 AI 圈:斯坦福大学计算机科学家李飞飞正式宣布创办 AI 初创公司 ——World Labs,旨在向人工智能系统传授有关物理现实的深入知识。 李飞飞说道&…...
PyTorch详解-可视化模块
PyTorch详解-可视化模块 Tensorboard 基础与使用启动 TensorBoard访问 TensorBoard使用 TensorBoardSummaryWriter类介绍参数说明常用方法 CNN卷积核与特征图可视化参数说明返回值 混淆矩阵与训练曲线可视化混淆矩阵可视化训练曲线绘制 模型参数打印参数说明输出解释 Tensorboa…...

Bootstrap 警告信息(Alerts)使用介绍
本章将讲解警告(Alerts)以及 Bootstrap 所提供的用于警告的 class。警告(Alerts)向用户提供了一种定义消息样式的方式。它们为典型的用户操作提供了上下文信息反馈。 您可以为警告框添加一个可选的关闭按钮。为了创建一个内联的可…...

uniapp(H5)设置反向代理,设置成功后页面报错
设置反向代理后,页面报错图: 反向代理代码:devServer下面就是配置对应的代理,一般这样就没问题了 "h5": {"router": {"mode": "hash"},"devServer": {"port": 517…...
define、typedef和using的使用
define、typedef 和 using 是 C(以及 C 语言中的 define)中用于定义别名或简化复杂类型的三个关键字,但它们各自有着不同的用途和行为。下面将分别对比这三个关键字: 1. #define 定义方式:#define 是预处理指令&…...

vue element时间选择不能超过今天 时间选中长度不能超过7天
背景: 使用elenmet plus 组件实现时间选择;且日期时间选择不能超过今天;连续选中时间的长度范围不能超过7天 效果展示: 实现思路: 一、使用element组件自带的属性和方法; :disabled-date"disabledDate…...
如何 吧一个 一维数组 切分成相同等分,一维数组作为lstm的输入(三维数据)的数据预处理 collate_fn的应用
要将一个一维数组切分成相同等分,你可以使用 Python 的内置功能或者 NumPy 库(如果你处理的是数值数据)。以下是几种不同的方法: 方法3 pad_sequence 结合dataloader 应该是最佳方案 ### 方法 1: 使用 Python 的内置切片功能 如果…...
Remix 学习 - @remix-run/react 中主要的 hooks
在 remix-run/react 中,有几个常用的 hooks,它们帮助你在 Remix 应用中处理路由、数据加载和其他功能。以下是一些主要的 hooks: useLoaderData: 用于获取从 loader 函数中返回的数据。 通常在组件中调用,以便访问路由加载的数据…...

STL之stack
stack容器 - 先进后出” - stack是堆栈容器,是一种的容器。 - 头文件:#include <stack> stack的push()与pop()方法 stack.push(elem);//往栈头添加元素 stack.pop();//从栈头移除第一个元素 stack<int> stkInt; stkInt.push(1);stkInt…...

如何用3个月零基础入门网络安全?_网络安全零基础怎么学习
前 言 写这篇教程的初衷是很多朋友都想了解如何入门/转行网络安全,实现自己的“黑客梦”。文章的宗旨是: 1.指出一些自学的误区 2.提供客观可行的学习表 3.推荐我认为适合小白学习的资源.大佬绕道哈! →点击获取网络安全资料攻略← 一、自学…...

适合学生党开学买的蓝牙耳机?分享开放式耳机排行榜前十名
学生党开学想买耳机的话,我觉得比较适合入手开放式耳机,因为这类耳机佩戴舒适度高,长时间使用也不会感到不适或疲劳,同时保持耳道干爽透气,更加健康卫生,还能提供自然、开阔的音场,音质表现优秀…...

汽车租赁系统1.0版本
汽车租赁系统1.0版本比较简陋,以后还会有2.0、3.0……就像《我爱发明》里面的一代机器二代机器,三代机器一样,是一个迭代更新的过程(最近比较忙,可能会很久),这个1.0版本很简陋,也请…...
DockerDocker Compose安装(离线+在线)
Docker&Docker Compose安装(离线在线) Docker离线安装 下载想要安装的docker软件版本:https://download.docker.com/linux/static/stable/x86_64/ 如目标机无法从链接下载,可以在本机下载后 scp docker版本压缩包[如docker-20.10.9.tgz] usernameh…...
【泰克生物】酵母展示建库技术解析:构建高质量抗体文库的实用指南
酵母展示库是抗体酵母展示服务的核心组成部分。酵母展示技术利用酵母细胞表面的展示系统,将目标蛋白质(如抗体)展示在细胞膜上。这一过程首先涉及到将抗体基因克隆到酵母表达载体中。随后,表达载体被转化到酵母细胞中,…...

QT Mode/View之View
目录 概念 使用已存在的视图 使用模型 使用模型的多个视图 处理元素的选择 视图间共享选择 概念 在模型/视图架构中,视图从模型中获取数据项并将它们呈现给用户。数据的表示方式不必与模型提供的数据表示形式相似,而且可能与用于存储数据项的底层数…...

URP 线性空间 ui资源制作规范
前言: 关于颜色空间的介绍,可参阅 unity 文档 Color space URP实现了基于物理的渲染,为了保证光照计算的准确,需要使用线性空间; 使用线性空间会带来一个问题,ui资源在unity中进行透明度混合时ÿ…...

循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...

微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...
服务器--宝塔命令
一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行! sudo su - 1. CentOS 系统: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!
简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求,并检查收到的响应。它以以下模式之一…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用
文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么?1.1.2 感知机的工作原理 1.2 感知机的简单应用:基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...