当前位置: 首页 > news >正文

Flink之窗口聚合算子

1.窗口聚合算子

在Flink中窗口聚合算子主要分类两类

  • 滚动聚合算子(增量聚合)
  • 全窗口聚合算子(全量聚合)
1.1 滚动聚合算子

滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下:

  • aggregate
  • max
  • maxBy
  • min
  • minBy
  • reduce
  • sum

这里以aggregate算子作为示例

// ... 
// 每10s统计一次每个用户最近30s的行为条数
SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarked.keyBy(userEvent -> userEvent.getUId()).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 参数1:窗口长度 参数2:滑动步长即计算频率.aggregate(new AggregateFunction<UserEvent2, Tuple2<String, Integer>, Tuple2<String, Integer>>() {// 这里给一个初始值@Overridepublic Tuple2<String, Integer> createAccumulator() {return Tuple2.of("", 0);}// 在累加器中统计每个用户行为条数(来一条更新一次)@Overridepublic Tuple2<String, Integer> add(UserEvent2 value, Tuple2<String, Integer> accumulator) {Tuple2<String, Integer> result = Tuple2.of(value.getUId() + "-" + value.getName(), accumulator.f1 + 1);return result;}// 将累加器中的更新结果给到getResult方法,输出@Overridepublic Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {return accumulator;}// 这个方法在流式计算中可以不用实现,在上下游数据进行合并时需要用到,以spark为例,上有map和下游reduce的计算结果需要合并时需要实现这个方法@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {Tuple2<String, Integer> merged = Tuple2.of(a.f0, a.f1 + b.f1);return merged;}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

1.2 全窗口聚合算子

全窗口聚合算子会将数据记录在状态容器中,当窗口触发时会将整个窗口中的数据交给聚合函数,根据具体逻辑将这些数据进行计算,常用算子如下:

  • apply
  • process

这里以apply算子为例

// ... 
// 每10s统计一次最近30s每个用户行为发生事件最大两条数据
SingleOutputStreamOperator<UserEvent2> userEventTimeTop2 = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))// 泛型1: 数据数据类型 泛型2: 输出数据类型 泛型3: key类型 泛型4: 窗口类型.apply(new WindowFunction<UserEvent2, UserEvent2, String, TimeWindow>() {/***@Param s 本次传入的key*@Param window 本次传入窗口的各种元信息*@Param input 本次输入的所有数据*@Param out 输出数据**/@Overridepublic void apply(String s, TimeWindow window, Iterable<UserEvent2> input, Collector<UserEvent2> out) throws Exception {// 创建集合接收迭代器中的数据ArrayList<UserEvent2> userEvent2List = new ArrayList<>();// 遍历迭代器,也就是输入数据for (UserEvent2 userEvent2 : input) {// 将数据添加到集合中userEvent2List.add(userEvent2);}// 将集合中的数据根据用户行为发生事件进行排序Collections.sort(userEvent2List, new Comparator<UserEvent2>() {@Overridepublic int compare(UserEvent2 o1, UserEvent2 o2) {// 倒序排序return Integer.parseInt(o2.getTime()) - Integer.parseInt(o1.getTime());}});// 将每个用户行为发生时间最大的两条数据输出for (int i = 0; i < Math.min(userEvent2List.size(), 2); i++) {out.collect(userEvent2List.get(i));}}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

相关文章:

Flink之窗口聚合算子

1.窗口聚合算子 在Flink中窗口聚合算子主要分类两类 滚动聚合算子(增量聚合)全窗口聚合算子(全量聚合) 1.1 滚动聚合算子 滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下: aggregatemaxmaxBy…...

K8S:Rancher管理 Kubernetes 集群

文章目录 一.Rancher 简介1.Rancher概念2.Rancher 和 k8s 的区别 二.Rancher 安装及配置1.安装 rancher2.登录 Rancher 平台3.Rancher 管理已存在的 k8s 集群4.Rancher 部署监控系统5.使用 Rancher 仪表盘管理 k8s 集群 三.拓展1.Rancher和kubesphere相比较2.K3S和K8S相比较 一…...

后台运行python程序并查看运行的python 进程

nohup python -u Job.py > log.log 2>&1 &说明&#xff1a; 末尾的“&”&#xff1a;表示后台运行程序 “nohup” &#xff1a;保证程序不被挂起 “python”&#xff1a;是执行python代码的命令 “-u”&#xff1a;表示不启用缓存&#xff0c;实时输出打印…...

树莓派部署.net core网站程序

1、发布你的项目 使用mobaxterm上传程序 回到mobaxterm,f进入目录输入&#xff1a; cd webpublish 运行程序&#xff1a;dotnet WebApplication1.dll 访问地址为&#xff1a;http://localhost:5000,尝访问如下&#xff1a; 已经出现 返回的json&#xff0c;证明是可以访问的…...

淘宝商品评论数据接口,淘宝商品评论API接口

淘宝商品评论数据接口可以通过淘宝开放平台API获取。 通过构建合理的请求URL&#xff0c;可以向淘宝服务器发起HTTP请求&#xff0c;获取商品评论数据。接口返回的数据一般为JSON格式&#xff0c;包含了商品的各种评价信息。获取到商品评论数据后&#xff0c;可以对其进行处理…...

455. 分发饼干

假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺寸&#xff1b;并且每块饼干 j&#xff0c;都有一个尺寸 s[j] …...

GEE:数据预处理的细节(处理顺序。比如, select() 和 filter() 要优先于 map())

作者:CSDN @ _养乐多_ 大家在数据预处理的时候,是不是随意进行处理,并没有考虑 Google Earth Engine(GEE)性能的问题?比如选择数据集的时候,先执行map函数,再按时间选择数据?不同的处理顺序会导致不同的计算成本。 因此,本文将探讨如何在 GEE 中筛选和选择数据集合…...

【AHK】任务栏调节音量/边缘滚动调节/边缘触发

通过ahk实现类似mouseinc的边缘滚动调节音量的功能&#xff0c;有两个思路。 任务栏调节音量 #If MouseIsOver("ahk_class Shell_TrayWnd") WheelUp::Send {Volume_Up} WheelDown::Send {Volume_Down} return #IfMouseIsOver(WinTitle) {MouseGetPos,,, Winreturn …...

Chrome插件 — ReRes

ReRes插件是一款可以帮助Web开发人员进行开发和测试的Chrome浏览器扩展。它可以模拟网页请求&#xff0c;并返回指定的响应。 该插件可以用于多种情况&#xff0c;例如&#xff1a; 测试网站功能&#xff0c;调试程序等&#xff1b;本地开发Web应用时&#xff0c;模拟远程API…...

前端面试基础面试题——9

1.js 延迟加载的方式有哪些&#xff1f; 2.js同步和异步的区别&#xff1f; 3.什么是浏览器的同源政策&#xff1f; 4.介绍一下 js 的节流与防抖&#xff1f; 5.js 中的深浅拷贝实现&#xff1f; 6.Js 动画与 CSS 动画区别及相应实现 7.观察者模式和发布订阅模式有什么不同…...

tomcat 问题

一、start up.bat 闪退 在命令窗口run 看看是缺少了哪个环境变量 二、控制台输出乱码 logging.properties 底部添加 java.util.logging.ConsoleHandler.encoding GBK 三、缓存不足 context.xml配置 <Resources cachingAllowed"false" cacheMaxSize"100…...

小程序首页如何进行装修设置

小程序首页是展示给用户的第一屏&#xff0c;它的装修直接影响到用户对小程序的第一印象。小程序首页的设置在小程序管理员后台->页面设置->首页&#xff0c;下图是小程序首页默认的设置。 下图&#xff0c;是小程序首页的具体表现形式。下面具体解释小程序首页各个设置项…...

npm安装依赖报错npm ERR! code ENOTFOUND npm ERR! errno ENOTFOUND、npm run dev报错记录

npm安装依赖报错npm ERR! code ENOTFOUND npm ERR! errno ENOTFOUND_得我所得&#xff0c;爱我所爱的博客-CSDN博客npm安装依赖报错今天在学习webpack的时候&#xff0c;在使用npm install来安装一个局部的webpack时候&#xff0c;报出一下错误:npm ERR! code ENOTFOUNDnpm ERR…...

堆叠注入([强网杯 2019]随便注1)

详解&#xff1a; 堆叠注入&#xff08;Stack Injection&#xff09;是一种计算机安全概念&#xff0c;涉及攻击者向程序的堆栈内存中插入恶意代码&#xff0c;以便在程序执行期间执行非预期的操作。 堆栈注入攻击通常利用程序在处理函数调用时使用的堆栈机制。当一个函数被调…...

零基础Linux_15(基础IO_文件)软硬链接+动静态库详解

目录 1. 软硬链接 1.1 创建软链接 1.2 创建硬链接 1.3 硬链接数和unlink 2. 动静态库 2.1 制作静态库 2.2 查看和打包静态库 2.3 使用静态库 2.3.1 安装在默认搜索路径 2.3.2 告知路径库路径库名 2.4 制作动态库 2.5 使用动态库 2.5.1 安装在默认搜索路径 2.5.2 …...

计算机毕业设计选什么题目好?springboot 健身房管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…...

两台linux 之间传输文件 (详细+bash脚本)

两台linux设备文件直接传输&#xff0c;有很多应用场景 一、可能的方案 &#xff08;一&#xff09;先下载再上传 从linux通过ssh下载到windows下&#xff0c;然后再通过ssh上传到另一台linux。 1.优点&#xff1a;简单 2.缺点&#xff1a;效率低&#xff0c;需要额外的设备…...

嵌入式系统开发【深入浅出】 EXTI 与 NVIC

目录 CPU 感知外部事件变化的三种方式 中断分三个级别 中断控制器 STM32 的中断和异常 NVIC 中断控制器 NVIC 结构体成员 抢占优先级和响应优先级 简单配置NVIC中断控制器 EXTI 外部中断【中断源级】​ STM32系列微控制器实际上最多有23根外部中断线&#xff08;EXT…...

【Kali】简单记录

文章目录 信息收集DNS记录分析hostdigdnsenum 路由信息tcptraceroutetctrace 搜索引擎 目标识别arpingfping 识别操作系统p0f 服务枚举端口扫描nmap识别VPN服务器 漏洞映射exploitdbmsfconsole 提权arpspoofDsniff 信息收集 DNS记录分析 host host www.example.com host -a …...

【数据结构】:队列的实现

队列 队列的概念及结构 队列&#xff1a;只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表&#xff0c;队列具有先进先出 FIFO(First In First Out) 入队列&#xff1a;进行插入操作的一端称为队尾 出队列&#xff1a;进行删除操作的一端称为队…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

P3 QT项目----记事本(3.8)

3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/

使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题&#xff1a;docker pull 失败 网络不同&#xff0c;需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...