大数据-玩转数据-Flink窗口函数
一、Flink窗口函数
前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。
二、ReduceFunction(增量聚合函数)
输入和输出必须一致
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}
运行结果
三、AggregateFunction(增量聚合函数)
输入和输出可以不一致
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}
运行结果
四、ProcessWindowFunction(全窗口函数)
上面例子里已经用到
new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " key: " + key + " result: " + result);}}
相关文章:

大数据-玩转数据-Flink窗口函数
一、Flink窗口函数 前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. Reduc…...

Docker网络-探索容器网络如何相互通信
当今世界,企业热衷于容器化,这需要强大的网络技能来正确配置容器架构,因此引入了 Docker Networking 的概念。Docker 是一种容器化平台,允许您在独立、轻量级的容器中运行应用程序和服务。Docker 提供了一套强大的网络功能&#x…...

ESP32-CAM模块Arduino环境搭建测试
ESP32-CAM模块Arduino环境搭建测试 一.ESP32OV2640摄像头模块CameraWebServer视频查看 二.测试ESP32-CAM(后续称cam模块)代码是否上传执行成功测试 const int led0 12; const int led1 13;void setup() {// put your setup code here, to run once:pinMode(led0, OUTPUT);pin…...

webassembly001 webassembly简述
WebAssembly 官方地址:https://webassembly.org/相关历史 https://en.wikipedia.org/wiki/WebAssembly https://brendaneich.com/2015/06/from-asm-js-to-webassembly/WebAssembly(缩写为Wasm)是一种基于堆栈的虚拟机的二进制指令格式。Wasm 被设计为编…...

vue 使用C-Lodop打印小票
先从官网下载js文件 https://www.lodop.net/LodopDemo.html 打开安装程序,一直下一步既可,我这边已经安装过就不演示了。 // 引入 import { getLodop } from /utils/CLodopfuncs.js;// 使用 let LODOP getLodop()let Count LODOP.GET_PRINTER_COUNT…...

【C++进阶(二)】STL大法--vector的深度剖析以及模拟实现
💓博主CSDN主页:杭电码农-NEO💓 ⏩专栏分类:C从入门到精通⏪ 🚚代码仓库:NEO的学习日记🚚 🌹关注我🫵带你学习C 🔝🔝 vector 1. 前言2. 熟悉vector的接口函数2.1 vec…...

1. import pandas as pd 导入库
【目录】 文章目录 1. import pandas as pd 导入库1. pandas库的概念2. 导入pandas库2.1 常规导入2.2 别名导入 3. 别名的作用4. 课堂练习 【正文】 1. import pandas as pd 导入库 【学习时间】 10分钟 1. pandas库的概念 pandas:熊猫panda的复数, …...

DMK5框选变量之后不显示其他位置的此变量高亮
使用软件MDK5.3.8版本 如下在2的位置选择之后,其他同样的变量没有高亮,因为1的原因折叠了; 展开折叠之后就可以了...
0061__Appium
Appium Documentation - Appium Documentation APP自动化测试(3)-Appium Inspector介绍_六天测试工程师的博客-CSDN博客 https://github.com/appium/appium-inspector https://github.com/appium/appium-desktop https://github.com/appium/appium...
【DEVOPS】需求跟踪管理全面落地
0. 目录 1. 现状/背景2. 需求管理存在的问题3. 改进思路/措施4. 所谓"禅道尚未普及/铺开"5. 最后6. 相关 1. 现状/背景 近期又被领导问到"如何对项目过程中的需求进行量化和跟踪管理"。这真是一个狗皮膏药似的问题,反反复复地,隔一…...
算法修炼Day57|647. 回文子串 ● 516.最长回文子序列
LeetCode:647. 回文子串 647. 回文子串 - 力扣(LeetCode) 1.思路 暴力思路见对应代码… 动规解法:画图推导动规公式,当前状态由左侧和左下角推出,所以首层应该采用倒序的方式,内部采用正序的方式。 2.…...

呈现数据的精妙之道:选择合适的可视化方法
在当今数据时代,数据可视化已成为理解和传达信息的重要手段。然而,选择适合的数据可视化方法对于有效地呈现数据至关重要。不同的数据和目标需要不同的可视化方法,下面我们将探讨如何选择最佳的数据可视化方法来呈现数据。 1. 理解数据类型&a…...

数据结构(Java实现)-java对象的比较
元素的比较 基本类型的比较 在Java中,基本类型的对象可以直接比较大小。 对象比较的问题 Java中引用类型的变量不能直接按照 > 或者 < 方式进行比较 默认情况下调用的就是equal方法,但是该方法的比较规则是:没有比较引用变量引用对象的…...

Wolfram Mathematica 13 for Mac 数学计算工具
Wolfram Mathematica for Mac是一款功能强大、划时代的科学计算软件。它结合了数字和符号计算引擎、图形系统、编程语言、文本系统以及与其他应用程序的高级连接,在许多功能方面处于世界领先地位,截至2009年,它是使用最广泛的数学软件之一。人…...

系统架构设计高级技能 · Web架构
现在的一切都是为将来的梦想编织翅膀,让梦想在现实中展翅高飞。 Now everything is for the future of dream weaving wings, let the dream fly in reality. 点击进入系列文章目录 系统架构设计高级技能 Web架构 一、Web架构介绍1.1 Web架构涉及技术1.2 单台服务…...

再写CentOS7升级OpenSSL-1.0.1U
本文在CentOS7.4以及TencentOS 2.4上测试通过。 原系统自带OpenSSL 1.0.2k-fips。 编译安装方法跟之前的没啥区别。 从官网下载1.0.1u版https://www.openssl.org/source/ 使用tar解包 tar xfz openssl-1.0.1u.tar.gz 依次执行如下: cd openssl-1.0.1u ./con…...

HBase--技术文档--基本概念--《快速扫盲》
官网 Apache HBase – Apache HBase™ Home 阿里云hbase 云数据库HBase_大数据存储_订单风控_数据库-阿里云 云数据库 HBase-阿里云帮助中心 基本概念 HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库。它基于Hadoop,采用列式存储方式,可…...

如何利用SFTP协议远程实现更安全的文件传输 ——【内网穿透】
🎬 鸽芷咕:个人主页 🔥 个人专栏: 《高效编程技巧》《cpolar》 ⛺️生活的理想,就是为了理想的生活! 文章目录 1. 安装openSSH1.1 安装SSH1.2 启动ssh 2. 安装cpolar2.1 配置termux服务 3. 远程SFTP连接配置3.1 查看生成的随机公…...

深度学习8:详解生成对抗网络原理
目录 大纲 生成随机变量 可以伪随机生成均匀随机变量 随机变量表示为操作或过程的结果 逆变换方法 生成模型 我们试图生成非常复杂的随机变量…… …所以让我们使用神经网络的变换方法作为函数! 生成匹配网络 培养生成模型 比较基于样本的两个概率分布 …...

sql入门-多表查询
案例涉及表 ----------------------------------建表语句之前翻看之前博客文章 多表查询 -- 学生表 create table studen ( id int primary key auto_increment comment id, name varchar(50) comment 姓名, no varchar(10) comment 学号 ) comment 学生表; insert…...
逻辑回归:给不确定性划界的分类大师
想象你是一名医生。面对患者的检查报告(肿瘤大小、血液指标),你需要做出一个**决定性判断**:恶性还是良性?这种“非黑即白”的抉择,正是**逻辑回归(Logistic Regression)** 的战场&a…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...

vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...

如何理解 IP 数据报中的 TTL?
目录 前言理解 前言 面试灵魂一问:说说对 IP 数据报中 TTL 的理解?我们都知道,IP 数据报由首部和数据两部分组成,首部又分为两部分:固定部分和可变部分,共占 20 字节,而即将讨论的 TTL 就位于首…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...