0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)
在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》我们介绍了不会有重复数据的时间滚动窗口。本节我们将介绍存在重复计算数据的时间滑动窗口。
关于滑动窗口,可以先看下《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》。下图就是个数滑动窗口示意图。

我们看到个数滑动窗口也会因为窗口内数据不够而不被触发。但是时间滑动窗口则可以解决这个问题,我们只要把窗口改成时间类型即可。
相应的代码我们参考《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》,只要把TumblingProcessingTimeWindows改成SlidingProcessingTimeWindows,并增加一个偏移参数(Time.milliseconds(1))即可。这意味着我们将运行一个时间长度为2毫秒,每次递进1毫秒的窗口。
完整代码
from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, SlidingProcessingTimeWindowsclass SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key, len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.window(SlidingProcessingTimeWindows.of(Time.milliseconds(2), Time.milliseconds(1))) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()
运行结果
运行两次上述代码,我们发现每次都不同,而且有重叠计算的元素。
(‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start=1698773292650, end=1698773292652)
(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) TimeWindow(start=1698773292651, end=1698773292653)
(A,3)
(A,11)
(‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698773292652, end=1698773292654)
(A,17)

(‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start=1698773319933, end=1698773319935)
(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698773319934, end=1698773319936)
(A,3)
(A,12)
(‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698773319935, end=1698773319937)
(A,17)

参考资料
- https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.SlidingProcessingTimeWindows.html#pyflink.datastream.window.SlidingProcessingTimeWindows
相关文章:
0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)
在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》我们介绍了不会有重复数据的时间滚动窗口。本节我们将介绍存在重复计算数据的时间滑动窗口。 关于滑动窗口,可以先看下《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows&#x…...
API安全之《大话:API的前世今生》
写在前面:本文结合API使用的业界现状,系统性地阐述API的基本概念、发展历史、表现形式等基础内容,主要包含以下内容: 1.什么是API 2.API的发展历史 3.现代API常用消息格式 4.top N 互联网企业API 使用现状 当前的世界是一个信…...
H5或者Vue实现二维码识别
前言 1、扫码识别库采用开源的zxing/library 2、支持js,Vue,lit等实现 原文章地址和代码仓库地址 1、在界面创建video标签用来显示摄像头内容 <!-- 视区 --><!-- lit写法 --> <video ${ref(this.videoRef)} class"xy-scan-wrap…...
stm32整理(三)ADC
1 ADC简介 1.1 ADC 简介 12 位 ADC 是逐次趋近型模数转换器。它具有多达 19 个复用通道,可测量来自 16 个外部 源、两个内部源和 VBAT 通道的信号。这些通道的 A/D 转换可在单次、连续、扫描或不连续 采样模式下进行。ADC 的结果存储在一个左对齐或右对齐的 16 位…...
Redis-持久化+主从架构
文章目录 Redis的持久化RDB模式异步持久化的实现AOF模式总结 Redis的主从架构1.端口以及文件调试测试2.主从配置3.数据同步原理(第一次同步为全局同步)4.增量同步5.主从配置优化6.问:master主机怎么判断从机slave是不是第一次同步数据? Redis…...
STM32H750之FreeRTOS学习--------(四)中断管理
四、FreeRTOS中断管理 中断的概念不再过多叙述,学习过逻辑的都知道 中断的执行过程 中断请求 外设产生中断请求(GPIO外部中断、定时器中断等)响应中断 CPU停止执行当前程序,转而去执行中断处理程序(ISR)…...
Macroscope安全漏洞检测工具简介
学习目标: 本介绍旨在帮助感兴趣者尽快了解 Macroscope,这是一款用于安全测试自动化和漏洞管理的企业工具。 全覆盖应用程序安全测试: 如下图所示,如果使用多种互补工具(SAST/DAST/SCA 等)来检测应用程序…...
【Linux】Nignx的入门使用负载均衡动静分离(前后端项目部署)---超详细
一,Nignx入门 1.1 Nignx是什么 Nginx是一个高性能的开源Web服务器和反向代理服务器。它使用事件驱动的异步框架,可同时处理大量请求,支持负载均衡、反向代理、HTTP缓存等常见Web服务场景。Nginx可以作为一个前端的Web服务器,也可…...
【入门Flink】- 04Flink部署模式和运行模式【偏概念】
部署模式 在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode&…...
react面试要点
# React面试知识点 ## React是什么?谈一谈你对react的理解 1 React是一个网页UI库 2 react的特点是 声明式 组件化 通用性 3 react优点: 简单,低耦合高内聚,由于虚拟dom概念,可以做到一次学习到处使用。 …...
在Google Kubernetes集群创建分布式Jenkins(一)
因为项目需要,在GKE的集群上需要创建一个CICD的环境,记录一下安装部署一个分布式Jenkins集群的过程。 分布式Jenkins由一个主服务器和多个Agent组成,Agent可以执行主服务器分派的任务。如下图所示: 如上图,Jenkins Ag…...
【Python全栈_公开课学习记录】
一、初识python (一).Python起源 Python创始人为吉多范罗苏姆(荷兰),Python崇尚优美、清晰、简明的编辑风格。Python语言结构清晰简单、数据库丰富、运行成熟稳定,科学计算统计分析领先。目前广泛应用于云计算、Web开发、科学运算…...
uniapp循环列表单选框实现单选
目录 图片源码参考最后 图片 源码 参考 大佬 最后 感觉文章好的话记得点个心心和关注和收藏,有错的地方麻烦指正一下,如果需要转载,请标明出处,多谢!!!...
【强化学习】14 —— A3C(Asynchronous Advantage Actor Critic)
A3C算法( Asynchronous Methods for Deep Reinforcement Learning)于2016年被谷歌DeepMind团队提出。A3C是一种非常有效的深度强化学习算法,在围棋、星际争霸等复杂任务上已经取得了很好的效果。接下来,我们先从A3C的名称入手&…...
Google单元测试sample分析(四)
GoogleTest单元测试可用实现在每个测试用例结束后监控其内存使用情况, 可以通过GoogleTest提供的事件侦听器EmptyTestEventListener 来实现,下面通过官方提供的sample例子,路径在samples文件夹下的sample10_unittest.cpp // Copyright 2009…...
网络套接字编程(二)
网络套接字编程(二) 文章目录 网络套接字编程(二)简易TCP网络程序服务端创建套接字服务端绑定IP地址和端口号服务端监听服务端运行服务端网络服务服务端启动客户端创建套接字客户端的绑定和监听问题客户端建立连接并通信客户端启动程序测试单执行流服务器的弊端 多进程版TCP网络…...
LLaMA-Adapter源码解析
LLaMA-Adapter源码解析 伪代码 def transformer_block_with_llama_adapter(x, gating_factor, soft_prompt):residual xy zero_init_attention(soft_prompt, x) # llama-adapter: prepend prefixx self_attention(x)x x gating_factor * y # llama-adapter: apply zero_init…...
JavaScript设计模式之发布-订阅模式
发布者和订阅者完全解耦(通过消息队列进行通信) 适用场景:功能模块间进行通信,如Vue的事件总线。 ES6实现方式: class eventManager {constructor() {this.eventList {};}on(eventName, callback) {if (this.eventL…...
mysql---索引
概要 索引:排序的列表,列表当中存储的是索引的值和包含这个值的数据所在的行的物理地址 作用:加快查找速度 注:索引要在创建表时尽量创建完全,后期添加影响变动大。 索引也需要占用磁盘空间,innodb表数据…...
微信小程序——简易复制文本
在微信小程序中,可以使用wx.setClipboardData()方法来实现复制文本内容的功能。以下是一个示例代码: // 点击按钮触发复制事件 copyText: function() {var that this;wx.setClipboardData({data: 要复制的文本内容,success: function(res) {wx.showToa…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案
目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后,迭代器会失效,因为顺序迭代器在内存中是连续存储的,元素删除后,后续元素会前移。 但一些场景中,我们又需要在执行删除操作…...
协议转换利器,profinet转ethercat网关的两大派系,各有千秋
随着工业以太网的发展,其高效、便捷、协议开放、易于冗余等诸多优点,被越来越多的工业现场所采用。西门子SIMATIC S7-1200/1500系列PLC集成有Profinet接口,具有实时性、开放性,使用TCP/IP和IT标准,符合基于工业以太网的…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
Linux中《基础IO》详细介绍
目录 理解"文件"狭义理解广义理解文件操作的归类认知系统角度文件类别 回顾C文件接口打开文件写文件读文件稍作修改,实现简单cat命令 输出信息到显示器,你有哪些方法stdin & stdout & stderr打开文件的方式 系统⽂件I/O⼀种传递标志位…...
Vue3 PC端 UI组件库我更推荐Naive UI
一、Vue3生态现状与UI库选择的重要性 随着Vue3的稳定发布和Composition API的广泛采用,前端开发者面临着UI组件库的重新选择。一个好的UI库不仅能提升开发效率,还能确保项目的长期可维护性。本文将对比三大主流Vue3 UI库(Naive UI、Element …...
