flink watermark 实例分析
WATERMARK 定义了表的事件时间属性,其形式为:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark是触发计算的机制,只要事件时间<= watermark,就会触发当前行数据的计算,watermark的形象描述如下:

watermark的窗口触发机制
watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:

根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s
- 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。 - 对于 [12:10,12:20] 和 [12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)
迟到的事件的处理,在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。
WATERMARK rowtime_column_name 取值两种方式
rowtime_column_name为计算列
CREATE TABLE pageviews (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), --计算列,必须为TIMESTAMP(3)/TIMESTAMP_LTZ(3)类型WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets latest-offset earliest-offset
);
rowtime_column_name为事件时间属性
CREATE TABLE dataGen(uuid VARCHAR(20),name INT,age INT,ts TIMESTAMP(3), --事件时间属性,字段类型为TIMESTAMP(3)WATERMARK FOR ts AS ts
)with('connector' = 'datagen','rows-per-second' = '10','number-of-rows' = '100','fields.age.kind' = 'random','fields.age.min' = '1','fields.age.max' = '10','fields.name.kind' = 'random','fields.name.min' = '1','fields.name.max' = '10');
watermark使用demo
CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,group_name as COALESCE(cur['group_name'], src['group_name']),batch_number as COALESCE(cur['batch_number'], src['batch_number']),event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE --SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets latest-offset earliest-offset
);
watermark在over聚合中的使用
--RANGE:每个group_name计算当前group_name前10分钟内收到的同一group_name的所有总数
selectgroup_name
,event_time
,COUNT(group_name) OVER w1 as cnt
from kafka_table
where UPPER(opt) <> 'DELETE'
WINDOW w1 AS (PARTITION BY group_nameORDER BY event_timeRANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW)
watermark在windows聚合中的使用
--求每10分钟的滚动窗口内同一group_name的所有总数
create view tmp as
SELECT group_name,event_time FROM kafka_table where UPPER(opt) <> 'DELETE';select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name
参考:
Window Aggregation
Over Aggregation
相关文章:
flink watermark 实例分析
WATERMARK 定义了表的事件时间属性,其形式为: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 sche…...
系列十二(面试)、Java中的GC回收类型有哪些?
一、Java中的GC回收类型 1.1、概述 Java中的GC回收类型主要包含以下几种,即:UseSerialGC、UseParallelGC、UseConcMarkSweepGC、UseParNewGC、UseParallelOldGC、UseG1GC。 1.2、源码...
华为数通方向HCIP-DataCom H12-831题库(多选题:201-220)
第201题 在多集群RR组网中,每个集群中部署了一台RR设备及其客户机,各集群的RR与为非客户机关系,并建立IBGP全连接。以下关于BGP路由反射器发布路由规则的描述,正确的有哪些? A、若某RR从EBGP对等体学到的路由,此RR会传递给其他集群的RR B、若某RR从非客户机IBGP对等体学…...
NLP论文阅读记录 - | 使用GPT对大型文档集合进行抽象总结
文章目录 前言0、论文摘要一、Introduction二.相关工作2.1Summarization2.2 神经网络抽象概括2.2.1训练和测试数据集。2.2.2 评估。 2.3 最先进的抽象摘要器 三.本文方法3.1 查询支持3.2 文档聚类3.3主题句提取3.4 语义分块3.5 GPT 零样本总结 四 实验效果4.1数据集4.2 对比模型…...
华为全屋wifi6蜂鸟套装标准
华为政企42 华为政企 目录 上一篇华为安防监控摄像头下一篇华为企业级无线路由器...
系列二十八、如何在Oracle官网下载JDK的api文档
一、官网下载JDK的api文档 1.1、官网地址 https://www.oracle.com/java/technologies/javase-jdk21-doc-downloads.html 1.2、我分享的api.chm 链接:https://pan.baidu.com/s/1Bf55Fz-eMTErmQDtZZcewQ?pwdyyds 提取码:yyds 1.3、参考 https://ww…...
STM32-ADC模数转换器
目录 一、ADC简介 二、逐次逼近型ADC内部结构 三、STM32内部ADC转换结构 四、ADC基本结构 五、输入通道 六、转换模式 6.1单次转换,非扫描模式 6.2连续转换,非扫描模式 6.3单次转换,扫描模式 6.4连续转换,扫描模式 七、…...
谷歌手机安装证书到根目录
1、前提你已经root,安装好面具 2,下载movecert模块,自动帮你把证书从用户证书移动成系统证书 视频教程,手机为谷歌手机 https://www.bilibili.com/video/BV1pG4y1A7Cj?p11&vd_source9c0a32b00d6d59fecae05b4133f22f06 软件下…...
代码随想录 322. 零钱兑换
题目 给你一个整数数组 coins ,表示不同面额的硬币;以及一个整数 amount ,表示总金额。 计算并返回可以凑成总金额所需的 最少的硬币个数 。如果没有任何一种硬币组合能组成总金额,返回 -1 。 你可以认为每种硬币的数量是无限的。…...
【图的应用二:最短路径】- 用 C 语言实现迪杰斯特拉算法和弗洛伊德算法
目录 一、最短路径 二、迪杰斯特拉算法 三、弗洛伊德算法 一、最短路径 假若要在计算机上建立一个交通咨询系统,则可以采用图的结构来表示实际的交通网络。如下图所示,图中顶点表示城市,边表示城市间的交通联系。 这个咨询系统可以回答旅…...
Qt之判断一个点是否在多边形内部(射线法)
算法思想: 以被测点Q为端点,向任意方向作射线(一般水平向右作射线),统计该射线与多边形的交点数。如果为奇数,Q在多边形内;如果为偶数,Q在多边形外。计数的时候会有一些特殊情况。这种方法适用于任意多边形,不需要考虑精度误差和多边形点给出的顺序,时间复杂度为O(n)…...
压力测试过程中内存溢出(堆溢出、栈溢出、持久代溢出)情况如何解决
在压力测试过程中,可能会遇到内存溢出的问题,其中常见的包括堆内存溢出、栈内存溢出和持久代溢出。解决这类问题需要首先理解各种内存溢出的原因和特点。 堆内存溢出:这种情况通常发生在稳定性压测一段时间后,系统报错࿰…...
【工业智能】音频信号相关场景
【工业智能】音频信号相关场景 DcaseDcase introduction:dcase2024有10个主题的任务: ASD硬件设备产品商 方法制造业应用场景 zenodo音频事件检测 与计算机视觉CV相对应,计算机听觉computer audition,简称CA。 Dcase 这里推荐一个…...
(PC+WAP)装修设计公司网站模板 家装公司网站源码下载
(PCWAP)装修设计公司网站模板 家装公司网站源码下载 PbootCMS内核开发的网站模板,该模板适用于装修设计、家装公司类等企业,当然其他行业也可以做,只需要把文字图片换成其他行业的即可; PCWAP,同一个后台,…...
使用opencv实现图像中几何图形检测
1 几何图形检测介绍 1.1 轮廓(contours) 什么是轮廓,简单说轮廓就是一些列点相连组成形状、它们拥有同样的颜色、轮廓发现在图像的对象分析、对象检测等方面是非常有用的工具,在OpenCV 中使用轮廓发现相关函数时候要求输入图像是二值图像,这…...
补题与周总结:leetcode第 376 场周赛
文章目录 复盘与一周总结2967. 使数组成为等数数组的最小代价(中位数贪心 回文数判断)2968. 执行操作使频率分数最大(中位数贪心 前缀和 滑窗) 复盘与一周总结 wa穿了第3题,赛时其实想到了思路:中位数贪心…...
js指纹库,可跟踪用户唯一性
fingerprintjs官网 资料: Browserleaks - Check your browser for privacy leaks...
Shell三剑客:awk(内部变量)
一、$0 :完整的输入记录 [rootlocalhost ~]# awk -F: {print $0} passwd.txt root:x:0:0:root:/root:/bin/bash bin:x:1:1:bin:/bin:/sbin/nologin daemon:x:2:2:daemon:/sbin:/sbin/nologin adm:x:3:4:adm:/var/adm:/sbin/nologin lp:x:4:7:lp:/var/spool/lpd:/s…...
JVM中的虚拟机栈的动态链接部分存放到底是什么
在Java虚拟机(JVM)中,每个线程在执行一个方法时都会创建一个栈帧(Stack Frame),栈帧中包含了方法的运行时数据。栈帧通常包括局部变量表、操作数栈、动态链接、方法返回地址等部分。 动态链接 动态链接&a…...
Leetcode 55 跳跃游戏
题意理解: 非负整数数组 nums, 最初位于数组的 第一个下标 。 数组中的每个元素代表你在该位置可以跳跃的最大长度。 需要跳到nums最后一个元素即为成功。 目标:是否能够跳到最后一个元素。 解题思路: 使用贪心算法来解题,需要理解…...
高通平台USB充电背后的秘密:从SBL1阶段到Kernel的电池ID识别全解析
高通平台USB充电与电池ID识别的深度技术解析 在Android设备开发中,电源管理系统的稳定性直接影响用户体验。作为底层驱动工程师,理解高通平台从硬件到软件的完整充电流程至关重要。本文将深入剖析从XBL阶段到Kernel层的电池识别机制,揭示BATT…...
9大核心优势!Outfit字体全方位应用指南:从安装到精通
9大核心优势!Outfit字体全方位应用指南:从安装到精通 【免费下载链接】Outfit-Fonts The most on-brand typeface 项目地址: https://gitcode.com/gh_mirrors/ou/Outfit-Fonts Outfit字体作为一款专业开源无衬线字体,凭借9种完整字重体…...
python-flask-djangol框架的关爱空巢老人和孩子留守儿童管理系统的设计和实现
目录需求分析与规划技术选型核心模块设计数据安全与权限开发与测试计划社区与可持续性项目技术支持源码获取详细视频演示 :文章底部获取博主联系方式!同行可合作需求分析与规划 明确系统核心功能模块:空巢老人健康监测、留守儿童学习与心理辅…...
【开发工具】Trae IDE 解决 Windows 下 C 工程无法跳转定义问题
1. 概要 👋 作为 Trae IDE 使用者,在 Windows 环境打开本地 C 工程时,习惯用 Ctrl 鼠标左键 快速跳转函数 / 变量定义却失效,仅能做文本匹配,无法精准定位语义定义。核心原因是 Trae 依赖 LSP(语言服务器协…...
计算机毕业设计springboot众筹系统 基于SpringBoot的校园项目众筹融资平台设计与实现 高校创新创业众筹服务与资金管理系统构建研究
计算机毕业设计springboot众筹系统(配套有源码 程序 mysql数据库 论文) 本套源码可以在文本联xi,先看具体系统功能演示视频领取,可分享源码参考。 随着我国经济的高速发展与人们生活水平的日益提高,人们对生活质量的追求也多种多样…...
Spring PetClinic实战解析:从单体应用到云原生部署的5大架构亮点
Spring PetClinic实战解析:从单体应用到云原生部署的5大架构亮点 【免费下载链接】spring-petclinic A sample Spring-based application 项目地址: https://gitcode.com/gh_mirrors/sp/spring-petclinic 你是否遇到过这样的困境:在学习Spring框架…...
PETRV2-BEV模型的高精度3D车道检测效果展示
PETRV2-BEV模型的高精度3D车道检测效果展示 1. 引言 想象一下,一辆自动驾驶汽车在复杂的城市道路中行驶,需要实时识别车道线、判断可行驶区域、预测周围车辆轨迹。这背后离不开一项关键技术——3D车道检测。传统的2D检测方法在复杂道路场景中往往力不从…...
4个关键步骤解决Calibre中文路径乱码难题
4个关键步骤解决Calibre中文路径乱码难题 【免费下载链接】calibre-do-not-translate-my-path Switch my calibre library from ascii path to plain Unicode path. 将我的书库从拼音目录切换至非纯英文(中文)命名 项目地址: https://gitcode.com/gh_m…...
打造轻量级Windows系统:Tiny11Builder深度应用指南
打造轻量级Windows系统:Tiny11Builder深度应用指南 【免费下载链接】tiny11builder Scripts to build a trimmed-down Windows 11 image. 项目地址: https://gitcode.com/GitHub_Trending/ti/tiny11builder 价值定位:解决三大系统痛点 你的Windo…...
Redis监听Key过期事件报错?教你两种绕过CONFIG命令的实用方案
Redis监听Key过期事件的两种安全实践方案 Redis的Key过期事件监听是许多业务场景中的核心需求,比如订单超时处理、会话管理、缓存刷新等。但在云服务环境中,开发者常会遇到ERR unknown command CONFIG的报错,这通常是因为云服务提供商出于安全…...
