窗口延时、侧输出流数据处理
一 、 AllowedLateness API 延时关闭窗口
AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s,AllowedLateness 的时间为2s,那一个10的滚动窗口,0-10这个单位窗口正常的关窗时间应该是超过12s的数据到达之后就关窗。而AllowedLateness 是在12s的基础上继续延长了2s,也就是在14s的时候才真正去关闭 0-10s的窗口,但是在12s的时候会触发窗口计算,从12s之后到14s的数据每到达一个就会触发一次窗口计算。
二 、 OutputTag API 侧输出流
使用 OutputTag API 保证窗口关闭的数据依然可以获取,窗口到达AllowedLateness 时间后将彻底关闭,此时再属于该窗口范围内的数据将会流向 OutputTag 。
context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);
如果现在定义一个 5s的
滚动窗口,WaterMark延时时间为2s,AllowedLateness 延时时间为2s,此时相当于是 WaterMark到达9s的时候才会关闭0-5的窗口,也就是说最后两条数据会流向OutputTag . 当4000L数据到达后,会再次触发一次窗口计算。
完全与预期一致。

完整代码:
public class WindowOutputTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = Env.getEnv();DataStreamSource<Event> dataStreamSource = env.addSource(new SourceFunction<Event>() {@Overridepublic void run(SourceContext<Event> context) throws Exception {context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);}@Overridepublic void cancel() {}});//operatorSingleOutputStreamOperator<Event> operator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))// 水位线延时2s.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));OutputTag<Event> eventOutputTag = new OutputTag<Event>("late") {};WindowedStream<Event, Boolean, TimeWindow> windowedStream = operator.keyBy(d -> true).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).allowedLateness(Time.of(2, TimeUnit.SECONDS)).sideOutputLateData(eventOutputTag);SingleOutputStreamOperator<String> windowAgg = windowedStream.aggregate(new AggregateFunction<Event, Long, Long>() {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event event, Long acc) {return acc + 1;}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long aLong, Long acc1) {return null;}}, new ProcessWindowFunction<Long, String, Boolean, TimeWindow>() {@Overridepublic void process(Boolean key, Context context, Iterable<Long> iterable, Collector<String> collector) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();collector.collect(new Timestamp(start) + " ~ " + new Timestamp(end) + " ===> " + iterable.iterator().next());}});windowAgg.print("窗口数据 ");//获取测输出流中的延时数据DataStream<Event> sideOutput = windowAgg.getSideOutput(eventOutputTag);sideOutput.print("测输出流:-> ");env.execute();}}
相关文章:
窗口延时、侧输出流数据处理
一 、 AllowedLateness API 延时关闭窗口 AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s&…...
发送HTTP请求
HTTP请求是一种客户端向服务器发送请求的协议。它是基于TCP/IP协议的应用层协议,用于在Web浏览器和Web服务器之间传输数据。 HTTP请求由以下几个部分组成: 请求行:包含请求方法、请求的URL和HTTP协议的版本。常见的请求方法有GET、POST、PUT、…...
高等工程数学张韵华版第四章课后题答案
下面答案仅供参考! 章节目录 第4章 欧氏空间和二次型 4.1内积和欧氏空间 4.1.1内积的定义 4.1.2欧氏空间的性质 4.1.3 正交投影 4.1.4 施密特正交化 4.2 正交变换和对称变换 4.2.1 正交变换 4.2.2 正交矩阵 4.2.3 对称变换 4.2.4 对称矩阵 4.3 二…...
wpf C# 用USB虚拟串口最高速下载大文件 每包400万字节 平均0.7s/M,支持批量多设备同时下载。自动识别串口。源码示例可自由定制。
C# 用USB虚拟串口下载大文件 每包400万字节 平均0.7s/M。支持批量多设备同时下载。自动识别串口。可自由定制。 int 32位有符号整数 -2147483648~2147483647 但500万字节时 write时报端口IO异常。可能是驱动限制的。 之前用这个助手发文件,连续发送࿰…...
代码随想录二刷day20
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、力扣654. 最大二叉树二、力扣617. 合并二叉树三、力扣700. 二叉搜索树中的搜索四、力扣98. 验证二叉搜索树 前言 一、力扣654. 最大二叉树 /*** Definitio…...
Yolov5如何训练自定义的数据集,以及使用GPU训练,涵盖报错解决
本文主要讲述了Yolov5如何训练自定义的数据集,以及使用GPU训练,涵盖报错解决,案例是检测图片中是否有救生圈。 最后的效果图大致如下: 效果图1效果图2 前言 系列文章 1、详细讲述Yolov5从下载、配置及如何使用GPU运行 2、…...
设计模式之单列模式
单列模式是一种经典的设计模式,在校招中最乐意考的设计模式之一~ 设计模式就是软件开发中的棋谱,大佬们针对一些常见的场景,总结出来的代码的编写套路,按照套路来写,不说你写的多好,至少不会太差~ 在校招中…...
linux内核模块编译方法详解
文章目录 前言一、静态加载法1.1 编写驱动程序1.2 将新功能配置在内核中1.3为新功能代码改写Makefile1.4 make menuconfig界面里将新功能对应的那项选择为<*> 二、动态加载法2.1 新功能源码与Linux内核源码在同一目录结构下2.2 新功能源码与Linux内核源码不在同一目录结构…...
简介shell的关联数组与普通数组
本文首先介绍shell的关联数组,然后介绍shell的普通数组,最后总结它们的共同语法。 shell的关联数组 定义一个关联数组,并打印它的key-value对 #!/bin/sh# 声明一个关联数组 declare -A HASH_MAP# 给关联数组赋值 HASH_MAP["Tom"…...
玩转Mysql系列 - 第17篇:存储过程自定义函数详解
这是Mysql系列第17篇。 环境:mysql5.7.25,cmd命令中进行演示。 代码中被[]包含的表示可选,|符号分开的表示可选其一。 需求背景介绍 线上程序有时候出现问题导致数据错误的时候,如果比较紧急,我们可以写一个存储来…...
自动驾驶:轨迹预测综述
自动驾驶:轨迹预测综述 轨迹预测的定义轨迹预测的分类基于物理的方法(Physics-based)基于机器学习的方法(Classic Machine Learning-based)基于深度学习的方法(Deep Learning-based)基于强化学习…...
【uniapp/uview】u-datetime-picker 选择器的过滤器用法
引入:要求日期选择的下拉框在分钟显示时,只显示 0 和 30 分钟; <u-datetime-picker :show"dateShow" :filter"timeFilter" confirm"selDateConfirm" cancel"dateCancel" v-model"value1&qu…...
如何使用Docker部署Nacos服务?Nacos Docker 快速部署指南: 一站式部署与配置教程
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…...
yocto stm32mp1集成ros
yocto stm32mp1集成ros yocto集成ros下载meta-rosyocto集成rosrootfs验证 yocto集成ros 本章节介绍yocto如何集成ros系统用来作机器人开发。 下载meta-ros 第一步首先需要下载meta-ros layer,meta-ros的链接如下:https://github.com/ros/meta-ros/tre…...
Linux 中的 chroot 命令及示例
Linux/Unix系统中的chroot命令用于更改根目录。Linux/Unix 类系统中的每个进程/命令都有一个称为root 目录的当前工作目录。它更改当前正在运行的进程及其子进程的根目录。 在此类修改的环境中运行的进程/命令无法访问根目录之外的文件。这种修改后的环境称为“ chroot监狱”或…...
oracle的redo与postgreSQL的WAL以及MySQL的binlog区别
Oracle的redo日志、PostgreSQL的WAL(Write-Ahead Log)以及MySQL的binlog(二进制日志)都是数据库的事务日志,但它们在实现和功能上有一些区别。 1. 实现方式: - Oracle的redo日志是通过在事务提交前将事务操作记录到磁盘上的重做日志文件中来实现的。 - PostgreSQL…...
进入低功耗和唤醒
休眠模式 进入休眠模式 如果使用 WFI 指令进入睡眠模式,则嵌套向量中断控制器 (NVIC) 确认的任意外设中断都会 将器件从睡眠模式唤醒。 如果使用 WFE 指令进入睡眠模式,MCU 将在有事件发生时立即退出睡眠模式。唤醒事件可 通过以下方式产生ÿ…...
【多线程】volatile 关键字
volatile 关键字 1. 保证内存可见性2. 禁止指令重排序3. 不保证原子性 1. 保证内存可见性 内存可见性问题: 一个线程针对一个变量进行读取操作,另一个线程针对这个变量进行修改操作, 此时读到的值,不一定是修改后的值,即这个读线…...
【Windows注册表内容详解】
Windows注册表内容详解 第一章节 注册表基础 一、什么是注册表 注册表是windows操作系统、硬件设备以及客户应用程序得以正常运行和保存设置的核心“数据库”,也可以说是一个非常巨大的树状分层结构的数据库系统。 注册表记录了用户安装在计算机上的软件和每个程…...
大数据Hadoop入门之集群的搭建
hadoop的三种运行模式 本地模式:测试本地的hadoop是否能够运行,用来运行官方的代码。伪分布模式:原先有人拿来测试,目前测试都不用这个模式了。完全分布模式:多台服务器组成分布式环境,生产环境使用 分布式主机文件同步命令 sc…...
Docker 部署 XiuXianGame 文字修仙游戏:极空间 NAS 上随时挂机刷资源
前言 挂机刷资源,躺平修成仙。 这类文字修仙游戏,说白了就是佛系养成为主,不用时刻盯着,挂着就行。但问题是——大多数要么得在本地电脑跑,要么依赖第三方平台,体验受限。把这套东西跑在自己的 NAS 上&am…...
如何将Claude Code的配置无缝迁移至Taotoken平台以解决封号困扰
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 如何将Claude Code的配置无缝迁移至Taotoken平台以解决封号困扰 Claude Code 作为一款高效的编程助手,其核心能力依赖于…...
SRWE终极指南:5分钟学会游戏窗口分辨率自定义技巧
SRWE终极指南:5分钟学会游戏窗口分辨率自定义技巧 【免费下载链接】SRWE Simple Runtime Window Editor 项目地址: https://gitcode.com/gh_mirrors/sr/SRWE 想要在游戏中获得超高清截图,却受限于系统预设的分辨率?想要在窗口模式下享…...
谷歌seo如何发布外链? 新站首月发布的频率与节奏
域名注册后的前30天,谷歌爬虫会对新站点进行密集的抓取与记录。这个阶段的站点就像一张白纸,每一个外源信号都会被放大记录。很多站长习惯在上线首周就去购买几百条低质链接,试图拉高权重,但这往往会导致站点在沙盒期停留更久。根…...
如何做谷歌SEO排名优化?搞定高质量外链的4种高成功率技巧
很多刚接触谷歌SEO的朋友发现,自己的网站内容写了不少,可排名始终在搜索结果的五六页开外晃悠。排除掉网站技术层面的小毛病,最让大家头疼的往往就是外链。你可以把外链看作是其他网站给你的“信任投票”,如果投给你的都是些街边的…...
2026程序员危机:AI岗位暴涨12倍,传统开发即将“毕业”?转型AI大模型开发,才是破局关键!
2026年技术圈将面临巨大变革,AI岗位需求激增,传统编程岗位面临淘汰风险。企业更看重懂AI、能提效的复合型人才。程序员需转型AI大模型开发,掌握系统设计、代码审查及AI工具应用能力。北大青鸟推出AI大模型开发实战营,聚焦落地开发…...
安达发|自动排单软件:破工程机械困局,助智能制造升级
安达发APS高级生产计划智能排产排程自动排单软件系统推荐_MES 在工程机械制造领域,挖掘机、起重机、混凝土泵车等产品结构复杂,一台设备涉及成千上万个零部件,订单个性化程度高、生产周期长,生产排单一度成为困扰企业发展的核心痛…...
企业内如何通过 Taotoken 实现 API 访问权限的精细化控制与审计
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 企业内如何通过 Taotoken 实现 API 访问权限的精细化控制与审计 当企业将大模型能力引入内部工作流时,如何安全、可控地…...
基于ChatGPT与Telethon的Telegram频道智能评论机器人开发指南
1. 项目概述与核心价值 如果你在运营Telegram频道,或者需要管理多个社群,肯定遇到过这样的场景:频道里每天都有大量新消息,你想保持活跃度、引导讨论,但手动回复每一条消息不仅耗时耗力,还很难保证回复的质…...
大模型提示词驱动的工业图像标注流水线实战
1. 这不是“打标签”,而是让大模型替你做标注决策的整套工作流“Prompt-Based Automated Data Labeling and Annotation”——光看这个标题,很多人第一反应是:“哦,用大模型自动打标签”。但干过三年以上NLP数据工程、带过两个以上…...
