0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)
大纲
- 滑动(Sliding)和滚动(Tumbling)的区别
- 样例
- 窗口为2,滑动距离为1
- 窗口为3,滑动距离为1
- 窗口为3,滑动距离为2
- 窗口为3,滑动距离为3
- 完整代码
- 参考资料
在 《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们介绍了滚动窗口。本节我们要介绍滑动窗口。
滑动(Sliding)和滚动(Tumbling)的区别
正如其名,“滑动”是指这个窗口沿着一定的方向,按着一定的速度“滑行”。

而滚动窗口,则是一个个“衔接着”,而不是像上面那样交错着。

它们的相同之处就是:只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。
样例
我们只要对《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》中的代码做轻微的改动即可。为了简化样例,我们只看Key为E的元素的滑动。
word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0])
窗口为2,滑动距离为1
count_window会根据传入的第二参数决定是构建滚动(CountTumblingWindowAssigner)窗口还是滑动(CountSlidingWindowAssigner)窗口。
def count_window(self, size: int, slide: int = 0):"""Windows this KeyedStream into tumbling or sliding count windows.:param size: The size of the windows in number of elements.:param slide: The slide interval in number of elements... versionadded:: 1.16.0"""if slide == 0:return WindowedStream(self, CountTumblingWindowAssigner(size))else:return WindowedStream(self, CountSlidingWindowAssigner(size, slide))
我们只要给count_window第二个参数传递一个不为0的值,即可达到滑动效果。
# reducingwindows_size = 2sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)

窗口为3,滑动距离为1
# reducingwindows_size = 3sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))
(E,3)
(E,3)
(E,3)
(E,3)

窗口为3,滑动距离为2
# reducingwindows_size = 3sliding_size = 2reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))
(E,3)
(E,3)

窗口为3,滑动距离为3
这个就等效于滚动窗口了,因为“滑”过了窗口大小。
# reducingwindows_size = 3sliding_size = 3reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))
(E,3)
(E,3)

完整代码
from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key, len([e for e in inputs]))]word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingwindows_size = 3sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()
参考资料
- https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/
相关文章:
0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)
大纲 滑动(Sliding)和滚动(Tumbling)的区别样例窗口为2,滑动距离为1窗口为3,滑动距离为1窗口为3,滑动距离为2窗口为3,滑动距离为3 完整代码参考资料 在 《0基础学习PyFlink——个数…...
vue3+ts 提取公共方法
因为好多页面都会使用到这个效验规则,封装一个校检规则,方便维护 封装前 封装后...
C++ ->
C -> 是访问类或结构体对象的成员的运算符 注意这里不是直接的访问.是用于访问指向对象的指针的成员 下面的代码可以很好的理解如下: #include<iostream>using namespace std;class Func{public:int i,j;void myFunc(){cout<<"i"<&l…...
VR全景在医院的应用:缓和医患矛盾、提升医院形象
医患关系一直以来都是较为激烈的,包括制度的不完善、医疗资源紧张等问题也时有存在,为了缓解医患矛盾,不仅要提升患者以及家属对于医院的认知,还需要完善医疗制度,提高医疗资源的配置效率,提高服务质量。 因…...
【python基础】format格式化函数的使用
文章目录 前言一、format()内容匹配替换1、序号索引2、关键字3、列表索引4、字典索引5、通过类的属性6、通过魔法参数 二、format()数字格式化 前言 语法:str.format() 说明:一种格式化字符串的函数。 一、format()内容匹配替换 1、序号索引 在没有参…...
Java web(三):Http、Tomcat、Servlet
文章目录 一、Java web技术栈二、Http1.1 Http请求数据格式1.2 Http响应数据格式1.3 状态码 二、Tomcat2.1 介绍2.2 web项目结构2.3 IDEA中使用Tomcat 三、Servlet3.1 Servlet使用3.2 Servlet生命周期3.3 Servlet方法和体系结构3.4 urlPattern配置 四、Request4.1 获取请求数据…...
Java实现Hive UDF详细步骤 (Hive 3.x版本,IDEA开发)
这里写目录标题 前言1. 新建项目2.配置maven依赖3.编写代码4.打jar包5.上传服务器6.代码中引用 前言 老版本编写UDF时,需要继承 org.apache.hadoop.hive.ql.exec.UDF类,然后直接实现evaluate()方法即可。 由于公司hive版本比较高(3.x&#x…...
Vue进阶(幺陆肆)Apache的Access.log分析总结
文章目录 一、前言二、常用指令 一、前言 前端项目排错阶段,可借助apache的Access.log进行请求日志查看。 二、常用指令 #查看80端口的tcp连接 #netstat -tan | grep "ESTABLISHED" | grep ":80" | wc -l #当前WEB服务器中联接次数最多的ip地…...
Apple 苹果发布 M3、M3 Pro 和 M3 Max 芯片
本心、输入输出、结果 文章目录 Apple 苹果发布 M3、M3 Pro 和 M3 Max 芯片前言M3、M3 Pro 和 M3 Max 芯片的性能相关资料图M3 Pro规格M3 Max规格弘扬爱国精神 Apple 苹果发布 M3、M3 Pro 和 M3 Max 芯片 编辑:简简单单 Online zuozuo 地址:https://blog…...
Linux常用命令及主流服务部署大全
目录 Linux 系统目录 一、常用操作命令 1、目录操作 2、文件内容操作(查看日志,更改配置文件) 3、压缩和解压缩 4、更改文件权限 二、各服务部署命令 1、增加虚拟内存 2、JDK 2.1 删除系统自带的openjdk 2.2 安装jdk 2.3 删除jd…...
list-watch集群调度
调度约束 Kubernetes 是通过 List-Watch **** 的机制进行每个组件的协作,保持数据同步的,每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件,向 APIServer 发送命令,在 Node 节点上面建立 Pod 和 Container。…...
深度强化学习中的神经网络部分的作用是什么?一般如何选择合适的神经网络呢?
在深度强化学习中,神经网络部分通常用于实现值函数近似或策略近似,以帮助智能体学习如何在一个环境中做出决策以获得最大的累积奖励。这些神经网络在深度强化学习中扮演着重要的角色,具体作用如下: 1.值函数近似(Valu…...
若依系统的数据导入功能设置
一、后端 Log(title "公交站牌", businessType BusinessType.IMPORT)PreAuthorize("ss.hasPermi(busStop:busStop:import)")PostMapping("/importData")public AjaxResult importData(MultipartFile file, boolean updateSupport) throws Exce…...
vue页面父组件与子组件相互调用方法和传递参数值
vue页面父组件与子组件相互调用方法和传递参数值 父组件页面定义 <el-button type"text" icon"el-icon-refresh" click"refreshClick" slot"label"></el-button> <leftList leftClick"loadModelClick" r…...
vim使用
概述 vi(visual editor)是Unix/Linux编辑器的一种。类似于win中notepad。vim(vi improved)加强版 安装vim: $ yum install vim -y四种模式 命令模式:快速进行复制、粘贴、删除等操作,还可以…...
人工智能基础_机器学习014_BGD批量梯度下降公式更新_进一步推导_SGD随机梯度下降和MBGD小批量梯度下降公式进一步推导---人工智能工作笔记0054
然后我们先来看BGD批量梯度下降,可以看到这里,其实这个公式来源于 梯度下降的公式对吧,其实就是对原始梯度下降公式求偏导以后的梯度下降公式,然后 使用所有样本进行梯度下降得来的,可以看到* 1/n 其实就是求了一个平均数对吧.所有样本的平均数. 然后我们看,我们这里* 1/n那么…...
Android STR研究之一
简介: 先上一段谷歌的介绍 谷歌的网站地址: 电源管理 | Android 开源项目 | Android Open Source Project (google.cn) 术语 STR: STR(Suspend To RAM)的意思是“挂起到内存”,它是一种瞬间开机技术(On Now)。 当系统进入“挂起”状态…...
单链表的详解实现
单链表 结构 单链表结构中有两个数据,一个是存储数据的,还有一个指针指向下一个节点。 该图就是一个简单单链表的结构图。 接口实现 SLNode* CreateNode(SLNDataType x);//申请节点 void SLTprint(SLNode* head);//打印链表 void SLTPushBack(SLNode*…...
抛弃 scp 改用 rsync,让 Linux 下文件传输高效无比
我们都使用过 scp 来传输文件。当传输在中途或甚至在 99% 时被中断时,(每当我想起99%的中断传输时,我的心都很痛);让我们看看如何使用 rsync 来替代 scp,避免这样的不幸。 什么是rsync? Rsync…...
Leetcode 2919. Minimum Increment Operations to Make Array Beautiful
Leetcode 2919. Minimum Increment Operations to Make Array Beautiful 1. 解题思路2. 代码实现 题目链接:2919. Minimum Increment Operations to Make Array Beautiful 1. 解题思路 这一题就是一个动态规划的题目。 思路上来说,就是考察每一个没到…...
ISTA 3B-2013 全解析|零担货物 (LTL) 综合模拟运输测试标准(CSDN 完整版)前言
前言 ISTA 3B-2013 是 ISTA 3 系列高级综合模拟测试,专门针对零担货物运输(LTL) 的包装件。 零担运输的特点是多货混装、多次中转、人工 / 叉车交叉搬运、环境复杂,因此 3B 是工业、设备、家电、汽配、大型包装最贴近真实物流的测…...
MVVMFramework性能优化:让你的iOS应用运行如飞的10个技巧
MVVMFramework性能优化:让你的iOS应用运行如飞的10个技巧 【免费下载链接】MVVMFramework (OC版)总结整理下一个快速开发框架,以更优雅的方式写代码,做一个代码艺术家。分离控制器中的代码,已加入cell自适应高度,自动缓…...
3分钟告别Windows桌面混乱:这款免费工具让你的图标瞬间变整齐
3分钟告别Windows桌面混乱:这款免费工具让你的图标瞬间变整齐 【免费下载链接】NoFences 🚧 Open Source Stardock Fences alternative 项目地址: https://gitcode.com/gh_mirrors/no/NoFences 还在为Windows桌面上那些杂乱无章的图标头疼吗&…...
物流物联网降本增效:LoRa、NB-IoT等低功耗无线技术选型与实战
1. 项目概述:当“省电”成为物流降本增效的隐形王牌最近和几个做仓储和车队管理的朋友聊天,大家不约而同都在吐槽同一个问题:设备电费和管理成本。一个大型仓库里,成千上万个传感器、电子标签、手持终端,光是电池更换和…...
让Windows 11任务栏唱歌:Taskbar-Lyrics插件如何改变你的音乐体验
让Windows 11任务栏唱歌:Taskbar-Lyrics插件如何改变你的音乐体验 【免费下载链接】Taskbar-Lyrics BetterNCM插件,在任务栏上嵌入歌词,目前仅建议Windows 11 项目地址: https://gitcode.com/gh_mirrors/ta/Taskbar-Lyrics 还在为切换…...
API调用总失败?ChatGPT官方Rate Limit机制深度拆解,4类高频报错代码级诊断手册
更多请点击: https://kaifayun.com 第一章:API调用总失败?ChatGPT官方Rate Limit机制深度拆解,4类高频报错代码级诊断手册 ChatGPT API 的速率限制(Rate Limit)并非黑盒策略,而是由 OpenAI 明确…...
FlexNeRFer架构:动态精度MAC与稀疏计算优化解析
1. FlexNeRFer架构设计解析 1.1 多精度MAC单元设计原理 FlexNeRFer的核心创新在于其可动态调整精度的MAC(乘加运算单元)架构。传统固定精度MAC在面对NeRF这类需要混合精度计算的场景时,要么存在计算资源浪费(高精度模式ÿ…...
用知识图谱重构搜索引擎
一、传统搜索:关键词的“机械匹配”时代你输入词,它找文档我们熟悉的搜索引擎,无论是早期的Google还是百度的首页,核心逻辑都是关键词匹配。你输入“苹果热量”,它就把互联网里包含“苹果”和“热量”两个词的网页抓出…...
在自动化脚本中集成Taotoken API并观察其长时间运行的可靠性
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 在自动化脚本中集成Taotoken API并观察其长时间运行的可靠性 对于需要长时间、周期性调用大模型API的自动化任务而言,服…...
OpenCV鼠标事件避坑指南:setMouseCallback() 中 userdata 参数的正确用法与内存管理
OpenCV鼠标事件高阶实践:setMouseCallback()中userdata参数的安全使用与多线程陷阱 在计算机视觉开发中,交互式图像处理是一个常见需求。OpenCV提供的setMouseCallback()函数看似简单,但当开发者需要传递复杂数据结构或在多线程环境下使用时…...
