窗口延时、侧输出流数据处理
一 、 AllowedLateness API 延时关闭窗口
AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s,AllowedLateness 的时间为2s,那一个10的滚动窗口,0-10这个单位窗口正常的关窗时间应该是超过12s的数据到达之后就关窗。而AllowedLateness 是在12s的基础上继续延长了2s,也就是在14s的时候才真正去关闭 0-10s的窗口,但是在12s的时候会触发窗口计算,从12s之后到14s的数据每到达一个就会触发一次窗口计算。
二 、 OutputTag API 侧输出流
使用 OutputTag API 保证窗口关闭的数据依然可以获取,窗口到达AllowedLateness 时间后将彻底关闭,此时再属于该窗口范围内的数据将会流向 OutputTag 。
context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);
如果现在定义一个 5s的
滚动窗口,WaterMark延时时间为2s,AllowedLateness 延时时间为2s,此时相当于是 WaterMark到达9s的时候才会关闭0-5的窗口,也就是说最后两条数据会流向OutputTag . 当4000L数据到达后,会再次触发一次窗口计算。
完全与预期一致。

完整代码:
public class WindowOutputTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = Env.getEnv();DataStreamSource<Event> dataStreamSource = env.addSource(new SourceFunction<Event>() {@Overridepublic void run(SourceContext<Event> context) throws Exception {context.collect(new Event("A", "/user", 1000L));Thread.sleep(3000);context.collect(new Event("B", "/prod", 6500L));Thread.sleep(3000);context.collect(new Event("C", "/cart", 4000L));Thread.sleep(3000);context.collect(new Event("D", "/user", 7500L));System.out.println("窗口关闭 ~ ");Thread.sleep(3000);context.collect(new Event("E", "/cente", 8500L));Thread.sleep(3000);context.collect(new Event("F", "/cente", 4000L));Thread.sleep(3000);context.collect(new Event("G", "/cente", 9200L));Thread.sleep(3000);context.collect(new Event("H", "/cente", 1000L));Thread.sleep(3000);context.collect(new Event("I", "/cente", 1500L));Thread.sleep(3000);}@Overridepublic void cancel() {}});//operatorSingleOutputStreamOperator<Event> operator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))// 水位线延时2s.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}}));OutputTag<Event> eventOutputTag = new OutputTag<Event>("late") {};WindowedStream<Event, Boolean, TimeWindow> windowedStream = operator.keyBy(d -> true).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))).allowedLateness(Time.of(2, TimeUnit.SECONDS)).sideOutputLateData(eventOutputTag);SingleOutputStreamOperator<String> windowAgg = windowedStream.aggregate(new AggregateFunction<Event, Long, Long>() {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event event, Long acc) {return acc + 1;}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long aLong, Long acc1) {return null;}}, new ProcessWindowFunction<Long, String, Boolean, TimeWindow>() {@Overridepublic void process(Boolean key, Context context, Iterable<Long> iterable, Collector<String> collector) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();collector.collect(new Timestamp(start) + " ~ " + new Timestamp(end) + " ===> " + iterable.iterator().next());}});windowAgg.print("窗口数据 ");//获取测输出流中的延时数据DataStream<Event> sideOutput = windowAgg.getSideOutput(eventOutputTag);sideOutput.print("测输出流:-> ");env.execute();}}
相关文章:
窗口延时、侧输出流数据处理
一 、 AllowedLateness API 延时关闭窗口 AllowedLateness 方法需要基于 WindowedStream 调用。AllowedLateness 需要设置一个延时时间,注意这个时间决定了窗口真正关闭的时间,而且是加上WaterMark的时间,例如 WaterMark的延时时间为2s&…...
发送HTTP请求
HTTP请求是一种客户端向服务器发送请求的协议。它是基于TCP/IP协议的应用层协议,用于在Web浏览器和Web服务器之间传输数据。 HTTP请求由以下几个部分组成: 请求行:包含请求方法、请求的URL和HTTP协议的版本。常见的请求方法有GET、POST、PUT、…...
高等工程数学张韵华版第四章课后题答案
下面答案仅供参考! 章节目录 第4章 欧氏空间和二次型 4.1内积和欧氏空间 4.1.1内积的定义 4.1.2欧氏空间的性质 4.1.3 正交投影 4.1.4 施密特正交化 4.2 正交变换和对称变换 4.2.1 正交变换 4.2.2 正交矩阵 4.2.3 对称变换 4.2.4 对称矩阵 4.3 二…...
wpf C# 用USB虚拟串口最高速下载大文件 每包400万字节 平均0.7s/M,支持批量多设备同时下载。自动识别串口。源码示例可自由定制。
C# 用USB虚拟串口下载大文件 每包400万字节 平均0.7s/M。支持批量多设备同时下载。自动识别串口。可自由定制。 int 32位有符号整数 -2147483648~2147483647 但500万字节时 write时报端口IO异常。可能是驱动限制的。 之前用这个助手发文件,连续发送࿰…...
代码随想录二刷day20
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、力扣654. 最大二叉树二、力扣617. 合并二叉树三、力扣700. 二叉搜索树中的搜索四、力扣98. 验证二叉搜索树 前言 一、力扣654. 最大二叉树 /*** Definitio…...
Yolov5如何训练自定义的数据集,以及使用GPU训练,涵盖报错解决
本文主要讲述了Yolov5如何训练自定义的数据集,以及使用GPU训练,涵盖报错解决,案例是检测图片中是否有救生圈。 最后的效果图大致如下: 效果图1效果图2 前言 系列文章 1、详细讲述Yolov5从下载、配置及如何使用GPU运行 2、…...
设计模式之单列模式
单列模式是一种经典的设计模式,在校招中最乐意考的设计模式之一~ 设计模式就是软件开发中的棋谱,大佬们针对一些常见的场景,总结出来的代码的编写套路,按照套路来写,不说你写的多好,至少不会太差~ 在校招中…...
linux内核模块编译方法详解
文章目录 前言一、静态加载法1.1 编写驱动程序1.2 将新功能配置在内核中1.3为新功能代码改写Makefile1.4 make menuconfig界面里将新功能对应的那项选择为<*> 二、动态加载法2.1 新功能源码与Linux内核源码在同一目录结构下2.2 新功能源码与Linux内核源码不在同一目录结构…...
简介shell的关联数组与普通数组
本文首先介绍shell的关联数组,然后介绍shell的普通数组,最后总结它们的共同语法。 shell的关联数组 定义一个关联数组,并打印它的key-value对 #!/bin/sh# 声明一个关联数组 declare -A HASH_MAP# 给关联数组赋值 HASH_MAP["Tom"…...
玩转Mysql系列 - 第17篇:存储过程自定义函数详解
这是Mysql系列第17篇。 环境:mysql5.7.25,cmd命令中进行演示。 代码中被[]包含的表示可选,|符号分开的表示可选其一。 需求背景介绍 线上程序有时候出现问题导致数据错误的时候,如果比较紧急,我们可以写一个存储来…...
自动驾驶:轨迹预测综述
自动驾驶:轨迹预测综述 轨迹预测的定义轨迹预测的分类基于物理的方法(Physics-based)基于机器学习的方法(Classic Machine Learning-based)基于深度学习的方法(Deep Learning-based)基于强化学习…...
【uniapp/uview】u-datetime-picker 选择器的过滤器用法
引入:要求日期选择的下拉框在分钟显示时,只显示 0 和 30 分钟; <u-datetime-picker :show"dateShow" :filter"timeFilter" confirm"selDateConfirm" cancel"dateCancel" v-model"value1&qu…...
如何使用Docker部署Nacos服务?Nacos Docker 快速部署指南: 一站式部署与配置教程
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…...
yocto stm32mp1集成ros
yocto stm32mp1集成ros yocto集成ros下载meta-rosyocto集成rosrootfs验证 yocto集成ros 本章节介绍yocto如何集成ros系统用来作机器人开发。 下载meta-ros 第一步首先需要下载meta-ros layer,meta-ros的链接如下:https://github.com/ros/meta-ros/tre…...
Linux 中的 chroot 命令及示例
Linux/Unix系统中的chroot命令用于更改根目录。Linux/Unix 类系统中的每个进程/命令都有一个称为root 目录的当前工作目录。它更改当前正在运行的进程及其子进程的根目录。 在此类修改的环境中运行的进程/命令无法访问根目录之外的文件。这种修改后的环境称为“ chroot监狱”或…...
oracle的redo与postgreSQL的WAL以及MySQL的binlog区别
Oracle的redo日志、PostgreSQL的WAL(Write-Ahead Log)以及MySQL的binlog(二进制日志)都是数据库的事务日志,但它们在实现和功能上有一些区别。 1. 实现方式: - Oracle的redo日志是通过在事务提交前将事务操作记录到磁盘上的重做日志文件中来实现的。 - PostgreSQL…...
进入低功耗和唤醒
休眠模式 进入休眠模式 如果使用 WFI 指令进入睡眠模式,则嵌套向量中断控制器 (NVIC) 确认的任意外设中断都会 将器件从睡眠模式唤醒。 如果使用 WFE 指令进入睡眠模式,MCU 将在有事件发生时立即退出睡眠模式。唤醒事件可 通过以下方式产生ÿ…...
【多线程】volatile 关键字
volatile 关键字 1. 保证内存可见性2. 禁止指令重排序3. 不保证原子性 1. 保证内存可见性 内存可见性问题: 一个线程针对一个变量进行读取操作,另一个线程针对这个变量进行修改操作, 此时读到的值,不一定是修改后的值,即这个读线…...
【Windows注册表内容详解】
Windows注册表内容详解 第一章节 注册表基础 一、什么是注册表 注册表是windows操作系统、硬件设备以及客户应用程序得以正常运行和保存设置的核心“数据库”,也可以说是一个非常巨大的树状分层结构的数据库系统。 注册表记录了用户安装在计算机上的软件和每个程…...
大数据Hadoop入门之集群的搭建
hadoop的三种运行模式 本地模式:测试本地的hadoop是否能够运行,用来运行官方的代码。伪分布模式:原先有人拿来测试,目前测试都不用这个模式了。完全分布模式:多台服务器组成分布式环境,生产环境使用 分布式主机文件同步命令 sc…...
Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
如何在看板中体现优先级变化
在看板中有效体现优先级变化的关键措施包括:采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中,设置任务排序规则尤其重要,因为它让看板视觉上直观地体…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...
mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...
LRU 缓存机制详解与实现(Java版) + 力扣解决
📌 LRU 缓存机制详解与实现(Java版) 一、📖 问题背景 在日常开发中,我们经常会使用 缓存(Cache) 来提升性能。但由于内存有限,缓存不可能无限增长,于是需要策略决定&am…...
脑机新手指南(七):OpenBCI_GUI:从环境搭建到数据可视化(上)
一、OpenBCI_GUI 项目概述 (一)项目背景与目标 OpenBCI 是一个开源的脑电信号采集硬件平台,其配套的 OpenBCI_GUI 则是专为该硬件设计的图形化界面工具。对于研究人员、开发者和学生而言,首次接触 OpenBCI 设备时,往…...
针对药品仓库的效期管理问题,如何利用WMS系统“破局”
案例: 某医药分销企业,主要经营各类药品的批发与零售。由于药品的特殊性,效期管理至关重要,但该企业一直面临效期问题的困扰。在未使用WMS系统之前,其药品入库、存储、出库等环节的效期管理主要依赖人工记录与检查。库…...
