当前位置: 首页 > news >正文

flink watermark 实例分析

WATERMARK 定义了表的事件时间属性,其形式为:

 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark是触发计算的机制,只要事件时间<= watermark,就会触发当前行数据的计算,watermark的形象描述如下:
在这里插入图片描述

watermark的窗口触发机制

watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:
在这里插入图片描述
根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
    注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。
  • 对于 [12:10,12:20][12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)

迟到的事件的处理,在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。

WATERMARK rowtime_column_name 取值两种方式

rowtime_column_name为计算列

CREATE TABLE pageviews (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), --计算列,必须为TIMESTAMP(3)/TIMESTAMP_LTZ(3)类型WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

rowtime_column_name为事件时间属性

CREATE TABLE dataGen(uuid VARCHAR(20),name INT,age INT,ts TIMESTAMP(3), --事件时间属性,字段类型为TIMESTAMP(3)WATERMARK FOR ts AS ts
)with('connector' = 'datagen','rows-per-second' = '10','number-of-rows' = '100','fields.age.kind' = 'random','fields.age.min' = '1','fields.age.max' = '10','fields.name.kind' = 'random','fields.name.min' = '1','fields.name.max' = '10');

watermark使用demo

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,group_name as COALESCE(cur['group_name'], src['group_name']),batch_number as COALESCE(cur['batch_number'], src['batch_number']),event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

watermark在over聚合中的使用

--RANGE:每个group_name计算当前group_name前10分钟内收到的同一group_name的所有总数
selectgroup_name
,event_time
,COUNT(group_name) OVER w1 as cnt
from kafka_table
where UPPER(opt) <> 'DELETE'
WINDOW w1 AS (PARTITION BY group_nameORDER BY event_timeRANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW)

watermark在windows聚合中的使用

--求每10分钟的滚动窗口内同一group_name的所有总数
create view tmp as
SELECT group_name,event_time FROM kafka_table where UPPER(opt) <> 'DELETE';select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

参考:
Window Aggregation
Over Aggregation

相关文章:

flink watermark 实例分析

WATERMARK 定义了表的事件时间属性&#xff0c;其形式为: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3)&#xff0c;且是 sche…...

系列十二(面试)、Java中的GC回收类型有哪些?

一、Java中的GC回收类型 1.1、概述 Java中的GC回收类型主要包含以下几种&#xff0c;即&#xff1a;UseSerialGC、UseParallelGC、UseConcMarkSweepGC、UseParNewGC、UseParallelOldGC、UseG1GC。 1.2、源码...

华为数通方向HCIP-DataCom H12-831题库(多选题:201-220)

第201题 在多集群RR组网中,每个集群中部署了一台RR设备及其客户机,各集群的RR与为非客户机关系,并建立IBGP全连接。以下关于BGP路由反射器发布路由规则的描述,正确的有哪些? A、若某RR从EBGP对等体学到的路由,此RR会传递给其他集群的RR B、若某RR从非客户机IBGP对等体学…...

NLP论文阅读记录 - | 使用GPT对大型文档集合进行抽象总结

文章目录 前言0、论文摘要一、Introduction二.相关工作2.1Summarization2.2 神经网络抽象概括2.2.1训练和测试数据集。2.2.2 评估。 2.3 最先进的抽象摘要器 三.本文方法3.1 查询支持3.2 文档聚类3.3主题句提取3.4 语义分块3.5 GPT 零样本总结 四 实验效果4.1数据集4.2 对比模型…...

华为全屋wifi6蜂鸟套装标准

华为政企42 华为政企 目录 上一篇华为安防监控摄像头下一篇华为企业级无线路由器...

系列二十八、如何在Oracle官网下载JDK的api文档

一、官网下载JDK的api文档 1.1、官网地址 https://www.oracle.com/java/technologies/javase-jdk21-doc-downloads.html 1.2、我分享的api.chm 链接&#xff1a;https://pan.baidu.com/s/1Bf55Fz-eMTErmQDtZZcewQ?pwdyyds 提取码&#xff1a;yyds 1.3、参考 https://ww…...

STM32-ADC模数转换器

目录 一、ADC简介 二、逐次逼近型ADC内部结构 三、STM32内部ADC转换结构 四、ADC基本结构 五、输入通道 六、转换模式 6.1单次转换&#xff0c;非扫描模式 6.2连续转换&#xff0c;非扫描模式 6.3单次转换&#xff0c;扫描模式 6.4连续转换&#xff0c;扫描模式 七、…...

谷歌手机安装证书到根目录

1、前提你已经root&#xff0c;安装好面具 2&#xff0c;下载movecert模块&#xff0c;自动帮你把证书从用户证书移动成系统证书 视频教程&#xff0c;手机为谷歌手机 https://www.bilibili.com/video/BV1pG4y1A7Cj?p11&vd_source9c0a32b00d6d59fecae05b4133f22f06 软件下…...

代码随想录 322. 零钱兑换

题目 给你一个整数数组 coins &#xff0c;表示不同面额的硬币&#xff1b;以及一个整数 amount &#xff0c;表示总金额。 计算并返回可以凑成总金额所需的 最少的硬币个数 。如果没有任何一种硬币组合能组成总金额&#xff0c;返回 -1 。 你可以认为每种硬币的数量是无限的。…...

【图的应用二:最短路径】- 用 C 语言实现迪杰斯特拉算法和弗洛伊德算法

目录 一、最短路径 二、迪杰斯特拉算法 三、弗洛伊德算法 一、最短路径 假若要在计算机上建立一个交通咨询系统&#xff0c;则可以采用图的结构来表示实际的交通网络。如下图所示&#xff0c;图中顶点表示城市&#xff0c;边表示城市间的交通联系。 这个咨询系统可以回答旅…...

Qt之判断一个点是否在多边形内部(射线法)

算法思想: 以被测点Q为端点,向任意方向作射线(一般水平向右作射线),统计该射线与多边形的交点数。如果为奇数,Q在多边形内;如果为偶数,Q在多边形外。计数的时候会有一些特殊情况。这种方法适用于任意多边形,不需要考虑精度误差和多边形点给出的顺序,时间复杂度为O(n)…...

压力测试过程中内存溢出(堆溢出、栈溢出、持久代溢出)情况如何解决

在压力测试过程中&#xff0c;可能会遇到内存溢出的问题&#xff0c;其中常见的包括堆内存溢出、栈内存溢出和持久代溢出。解决这类问题需要首先理解各种内存溢出的原因和特点。 堆内存溢出&#xff1a;这种情况通常发生在稳定性压测一段时间后&#xff0c;系统报错&#xff0…...

【工业智能】音频信号相关场景

【工业智能】音频信号相关场景 DcaseDcase introduction&#xff1a;dcase2024有10个主题的任务&#xff1a; ASD硬件设备产品商 方法制造业应用场景 zenodo音频事件检测 与计算机视觉CV相对应&#xff0c;计算机听觉computer audition&#xff0c;简称CA。 Dcase 这里推荐一个…...

(PC+WAP)装修设计公司网站模板 家装公司网站源码下载

(PCWAP)装修设计公司网站模板 家装公司网站源码下载 PbootCMS内核开发的网站模板&#xff0c;该模板适用于装修设计、家装公司类等企业&#xff0c;当然其他行业也可以做&#xff0c;只需要把文字图片换成其他行业的即可&#xff1b; PCWAP&#xff0c;同一个后台&#xff0c…...

使用opencv实现图像中几何图形检测

1 几何图形检测介绍 1.1 轮廓(contours) 什么是轮廓&#xff0c;简单说轮廓就是一些列点相连组成形状、它们拥有同样的颜色、轮廓发现在图像的对象分析、对象检测等方面是非常有用的工具&#xff0c;在OpenCV 中使用轮廓发现相关函数时候要求输入图像是二值图像&#xff0c;这…...

补题与周总结:leetcode第 376 场周赛

文章目录 复盘与一周总结2967. 使数组成为等数数组的最小代价&#xff08;中位数贪心 回文数判断&#xff09;2968. 执行操作使频率分数最大&#xff08;中位数贪心 前缀和 滑窗&#xff09; 复盘与一周总结 wa穿了第3题&#xff0c;赛时其实想到了思路&#xff1a;中位数贪心…...

js指纹库,可跟踪用户唯一性

fingerprintjs官网 资料&#xff1a; Browserleaks - Check your browser for privacy leaks...

Shell三剑客:awk(内部变量)

一、$0 &#xff1a;完整的输入记录 [rootlocalhost ~]# awk -F: {print $0} passwd.txt root:x:0:0:root:/root:/bin/bash bin:x:1:1:bin:/bin:/sbin/nologin daemon:x:2:2:daemon:/sbin:/sbin/nologin adm:x:3:4:adm:/var/adm:/sbin/nologin lp:x:4:7:lp:/var/spool/lpd:/s…...

JVM中的虚拟机栈的动态链接部分存放到底是什么

在Java虚拟机&#xff08;JVM&#xff09;中&#xff0c;每个线程在执行一个方法时都会创建一个栈帧&#xff08;Stack Frame&#xff09;&#xff0c;栈帧中包含了方法的运行时数据。栈帧通常包括局部变量表、操作数栈、动态链接、方法返回地址等部分。 动态链接 动态链接&a…...

Leetcode 55 跳跃游戏

题意理解&#xff1a; 非负整数数组 nums, 最初位于数组的 第一个下标 。 数组中的每个元素代表你在该位置可以跳跃的最大长度。 需要跳到nums最后一个元素即为成功。 目标&#xff1a;是否能够跳到最后一个元素。 解题思路&#xff1a; 使用贪心算法来解题&#xff0c;需要理解…...

OpenLayers 可视化之热力图

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 热力图&#xff08;Heatmap&#xff09;又叫热点图&#xff0c;是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...