0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)
在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》我们介绍了不会有重复数据的时间滚动窗口。本节我们将介绍存在重复计算数据的时间滑动窗口。
关于滑动窗口,可以先看下《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》。下图就是个数滑动窗口示意图。

我们看到个数滑动窗口也会因为窗口内数据不够而不被触发。但是时间滑动窗口则可以解决这个问题,我们只要把窗口改成时间类型即可。
相应的代码我们参考《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》,只要把TumblingProcessingTimeWindows改成SlidingProcessingTimeWindows,并增加一个偏移参数(Time.milliseconds(1))即可。这意味着我们将运行一个时间长度为2毫秒,每次递进1毫秒的窗口。
完整代码
from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, SlidingProcessingTimeWindowsclass SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key, len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.window(SlidingProcessingTimeWindows.of(Time.milliseconds(2), Time.milliseconds(1))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()
运行结果
运行两次上述代码,我们发现每次都不同,而且有重叠计算的元素。
(‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start=1698773292650, end=1698773292652)
(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) TimeWindow(start=1698773292651, end=1698773292653)
(A,3)
(A,11)
(‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698773292652, end=1698773292654)
(A,17)

(‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start=1698773319933, end=1698773319935)
(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698773319934, end=1698773319936)
(A,3)
(A,12)
(‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698773319935, end=1698773319937)
(A,17)

参考资料
- https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.SlidingProcessingTimeWindows.html#pyflink.datastream.window.SlidingProcessingTimeWindows
相关文章:
0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)
在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》我们介绍了不会有重复数据的时间滚动窗口。本节我们将介绍存在重复计算数据的时间滑动窗口。 关于滑动窗口,可以先看下《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows&#x…...
API安全之《大话:API的前世今生》
写在前面:本文结合API使用的业界现状,系统性地阐述API的基本概念、发展历史、表现形式等基础内容,主要包含以下内容: 1.什么是API 2.API的发展历史 3.现代API常用消息格式 4.top N 互联网企业API 使用现状 当前的世界是一个信…...
H5或者Vue实现二维码识别
前言 1、扫码识别库采用开源的zxing/library 2、支持js,Vue,lit等实现 原文章地址和代码仓库地址 1、在界面创建video标签用来显示摄像头内容 <!-- 视区 --><!-- lit写法 --> <video ${ref(this.videoRef)} class"xy-scan-wrap…...
stm32整理(三)ADC
1 ADC简介 1.1 ADC 简介 12 位 ADC 是逐次趋近型模数转换器。它具有多达 19 个复用通道,可测量来自 16 个外部 源、两个内部源和 VBAT 通道的信号。这些通道的 A/D 转换可在单次、连续、扫描或不连续 采样模式下进行。ADC 的结果存储在一个左对齐或右对齐的 16 位…...
Redis-持久化+主从架构
文章目录 Redis的持久化RDB模式异步持久化的实现AOF模式总结 Redis的主从架构1.端口以及文件调试测试2.主从配置3.数据同步原理(第一次同步为全局同步)4.增量同步5.主从配置优化6.问:master主机怎么判断从机slave是不是第一次同步数据? Redis…...
STM32H750之FreeRTOS学习--------(四)中断管理
四、FreeRTOS中断管理 中断的概念不再过多叙述,学习过逻辑的都知道 中断的执行过程 中断请求 外设产生中断请求(GPIO外部中断、定时器中断等)响应中断 CPU停止执行当前程序,转而去执行中断处理程序(ISR)…...
Macroscope安全漏洞检测工具简介
学习目标: 本介绍旨在帮助感兴趣者尽快了解 Macroscope,这是一款用于安全测试自动化和漏洞管理的企业工具。 全覆盖应用程序安全测试: 如下图所示,如果使用多种互补工具(SAST/DAST/SCA 等)来检测应用程序…...
【Linux】Nignx的入门使用负载均衡动静分离(前后端项目部署)---超详细
一,Nignx入门 1.1 Nignx是什么 Nginx是一个高性能的开源Web服务器和反向代理服务器。它使用事件驱动的异步框架,可同时处理大量请求,支持负载均衡、反向代理、HTTP缓存等常见Web服务场景。Nginx可以作为一个前端的Web服务器,也可…...
【入门Flink】- 04Flink部署模式和运行模式【偏概念】
部署模式 在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode&…...
react面试要点
# React面试知识点 ## React是什么?谈一谈你对react的理解 1 React是一个网页UI库 2 react的特点是 声明式 组件化 通用性 3 react优点: 简单,低耦合高内聚,由于虚拟dom概念,可以做到一次学习到处使用。 …...
在Google Kubernetes集群创建分布式Jenkins(一)
因为项目需要,在GKE的集群上需要创建一个CICD的环境,记录一下安装部署一个分布式Jenkins集群的过程。 分布式Jenkins由一个主服务器和多个Agent组成,Agent可以执行主服务器分派的任务。如下图所示: 如上图,Jenkins Ag…...
【Python全栈_公开课学习记录】
一、初识python (一).Python起源 Python创始人为吉多范罗苏姆(荷兰),Python崇尚优美、清晰、简明的编辑风格。Python语言结构清晰简单、数据库丰富、运行成熟稳定,科学计算统计分析领先。目前广泛应用于云计算、Web开发、科学运算…...
uniapp循环列表单选框实现单选
目录 图片源码参考最后 图片 源码 参考 大佬 最后 感觉文章好的话记得点个心心和关注和收藏,有错的地方麻烦指正一下,如果需要转载,请标明出处,多谢!!!...
【强化学习】14 —— A3C(Asynchronous Advantage Actor Critic)
A3C算法( Asynchronous Methods for Deep Reinforcement Learning)于2016年被谷歌DeepMind团队提出。A3C是一种非常有效的深度强化学习算法,在围棋、星际争霸等复杂任务上已经取得了很好的效果。接下来,我们先从A3C的名称入手&…...
Google单元测试sample分析(四)
GoogleTest单元测试可用实现在每个测试用例结束后监控其内存使用情况, 可以通过GoogleTest提供的事件侦听器EmptyTestEventListener 来实现,下面通过官方提供的sample例子,路径在samples文件夹下的sample10_unittest.cpp // Copyright 2009…...
网络套接字编程(二)
网络套接字编程(二) 文章目录 网络套接字编程(二)简易TCP网络程序服务端创建套接字服务端绑定IP地址和端口号服务端监听服务端运行服务端网络服务服务端启动客户端创建套接字客户端的绑定和监听问题客户端建立连接并通信客户端启动程序测试单执行流服务器的弊端 多进程版TCP网络…...
LLaMA-Adapter源码解析
LLaMA-Adapter源码解析 伪代码 def transformer_block_with_llama_adapter(x, gating_factor, soft_prompt):residual xy zero_init_attention(soft_prompt, x) # llama-adapter: prepend prefixx self_attention(x)x x gating_factor * y # llama-adapter: apply zero_init…...
JavaScript设计模式之发布-订阅模式
发布者和订阅者完全解耦(通过消息队列进行通信) 适用场景:功能模块间进行通信,如Vue的事件总线。 ES6实现方式: class eventManager {constructor() {this.eventList {};}on(eventName, callback) {if (this.eventL…...
mysql---索引
概要 索引:排序的列表,列表当中存储的是索引的值和包含这个值的数据所在的行的物理地址 作用:加快查找速度 注:索引要在创建表时尽量创建完全,后期添加影响变动大。 索引也需要占用磁盘空间,innodb表数据…...
微信小程序——简易复制文本
在微信小程序中,可以使用wx.setClipboardData()方法来实现复制文本内容的功能。以下是一个示例代码: // 点击按钮触发复制事件 copyText: function() {var that this;wx.setClipboardData({data: 要复制的文本内容,success: function(res) {wx.showToa…...
微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案
JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停 1. 安全点(Safepoint)阻塞 现象:JVM暂停但无GC日志,日志显示No GCs detected。原因:JVM等待所有线程进入安全点(如…...
优选算法第十二讲:队列 + 宽搜 优先级队列
优选算法第十二讲:队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...
安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)
船舶制造装配管理现状:装配工作依赖人工经验,装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书,但在实际执行中,工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...
提升移动端网页调试效率:WebDebugX 与常见工具组合实践
在日常移动端开发中,网页调试始终是一个高频但又极具挑战的环节。尤其在面对 iOS 与 Android 的混合技术栈、各种设备差异化行为时,开发者迫切需要一套高效、可靠且跨平台的调试方案。过去,我们或多或少使用过 Chrome DevTools、Remote Debug…...
作为点的对象CenterNet论文阅读
摘要 检测器将图像中的物体表示为轴对齐的边界框。大多数成功的目标检测方法都会枚举几乎完整的潜在目标位置列表,并对每一个位置进行分类。这种做法既浪费又低效,并且需要额外的后处理。在本文中,我们采取了不同的方法。我们将物体建模为单…...
无头浏览器技术:Python爬虫如何精准模拟搜索点击
1. 无头浏览器技术概述 1.1 什么是无头浏览器? 无头浏览器是一种没有图形用户界面(GUI)的浏览器,它通过程序控制浏览器内核(如Chromium、Firefox)执行页面加载、JavaScript渲染、表单提交等操作。由于不渲…...
Go 并发编程基础:select 多路复用
select 是 Go 并发编程中非常强大的语法结构,它允许程序同时等待多个通道操作的完成,从而实现多路复用机制,是协程调度、超时控制、通道竞争等场景的核心工具。 一、什么是 select select 类似于 switch 语句,但它用于监听多个通…...
基于Java的离散数学题库系统设计与实现:附完整源码与论文
JAVASQL离散数学题库管理系统 一、系统概述 本系统采用Java Swing开发桌面应用,结合SQL Server数据库实现离散数学题库的高效管理。系统支持题型分类(选择题、填空题、判断题等)、难度分级、知识点关联,并提供智能组卷、在线测试…...
