窗口延时、侧输出流数据处理
一 、 AllowedLateness API 延时关闭窗口
AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s,AllowedLateness 的时间为2s,那一个10的滚动窗口,0-10这个单位窗口正常的关窗时间应该是超过12s的数据到达之后就关窗。而AllowedLateness 是在12s的基础上继续延长了2s,也就是在14s的时候才真正去关闭 0-10s的窗口,但是在12s的时候会触发窗口计算,从12s之后到14s的数据每到达一个就会触发一次窗口计算。
二 、 OutputTag API 侧输出流
使用 OutputTag API 保证窗口关闭的数据依然可以获取,窗口到达AllowedLateness 时间后将彻底关闭,此时再属于该窗口范围内的数据将会流向 OutputTag 。
context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);
如果现在定义一个 5s的
滚动窗口,WaterMark延时时间为2s,AllowedLateness 延时时间为2s,此时相当于是 WaterMark到达9s的时候才会关闭0-5的窗口,也就是说最后两条数据会流向OutputTag . 当4000L数据到达后,会再次触发一次窗口计算。
完全与预期一致。
完整代码:
public class WindowOutputTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = Env.getEnv();DataStreamSource<Event> dataStreamSource = env.addSource(new SourceFunction<Event>() {@Overridepublic void run(SourceContext<Event> context) throws Exception {context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);}@Overridepublic void cancel() {}});//operatorSingleOutputStreamOperator<Event> operator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))// 水位线延时2s.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));OutputTag<Event> eventOutputTag = new OutputTag<Event>("late") {};WindowedStream<Event, Boolean, TimeWindow> windowedStream = operator.keyBy(d -> true).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).allowedLateness(Time.of(2, TimeUnit.SECONDS)).sideOutputLateData(eventOutputTag);SingleOutputStreamOperator<String> windowAgg = windowedStream.aggregate(new AggregateFunction<Event, Long, Long>() {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event event, Long acc) {return acc + 1;}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long aLong, Long acc1) {return null;}}, new ProcessWindowFunction<Long, String, Boolean, TimeWindow>() {@Overridepublic void process(Boolean key, Context context, Iterable<Long> iterable, Collector<String> collector) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();collector.collect(new Timestamp(start) + " ~ " + new Timestamp(end) + " ===> " + iterable.iterator().next());}});windowAgg.print("窗口数据 ");//获取测输出流中的延时数据DataStream<Event> sideOutput = windowAgg.getSideOutput(eventOutputTag);sideOutput.print("测输出流:-> ");env.execute();}}
相关文章:

窗口延时、侧输出流数据处理
一 、 AllowedLateness API 延时关闭窗口 AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s&…...
发送HTTP请求
HTTP请求是一种客户端向服务器发送请求的协议。它是基于TCP/IP协议的应用层协议,用于在Web浏览器和Web服务器之间传输数据。 HTTP请求由以下几个部分组成: 请求行:包含请求方法、请求的URL和HTTP协议的版本。常见的请求方法有GET、POST、PUT、…...

高等工程数学张韵华版第四章课后题答案
下面答案仅供参考! 章节目录 第4章 欧氏空间和二次型 4.1内积和欧氏空间 4.1.1内积的定义 4.1.2欧氏空间的性质 4.1.3 正交投影 4.1.4 施密特正交化 4.2 正交变换和对称变换 4.2.1 正交变换 4.2.2 正交矩阵 4.2.3 对称变换 4.2.4 对称矩阵 4.3 二…...

wpf C# 用USB虚拟串口最高速下载大文件 每包400万字节 平均0.7s/M,支持批量多设备同时下载。自动识别串口。源码示例可自由定制。
C# 用USB虚拟串口下载大文件 每包400万字节 平均0.7s/M。支持批量多设备同时下载。自动识别串口。可自由定制。 int 32位有符号整数 -2147483648~2147483647 但500万字节时 write时报端口IO异常。可能是驱动限制的。 之前用这个助手发文件,连续发送࿰…...
代码随想录二刷day20
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、力扣654. 最大二叉树二、力扣617. 合并二叉树三、力扣700. 二叉搜索树中的搜索四、力扣98. 验证二叉搜索树 前言 一、力扣654. 最大二叉树 /*** Definitio…...

Yolov5如何训练自定义的数据集,以及使用GPU训练,涵盖报错解决
本文主要讲述了Yolov5如何训练自定义的数据集,以及使用GPU训练,涵盖报错解决,案例是检测图片中是否有救生圈。 最后的效果图大致如下: 效果图1效果图2 前言 系列文章 1、详细讲述Yolov5从下载、配置及如何使用GPU运行 2、…...

设计模式之单列模式
单列模式是一种经典的设计模式,在校招中最乐意考的设计模式之一~ 设计模式就是软件开发中的棋谱,大佬们针对一些常见的场景,总结出来的代码的编写套路,按照套路来写,不说你写的多好,至少不会太差~ 在校招中…...

linux内核模块编译方法详解
文章目录 前言一、静态加载法1.1 编写驱动程序1.2 将新功能配置在内核中1.3为新功能代码改写Makefile1.4 make menuconfig界面里将新功能对应的那项选择为<*> 二、动态加载法2.1 新功能源码与Linux内核源码在同一目录结构下2.2 新功能源码与Linux内核源码不在同一目录结构…...
简介shell的关联数组与普通数组
本文首先介绍shell的关联数组,然后介绍shell的普通数组,最后总结它们的共同语法。 shell的关联数组 定义一个关联数组,并打印它的key-value对 #!/bin/sh# 声明一个关联数组 declare -A HASH_MAP# 给关联数组赋值 HASH_MAP["Tom"…...
玩转Mysql系列 - 第17篇:存储过程自定义函数详解
这是Mysql系列第17篇。 环境:mysql5.7.25,cmd命令中进行演示。 代码中被[]包含的表示可选,|符号分开的表示可选其一。 需求背景介绍 线上程序有时候出现问题导致数据错误的时候,如果比较紧急,我们可以写一个存储来…...
自动驾驶:轨迹预测综述
自动驾驶:轨迹预测综述 轨迹预测的定义轨迹预测的分类基于物理的方法(Physics-based)基于机器学习的方法(Classic Machine Learning-based)基于深度学习的方法(Deep Learning-based)基于强化学习…...

【uniapp/uview】u-datetime-picker 选择器的过滤器用法
引入:要求日期选择的下拉框在分钟显示时,只显示 0 和 30 分钟; <u-datetime-picker :show"dateShow" :filter"timeFilter" confirm"selDateConfirm" cancel"dateCancel" v-model"value1&qu…...

如何使用Docker部署Nacos服务?Nacos Docker 快速部署指南: 一站式部署与配置教程
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…...

yocto stm32mp1集成ros
yocto stm32mp1集成ros yocto集成ros下载meta-rosyocto集成rosrootfs验证 yocto集成ros 本章节介绍yocto如何集成ros系统用来作机器人开发。 下载meta-ros 第一步首先需要下载meta-ros layer,meta-ros的链接如下:https://github.com/ros/meta-ros/tre…...

Linux 中的 chroot 命令及示例
Linux/Unix系统中的chroot命令用于更改根目录。Linux/Unix 类系统中的每个进程/命令都有一个称为root 目录的当前工作目录。它更改当前正在运行的进程及其子进程的根目录。 在此类修改的环境中运行的进程/命令无法访问根目录之外的文件。这种修改后的环境称为“ chroot监狱”或…...
oracle的redo与postgreSQL的WAL以及MySQL的binlog区别
Oracle的redo日志、PostgreSQL的WAL(Write-Ahead Log)以及MySQL的binlog(二进制日志)都是数据库的事务日志,但它们在实现和功能上有一些区别。 1. 实现方式: - Oracle的redo日志是通过在事务提交前将事务操作记录到磁盘上的重做日志文件中来实现的。 - PostgreSQL…...

进入低功耗和唤醒
休眠模式 进入休眠模式 如果使用 WFI 指令进入睡眠模式,则嵌套向量中断控制器 (NVIC) 确认的任意外设中断都会 将器件从睡眠模式唤醒。 如果使用 WFE 指令进入睡眠模式,MCU 将在有事件发生时立即退出睡眠模式。唤醒事件可 通过以下方式产生ÿ…...

【多线程】volatile 关键字
volatile 关键字 1. 保证内存可见性2. 禁止指令重排序3. 不保证原子性 1. 保证内存可见性 内存可见性问题: 一个线程针对一个变量进行读取操作,另一个线程针对这个变量进行修改操作, 此时读到的值,不一定是修改后的值,即这个读线…...
【Windows注册表内容详解】
Windows注册表内容详解 第一章节 注册表基础 一、什么是注册表 注册表是windows操作系统、硬件设备以及客户应用程序得以正常运行和保存设置的核心“数据库”,也可以说是一个非常巨大的树状分层结构的数据库系统。 注册表记录了用户安装在计算机上的软件和每个程…...

大数据Hadoop入门之集群的搭建
hadoop的三种运行模式 本地模式:测试本地的hadoop是否能够运行,用来运行官方的代码。伪分布模式:原先有人拿来测试,目前测试都不用这个模式了。完全分布模式:多台服务器组成分布式环境,生产环境使用 分布式主机文件同步命令 sc…...
Linux链表操作全解析
Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表?1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
多场景 OkHttpClient 管理器 - Android 网络通信解决方案
下面是一个完整的 Android 实现,展示如何创建和管理多个 OkHttpClient 实例,分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...

给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...

从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障
关键领域软件测试的"安全密码":Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力,从金融交易到交通管控,这些关乎国计民生的关键领域…...

uniapp 小程序 学习(一)
利用Hbuilder 创建项目 运行到内置浏览器看效果 下载微信小程序 安装到Hbuilder 下载地址 :开发者工具默认安装 设置服务端口号 在Hbuilder中设置微信小程序 配置 找到运行设置,将微信开发者工具放入到Hbuilder中, 打开后出现 如下 bug 解…...