当前位置: 首页 > news >正文

flink学习(8)——窗口函数

增量聚合函数

——指窗口每进入一条数据就计算一次

例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27

 reduce

aggregate(aggregateFunction)

package com.bigdata.day04;public class _04_agg函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);// 此时我要获取每个班级的平均成绩// 输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)// IN——Tuple3<String, String, Long>// ACC——Tuple3<String, Integer,Long> 第一个是班级(key)第二个是数量,第三个是总的成绩// OUT —— Tuple2<String,Double> 第一个是班级 第二个是平均成绩dataStreamSource.countWindowAll(3).aggregate(new AggregateFunction<Tuple3<String, String, Long>, Tuple3<String, Integer,Long>, Tuple2<String,Double>>() {// 初始化一个 累加器@Overridepublic Tuple3<String, Integer, Long> createAccumulator() {return Tuple3.of(null,0,0L);}// 累加器和输入的值进行累加// Tuple3<String, String, Long> value 第一个是传入的值// Tuple3<String, Integer, Long> accumulator 第二个是累加器的值@Overridepublic Tuple3<String, Integer, Long> add(Tuple3<String, String, Long> value, Tuple3<String, Integer, Long> accumulator) {return Tuple3.of(value.f0,accumulator.f1+1,accumulator.f2+value.f2);}// 获取结果——在不同节点的结果进行汇总后实现@Overridepublic Tuple2<String, Double> getResult(Tuple3<String, Integer, Long> accumulator) {return Tuple2.of(accumulator.f0, (double) accumulator.f2 / accumulator.f1);}// 由于flink是分布式,所以在别的节点也会进行累加 ,该方法是不同节点的结果进行汇总// 即累加器之间的累加@Overridepublic Tuple3<String, Integer, Long> merge(Tuple3<String, Integer, Long> a, Tuple3<String, Integer, Long> b) {return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

sum()

min()

max()

 全量聚合函数

指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)

全量聚合函数比较简单,但是会将所有的数据存放在内存中,因此会占用大量的内存空间

apply 

package com.bigdata.day04;public class _05_app函数 {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L),};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);//2. source-加载数据dataStreamSource.countWindowAll(3).apply(new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>() {@Overridepublic void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) throws Exception {Long sum = 0L;int length = 0;String key = null;for (Tuple3<String, String, Long> value : values) {sum += value.f2;length++;key = value.f0;}out.collect(Tuple2.of(key,(double) sum/length));}}).print();env.execute();}
}// 总结// 接口
new AllWindowFunction<Tuple3<String,String,Long>, Tuple2<String,Double>, GlobalWindow>()
GlobalWindow 窗口对象  Tuple3<String,String,Long> 传入的值  Tuple2<String,Double> 结果// 重写的方法
public void apply(GlobalWindow window, Iterable<Tuple3<String, String, Long>> values, Collector<Tuple2<String,Double>> out) Iterable<Tuple3<String, String, Long>> values 传入值的迭代器 进行遍历
Collector<Tuple2<String,Double>> out 收集器 调用collect方法收集即可
window 窗口对象//使用窗口对象我们可以拿到窗口的起始时间long start = window.getStart();long end = window.getEnd();

process

使用方式一:在connect合流之后对两个类型不同的流进行处理

使用方式二:在分流的时候使用,可以通过context.output方法对每个数据添加一个标签

 使用方式一
new CoProcessFunction<Long, String, String>()  
// 第一个泛型是第一个流的类型 第二个泛型是第二个流的类型  第三个泛型是合并后流的类型@Overridepublic void processElement1(Long l, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {// Long 是数据类型 结果使用collector中的collect 收集collector.collect(String.valueOf(l));}@Overridepublic void processElement2(String s, CoProcessFunction<Long, String, String>.Context context, Collector<String> collector) throws Exception {//  String 是数据类型 结果使用collector中的collect 收集collector.collect(s);}
使用方式二
此时使用的是context中的context.output(odd, element); 方法
odd 是标签
element 是元素OutputTag<Long> odd = new OutputTag<>("奇数",TypeInformation.of(Long.class));
OutputTag<Long> even = new OutputTag<>("偶数", TypeInformation.of(Long.class));

相关文章:

flink学习(8)——窗口函数

增量聚合函数 ——指窗口每进入一条数据就计算一次 例如&#xff1a;要计算数字之和&#xff0c;进去一个12 计算结果为20&#xff0c; 再进入一个7 ——结果为27 reduce aggregate(aggregateFunction) package com.bigdata.day04;public class _04_agg函数 {public static …...

「实战应用」如何用图表控件LightningChart .NET实现散点图?(一)

LightningChart .NET完全由GPU加速&#xff0c;并且性能经过优化&#xff0c;可用于实时显示海量数据-超过10亿个数据点。 LightningChart包括广泛的2D&#xff0c;高级3D&#xff0c;Polar&#xff0c;Smith&#xff0c;3D饼/甜甜圈&#xff0c;地理地图和GIS图表以及适用于科…...

鸿蒙Native使用Demo

DevecoStudio使用Native 今天,给大家带来的是关于DevecoStudio中使用Native进行开发 个人拙见:为什么要使用Native?无论是JS还是TS在复杂的情况下运行速度,肯定不如直接操作内存的C/C的运行速度快,所以,会选择使用Native;这里面的过程是什么?通过映射转化,使用napi提供的接口…...

29.UE5蓝图的网络通讯,多人自定义事件,变量同步

3-9 蓝图的网络通讯、多人自定义事件、变量同步_哔哩哔哩_bilibili 目录 1.网络通讯 1.1玩家Pawn之间的同步 1.2事件同步 1.3UI同步 1.4组播 1.5变量同步 1.网络通讯 1.1玩家Pawn之间的同步 创建一个第三人称项目 将网络模式更改为监听服务器&#xff0c;即将房主作为…...

Scala—列表(可变ListBuffer、不可变List)用法详解

Scala集合概述-链接 大家可以点击上方链接&#xff0c;先对Scala的集合有一个整体的概念&#x1f923;&#x1f923;&#x1f923; 在 Scala 中&#xff0c;列表&#xff08;List&#xff09;分为不可变列表&#xff08;List&#xff09;和可变列表&#xff08;ListBuffer&…...

【论文复现】偏标记学习+图像分类

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀ 偏标记学习图像分类 概述算法原理核心逻辑效果演示使用方式参考文献 概述 本文复现论文 Progressive Identification of True Labels for Pa…...

C嘎嘎探索篇:栈与队列的交响:C++中的结构艺术

C嘎嘎探索篇&#xff1a;栈与队列的交响&#xff1a;C中的结构艺术 前言&#xff1a; 小编在之前刚完成了C中栈和队列&#xff08;stack和queue&#xff09;的讲解&#xff0c;忘记的小伙伴可以去我上一篇文章看一眼的&#xff0c;今天小编将会带领大家吹奏栈和队列的交响&am…...

AIGC-----AIGC在虚拟现实中的应用前景

AIGC在虚拟现实中的应用前景 引言 随着人工智能生成内容&#xff08;AIGC&#xff09;的快速发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术的应用也迎来了新的契机。AIGC与VR的结合为创造沉浸式体验带来了全新的可能性&#xff0c;这种组合不仅极大地降低了VR内容的…...

Django 路由层

1. 路由基础概念 URLconf (URL 配置)&#xff1a;Django 的路由系统是基于 urls.py 文件定义的。路径匹配&#xff1a;通过模式匹配 URL&#xff0c;并将请求传递给对应的视图处理函数。命名路由&#xff1a;每个路由可以定义一个名称&#xff0c;用于反向解析。 2. 基本路由配…...

《硬件架构的艺术》笔记(八):消抖技术

简介 在电子设备中两个金属触点随着触点的断开闭合便产生了多个信号&#xff0c;这就是抖动。 消抖是用来确保每一次断开或闭合触点时只有一个信号起作用的硬件设备或软件。&#xff08;就是每次断开闭合只对应一个操作&#xff09;。 抖动在某些模拟和逻辑电路中可能产生问…...

Spring 与 Spring MVC 与 Spring Boot三者之间的区别与联系

一.什么是Spring&#xff1f;它解决了什么问题&#xff1f; 1.1什么是Spring&#xff1f; Spring&#xff0c;一般指代的是Spring Framework 它是一个开源的应用程序框架&#xff0c;提供了一个简易的开发方式&#xff0c;通过这种开发方式&#xff0c;将避免那些可能致使代码…...

【算法】连通块问题(C/C++)

目录 连通块问题 解决思路 步骤&#xff1a; 初始化&#xff1a; DFS函数&#xff1a; 复杂度分析 代码实现&#xff08;C&#xff09; 题目链接&#xff1a;2060. 奶牛选美 - AcWing题库 解题思路&#xff1a; AC代码&#xff1a; 题目链接&#xff1a;687. 扫雷 -…...

如何选择黑白相机和彩色相机

我们在选择成像解决方案时黑白相机很容易被忽略&#xff0c;因为许多新相机提供鲜艳的颜色&#xff0c;鲜明的对比度和改进的弱光性能。然而&#xff0c;有许多应用&#xff0c;选择黑白相机将是更好的选择&#xff0c;因为他们产生更清晰的图像&#xff0c;更好的分辨率&#…...

Rust 力扣 - 740. 删除并获得点数

文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 首先对于这题我们如果将所有点数装入一个切片f中&#xff0c;该切片f中的i号下标表示所有点数为i的点数之和 那么这题就转换成了打家劫舍这道题&#xff0c;也就是求选择了切片中某个下标的元素后&#xff0c;该…...

OpenCV从入门到精通实战(七)——探索图像处理:自定义滤波与OpenCV卷积核

本文主要介绍如何使用Python和OpenCV库通过卷积操作来应用不同的图像滤波效果。主要分为几个步骤&#xff1a;图像的读取与处理、自定义卷积函数的实现、不同卷积核的应用&#xff0c;以及结果的展示。 卷积 在图像处理中&#xff0c;卷积是一种重要的操作&#xff0c;它通过…...

Docker核心概念总结

本文只是对 Docker 的概念做了较为详细的介绍&#xff0c;并不涉及一些像 Docker 环境的安装以及 Docker 的一些常见操作和命令。 容器介绍 Docker 是世界领先的软件容器平台&#xff0c;所以想要搞懂 Docker 的概念我们必须先从容器开始说起。 什么是容器? 先来看看容器较为…...

环形缓冲区

什么是环形缓冲区 环形缓冲区,也称为循环缓冲区或环形队列,是一种特殊的FIFO(先进先出)数据结构。它使用一块固定大小的内存空间来缓存数据,并通过两个指针(读指针和写指针)来管理数据的读写。当任意一个指针到达缓冲区末尾时,会自动回绕到缓冲区开头,形成一个"环"。…...

jQuery-Word-Export 使用记录及完整修正文件下载 jquery.wordexport.js

参考资料&#xff1a; jQuery-Word-Export导出word_jquery.wordexport.js下载-CSDN博客 近期又需要自己做个 Html2Doc 的解决方案&#xff0c;因为客户又不想要 Html2pdf 的下载了&#xff0c;当初还给我费尽心思解决Html转pdf时中文输出的问题&#xff08;html转pdf文件下载之…...

云服务器部署WebSocket项目

WebSocket是一种在单个TCP连接上进行全双工通信的协议&#xff0c;其设计的目的是在Web浏览器和Web服务器之间进行实时通信&#xff08;实时Web&#xff09; WebSocket协议的优点包括&#xff1a; 1. 更高效的网络利用率&#xff1a;与HTTP相比&#xff0c;WebSocket的握手只…...

C#+数据库 实现动态权限设置

将权限信息存储在数据库中&#xff0c;支持动态调整。根据用户所属的角色、特定的功能模块&#xff0c;动态加载权限” 1. 数据库设计 根据这种需求&#xff0c;可以通过以下表设计&#xff1a; 用户表 (Users)&#xff1a;存储用户信息。角色表 (Roles)&#xff1a;存储角色…...

Obsidian-Templates:卡片盒笔记法的终极模板库,构建你的第二大脑

Obsidian-Templates&#xff1a;卡片盒笔记法的终极模板库&#xff0c;构建你的第二大脑 【免费下载链接】Obsidian-Templates A repository containing templates and scripts for #Obsidian to support the #Zettelkasten method for note-taking. 项目地址: https://gitco…...

DeepSeek API Gateway安全防护体系(零信任网关落地指南)

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;DeepSeek API Gateway安全防护体系&#xff08;零信任网关落地指南&#xff09; DeepSeek API Gateway 作为面向大模型服务的统一入口&#xff0c;其安全架构严格遵循零信任原则——默认不信任任何网络…...

一键解决!VisualCppRedist AIO彻底告别Windows DLL错误困扰

一键解决&#xff01;VisualCppRedist AIO彻底告别Windows DLL错误困扰 【免费下载链接】vcredist AIO Repack for latest Microsoft Visual C Redistributable Runtimes 项目地址: https://gitcode.com/gh_mirrors/vc/vcredist 还记得那个令人抓狂的时刻吗&#xff1f;…...

如何快速掌握MRIcroGL:医学影像三维可视化的完整指南

如何快速掌握MRIcroGL&#xff1a;医学影像三维可视化的完整指南 【免费下载链接】MRIcroGL v1.2 GLSL volume rendering. Able to view NIfTI, DICOM, MGH, MHD, NRRD, AFNI format images. 项目地址: https://gitcode.com/gh_mirrors/mr/MRIcroGL MRIcroGL是一款功能强…...

别再百度了!工程师私藏的5个免费Datasheet查询网站(附使用技巧)

工程师必备&#xff1a;5个高效Datasheet查询工具与实战技巧 每次调试电路板时&#xff0c;最让人抓狂的莫过于找不到最新版的元器件规格书。上周我就遇到一个案例&#xff1a;某款MCU的旧版手册标注的引脚功能与实际芯片不符&#xff0c;导致整个通信模块无法工作。这种经历让…...

完整指南:3分钟解锁你的加密音乐文件

完整指南&#xff1a;3分钟解锁你的加密音乐文件 【免费下载链接】qmc-decoder Fastest & best convert qmc 2 mp3 | flac tools 项目地址: https://gitcode.com/gh_mirrors/qm/qmc-decoder 你是否曾经遇到过这样的情况&#xff1a;从音乐平台下载的歌曲只能在特定应…...

视频转文字软件免费的哪个最好用?2026年免费视频转文字软件对比指南

截至 2026 年&#xff0c;处理视频转文字需求的工具大致分为三类&#xff1a;桌面软件、在线网页版、微信小程序。不同类型的选择往往取决于你习惯的使用场景——有人倾向装软件后离线处理&#xff0c;有人则更喜欢打开就用不用卸载的方案。本文会重点拆解一款叫提词匠的微信小…...

3分钟快速上手TransNet V2:视频镜头检测的终极完整指南

3分钟快速上手TransNet V2&#xff1a;视频镜头检测的终极完整指南 【免费下载链接】TransNetV2 TransNet V2: Shot Boundary Detection Neural Network 项目地址: https://gitcode.com/gh_mirrors/tr/TransNetV2 在视频内容爆炸式增长的今天&#xff0c;如何快速准确地…...

当大模型认不出一个具体名字:MiniMax 回答失灵,问题未必只在模型本身

当大模型认不出一个具体名字&#xff1a;MiniMax 回答失灵&#xff0c;问题未必只在模型本身 围绕“为什么 MiniMax 大模型无法识别马嘉祺是谁”的一次能力拆解&#xff1a;真正暴露的&#xff0c;往往是知识覆盖、检索策略与风控边界的耦合问题 直接回答 先给结论。 如果 Mi…...

LaTeX2Word-Equation:三步实现网页公式到Word的精准转换

LaTeX2Word-Equation&#xff1a;三步实现网页公式到Word的精准转换 【免费下载链接】LaTeX2Word-Equation Copy LaTeX Equations as Word Equations, a Chrome Extension 项目地址: https://gitcode.com/gh_mirrors/la/LaTeX2Word-Equation 在学术写作和文档编辑过程中…...