flink watermark 实例分析
WATERMARK 定义了表的事件时间属性,其形式为:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark是触发计算的机制,只要事件时间<= watermark,就会触发当前行数据的计算,watermark的形象描述如下:

watermark的窗口触发机制
watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:

根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s
- 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。 - 对于 [12:10,12:20] 和 [12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)
迟到的事件的处理,在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。
WATERMARK rowtime_column_name 取值两种方式
rowtime_column_name为计算列
CREATE TABLE pageviews (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), --计算列,必须为TIMESTAMP(3)/TIMESTAMP_LTZ(3)类型WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets latest-offset earliest-offset
);
rowtime_column_name为事件时间属性
CREATE TABLE dataGen(uuid VARCHAR(20),name INT,age INT,ts TIMESTAMP(3), --事件时间属性,字段类型为TIMESTAMP(3)WATERMARK FOR ts AS ts
)with('connector' = 'datagen','rows-per-second' = '10','number-of-rows' = '100','fields.age.kind' = 'random','fields.age.min' = '1','fields.age.max' = '10','fields.name.kind' = 'random','fields.name.min' = '1','fields.name.max' = '10');
watermark使用demo
CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,group_name as COALESCE(cur['group_name'], src['group_name']),batch_number as COALESCE(cur['batch_number'], src['batch_number']),event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE --SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets latest-offset earliest-offset
);
watermark在over聚合中的使用
--RANGE:每个group_name计算当前group_name前10分钟内收到的同一group_name的所有总数
selectgroup_name
,event_time
,COUNT(group_name) OVER w1 as cnt
from kafka_table
where UPPER(opt) <> 'DELETE'
WINDOW w1 AS (PARTITION BY group_nameORDER BY event_timeRANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW)
watermark在windows聚合中的使用
--求每10分钟的滚动窗口内同一group_name的所有总数
create view tmp as
SELECT group_name,event_time FROM kafka_table where UPPER(opt) <> 'DELETE';select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name
参考:
Window Aggregation
Over Aggregation
相关文章:
flink watermark 实例分析
WATERMARK 定义了表的事件时间属性,其形式为: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 sche…...
系列十二(面试)、Java中的GC回收类型有哪些?
一、Java中的GC回收类型 1.1、概述 Java中的GC回收类型主要包含以下几种,即:UseSerialGC、UseParallelGC、UseConcMarkSweepGC、UseParNewGC、UseParallelOldGC、UseG1GC。 1.2、源码...
华为数通方向HCIP-DataCom H12-831题库(多选题:201-220)
第201题 在多集群RR组网中,每个集群中部署了一台RR设备及其客户机,各集群的RR与为非客户机关系,并建立IBGP全连接。以下关于BGP路由反射器发布路由规则的描述,正确的有哪些? A、若某RR从EBGP对等体学到的路由,此RR会传递给其他集群的RR B、若某RR从非客户机IBGP对等体学…...
NLP论文阅读记录 - | 使用GPT对大型文档集合进行抽象总结
文章目录 前言0、论文摘要一、Introduction二.相关工作2.1Summarization2.2 神经网络抽象概括2.2.1训练和测试数据集。2.2.2 评估。 2.3 最先进的抽象摘要器 三.本文方法3.1 查询支持3.2 文档聚类3.3主题句提取3.4 语义分块3.5 GPT 零样本总结 四 实验效果4.1数据集4.2 对比模型…...
华为全屋wifi6蜂鸟套装标准
华为政企42 华为政企 目录 上一篇华为安防监控摄像头下一篇华为企业级无线路由器...
系列二十八、如何在Oracle官网下载JDK的api文档
一、官网下载JDK的api文档 1.1、官网地址 https://www.oracle.com/java/technologies/javase-jdk21-doc-downloads.html 1.2、我分享的api.chm 链接:https://pan.baidu.com/s/1Bf55Fz-eMTErmQDtZZcewQ?pwdyyds 提取码:yyds 1.3、参考 https://ww…...
STM32-ADC模数转换器
目录 一、ADC简介 二、逐次逼近型ADC内部结构 三、STM32内部ADC转换结构 四、ADC基本结构 五、输入通道 六、转换模式 6.1单次转换,非扫描模式 6.2连续转换,非扫描模式 6.3单次转换,扫描模式 6.4连续转换,扫描模式 七、…...
谷歌手机安装证书到根目录
1、前提你已经root,安装好面具 2,下载movecert模块,自动帮你把证书从用户证书移动成系统证书 视频教程,手机为谷歌手机 https://www.bilibili.com/video/BV1pG4y1A7Cj?p11&vd_source9c0a32b00d6d59fecae05b4133f22f06 软件下…...
代码随想录 322. 零钱兑换
题目 给你一个整数数组 coins ,表示不同面额的硬币;以及一个整数 amount ,表示总金额。 计算并返回可以凑成总金额所需的 最少的硬币个数 。如果没有任何一种硬币组合能组成总金额,返回 -1 。 你可以认为每种硬币的数量是无限的。…...
【图的应用二:最短路径】- 用 C 语言实现迪杰斯特拉算法和弗洛伊德算法
目录 一、最短路径 二、迪杰斯特拉算法 三、弗洛伊德算法 一、最短路径 假若要在计算机上建立一个交通咨询系统,则可以采用图的结构来表示实际的交通网络。如下图所示,图中顶点表示城市,边表示城市间的交通联系。 这个咨询系统可以回答旅…...
Qt之判断一个点是否在多边形内部(射线法)
算法思想: 以被测点Q为端点,向任意方向作射线(一般水平向右作射线),统计该射线与多边形的交点数。如果为奇数,Q在多边形内;如果为偶数,Q在多边形外。计数的时候会有一些特殊情况。这种方法适用于任意多边形,不需要考虑精度误差和多边形点给出的顺序,时间复杂度为O(n)…...
压力测试过程中内存溢出(堆溢出、栈溢出、持久代溢出)情况如何解决
在压力测试过程中,可能会遇到内存溢出的问题,其中常见的包括堆内存溢出、栈内存溢出和持久代溢出。解决这类问题需要首先理解各种内存溢出的原因和特点。 堆内存溢出:这种情况通常发生在稳定性压测一段时间后,系统报错࿰…...
【工业智能】音频信号相关场景
【工业智能】音频信号相关场景 DcaseDcase introduction:dcase2024有10个主题的任务: ASD硬件设备产品商 方法制造业应用场景 zenodo音频事件检测 与计算机视觉CV相对应,计算机听觉computer audition,简称CA。 Dcase 这里推荐一个…...
(PC+WAP)装修设计公司网站模板 家装公司网站源码下载
(PCWAP)装修设计公司网站模板 家装公司网站源码下载 PbootCMS内核开发的网站模板,该模板适用于装修设计、家装公司类等企业,当然其他行业也可以做,只需要把文字图片换成其他行业的即可; PCWAP,同一个后台,…...
使用opencv实现图像中几何图形检测
1 几何图形检测介绍 1.1 轮廓(contours) 什么是轮廓,简单说轮廓就是一些列点相连组成形状、它们拥有同样的颜色、轮廓发现在图像的对象分析、对象检测等方面是非常有用的工具,在OpenCV 中使用轮廓发现相关函数时候要求输入图像是二值图像,这…...
补题与周总结:leetcode第 376 场周赛
文章目录 复盘与一周总结2967. 使数组成为等数数组的最小代价(中位数贪心 回文数判断)2968. 执行操作使频率分数最大(中位数贪心 前缀和 滑窗) 复盘与一周总结 wa穿了第3题,赛时其实想到了思路:中位数贪心…...
js指纹库,可跟踪用户唯一性
fingerprintjs官网 资料: Browserleaks - Check your browser for privacy leaks...
Shell三剑客:awk(内部变量)
一、$0 :完整的输入记录 [rootlocalhost ~]# awk -F: {print $0} passwd.txt root:x:0:0:root:/root:/bin/bash bin:x:1:1:bin:/bin:/sbin/nologin daemon:x:2:2:daemon:/sbin:/sbin/nologin adm:x:3:4:adm:/var/adm:/sbin/nologin lp:x:4:7:lp:/var/spool/lpd:/s…...
JVM中的虚拟机栈的动态链接部分存放到底是什么
在Java虚拟机(JVM)中,每个线程在执行一个方法时都会创建一个栈帧(Stack Frame),栈帧中包含了方法的运行时数据。栈帧通常包括局部变量表、操作数栈、动态链接、方法返回地址等部分。 动态链接 动态链接&a…...
Leetcode 55 跳跃游戏
题意理解: 非负整数数组 nums, 最初位于数组的 第一个下标 。 数组中的每个元素代表你在该位置可以跳跃的最大长度。 需要跳到nums最后一个元素即为成功。 目标:是否能够跳到最后一个元素。 解题思路: 使用贪心算法来解题,需要理解…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
用docker来安装部署freeswitch记录
今天刚才测试一个callcenter的项目,所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...
NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合
在汽车智能化的汹涌浪潮中,车辆不再仅仅是传统的交通工具,而是逐步演变为高度智能的移动终端。这一转变的核心支撑,来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒(T-Box)方案:NXP S32K146 与…...
