0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)
大纲
- Tumbling Count Windows
- map
- reduce
- Window Size为2
- Window Size为3
- Window Size为4
- Window Size为5
- Window Size为6
- 完整代码
- 参考资料
之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是数据会源源不断输入。

对于这种数据,我们称之为无界流,即没有“终止的界限”。但是程序在底层一定不能等着无止境的数据都传递结束再处理,因为“无止境”就意味着“终止的界限”触发计算的条件是不存在的。那么我们可以人为的给它设置一个“界”,这就是我们本节介绍的窗口。
Tumbling Count Windows
Tumbling Count Windows是指按元素个数计数的滚动窗口。
滚动窗口是指没有元素重叠的窗口,比如下面图是个数为2的窗口。(元素重叠的窗口我们会在《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》介绍)

个数为3的窗口

我们用代码探索下这个概念
map
word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0])
这段代码构造了一个KeyedStream,用于存储word_count_data中的数据。
我们并没有让Source是流的形式,是因为为了降低例子复杂度。但是我们将runntime mode设置为流(STREAMING)模式。

reduce
我们需要定义一个Reduce类,用于对元组中的数据进行计算。这个类需要继承于WindowFunction,并实现相应方法(本例中是apply)。
apply会计算一个相同key的元素个数。比如key是“E”的元组个数是6。
class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key, len([e for e in inputs]))]
Window Size为2
# reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()
(A,2)
(B,2)
(C,2)
(C,2)
(D,2)
(D,2)
(E,2)
(E,2)
(E,2)
- A的个数是2是因为A的确只有两个元组,而一个Size为2的Window正好承载了这两个元素。于是有(A,2)这个结果;
- B的个数是3。但是会产生两个窗口,第一个窗口承载了前两个元素,第二个窗口当前只有一个元素。于是第一个窗口进行了Reduce计算,得出一个(B,2);第二个窗口还没进行reduce计算,就没有展现出结果;
- C有4个,正好可以被2个窗口承载。这样我们就看到2个(C,2)。
- D有5个,情况和B类似。它被分成了3个窗口,只有2个窗口满足个数条件,于是就输出2个(D,2);最后一个窗口因为元素不够,就没尽兴reduce计算了。
- E有6个,正好被3个窗口承载。我们就看到3个(E,2)。

Window Size为3
# reducingreduced=keyed.count_window(3) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))
(B,3)
(C,3)
(D,3)
(E,3)
(E,3)

Window Size为4
# reducingreduced=keyed.count_window(4) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))
(C,4)
(D,4)
(E,4)

Window Size为5
# reducingreduced=keyed.count_window(5) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))
(D,5)
(E,5)

Window Size为6
# reducingreduced=keyed.count_window(6) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))
(E,6)

完整代码
from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key, len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()
参考资料
- https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/
相关文章:
0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)
大纲 Tumbling Count WindowsmapreduceWindow Size为2Window Size为3Window Size为4Window Size为5Window Size为6 完整代码参考资料 之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是…...
车载终端构筑智慧工厂:无人配送车的高效物流体系
随着科技的不断进步和应用,智能化已经成为许多领域的关键词。在物流行业中,随着无人配送车的兴起和智慧工厂的崛起,车载终端正引领着无人配送车的科技变革之路。 文章同款:https://www.key-iot.com/iotlist/sv900.html 车载终端…...
插件_日期_lunar-calendar公历农历转换
现在存在某需求,需要将公历、农历日期进行相互转换,在此借助lunar-calendar插件完成。 下载 [1] 通过npm安装 npm install lunar-calendar[2]通过文件方式引入 <script type"text/javascript" src"lib/LunarCalendar.min.js">…...
【FreeRTOS】【STM32】08 FreeRTOS 消息队列
简单来说 消息队列是一种数据结构 任务操作队列的基本描述 1.如果队列未满或者允许覆盖入队,FreeRTOS会将任务需要发送的消息添加到队列尾。 2.如果队列满,任务会阻塞(等待)。 3.用户可以指定等待时间。 4.当其它任务从其等待的队列中读取入了数据(这时候队列未满…...
【计算机组成原理】CPU的工作原理
一.CPU的组成结构 CPU主要有运算器、控制器、寄存器和内部总线等组成,其大概的样子长这样: 看不懂没关系,我们将采用自顶而下的方法来讲解CPU的具体工作原理,我们首先来说一下什么叫寄存器,顾名思义,寄存器…...
部署ELK
一、elasticsearch #拉取镜像 docker pull elasticsearch:7.12.1 #创建ELK docker网络 docker network create elk #启动ELK docker run -d --name es --net elk -P -e "discovery.typesingle-node" elasticsearch:7.12.1 #拷贝配置文件 docker cp es:/usr/share/el…...
纯前端实现图片验证码
前言 之前业务系统中验证码一直是由后端返回base64与一个验证码的字符串来实现的,想了下,前端其实可以直接canvas实现,减轻服务器压力。 实现 子组件,允许自定义图片尺寸(默认尺寸为100 * 40)与验证码刷新时间(默认时间为60秒)…...
#django基本常识01#
1、manage.py 所有子命令的入口,比如: python3 manage.py runserver 启动服务 python3 manage.py startapp 创建应用 python3 manage.py migrate 数据库迁移 直接执行python3 manage.py 可显示所有子命令...
什么是物流RPA?物流RPA解决什么问题?物流RPA实施难点在哪里?
RPA指的是机器人流程自动化,它是一套模拟人类在计算机、平板电脑、移动设备等界面执行任务的软件。通过RPA,可以自动完成重复性、繁琐的工作,提高工作效率和质量,降低人力成本。RPA适用于各种行业和场景,例如财务、人力…...
乐鑫工程部署过程记录
一、获取编译环境 1、下载sdk,ESP-IDF 这里有很多发布版本,当前我选择的是4.4.6,可以选择下载压缩包,也可以git直接clone 2、配置编译环境 我选择的是Linux Ubuntu下部署开发环境 查看入门指南 选择对应的芯片,我…...
to 后接ing形式的情况
look forward to seeing you. (期待着见到你) She admitted to making a mistake. (承认犯了个错误) He is accustomed to working long hours. (习惯于长时间工作)...
我做云原生的那几年
背景介绍 在2020年6月,我加入了一家拥有超过500人的企业。彼时,前端团队人数众多,有二三十名成员。在这样的大团队中,每个人都要寻找自己的独特之处和核心竞争力。否则,你可能会沉没于常规的增删改查工作中࿰…...
@EventListener注解使用说明
在Java的Spring框架中,EventListener注解用于监听和处理应用程序中的各种事件。通过使用EventListener注解,开发人员可以方便地实现事件驱动的编程模型,提高代码的灵活性和可维护性。本文将详细探讨EventListener注解的使用方法和作用&#x…...
算法通关村第五关-白银挑战实现队列
大纲 队列基础队列的基本概念和基本特征实现队列队列的基本操作Java中的队列 队列基础 队列的基本概念和基本特征 队列的特点是节点的排队次序和出队次序按入队时间先后确定,即先入队者先出队,后入队者后出队,即我们常说的FIFO(first in fi…...
协力共创智能未来:乐鑫 ESP RainMaker 云方案线下研讨会圆满落幕
近日,乐鑫 ESP RainMaker 云方案线下研讨会(深圳)在亚马逊云科技与合作伙伴嘉宾的支持下成功举办,吸引了众多来自智能家电、照明电工、能源和宠物等行业的品牌客户、方案商和制造商。研讨会围绕如何基于乐鑫 ESP RainMaker 硬件连…...
读取谷歌地球的kml文件中的经纬度坐标
最近我在B站上传了如何获取研究边界的视频,下面分享一个可以读取kml中经纬度的matlab函数,如此一来就可以获取任意区域的经纬度坐标了。 1.谷歌地球中划分区域 2.matlab读取kml文件 function [sname,lon,lat] kml2xy(ip_kml) % ip_kml ocean_distubu…...
1深度学习李宏毅
目录 机器学习三件事:分类,预测和结构化生成 2、一般会有经常提到什么是标签label,label就是预测值,在机器学习领域的残差就是e和loss编辑3、一些计算loss的方法:编辑编辑 4、可以设置不同的b和w从而控制loss的…...
Flask_Login使用与源码解读
一、前言 用户登录后,验证状态需要记录在会话中,这样浏览不同页面时才能记住这个状态,Flask_Login是Flask的扩展,专门用于管理用户身份验证系统中的验证状态。 注:Flask是一个微框架,仅提供包含基本服务的…...
利用Graviton2和S3免费套餐搭建私人网盘
网盘是一种在线存储服务,提供文件存储,访问,备份,贡献等功能,是我们日常中不可或缺的一种服务。很多互联网公司都为个人和企业提供免费的网盘服务。但这些免费服务都有一些限制,比如限制下载速度࿰…...
跟着GPT学设计模式之单例模式
单例设计模式(Singleton Design Pattern)一个类只允许创建一个对象(或者实例),那这个类就是一个单例类,这种设计模式就叫作单例设计模式,简称单例模式。 单例有几种经典的实现方式,…...
MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
Java入门学习详细版(一)
大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...
如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...
DiscuzX3.5发帖json api
参考文章:PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下,适配我自己的需求 有一个站点存在多个采集站,我想通过主站拿标题,采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...
自然语言处理——文本分类
文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益(IG) 分类器设计贝叶斯理论:线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别, 有单标签多类别文本分类和多…...
Vue3中的computer和watch
computed的写法 在页面中 <div>{{ calcNumber }}</div>script中 写法1 常用 import { computed, ref } from vue; let price ref(100);const priceAdd () > { //函数方法 price 1price.value ; }//计算属性 let calcNumber computed(() > {return ${p…...
土建施工员考试:建筑施工技术重点知识有哪些?
《管理实务》是土建施工员考试中侧重实操应用与管理能力的科目,核心考查施工组织、质量安全、进度成本等现场管理要点。以下是结合考试大纲与高频考点整理的重点内容,附学习方向和应试技巧: 一、施工组织与进度管理 核心目标: 规…...
