当前位置: 首页 > news >正文

Flink之窗口聚合算子

1.窗口聚合算子

在Flink中窗口聚合算子主要分类两类

  • 滚动聚合算子(增量聚合)
  • 全窗口聚合算子(全量聚合)
1.1 滚动聚合算子

滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下:

  • aggregate
  • max
  • maxBy
  • min
  • minBy
  • reduce
  • sum

这里以aggregate算子作为示例

// ... 
// 每10s统计一次每个用户最近30s的行为条数
SingleOutputStreamOperator<Tuple2<String, Integer>> result = watermarked.keyBy(userEvent -> userEvent.getUId()).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))) // 参数1:窗口长度 参数2:滑动步长即计算频率.aggregate(new AggregateFunction<UserEvent2, Tuple2<String, Integer>, Tuple2<String, Integer>>() {// 这里给一个初始值@Overridepublic Tuple2<String, Integer> createAccumulator() {return Tuple2.of("", 0);}// 在累加器中统计每个用户行为条数(来一条更新一次)@Overridepublic Tuple2<String, Integer> add(UserEvent2 value, Tuple2<String, Integer> accumulator) {Tuple2<String, Integer> result = Tuple2.of(value.getUId() + "-" + value.getName(), accumulator.f1 + 1);return result;}// 将累加器中的更新结果给到getResult方法,输出@Overridepublic Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {return accumulator;}// 这个方法在流式计算中可以不用实现,在上下游数据进行合并时需要用到,以spark为例,上有map和下游reduce的计算结果需要合并时需要实现这个方法@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {Tuple2<String, Integer> merged = Tuple2.of(a.f0, a.f1 + b.f1);return merged;}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

1.2 全窗口聚合算子

全窗口聚合算子会将数据记录在状态容器中,当窗口触发时会将整个窗口中的数据交给聚合函数,根据具体逻辑将这些数据进行计算,常用算子如下:

  • apply
  • process

这里以apply算子为例

// ... 
// 每10s统计一次最近30s每个用户行为发生事件最大两条数据
SingleOutputStreamOperator<UserEvent2> userEventTimeTop2 = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))// 泛型1: 数据数据类型 泛型2: 输出数据类型 泛型3: key类型 泛型4: 窗口类型.apply(new WindowFunction<UserEvent2, UserEvent2, String, TimeWindow>() {/***@Param s 本次传入的key*@Param window 本次传入窗口的各种元信息*@Param input 本次输入的所有数据*@Param out 输出数据**/@Overridepublic void apply(String s, TimeWindow window, Iterable<UserEvent2> input, Collector<UserEvent2> out) throws Exception {// 创建集合接收迭代器中的数据ArrayList<UserEvent2> userEvent2List = new ArrayList<>();// 遍历迭代器,也就是输入数据for (UserEvent2 userEvent2 : input) {// 将数据添加到集合中userEvent2List.add(userEvent2);}// 将集合中的数据根据用户行为发生事件进行排序Collections.sort(userEvent2List, new Comparator<UserEvent2>() {@Overridepublic int compare(UserEvent2 o1, UserEvent2 o2) {// 倒序排序return Integer.parseInt(o2.getTime()) - Integer.parseInt(o1.getTime());}});// 将每个用户行为发生时间最大的两条数据输出for (int i = 0; i < Math.min(userEvent2List.size(), 2); i++) {out.collect(userEvent2List.get(i));}}});
// ...

只展示部分代码,冗余代码已省略.
图解如下:
image-20231012101658054

相关文章:

Flink之窗口聚合算子

1.窗口聚合算子 在Flink中窗口聚合算子主要分类两类 滚动聚合算子(增量聚合)全窗口聚合算子(全量聚合) 1.1 滚动聚合算子 滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下: aggregatemaxmaxBy…...

K8S:Rancher管理 Kubernetes 集群

文章目录 一.Rancher 简介1.Rancher概念2.Rancher 和 k8s 的区别 二.Rancher 安装及配置1.安装 rancher2.登录 Rancher 平台3.Rancher 管理已存在的 k8s 集群4.Rancher 部署监控系统5.使用 Rancher 仪表盘管理 k8s 集群 三.拓展1.Rancher和kubesphere相比较2.K3S和K8S相比较 一…...

后台运行python程序并查看运行的python 进程

nohup python -u Job.py > log.log 2>&1 &说明&#xff1a; 末尾的“&”&#xff1a;表示后台运行程序 “nohup” &#xff1a;保证程序不被挂起 “python”&#xff1a;是执行python代码的命令 “-u”&#xff1a;表示不启用缓存&#xff0c;实时输出打印…...

树莓派部署.net core网站程序

1、发布你的项目 使用mobaxterm上传程序 回到mobaxterm,f进入目录输入&#xff1a; cd webpublish 运行程序&#xff1a;dotnet WebApplication1.dll 访问地址为&#xff1a;http://localhost:5000,尝访问如下&#xff1a; 已经出现 返回的json&#xff0c;证明是可以访问的…...

淘宝商品评论数据接口,淘宝商品评论API接口

淘宝商品评论数据接口可以通过淘宝开放平台API获取。 通过构建合理的请求URL&#xff0c;可以向淘宝服务器发起HTTP请求&#xff0c;获取商品评论数据。接口返回的数据一般为JSON格式&#xff0c;包含了商品的各种评价信息。获取到商品评论数据后&#xff0c;可以对其进行处理…...

455. 分发饼干

假设你是一位很棒的家长&#xff0c;想要给你的孩子们一些小饼干。但是&#xff0c;每个孩子最多只能给一块饼干。 对每个孩子 i&#xff0c;都有一个胃口值 g[i]&#xff0c;这是能让孩子们满足胃口的饼干的最小尺寸&#xff1b;并且每块饼干 j&#xff0c;都有一个尺寸 s[j] …...

GEE:数据预处理的细节(处理顺序。比如, select() 和 filter() 要优先于 map())

作者:CSDN @ _养乐多_ 大家在数据预处理的时候,是不是随意进行处理,并没有考虑 Google Earth Engine(GEE)性能的问题?比如选择数据集的时候,先执行map函数,再按时间选择数据?不同的处理顺序会导致不同的计算成本。 因此,本文将探讨如何在 GEE 中筛选和选择数据集合…...

【AHK】任务栏调节音量/边缘滚动调节/边缘触发

通过ahk实现类似mouseinc的边缘滚动调节音量的功能&#xff0c;有两个思路。 任务栏调节音量 #If MouseIsOver("ahk_class Shell_TrayWnd") WheelUp::Send {Volume_Up} WheelDown::Send {Volume_Down} return #IfMouseIsOver(WinTitle) {MouseGetPos,,, Winreturn …...

Chrome插件 — ReRes

ReRes插件是一款可以帮助Web开发人员进行开发和测试的Chrome浏览器扩展。它可以模拟网页请求&#xff0c;并返回指定的响应。 该插件可以用于多种情况&#xff0c;例如&#xff1a; 测试网站功能&#xff0c;调试程序等&#xff1b;本地开发Web应用时&#xff0c;模拟远程API…...

前端面试基础面试题——9

1.js 延迟加载的方式有哪些&#xff1f; 2.js同步和异步的区别&#xff1f; 3.什么是浏览器的同源政策&#xff1f; 4.介绍一下 js 的节流与防抖&#xff1f; 5.js 中的深浅拷贝实现&#xff1f; 6.Js 动画与 CSS 动画区别及相应实现 7.观察者模式和发布订阅模式有什么不同…...

tomcat 问题

一、start up.bat 闪退 在命令窗口run 看看是缺少了哪个环境变量 二、控制台输出乱码 logging.properties 底部添加 java.util.logging.ConsoleHandler.encoding GBK 三、缓存不足 context.xml配置 <Resources cachingAllowed"false" cacheMaxSize"100…...

小程序首页如何进行装修设置

小程序首页是展示给用户的第一屏&#xff0c;它的装修直接影响到用户对小程序的第一印象。小程序首页的设置在小程序管理员后台->页面设置->首页&#xff0c;下图是小程序首页默认的设置。 下图&#xff0c;是小程序首页的具体表现形式。下面具体解释小程序首页各个设置项…...

npm安装依赖报错npm ERR! code ENOTFOUND npm ERR! errno ENOTFOUND、npm run dev报错记录

npm安装依赖报错npm ERR! code ENOTFOUND npm ERR! errno ENOTFOUND_得我所得&#xff0c;爱我所爱的博客-CSDN博客npm安装依赖报错今天在学习webpack的时候&#xff0c;在使用npm install来安装一个局部的webpack时候&#xff0c;报出一下错误:npm ERR! code ENOTFOUNDnpm ERR…...

堆叠注入([强网杯 2019]随便注1)

详解&#xff1a; 堆叠注入&#xff08;Stack Injection&#xff09;是一种计算机安全概念&#xff0c;涉及攻击者向程序的堆栈内存中插入恶意代码&#xff0c;以便在程序执行期间执行非预期的操作。 堆栈注入攻击通常利用程序在处理函数调用时使用的堆栈机制。当一个函数被调…...

零基础Linux_15(基础IO_文件)软硬链接+动静态库详解

目录 1. 软硬链接 1.1 创建软链接 1.2 创建硬链接 1.3 硬链接数和unlink 2. 动静态库 2.1 制作静态库 2.2 查看和打包静态库 2.3 使用静态库 2.3.1 安装在默认搜索路径 2.3.2 告知路径库路径库名 2.4 制作动态库 2.5 使用动态库 2.5.1 安装在默认搜索路径 2.5.2 …...

计算机毕业设计选什么题目好?springboot 健身房管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…...

两台linux 之间传输文件 (详细+bash脚本)

两台linux设备文件直接传输&#xff0c;有很多应用场景 一、可能的方案 &#xff08;一&#xff09;先下载再上传 从linux通过ssh下载到windows下&#xff0c;然后再通过ssh上传到另一台linux。 1.优点&#xff1a;简单 2.缺点&#xff1a;效率低&#xff0c;需要额外的设备…...

嵌入式系统开发【深入浅出】 EXTI 与 NVIC

目录 CPU 感知外部事件变化的三种方式 中断分三个级别 中断控制器 STM32 的中断和异常 NVIC 中断控制器 NVIC 结构体成员 抢占优先级和响应优先级 简单配置NVIC中断控制器 EXTI 外部中断【中断源级】​ STM32系列微控制器实际上最多有23根外部中断线&#xff08;EXT…...

【Kali】简单记录

文章目录 信息收集DNS记录分析hostdigdnsenum 路由信息tcptraceroutetctrace 搜索引擎 目标识别arpingfping 识别操作系统p0f 服务枚举端口扫描nmap识别VPN服务器 漏洞映射exploitdbmsfconsole 提权arpspoofDsniff 信息收集 DNS记录分析 host host www.example.com host -a …...

【数据结构】:队列的实现

队列 队列的概念及结构 队列&#xff1a;只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表&#xff0c;队列具有先进先出 FIFO(First In First Out) 入队列&#xff1a;进行插入操作的一端称为队尾 出队列&#xff1a;进行删除操作的一端称为队…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上&#xff0c;开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识&#xff0c;在 vs 2017 平台上&#xff0c;进行 ASP.NET 应用程序和简易网站的开发&#xff1b;初步熟悉开发一…...

AtCoder 第409​场初级竞赛 A~E题解

A Conflict 【题目链接】 原题链接&#xff1a;A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串&#xff0c;只有在同时为 o 时输出 Yes 并结束程序&#xff0c;否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...

STM32F4基本定时器使用和原理详解

STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?

Otsu 是一种自动阈值化方法&#xff0c;用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理&#xff0c;能够自动确定一个阈值&#xff0c;将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...