窗口延时、侧输出流数据处理
一 、 AllowedLateness API 延时关闭窗口
AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s,AllowedLateness 的时间为2s,那一个10的滚动窗口,0-10这个单位窗口正常的关窗时间应该是超过12s的数据到达之后就关窗。而AllowedLateness 是在12s的基础上继续延长了2s,也就是在14s的时候才真正去关闭 0-10s的窗口,但是在12s的时候会触发窗口计算,从12s之后到14s的数据每到达一个就会触发一次窗口计算。
二 、 OutputTag API 侧输出流
使用 OutputTag API 保证窗口关闭的数据依然可以获取,窗口到达AllowedLateness 时间后将彻底关闭,此时再属于该窗口范围内的数据将会流向 OutputTag 。
context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);
如果现在定义一个 5s的
滚动窗口,WaterMark延时时间为2s,AllowedLateness 延时时间为2s,此时相当于是 WaterMark到达9s的时候才会关闭0-5的窗口,也就是说最后两条数据会流向OutputTag . 当4000L数据到达后,会再次触发一次窗口计算。
完全与预期一致。

完整代码:
public class WindowOutputTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = Env.getEnv();DataStreamSource<Event> dataStreamSource = env.addSource(new SourceFunction<Event>() {@Overridepublic void run(SourceContext<Event> context) throws Exception {context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);}@Overridepublic void cancel() {}});//operatorSingleOutputStreamOperator<Event> operator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))// 水位线延时2s.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));OutputTag<Event> eventOutputTag = new OutputTag<Event>("late") {};WindowedStream<Event, Boolean, TimeWindow> windowedStream = operator.keyBy(d -> true).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).allowedLateness(Time.of(2, TimeUnit.SECONDS)).sideOutputLateData(eventOutputTag);SingleOutputStreamOperator<String> windowAgg = windowedStream.aggregate(new AggregateFunction<Event, Long, Long>() {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event event, Long acc) {return acc + 1;}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long aLong, Long acc1) {return null;}}, new ProcessWindowFunction<Long, String, Boolean, TimeWindow>() {@Overridepublic void process(Boolean key, Context context, Iterable<Long> iterable, Collector<String> collector) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();collector.collect(new Timestamp(start) + " ~ " + new Timestamp(end) + " ===> " + iterable.iterator().next());}});windowAgg.print("窗口数据 ");//获取测输出流中的延时数据DataStream<Event> sideOutput = windowAgg.getSideOutput(eventOutputTag);sideOutput.print("测输出流:-> ");env.execute();}}
相关文章:
窗口延时、侧输出流数据处理
一 、 AllowedLateness API 延时关闭窗口 AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s&…...
发送HTTP请求
HTTP请求是一种客户端向服务器发送请求的协议。它是基于TCP/IP协议的应用层协议,用于在Web浏览器和Web服务器之间传输数据。 HTTP请求由以下几个部分组成: 请求行:包含请求方法、请求的URL和HTTP协议的版本。常见的请求方法有GET、POST、PUT、…...
高等工程数学张韵华版第四章课后题答案
下面答案仅供参考! 章节目录 第4章 欧氏空间和二次型 4.1内积和欧氏空间 4.1.1内积的定义 4.1.2欧氏空间的性质 4.1.3 正交投影 4.1.4 施密特正交化 4.2 正交变换和对称变换 4.2.1 正交变换 4.2.2 正交矩阵 4.2.3 对称变换 4.2.4 对称矩阵 4.3 二…...
wpf C# 用USB虚拟串口最高速下载大文件 每包400万字节 平均0.7s/M,支持批量多设备同时下载。自动识别串口。源码示例可自由定制。
C# 用USB虚拟串口下载大文件 每包400万字节 平均0.7s/M。支持批量多设备同时下载。自动识别串口。可自由定制。 int 32位有符号整数 -2147483648~2147483647 但500万字节时 write时报端口IO异常。可能是驱动限制的。 之前用这个助手发文件,连续发送࿰…...
代码随想录二刷day20
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、力扣654. 最大二叉树二、力扣617. 合并二叉树三、力扣700. 二叉搜索树中的搜索四、力扣98. 验证二叉搜索树 前言 一、力扣654. 最大二叉树 /*** Definitio…...
Yolov5如何训练自定义的数据集,以及使用GPU训练,涵盖报错解决
本文主要讲述了Yolov5如何训练自定义的数据集,以及使用GPU训练,涵盖报错解决,案例是检测图片中是否有救生圈。 最后的效果图大致如下: 效果图1效果图2 前言 系列文章 1、详细讲述Yolov5从下载、配置及如何使用GPU运行 2、…...
设计模式之单列模式
单列模式是一种经典的设计模式,在校招中最乐意考的设计模式之一~ 设计模式就是软件开发中的棋谱,大佬们针对一些常见的场景,总结出来的代码的编写套路,按照套路来写,不说你写的多好,至少不会太差~ 在校招中…...
linux内核模块编译方法详解
文章目录 前言一、静态加载法1.1 编写驱动程序1.2 将新功能配置在内核中1.3为新功能代码改写Makefile1.4 make menuconfig界面里将新功能对应的那项选择为<*> 二、动态加载法2.1 新功能源码与Linux内核源码在同一目录结构下2.2 新功能源码与Linux内核源码不在同一目录结构…...
简介shell的关联数组与普通数组
本文首先介绍shell的关联数组,然后介绍shell的普通数组,最后总结它们的共同语法。 shell的关联数组 定义一个关联数组,并打印它的key-value对 #!/bin/sh# 声明一个关联数组 declare -A HASH_MAP# 给关联数组赋值 HASH_MAP["Tom"…...
玩转Mysql系列 - 第17篇:存储过程自定义函数详解
这是Mysql系列第17篇。 环境:mysql5.7.25,cmd命令中进行演示。 代码中被[]包含的表示可选,|符号分开的表示可选其一。 需求背景介绍 线上程序有时候出现问题导致数据错误的时候,如果比较紧急,我们可以写一个存储来…...
自动驾驶:轨迹预测综述
自动驾驶:轨迹预测综述 轨迹预测的定义轨迹预测的分类基于物理的方法(Physics-based)基于机器学习的方法(Classic Machine Learning-based)基于深度学习的方法(Deep Learning-based)基于强化学习…...
【uniapp/uview】u-datetime-picker 选择器的过滤器用法
引入:要求日期选择的下拉框在分钟显示时,只显示 0 和 30 分钟; <u-datetime-picker :show"dateShow" :filter"timeFilter" confirm"selDateConfirm" cancel"dateCancel" v-model"value1&qu…...
如何使用Docker部署Nacos服务?Nacos Docker 快速部署指南: 一站式部署与配置教程
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…...
yocto stm32mp1集成ros
yocto stm32mp1集成ros yocto集成ros下载meta-rosyocto集成rosrootfs验证 yocto集成ros 本章节介绍yocto如何集成ros系统用来作机器人开发。 下载meta-ros 第一步首先需要下载meta-ros layer,meta-ros的链接如下:https://github.com/ros/meta-ros/tre…...
Linux 中的 chroot 命令及示例
Linux/Unix系统中的chroot命令用于更改根目录。Linux/Unix 类系统中的每个进程/命令都有一个称为root 目录的当前工作目录。它更改当前正在运行的进程及其子进程的根目录。 在此类修改的环境中运行的进程/命令无法访问根目录之外的文件。这种修改后的环境称为“ chroot监狱”或…...
oracle的redo与postgreSQL的WAL以及MySQL的binlog区别
Oracle的redo日志、PostgreSQL的WAL(Write-Ahead Log)以及MySQL的binlog(二进制日志)都是数据库的事务日志,但它们在实现和功能上有一些区别。 1. 实现方式: - Oracle的redo日志是通过在事务提交前将事务操作记录到磁盘上的重做日志文件中来实现的。 - PostgreSQL…...
进入低功耗和唤醒
休眠模式 进入休眠模式 如果使用 WFI 指令进入睡眠模式,则嵌套向量中断控制器 (NVIC) 确认的任意外设中断都会 将器件从睡眠模式唤醒。 如果使用 WFE 指令进入睡眠模式,MCU 将在有事件发生时立即退出睡眠模式。唤醒事件可 通过以下方式产生ÿ…...
【多线程】volatile 关键字
volatile 关键字 1. 保证内存可见性2. 禁止指令重排序3. 不保证原子性 1. 保证内存可见性 内存可见性问题: 一个线程针对一个变量进行读取操作,另一个线程针对这个变量进行修改操作, 此时读到的值,不一定是修改后的值,即这个读线…...
【Windows注册表内容详解】
Windows注册表内容详解 第一章节 注册表基础 一、什么是注册表 注册表是windows操作系统、硬件设备以及客户应用程序得以正常运行和保存设置的核心“数据库”,也可以说是一个非常巨大的树状分层结构的数据库系统。 注册表记录了用户安装在计算机上的软件和每个程…...
大数据Hadoop入门之集群的搭建
hadoop的三种运行模式 本地模式:测试本地的hadoop是否能够运行,用来运行官方的代码。伪分布模式:原先有人拿来测试,目前测试都不用这个模式了。完全分布模式:多台服务器组成分布式环境,生产环境使用 分布式主机文件同步命令 sc…...
MAC动态库加载路径优化:从@rpath到install_name_tool实战解析
1. 动态库加载路径问题的本质 当你第一次在Mac上遇到"Library not loaded"错误时,那种感觉就像在陌生城市迷了路。我清楚地记得自己早期开发时,控制台突然抛出红色错误信息的场景: dyld: Library not loaded: libAwesome.dylibRefe…...
如何轻松实现专业音频低延迟:FlexASIO实用配置完全指南
如何轻松实现专业音频低延迟:FlexASIO实用配置完全指南 【免费下载链接】FlexASIO A flexible universal ASIO driver that uses the PortAudio sound I/O library. Supports WASAPI (shared and exclusive), KS, DirectSound and MME. 项目地址: https://gitcode…...
SGLang-v0.5.6实战体验:5种预装镜像,哪个最适合你的项目?
SGLang-v0.5.6实战体验:5种预装镜像,哪个最适合你的项目? 选型会上,技术负责人又抛出了那个经典问题:“我们到底用哪个环境来部署SGLang?” 会议室里立刻热闹起来。有人坚持用PyTorch 2.1,说它…...
美团、腾讯、字节怎么选?3个真实案例告诉你答案
美团、腾讯、字节怎么选?3个真实案例告诉你答案 2026校招季,三个朋友的不同选择 大厂直通车-校招大礼包:入口入口 写在前面 2026届秋招结束了。 我的三个朋友小A、小B、小C都拿到了心仪的offer。有意思的是,他们分别选了字节、腾…...
Apache Tomcat 在 IDEA 中配置完整教程(手把手保姆教程)
目录 文章内容简介 配置前提 IDEA 准备 IDEA 中的配置 文章内容简介 本文详细介绍了在IDEA中配置Apache Tomcat服务器的完整步骤。首先指导用户创建Maven Archetype项目。重点讲解了Tomcat服务器的配置过程,包括设置服务器路径、部署工件、修改HTTP端口等关键操…...
智能内容解锁工具:5分钟掌握付费墙突破技巧
智能内容解锁工具:5分钟掌握付费墙突破技巧 【免费下载链接】bypass-paywalls-chrome-clean 项目地址: https://gitcode.com/GitHub_Trending/by/bypass-paywalls-chrome-clean 在数字信息时代,优质内容常被付费墙阻隔,而bypass-payw…...
FreeRTOS+LwIP 2.2.0实战:手把手教你理解tcpip_thread的消息处理机制
FreeRTOSLwIP 2.2.0实战:深入解析tcpip_thread的消息驱动架构 在嵌入式网络开发中,理解协议栈的线程模型是构建稳定系统的关键。当FreeRTOS遇上LwIP,tcpip_thread就像一位不知疲倦的邮差,日夜处理着来自各方的网络报文。本文将带您…...
GNSS数据处理效率翻倍:FileZilla+crx2rnx自动化脚本一键下载转换RINEX观测值
GNSS数据处理效率革命:构建全自动RINEX观测值处理流水线 凌晨三点的实验室里,李工程师盯着屏幕上堆积如山的.crx文件叹了口气——这已经是本周第三次通宵处理GNSS观测数据了。对于需要处理多站点、长时间序列GNSS数据的科研人员和工程师而言,…...
5个维度解析:如何通过Excel可视化突破AI算法学习瓶颈
5个维度解析:如何通过Excel可视化突破AI算法学习瓶颈 【免费下载链接】ai-by-hand-excel 项目地址: https://gitcode.com/gh_mirrors/ai/ai-by-hand-excel 你是否也曾在学习AI算法时遇到这样的困境:面对满屏的数学公式感到无从下手,神…...
基于Coqui TTS的高质量语音合成实战:从模型部署到生产环境优化
最近在做一个需要语音播报功能的小项目,之前用的一些在线TTS服务,要么费用不低,要么音质和速度达不到要求。于是把目光投向了开源方案,一番折腾后,发现 Coqui TTS 真是个宝藏。它不仅音质好,支持的语言和声…...
