当前位置: 首页 > news >正文

Flink之窗口聚合算子

1.窗口聚合算子

在Flink中窗口聚合算子主要分类两类

  • 滚动聚合算子(增量聚合)
  • 全窗口聚合算子(全量聚合)
1.1 滚动聚合算子

滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下:

  • aggregate
  • max
  • maxBy
  • min
  • minBy
  • reduce
  • sum

这里以aggregate算子作为示例

// ... 
// 每10s统计一次每个用户最近30s的行为条数
SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarked.keyBy(userEvent -> userEvent.getUId()).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 参数1:窗口长度 参数2:滑动步长即计算频率.aggregate(new AggregateFunction<UserEvent2, Tuple2<String, Integer>, Tuple2<String, Integer>>() {// 这里给一个初始值@Overridepublic Tuple2<String, Integer> createAccumulator() {return Tuple2.of("", 0);}// 在累加器中统计每个用户行为条数(来一条更新一次)@Overridepublic Tuple2<String, Integer> add(UserEvent2 value, Tuple2<String, Integer> accumulator) {Tuple2<String, Integer> result = Tuple2.of(value.getUId() + "-" + value.getName(), accumulator.f1 + 1);return result;}// 将累加器中的更新结果给到getResult方法,输出@Overridepublic Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {return accumulator;}// 这个方法在流式计算中可以不用实现,在上下游数据进行合并时需要用到,以spark为例,上有map和下游reduce的计算结果需要合并时需要实现这个方法@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {Tuple2<String, Integer> merged = Tuple2.of(a.f0, a.f1 + b.f1);return merged;}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

1.2 全窗口聚合算子

全窗口聚合算子会将数据记录在状态容器中,当窗口触发时会将整个窗口中的数据交给聚合函数,根据具体逻辑将这些数据进行计算,常用算子如下:

  • apply
  • process

这里以apply算子为例

// ... 
// 每10s统计一次最近30s每个用户行为发生事件最大两条数据
SingleOutputStreamOperator<UserEvent2> userEventTimeTop2 = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))// 泛型1: 数据数据类型 泛型2: 输出数据类型 泛型3: key类型 泛型4: 窗口类型.apply(new WindowFunction<UserEvent2, UserEvent2, String, TimeWindow>() {/***@Param s 本次传入的key*@Param window 本次传入窗口的各种元信息*@Param input 本次输入的所有数据*@Param out 输出数据**/@Overridepublic void apply(String s, TimeWindow window, Iterable<UserEvent2> input, Collector<UserEvent2> out) throws Exception {// 创建集合接收迭代器中的数据ArrayList<UserEvent2> userEvent2List = new ArrayList<>();// 遍历迭代器,也就是输入数据for (UserEvent2 userEvent2 : input) {// 将数据添加到集合中userEvent2List.add(userEvent2);}// 将集合中的数据根据用户行为发生事件进行排序Collections.sort(userEvent2List, new Comparator<UserEvent2>() {@Overridepublic int compare(UserEvent2 o1, UserEvent2 o2) {// 倒序排序return Integer.parseInt(o2.getTime()) - Integer.parseInt(o1.getTime());}});// 将每个用户行为发生时间最大的两条数据输出for (int i = 0; i < Math.min(userEvent2List.size(), 2); i++) {out.collect(userEvent2List.get(i));}}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

相关文章:

Flink之窗口聚合算子

1.窗口聚合算子 在Flink中窗口聚合算子主要分类两类 滚动聚合算子(增量聚合)全窗口聚合算子(全量聚合) 1.1 滚动聚合算子 滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下: aggregatemaxmaxBy…...

K8S:Rancher管理 Kubernetes 集群

文章目录 一.Rancher 简介1.Rancher概念2.Rancher 和 k8s 的区别 二.Rancher 安装及配置1.安装 rancher2.登录 Rancher 平台3.Rancher 管理已存在的 k8s 集群4.Rancher 部署监控系统5.使用 Rancher 仪表盘管理 k8s 集群 三.拓展1.Rancher和kubesphere相比较2.K3S和K8S相比较 一…...

后台运行python程序并查看运行的python 进程

nohup python -u Job.py > log.log 2>&1 &说明&#xff1a; 末尾的“&”&#xff1a;表示后台运行程序 “nohup” &#xff1a;保证程序不被挂起 “python”&#xff1a;是执行python代码的命令 “-u”&#xff1a;表示不启用缓存&#xff0c;实时输出打印…...

树莓派部署.net core网站程序

1、发布你的项目 使用mobaxterm上传程序 回到mobaxterm,f进入目录输入&#xff1a; cd webpublish 运行程序&#xff1a;dotnet WebApplication1.dll 访问地址为&#xff1a;http://localhost:5000,尝访问如下&#xff1a; 已经出现 返回的json&#xff0c;证明是可以访问的…...

淘宝商品评论数据接口,淘宝商品评论API接口

淘宝商品评论数据接口可以通过淘宝开放平台API获取。 通过构建合理的请求URL&#xff0c;可以向淘宝服务器发起HTTP请求&#xff0c;获取商品评论数据。接口返回的数据一般为JSON格式&#xff0c;包含了商品的各种评价信息。获取到商品评论数据后&#xff0c;可以对其进行处理…...

455. 分发饼干

假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺寸&#xff1b;并且每块饼干 j&#xff0c;都有一个尺寸 s[j] …...

GEE:数据预处理的细节(处理顺序。比如, select() 和 filter() 要优先于 map())

作者:CSDN @ _养乐多_ 大家在数据预处理的时候,是不是随意进行处理,并没有考虑 Google Earth Engine(GEE)性能的问题?比如选择数据集的时候,先执行map函数,再按时间选择数据?不同的处理顺序会导致不同的计算成本。 因此,本文将探讨如何在 GEE 中筛选和选择数据集合…...

【AHK】任务栏调节音量/边缘滚动调节/边缘触发

通过ahk实现类似mouseinc的边缘滚动调节音量的功能&#xff0c;有两个思路。 任务栏调节音量 #If MouseIsOver("ahk_class Shell_TrayWnd") WheelUp::Send {Volume_Up} WheelDown::Send {Volume_Down} return #IfMouseIsOver(WinTitle) {MouseGetPos,,, Winreturn …...

Chrome插件 — ReRes

ReRes插件是一款可以帮助Web开发人员进行开发和测试的Chrome浏览器扩展。它可以模拟网页请求&#xff0c;并返回指定的响应。 该插件可以用于多种情况&#xff0c;例如&#xff1a; 测试网站功能&#xff0c;调试程序等&#xff1b;本地开发Web应用时&#xff0c;模拟远程API…...

前端面试基础面试题——9

1.js 延迟加载的方式有哪些&#xff1f; 2.js同步和异步的区别&#xff1f; 3.什么是浏览器的同源政策&#xff1f; 4.介绍一下 js 的节流与防抖&#xff1f; 5.js 中的深浅拷贝实现&#xff1f; 6.Js 动画与 CSS 动画区别及相应实现 7.观察者模式和发布订阅模式有什么不同…...

tomcat 问题

一、start up.bat 闪退 在命令窗口run 看看是缺少了哪个环境变量 二、控制台输出乱码 logging.properties 底部添加 java.util.logging.ConsoleHandler.encoding GBK 三、缓存不足 context.xml配置 <Resources cachingAllowed"false" cacheMaxSize"100…...

小程序首页如何进行装修设置

小程序首页是展示给用户的第一屏&#xff0c;它的装修直接影响到用户对小程序的第一印象。小程序首页的设置在小程序管理员后台->页面设置->首页&#xff0c;下图是小程序首页默认的设置。 下图&#xff0c;是小程序首页的具体表现形式。下面具体解释小程序首页各个设置项…...

npm安装依赖报错npm ERR! code ENOTFOUND npm ERR! errno ENOTFOUND、npm run dev报错记录

npm安装依赖报错npm ERR! code ENOTFOUND npm ERR! errno ENOTFOUND_得我所得&#xff0c;爱我所爱的博客-CSDN博客npm安装依赖报错今天在学习webpack的时候&#xff0c;在使用npm install来安装一个局部的webpack时候&#xff0c;报出一下错误:npm ERR! code ENOTFOUNDnpm ERR…...

堆叠注入([强网杯 2019]随便注1)

详解&#xff1a; 堆叠注入&#xff08;Stack Injection&#xff09;是一种计算机安全概念&#xff0c;涉及攻击者向程序的堆栈内存中插入恶意代码&#xff0c;以便在程序执行期间执行非预期的操作。 堆栈注入攻击通常利用程序在处理函数调用时使用的堆栈机制。当一个函数被调…...

零基础Linux_15(基础IO_文件)软硬链接+动静态库详解

目录 1. 软硬链接 1.1 创建软链接 1.2 创建硬链接 1.3 硬链接数和unlink 2. 动静态库 2.1 制作静态库 2.2 查看和打包静态库 2.3 使用静态库 2.3.1 安装在默认搜索路径 2.3.2 告知路径库路径库名 2.4 制作动态库 2.5 使用动态库 2.5.1 安装在默认搜索路径 2.5.2 …...

计算机毕业设计选什么题目好?springboot 健身房管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…...

两台linux 之间传输文件 (详细+bash脚本)

两台linux设备文件直接传输&#xff0c;有很多应用场景 一、可能的方案 &#xff08;一&#xff09;先下载再上传 从linux通过ssh下载到windows下&#xff0c;然后再通过ssh上传到另一台linux。 1.优点&#xff1a;简单 2.缺点&#xff1a;效率低&#xff0c;需要额外的设备…...

嵌入式系统开发【深入浅出】 EXTI 与 NVIC

目录 CPU 感知外部事件变化的三种方式 中断分三个级别 中断控制器 STM32 的中断和异常 NVIC 中断控制器 NVIC 结构体成员 抢占优先级和响应优先级 简单配置NVIC中断控制器 EXTI 外部中断【中断源级】​ STM32系列微控制器实际上最多有23根外部中断线&#xff08;EXT…...

【Kali】简单记录

文章目录 信息收集DNS记录分析hostdigdnsenum 路由信息tcptraceroutetctrace 搜索引擎 目标识别arpingfping 识别操作系统p0f 服务枚举端口扫描nmap识别VPN服务器 漏洞映射exploitdbmsfconsole 提权arpspoofDsniff 信息收集 DNS记录分析 host host www.example.com host -a …...

【数据结构】:队列的实现

队列 队列的概念及结构 队列&#xff1a;只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表&#xff0c;队列具有先进先出 FIFO(First In First Out) 入队列&#xff1a;进行插入操作的一端称为队尾 出队列&#xff1a;进行删除操作的一端称为队…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

Debian系统简介

目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版&#xff…...

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解

JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用&#xff0c;结合SQLite数据库实现联系人管理功能&#xff0c;并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能&#xff0c;同时可以最小化到系统…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…...

c++第七天 继承与派生2

这一篇文章主要内容是 派生类构造函数与析构函数 在派生类中重写基类成员 以及多继承 第一部分&#xff1a;派生类构造函数与析构函数 当创建一个派生类对象时&#xff0c;基类成员是如何初始化的&#xff1f; 1.当派生类对象创建的时候&#xff0c;基类成员的初始化顺序 …...