当前位置: 首页 > news >正文

0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)

大纲

  • Tumbling Count Windows
    • map
    • reduce
      • Window Size为2
      • Window Size为3
      • Window Size为4
      • Window Size为5
      • Window Size为6
  • 完整代码
  • 参考资料

之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是数据会源源不断输入。
在这里插入图片描述
对于这种数据,我们称之为无界流,即没有“终止的界限”。但是程序在底层一定不能等着无止境的数据都传递结束再处理,因为“无止境”就意味着“终止的界限”触发计算的条件是不存在的。那么我们可以人为的给它设置一个“界”,这就是我们本节介绍的窗口。

Tumbling Count Windows

Tumbling Count Windows是指按元素个数计数的滚动窗口。
滚动窗口是指没有元素重叠的窗口,比如下面图是个数为2的窗口。(元素重叠的窗口我们会在《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》介绍)
在这里插入图片描述
个数为3的窗口
在这里插入图片描述
我们用代码探索下这个概念

map

word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

这段代码构造了一个KeyedStream,用于存储word_count_data中的数据。
我们并没有让Source是流的形式,是因为为了降低例子复杂度。但是我们将runntime mode设置为流(STREAMING)模式。
在这里插入图片描述

reduce

我们需要定义一个Reduce类,用于对元组中的数据进行计算。这个类需要继承于WindowFunction,并实现相应方法(本例中是apply)。
apply会计算一个相同key的元素个数。比如key是“E”的元组个数是6。

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]

Window Size为2

    # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

(A,2)
(B,2)
(C,2)
(C,2)
(D,2)
(D,2)
(E,2)
(E,2)
(E,2)

  • A的个数是2是因为A的确只有两个元组,而一个Size为2的Window正好承载了这两个元素。于是有(A,2)这个结果;
  • B的个数是3。但是会产生两个窗口,第一个窗口承载了前两个元素,第二个窗口当前只有一个元素。于是第一个窗口进行了Reduce计算,得出一个(B,2);第二个窗口还没进行reduce计算,就没有展现出结果;
  • C有4个,正好可以被2个窗口承载。这样我们就看到2个(C,2)。
  • D有5个,情况和B类似。它被分成了3个窗口,只有2个窗口满足个数条件,于是就输出2个(D,2);最后一个窗口因为元素不够,就没尽兴reduce计算了。
  • E有6个,正好被3个窗口承载。我们就看到3个(E,2)。
    在这里插入图片描述

Window Size为3

    # reducingreduced=keyed.count_window(3) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(B,3)
(C,3)
(D,3)
(E,3)
(E,3)

在这里插入图片描述

Window Size为4

    # reducingreduced=keyed.count_window(4) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(C,4)
(D,4)
(E,4)

在这里插入图片描述

Window Size为5

    # reducingreduced=keyed.count_window(5) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(D,5)
(E,5)

在这里插入图片描述

Window Size为6

    # reducingreduced=keyed.count_window(6) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,6)

在这里插入图片描述

完整代码

from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

相关文章:

0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)

大纲 Tumbling Count WindowsmapreduceWindow Size为2Window Size为3Window Size为4Window Size为5Window Size为6 完整代码参考资料 之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是…...

车载终端构筑智慧工厂:无人配送车的高效物流体系

​随着科技的不断进步和应用,智能化已经成为许多领域的关键词。在物流行业中,随着无人配送车的兴起和智慧工厂的崛起,车载终端正引领着无人配送车的科技变革之路。 文章同款:https://www.key-iot.com/iotlist/sv900.html 车载终端…...

插件_日期_lunar-calendar公历农历转换

现在存在某需求&#xff0c;需要将公历、农历日期进行相互转换&#xff0c;在此借助lunar-calendar插件完成。 下载 [1] 通过npm安装 npm install lunar-calendar[2]通过文件方式引入 <script type"text/javascript" src"lib/LunarCalendar.min.js">…...

【FreeRTOS】【STM32】08 FreeRTOS 消息队列

简单来说 消息队列是一种数据结构 任务操作队列的基本描述 1.如果队列未满或者允许覆盖入队,FreeRTOS会将任务需要发送的消息添加到队列尾。 2.如果队列满,任务会阻塞(等待)。 3.用户可以指定等待时间。 4.当其它任务从其等待的队列中读取入了数据&#xff08;这时候队列未满…...

【计算机组成原理】CPU的工作原理

一.CPU的组成结构 CPU主要有运算器、控制器、寄存器和内部总线等组成&#xff0c;其大概的样子长这样&#xff1a; 看不懂没关系&#xff0c;我们将采用自顶而下的方法来讲解CPU的具体工作原理&#xff0c;我们首先来说一下什么叫寄存器&#xff0c;顾名思义&#xff0c;寄存器…...

部署ELK

一、elasticsearch #拉取镜像 docker pull elasticsearch:7.12.1 #创建ELK docker网络 docker network create elk #启动ELK docker run -d --name es --net elk -P -e "discovery.typesingle-node" elasticsearch:7.12.1 #拷贝配置文件 docker cp es:/usr/share/el…...

纯前端实现图片验证码

前言 之前业务系统中验证码一直是由后端返回base64与一个验证码的字符串来实现的&#xff0c;想了下&#xff0c;前端其实可以直接canvas实现&#xff0c;减轻服务器压力。 实现 子组件&#xff0c;允许自定义图片尺寸(默认尺寸为100 * 40)与验证码刷新时间(默认时间为60秒)…...

#django基本常识01#

1、manage.py 所有子命令的入口&#xff0c;比如&#xff1a; python3 manage.py runserver 启动服务 python3 manage.py startapp 创建应用 python3 manage.py migrate 数据库迁移 直接执行python3 manage.py 可显示所有子命令...

什么是物流RPA?物流RPA解决什么问题?物流RPA实施难点在哪里?

RPA指的是机器人流程自动化&#xff0c;它是一套模拟人类在计算机、平板电脑、移动设备等界面执行任务的软件。通过RPA&#xff0c;可以自动完成重复性、繁琐的工作&#xff0c;提高工作效率和质量&#xff0c;降低人力成本。RPA适用于各种行业和场景&#xff0c;例如财务、人力…...

乐鑫工程部署过程记录

一、获取编译环境 1、下载sdk&#xff0c;ESP-IDF 这里有很多发布版本&#xff0c;当前我选择的是4.4.6&#xff0c;可以选择下载压缩包&#xff0c;也可以git直接clone 2、配置编译环境 我选择的是Linux Ubuntu下部署开发环境 查看入门指南 选择对应的芯片&#xff0c;我…...

to 后接ing形式的情况

look forward to seeing you. (期待着见到你) She admitted to making a mistake. (承认犯了个错误) He is accustomed to working long hours. (习惯于长时间工作)...

我做云原生的那几年

背景介绍 在2020年6月&#xff0c;我加入了一家拥有超过500人的企业。彼时&#xff0c;前端团队人数众多&#xff0c;有二三十名成员。在这样的大团队中&#xff0c;每个人都要寻找自己的独特之处和核心竞争力。否则&#xff0c;你可能会沉没于常规的增删改查工作中&#xff0…...

@EventListener注解使用说明

在Java的Spring框架中&#xff0c;EventListener注解用于监听和处理应用程序中的各种事件。通过使用EventListener注解&#xff0c;开发人员可以方便地实现事件驱动的编程模型&#xff0c;提高代码的灵活性和可维护性。本文将详细探讨EventListener注解的使用方法和作用&#x…...

算法通关村第五关-白银挑战实现队列

大纲 队列基础队列的基本概念和基本特征实现队列队列的基本操作Java中的队列 队列基础 队列的基本概念和基本特征 队列的特点是节点的排队次序和出队次序按入队时间先后确定&#xff0c;即先入队者先出队&#xff0c;后入队者后出队&#xff0c;即我们常说的FIFO(first in fi…...

协力共创智能未来:乐鑫 ESP RainMaker 云方案线下研讨会圆满落幕

近日&#xff0c;乐鑫 ESP RainMaker 云方案线下研讨会&#xff08;深圳&#xff09;在亚马逊云科技与合作伙伴嘉宾的支持下成功举办&#xff0c;吸引了众多来自智能家电、照明电工、能源和宠物等行业的品牌客户、方案商和制造商。研讨会围绕如何基于乐鑫 ESP RainMaker 硬件连…...

读取谷歌地球的kml文件中的经纬度坐标

最近我在B站上传了如何获取研究边界的视频&#xff0c;下面分享一个可以读取kml中经纬度的matlab函数&#xff0c;如此一来就可以获取任意区域的经纬度坐标了。 1.谷歌地球中划分区域 2.matlab读取kml文件 function [sname,lon,lat] kml2xy(ip_kml) % ip_kml ocean_distubu…...

1深度学习李宏毅

目录 机器学习三件事&#xff1a;分类&#xff0c;预测和结构化生成 2、一般会有经常提到什么是标签label&#xff0c;label就是预测值&#xff0c;在机器学习领域的残差就是e和loss​编辑3、一些计算loss的方法&#xff1a;​编辑​编辑 4、可以设置不同的b和w从而控制loss的…...

Flask_Login使用与源码解读

一、前言 用户登录后&#xff0c;验证状态需要记录在会话中&#xff0c;这样浏览不同页面时才能记住这个状态&#xff0c;Flask_Login是Flask的扩展&#xff0c;专门用于管理用户身份验证系统中的验证状态。 注&#xff1a;Flask是一个微框架&#xff0c;仅提供包含基本服务的…...

利用Graviton2和S3免费套餐搭建私人网盘

网盘是一种在线存储服务&#xff0c;提供文件存储&#xff0c;访问&#xff0c;备份&#xff0c;贡献等功能&#xff0c;是我们日常中不可或缺的一种服务。很多互联网公司都为个人和企业提供免费的网盘服务。但这些免费服务都有一些限制&#xff0c;比如限制下载速度&#xff0…...

跟着GPT学设计模式之单例模式

单例设计模式&#xff08;Singleton Design Pattern&#xff09;一个类只允许创建一个对象&#xff08;或者实例&#xff09;&#xff0c;那这个类就是一个单例类&#xff0c;这种设计模式就叫作单例设计模式&#xff0c;简称单例模式。 单例有几种经典的实现方式&#xff0c;…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

【HTML-16】深入理解HTML中的块元素与行内元素

HTML元素根据其显示特性可以分为两大类&#xff1a;块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

前端开发面试题总结-JavaScript篇(一)

文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包&#xff08;Closure&#xff09;&#xff1f;闭包有什么应用场景和潜在问题&#xff1f;2.解释 JavaScript 的作用域链&#xff08;Scope Chain&#xff09; 二、原型与继承3.原型链是什么&#xff1f;如何实现继承&a…...

Element Plus 表单(el-form)中关于正整数输入的校验规则

目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入&#xff08;联动&#xff09;2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 &#x1f4dd; 在上一篇文章中&#xff0c;我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源&#xff0c;方便后续将资源打包到一个可执行文件中。 2.embed介绍 &#x1f3af; Go 1.16 引入了革命性的 embed 包&#xff0c;彻底改变了静态资源管理的…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

手机平板能效生态设计指令EU 2023/1670标准解读

手机平板能效生态设计指令EU 2023/1670标准解读 以下是针对欧盟《手机和平板电脑生态设计法规》(EU) 2023/1670 的核心解读&#xff0c;综合法规核心要求、最新修正及企业合规要点&#xff1a; 一、法规背景与目标 生效与强制时间 发布于2023年8月31日&#xff08;OJ公报&…...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1&#xff1a;通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分&#xff0c;设置 Gradle JDK 方法2&#xff1a;通过 Settings File → Settings... (或 CtrlAltS)…...

华为OD机试-最短木板长度-二分法(A卷,100分)

此题是一个最大化最小值的典型例题&#xff0c; 因为搜索范围是有界的&#xff0c;上界最大木板长度补充的全部木料长度&#xff0c;下界最小木板长度&#xff1b; 即left0,right10^6; 我们可以设置一个候选值x(mid)&#xff0c;将木板的长度全部都补充到x&#xff0c;如果成功…...