当前位置: 首页 > news >正文

0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)

大纲

  • Tumbling Count Windows
    • map
    • reduce
      • Window Size为2
      • Window Size为3
      • Window Size为4
      • Window Size为5
      • Window Size为6
  • 完整代码
  • 参考资料

之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是数据会源源不断输入。
在这里插入图片描述
对于这种数据,我们称之为无界流,即没有“终止的界限”。但是程序在底层一定不能等着无止境的数据都传递结束再处理,因为“无止境”就意味着“终止的界限”触发计算的条件是不存在的。那么我们可以人为的给它设置一个“界”,这就是我们本节介绍的窗口。

Tumbling Count Windows

Tumbling Count Windows是指按元素个数计数的滚动窗口。
滚动窗口是指没有元素重叠的窗口,比如下面图是个数为2的窗口。(元素重叠的窗口我们会在《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》介绍)
在这里插入图片描述
个数为3的窗口
在这里插入图片描述
我们用代码探索下这个概念

map

word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) 

这段代码构造了一个KeyedStream,用于存储word_count_data中的数据。
我们并没有让Source是流的形式,是因为为了降低例子复杂度。但是我们将runntime mode设置为流(STREAMING)模式。
在这里插入图片描述

reduce

我们需要定义一个Reduce类,用于对元组中的数据进行计算。这个类需要继承于WindowFunction,并实现相应方法(本例中是apply)。
apply会计算一个相同key的元素个数。比如key是“E”的元组个数是6。

class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]

Window Size为2

    # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()

(A,2)
(B,2)
(C,2)
(C,2)
(D,2)
(D,2)
(E,2)
(E,2)
(E,2)

  • A的个数是2是因为A的确只有两个元组,而一个Size为2的Window正好承载了这两个元素。于是有(A,2)这个结果;
  • B的个数是3。但是会产生两个窗口,第一个窗口承载了前两个元素,第二个窗口当前只有一个元素。于是第一个窗口进行了Reduce计算,得出一个(B,2);第二个窗口还没进行reduce计算,就没有展现出结果;
  • C有4个,正好可以被2个窗口承载。这样我们就看到2个(C,2)。
  • D有5个,情况和B类似。它被分成了3个窗口,只有2个窗口满足个数条件,于是就输出2个(D,2);最后一个窗口因为元素不够,就没尽兴reduce计算了。
  • E有6个,正好被3个窗口承载。我们就看到3个(E,2)。
    在这里插入图片描述

Window Size为3

    # reducingreduced=keyed.count_window(3) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(B,3)
(C,3)
(D,3)
(E,3)
(E,3)

在这里插入图片描述

Window Size为4

    # reducingreduced=keyed.count_window(4) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(C,4)
(D,4)
(E,4)

在这里插入图片描述

Window Size为5

    # reducingreduced=keyed.count_window(5) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(D,5)
(E,5)

在这里插入图片描述

Window Size为6

    # reducingreduced=keyed.count_window(6) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))

(E,6)

在这里插入图片描述

完整代码

from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key,  len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("B",3),("B",1),("B",2),("C",3),("C",1),("C",4),("C",2),("D",3),("D",1),("D",4),("D",2),("D",5),("E",3),("E",1),("E",4),("E",2),("E",6),("E",5)]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0]) # reducingreduced=keyed.count_window(2) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

相关文章:

0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)

大纲 Tumbling Count WindowsmapreduceWindow Size为2Window Size为3Window Size为4Window Size为5Window Size为6 完整代码参考资料 之前的案例中,我们的Source都是确定内容的数据。而Flink是可以处理流式(Streaming)数据的,就是…...

车载终端构筑智慧工厂:无人配送车的高效物流体系

​随着科技的不断进步和应用,智能化已经成为许多领域的关键词。在物流行业中,随着无人配送车的兴起和智慧工厂的崛起,车载终端正引领着无人配送车的科技变革之路。 文章同款:https://www.key-iot.com/iotlist/sv900.html 车载终端…...

插件_日期_lunar-calendar公历农历转换

现在存在某需求&#xff0c;需要将公历、农历日期进行相互转换&#xff0c;在此借助lunar-calendar插件完成。 下载 [1] 通过npm安装 npm install lunar-calendar[2]通过文件方式引入 <script type"text/javascript" src"lib/LunarCalendar.min.js">…...

【FreeRTOS】【STM32】08 FreeRTOS 消息队列

简单来说 消息队列是一种数据结构 任务操作队列的基本描述 1.如果队列未满或者允许覆盖入队,FreeRTOS会将任务需要发送的消息添加到队列尾。 2.如果队列满,任务会阻塞(等待)。 3.用户可以指定等待时间。 4.当其它任务从其等待的队列中读取入了数据&#xff08;这时候队列未满…...

【计算机组成原理】CPU的工作原理

一.CPU的组成结构 CPU主要有运算器、控制器、寄存器和内部总线等组成&#xff0c;其大概的样子长这样&#xff1a; 看不懂没关系&#xff0c;我们将采用自顶而下的方法来讲解CPU的具体工作原理&#xff0c;我们首先来说一下什么叫寄存器&#xff0c;顾名思义&#xff0c;寄存器…...

部署ELK

一、elasticsearch #拉取镜像 docker pull elasticsearch:7.12.1 #创建ELK docker网络 docker network create elk #启动ELK docker run -d --name es --net elk -P -e "discovery.typesingle-node" elasticsearch:7.12.1 #拷贝配置文件 docker cp es:/usr/share/el…...

纯前端实现图片验证码

前言 之前业务系统中验证码一直是由后端返回base64与一个验证码的字符串来实现的&#xff0c;想了下&#xff0c;前端其实可以直接canvas实现&#xff0c;减轻服务器压力。 实现 子组件&#xff0c;允许自定义图片尺寸(默认尺寸为100 * 40)与验证码刷新时间(默认时间为60秒)…...

#django基本常识01#

1、manage.py 所有子命令的入口&#xff0c;比如&#xff1a; python3 manage.py runserver 启动服务 python3 manage.py startapp 创建应用 python3 manage.py migrate 数据库迁移 直接执行python3 manage.py 可显示所有子命令...

什么是物流RPA?物流RPA解决什么问题?物流RPA实施难点在哪里?

RPA指的是机器人流程自动化&#xff0c;它是一套模拟人类在计算机、平板电脑、移动设备等界面执行任务的软件。通过RPA&#xff0c;可以自动完成重复性、繁琐的工作&#xff0c;提高工作效率和质量&#xff0c;降低人力成本。RPA适用于各种行业和场景&#xff0c;例如财务、人力…...

乐鑫工程部署过程记录

一、获取编译环境 1、下载sdk&#xff0c;ESP-IDF 这里有很多发布版本&#xff0c;当前我选择的是4.4.6&#xff0c;可以选择下载压缩包&#xff0c;也可以git直接clone 2、配置编译环境 我选择的是Linux Ubuntu下部署开发环境 查看入门指南 选择对应的芯片&#xff0c;我…...

to 后接ing形式的情况

look forward to seeing you. (期待着见到你) She admitted to making a mistake. (承认犯了个错误) He is accustomed to working long hours. (习惯于长时间工作)...

我做云原生的那几年

背景介绍 在2020年6月&#xff0c;我加入了一家拥有超过500人的企业。彼时&#xff0c;前端团队人数众多&#xff0c;有二三十名成员。在这样的大团队中&#xff0c;每个人都要寻找自己的独特之处和核心竞争力。否则&#xff0c;你可能会沉没于常规的增删改查工作中&#xff0…...

@EventListener注解使用说明

在Java的Spring框架中&#xff0c;EventListener注解用于监听和处理应用程序中的各种事件。通过使用EventListener注解&#xff0c;开发人员可以方便地实现事件驱动的编程模型&#xff0c;提高代码的灵活性和可维护性。本文将详细探讨EventListener注解的使用方法和作用&#x…...

算法通关村第五关-白银挑战实现队列

大纲 队列基础队列的基本概念和基本特征实现队列队列的基本操作Java中的队列 队列基础 队列的基本概念和基本特征 队列的特点是节点的排队次序和出队次序按入队时间先后确定&#xff0c;即先入队者先出队&#xff0c;后入队者后出队&#xff0c;即我们常说的FIFO(first in fi…...

协力共创智能未来:乐鑫 ESP RainMaker 云方案线下研讨会圆满落幕

近日&#xff0c;乐鑫 ESP RainMaker 云方案线下研讨会&#xff08;深圳&#xff09;在亚马逊云科技与合作伙伴嘉宾的支持下成功举办&#xff0c;吸引了众多来自智能家电、照明电工、能源和宠物等行业的品牌客户、方案商和制造商。研讨会围绕如何基于乐鑫 ESP RainMaker 硬件连…...

读取谷歌地球的kml文件中的经纬度坐标

最近我在B站上传了如何获取研究边界的视频&#xff0c;下面分享一个可以读取kml中经纬度的matlab函数&#xff0c;如此一来就可以获取任意区域的经纬度坐标了。 1.谷歌地球中划分区域 2.matlab读取kml文件 function [sname,lon,lat] kml2xy(ip_kml) % ip_kml ocean_distubu…...

1深度学习李宏毅

目录 机器学习三件事&#xff1a;分类&#xff0c;预测和结构化生成 2、一般会有经常提到什么是标签label&#xff0c;label就是预测值&#xff0c;在机器学习领域的残差就是e和loss​编辑3、一些计算loss的方法&#xff1a;​编辑​编辑 4、可以设置不同的b和w从而控制loss的…...

Flask_Login使用与源码解读

一、前言 用户登录后&#xff0c;验证状态需要记录在会话中&#xff0c;这样浏览不同页面时才能记住这个状态&#xff0c;Flask_Login是Flask的扩展&#xff0c;专门用于管理用户身份验证系统中的验证状态。 注&#xff1a;Flask是一个微框架&#xff0c;仅提供包含基本服务的…...

利用Graviton2和S3免费套餐搭建私人网盘

网盘是一种在线存储服务&#xff0c;提供文件存储&#xff0c;访问&#xff0c;备份&#xff0c;贡献等功能&#xff0c;是我们日常中不可或缺的一种服务。很多互联网公司都为个人和企业提供免费的网盘服务。但这些免费服务都有一些限制&#xff0c;比如限制下载速度&#xff0…...

跟着GPT学设计模式之单例模式

单例设计模式&#xff08;Singleton Design Pattern&#xff09;一个类只允许创建一个对象&#xff08;或者实例&#xff09;&#xff0c;那这个类就是一个单例类&#xff0c;这种设计模式就叫作单例设计模式&#xff0c;简称单例模式。 单例有几种经典的实现方式&#xff0c;…...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

css的定位(position)详解:相对定位 绝对定位 固定定位

在 CSS 中&#xff0c;元素的定位通过 position 属性控制&#xff0c;共有 5 种定位模式&#xff1a;static&#xff08;静态定位&#xff09;、relative&#xff08;相对定位&#xff09;、absolute&#xff08;绝对定位&#xff09;、fixed&#xff08;固定定位&#xff09;和…...

Android15默认授权浮窗权限

我们经常有那种需求&#xff0c;客户需要定制的apk集成在ROM中&#xff0c;并且默认授予其【显示在其他应用的上层】权限&#xff0c;也就是我们常说的浮窗权限&#xff0c;那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...

redis和redission的区别

Redis 和 Redisson 是两个密切相关但又本质不同的技术&#xff0c;它们扮演着完全不同的角色&#xff1a; Redis: 内存数据库/数据结构存储 本质&#xff1a; 它是一个开源的、高性能的、基于内存的 键值存储数据库。它也可以将数据持久化到磁盘。 核心功能&#xff1a; 提供丰…...

Neko虚拟浏览器远程协作方案:Docker+内网穿透技术部署实践

前言&#xff1a;本文将向开发者介绍一款创新性协作工具——Neko虚拟浏览器。在数字化协作场景中&#xff0c;跨地域的团队常需面对实时共享屏幕、协同编辑文档等需求。通过本指南&#xff0c;你将掌握在Ubuntu系统中使用容器化技术部署该工具的具体方案&#xff0c;并结合内网…...

webpack面试题

面试题&#xff1a;webpack介绍和简单使用 一、webpack&#xff08;模块化打包工具&#xff09;1. webpack是把项目当作一个整体&#xff0c;通过给定的一个主文件&#xff0c;webpack将从这个主文件开始找到你项目当中的所有依赖文件&#xff0c;使用loaders来处理它们&#x…...

STM32 低功耗设计全攻略:PWR 模块原理 + 睡眠 / 停止 / 待机模式实战(串口 + 红外 + RTC 应用全解析)

文章目录 PWRPWR&#xff08;电源控制模块&#xff09;核心功能 电源框图上电复位和掉电复位可编程电压监测器低功耗模式模式选择睡眠模式停止模式待机模式 修改主频一、准备工作二、修改主频的核心步骤&#xff1a;宏定义配置三、程序流程&#xff1a;时钟配置函数解析四、注意…...