当前位置: 首页 > news >正文

0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)

大纲

  • 滑动(Sliding)和滚动(Tumbling)的区别
  • 样例
    • 窗口为2,滑动距离为1
    • 窗口为3,滑动距离为1
    • 窗口为3,滑动距离为2
    • 窗口为3,滑动距离为3
  • 完整代码
  • 参考资料

在 《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们介绍了滚动窗口。本节我们要介绍滑动窗口。

滑动(Sliding)和滚动(Tumbling)的区别

正如其名,“滑动”是指这个窗口沿着一定的方向,按着一定的速度“滑行”。
在这里插入图片描述
而滚动窗口,则是一个个“衔接着”,而不是像上面那样交错着。
在这里插入图片描述
它们的相同之处就是:只有窗口内的事件数量到达窗口要求的数值时,这些窗口才会触发计算。

样例

我们只要对《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》中的代码做轻微的改动即可。为了简化样例,我们只看Key为E的元素的滑动。

word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

窗口为2,滑动距离为1

count_window会根据传入的第二参数决定是构建滚动(CountTumblingWindowAssigner)窗口还是滑动(CountSlidingWindowAssigner)窗口。

    def count_window(self, size: int, slide: int = 0):"""Windows this KeyedStream into tumbling or sliding count windows.:param size: The size of the windows in number of elements.:param slide: The slide interval in number of elements... versionadded:: 1.16.0"""if slide == 0:return WindowedStream(self, CountTumblingWindowAssigner(size))else:return WindowedStream(self, CountSlidingWindowAssigner(size, slide))

我们只要给count_window第二个参数传递一个不为0的值,即可达到滑动效果。

    # reducingwindows_size = 2sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

(E,2)
(E,2)
(E,2)
(E,2)
(E,2)

在这里插入图片描述

窗口为3,滑动距离为1

    # reducingwindows_size = 3sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)
(E,3)
(E,3)

在这里插入图片描述

窗口为3,滑动距离为2

    # reducingwindows_size = 3sliding_size = 2reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)

在这里插入图片描述

窗口为3,滑动距离为3

这个就等效于滚动窗口了,因为“滑”过了窗口大小。

    # reducingwindows_size = 3sliding_size = 3reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,3)
(E,3)

在这里插入图片描述

完整代码

from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingwindows_size = 3sliding_size = 1reduced=keyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

相关文章:

0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)

大纲 滑动(Sliding)和滚动(Tumbling)的区别样例窗口为2,滑动距离为1窗口为3,滑动距离为1窗口为3,滑动距离为2窗口为3,滑动距离为3 完整代码参考资料 在 《0基础学习PyFlink——个数…...

vue3+ts 提取公共方法

因为好多页面都会使用到这个效验规则,封装一个校检规则,方便维护 封装前 封装后...

C++ ->

C -> 是访问类或结构体对象的成员的运算符 注意这里不是直接的访问.是用于访问指向对象的指针的成员 下面的代码可以很好的理解如下&#xff1a; #include<iostream>using namespace std;class Func{public:int i,j;void myFunc(){cout<<"i"<&l…...

VR全景在医院的应用:缓和医患矛盾、提升医院形象

医患关系一直以来都是较为激烈的&#xff0c;包括制度的不完善、医疗资源紧张等问题也时有存在&#xff0c;为了缓解医患矛盾&#xff0c;不仅要提升患者以及家属对于医院的认知&#xff0c;还需要完善医疗制度&#xff0c;提高医疗资源的配置效率&#xff0c;提高服务质量。 因…...

【python基础】format格式化函数的使用

文章目录 前言一、format()内容匹配替换1、序号索引2、关键字3、列表索引4、字典索引5、通过类的属性6、通过魔法参数 二、format()数字格式化 前言 语法&#xff1a;str.format() 说明&#xff1a;一种格式化字符串的函数。 一、format()内容匹配替换 1、序号索引 在没有参…...

Java web(三):Http、Tomcat、Servlet

文章目录 一、Java web技术栈二、Http1.1 Http请求数据格式1.2 Http响应数据格式1.3 状态码 二、Tomcat2.1 介绍2.2 web项目结构2.3 IDEA中使用Tomcat 三、Servlet3.1 Servlet使用3.2 Servlet生命周期3.3 Servlet方法和体系结构3.4 urlPattern配置 四、Request4.1 获取请求数据…...

Java实现Hive UDF详细步骤 (Hive 3.x版本,IDEA开发)

这里写目录标题 前言1. 新建项目2.配置maven依赖3.编写代码4.打jar包5.上传服务器6.代码中引用 前言 老版本编写UDF时&#xff0c;需要继承 org.apache.hadoop.hive.ql.exec.UDF类&#xff0c;然后直接实现evaluate()方法即可。 由于公司hive版本比较高&#xff08;3.x&#x…...

Vue进阶(幺陆肆)Apache的Access.log分析总结

文章目录 一、前言二、常用指令 一、前言 前端项目排错阶段&#xff0c;可借助apache的Access.log进行请求日志查看。 二、常用指令 #查看80端口的tcp连接 #netstat -tan | grep "ESTABLISHED" | grep ":80" | wc -l #当前WEB服务器中联接次数最多的ip地…...

Apple 苹果发布 M3、M3 Pro 和 M3 Max 芯片

本心、输入输出、结果 文章目录 Apple 苹果发布 M3、M3 Pro 和 M3 Max 芯片前言M3、M3 Pro 和 M3 Max 芯片的性能相关资料图M3 Pro规格M3 Max规格弘扬爱国精神 Apple 苹果发布 M3、M3 Pro 和 M3 Max 芯片 编辑&#xff1a;简简单单 Online zuozuo 地址&#xff1a;https://blog…...

Linux常用命令及主流服务部署大全

目录 Linux 系统目录 一、常用操作命令 1、目录操作 2、文件内容操作&#xff08;查看日志&#xff0c;更改配置文件&#xff09; 3、压缩和解压缩 4、更改文件权限 二、各服务部署命令 1、增加虚拟内存 2、JDK 2.1 删除系统自带的openjdk 2.2 安装jdk 2.3 删除jd…...

list-watch集群调度

调度约束 Kubernetes 是通过 List-Watch **** 的机制进行每个组件的协作&#xff0c;保持数据同步的&#xff0c;每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件&#xff0c;向 APIServer 发送命令&#xff0c;在 Node 节点上面建立 Pod 和 Container。…...

深度强化学习中的神经网络部分的作用是什么?一般如何选择合适的神经网络呢?

在深度强化学习中&#xff0c;神经网络部分通常用于实现值函数近似或策略近似&#xff0c;以帮助智能体学习如何在一个环境中做出决策以获得最大的累积奖励。这些神经网络在深度强化学习中扮演着重要的角色&#xff0c;具体作用如下&#xff1a; 1.值函数近似&#xff08;Valu…...

若依系统的数据导入功能设置

一、后端 Log(title "公交站牌", businessType BusinessType.IMPORT)PreAuthorize("ss.hasPermi(busStop:busStop:import)")PostMapping("/importData")public AjaxResult importData(MultipartFile file, boolean updateSupport) throws Exce…...

vue页面父组件与子组件相互调用方法和传递参数值

vue页面父组件与子组件相互调用方法和传递参数值 父组件页面定义 <el-button type"text" icon"el-icon-refresh" click"refreshClick" slot"label"></el-button> <leftList leftClick"loadModelClick" r…...

vim使用

概述 vi&#xff08;visual editor&#xff09;是Unix/Linux编辑器的一种。类似于win中notepad。vim&#xff08;vi improved&#xff09;加强版 安装vim&#xff1a; $ yum install vim -y四种模式 命令模式&#xff1a;快速进行复制、粘贴、删除等操作&#xff0c;还可以…...

人工智能基础_机器学习014_BGD批量梯度下降公式更新_进一步推导_SGD随机梯度下降和MBGD小批量梯度下降公式进一步推导---人工智能工作笔记0054

然后我们先来看BGD批量梯度下降,可以看到这里,其实这个公式来源于 梯度下降的公式对吧,其实就是对原始梯度下降公式求偏导以后的梯度下降公式,然后 使用所有样本进行梯度下降得来的,可以看到* 1/n 其实就是求了一个平均数对吧.所有样本的平均数. 然后我们看,我们这里* 1/n那么…...

Android STR研究之一

简介&#xff1a; 先上一段谷歌的介绍 谷歌的网站地址&#xff1a; 电源管理 | Android 开源项目 | Android Open Source Project (google.cn) 术语 STR&#xff1a; STR(Suspend To RAM)的意思是“挂起到内存”,它是一种瞬间开机技术(On Now)。 当系统进入“挂起”状态…...

单链表的详解实现

单链表 结构 单链表结构中有两个数据&#xff0c;一个是存储数据的&#xff0c;还有一个指针指向下一个节点。 该图就是一个简单单链表的结构图。 接口实现 SLNode* CreateNode(SLNDataType x);//申请节点 void SLTprint(SLNode* head);//打印链表 void SLTPushBack(SLNode*…...

抛弃 scp 改用 rsync,让 Linux 下文件传输高效无比

我们都使用过 scp 来传输文件。当传输在中途或甚至在 99% 时被中断时&#xff0c;&#xff08;每当我想起99%的中断传输时&#xff0c;我的心都很痛&#xff09;&#xff1b;让我们看看如何使用 rsync 来替代 scp&#xff0c;避免这样的不幸。 什么是rsync&#xff1f; Rsync…...

Leetcode 2919. Minimum Increment Operations to Make Array Beautiful

Leetcode 2919. Minimum Increment Operations to Make Array Beautiful 1. 解题思路2. 代码实现 题目链接&#xff1a;2919. Minimum Increment Operations to Make Array Beautiful 1. 解题思路 这一题就是一个动态规划的题目。 思路上来说&#xff0c;就是考察每一个没到…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

转转集团旗下首家二手多品类循环仓店“超级转转”开业

6月9日&#xff0c;国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解&#xff0c;“超级…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...

VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP

编辑-虚拟网络编辑器-更改设置 选择桥接模式&#xff0c;然后找到相应的网卡&#xff08;可以查看自己本机的网络连接&#xff09; windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置&#xff0c;选择刚才配置的桥接模式 静态ip设置&#xff1a; 我用的ubuntu24桌…...

搭建DNS域名解析服务器(正向解析资源文件)

正向解析资源文件 1&#xff09;准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2&#xff09;服务端安装软件&#xff1a;bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...

Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成

一个面向 Java 开发者的 Sring-Ai 示例工程项目&#xff0c;该项目是一个 Spring AI 快速入门的样例工程项目&#xff0c;旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计&#xff0c;每个模块都专注于特定的功能领域&#xff0c;便于学习和…...

wpf在image控件上快速显示内存图像

wpf在image控件上快速显示内存图像https://www.cnblogs.com/haodafeng/p/10431387.html 如果你在寻找能够快速在image控件刷新大图像&#xff08;比如分辨率3000*3000的图像&#xff09;的办法&#xff0c;尤其是想把内存中的裸数据&#xff08;只有图像的数据&#xff0c;不包…...

HybridVLA——让单一LLM同时具备扩散和自回归动作预测能力:训练时既扩散也回归,但推理时则扩散

前言 如上一篇文章《dexcap升级版之DexWild》中的前言部分所说&#xff0c;在叠衣服的过程中&#xff0c;我会带着团队对比各种模型、方法、策略&#xff0c;毕竟针对各个场景始终寻找更优的解决方案&#xff0c;是我个人和我司「七月在线」的职责之一 且个人认为&#xff0c…...