大数据-玩转数据-Flink窗口函数
一、Flink窗口函数
前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。
二、ReduceFunction(增量聚合函数)
输入和输出必须一致
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}
运行结果


三、AggregateFunction(增量聚合函数)
输入和输出可以不一致
package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}
运行结果


四、ProcessWindowFunction(全窗口函数)
上面例子里已经用到
new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + " " + endtime + " key: " + key + " result: " + result);}}
相关文章:
大数据-玩转数据-Flink窗口函数
一、Flink窗口函数 前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. Reduc…...
Docker网络-探索容器网络如何相互通信
当今世界,企业热衷于容器化,这需要强大的网络技能来正确配置容器架构,因此引入了 Docker Networking 的概念。Docker 是一种容器化平台,允许您在独立、轻量级的容器中运行应用程序和服务。Docker 提供了一套强大的网络功能&#x…...
ESP32-CAM模块Arduino环境搭建测试
ESP32-CAM模块Arduino环境搭建测试 一.ESP32OV2640摄像头模块CameraWebServer视频查看 二.测试ESP32-CAM(后续称cam模块)代码是否上传执行成功测试 const int led0 12; const int led1 13;void setup() {// put your setup code here, to run once:pinMode(led0, OUTPUT);pin…...
webassembly001 webassembly简述
WebAssembly 官方地址:https://webassembly.org/相关历史 https://en.wikipedia.org/wiki/WebAssembly https://brendaneich.com/2015/06/from-asm-js-to-webassembly/WebAssembly(缩写为Wasm)是一种基于堆栈的虚拟机的二进制指令格式。Wasm 被设计为编…...
vue 使用C-Lodop打印小票
先从官网下载js文件 https://www.lodop.net/LodopDemo.html 打开安装程序,一直下一步既可,我这边已经安装过就不演示了。 // 引入 import { getLodop } from /utils/CLodopfuncs.js;// 使用 let LODOP getLodop()let Count LODOP.GET_PRINTER_COUNT…...
【C++进阶(二)】STL大法--vector的深度剖析以及模拟实现
💓博主CSDN主页:杭电码农-NEO💓 ⏩专栏分类:C从入门到精通⏪ 🚚代码仓库:NEO的学习日记🚚 🌹关注我🫵带你学习C 🔝🔝 vector 1. 前言2. 熟悉vector的接口函数2.1 vec…...
1. import pandas as pd 导入库
【目录】 文章目录 1. import pandas as pd 导入库1. pandas库的概念2. 导入pandas库2.1 常规导入2.2 别名导入 3. 别名的作用4. 课堂练习 【正文】 1. import pandas as pd 导入库 【学习时间】 10分钟 1. pandas库的概念 pandas:熊猫panda的复数, …...
DMK5框选变量之后不显示其他位置的此变量高亮
使用软件MDK5.3.8版本 如下在2的位置选择之后,其他同样的变量没有高亮,因为1的原因折叠了; 展开折叠之后就可以了...
0061__Appium
Appium Documentation - Appium Documentation APP自动化测试(3)-Appium Inspector介绍_六天测试工程师的博客-CSDN博客 https://github.com/appium/appium-inspector https://github.com/appium/appium-desktop https://github.com/appium/appium...
【DEVOPS】需求跟踪管理全面落地
0. 目录 1. 现状/背景2. 需求管理存在的问题3. 改进思路/措施4. 所谓"禅道尚未普及/铺开"5. 最后6. 相关 1. 现状/背景 近期又被领导问到"如何对项目过程中的需求进行量化和跟踪管理"。这真是一个狗皮膏药似的问题,反反复复地,隔一…...
算法修炼Day57|647. 回文子串 ● 516.最长回文子序列
LeetCode:647. 回文子串 647. 回文子串 - 力扣(LeetCode) 1.思路 暴力思路见对应代码… 动规解法:画图推导动规公式,当前状态由左侧和左下角推出,所以首层应该采用倒序的方式,内部采用正序的方式。 2.…...
呈现数据的精妙之道:选择合适的可视化方法
在当今数据时代,数据可视化已成为理解和传达信息的重要手段。然而,选择适合的数据可视化方法对于有效地呈现数据至关重要。不同的数据和目标需要不同的可视化方法,下面我们将探讨如何选择最佳的数据可视化方法来呈现数据。 1. 理解数据类型&a…...
数据结构(Java实现)-java对象的比较
元素的比较 基本类型的比较 在Java中,基本类型的对象可以直接比较大小。 对象比较的问题 Java中引用类型的变量不能直接按照 > 或者 < 方式进行比较 默认情况下调用的就是equal方法,但是该方法的比较规则是:没有比较引用变量引用对象的…...
Wolfram Mathematica 13 for Mac 数学计算工具
Wolfram Mathematica for Mac是一款功能强大、划时代的科学计算软件。它结合了数字和符号计算引擎、图形系统、编程语言、文本系统以及与其他应用程序的高级连接,在许多功能方面处于世界领先地位,截至2009年,它是使用最广泛的数学软件之一。人…...
系统架构设计高级技能 · Web架构
现在的一切都是为将来的梦想编织翅膀,让梦想在现实中展翅高飞。 Now everything is for the future of dream weaving wings, let the dream fly in reality. 点击进入系列文章目录 系统架构设计高级技能 Web架构 一、Web架构介绍1.1 Web架构涉及技术1.2 单台服务…...
再写CentOS7升级OpenSSL-1.0.1U
本文在CentOS7.4以及TencentOS 2.4上测试通过。 原系统自带OpenSSL 1.0.2k-fips。 编译安装方法跟之前的没啥区别。 从官网下载1.0.1u版https://www.openssl.org/source/ 使用tar解包 tar xfz openssl-1.0.1u.tar.gz 依次执行如下: cd openssl-1.0.1u ./con…...
HBase--技术文档--基本概念--《快速扫盲》
官网 Apache HBase – Apache HBase™ Home 阿里云hbase 云数据库HBase_大数据存储_订单风控_数据库-阿里云 云数据库 HBase-阿里云帮助中心 基本概念 HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库。它基于Hadoop,采用列式存储方式,可…...
如何利用SFTP协议远程实现更安全的文件传输 ——【内网穿透】
🎬 鸽芷咕:个人主页 🔥 个人专栏: 《高效编程技巧》《cpolar》 ⛺️生活的理想,就是为了理想的生活! 文章目录 1. 安装openSSH1.1 安装SSH1.2 启动ssh 2. 安装cpolar2.1 配置termux服务 3. 远程SFTP连接配置3.1 查看生成的随机公…...
深度学习8:详解生成对抗网络原理
目录 大纲 生成随机变量 可以伪随机生成均匀随机变量 随机变量表示为操作或过程的结果 逆变换方法 生成模型 我们试图生成非常复杂的随机变量…… …所以让我们使用神经网络的变换方法作为函数! 生成匹配网络 培养生成模型 比较基于样本的两个概率分布 …...
sql入门-多表查询
案例涉及表 ----------------------------------建表语句之前翻看之前博客文章 多表查询 -- 学生表 create table studen ( id int primary key auto_increment comment id, name varchar(50) comment 姓名, no varchar(10) comment 学号 ) comment 学生表; insert…...
大模型压缩实战:量化、剪枝与蒸馏技术解析与AngelSlim应用
1. 项目概述:从“大”到“小”的模型压缩革命最近在模型部署和推理优化的圈子里,Tencent/AngelSlim 这个项目被讨论得挺多。简单来说,它不是一个全新的模型,而是一套由腾讯开源的、专门用于大语言模型(LLM)…...
告别TwinCAT:手把手教你用LinuxCNC+IGH搭建开源EtherCAT运动控制平台
告别商业软件束缚:LinuxCNCIGH开源运动控制平台实战指南 在工业自动化和运动控制领域,商业软件长期占据主导地位,但高昂的授权费用和封闭的生态系统让许多工程师和创客望而却步。开源运动控制平台的出现打破了这一局面,为追求灵活…...
收藏!小白程序员必备:2026年AI大模型就业新机遇与学习路线指南
根据世界经济论坛报告,到2030年科技、数据、AI等领域将创造1.7亿工作机会,同时淘汰9200万个岗位。AI市场规模预计到2034年达36804.7亿美元,年复合增长率19.20%。中国AI人才需求将远超供应。文章介绍了AI运营/AIGC内容创作者、算法工程师、大模…...
乔布斯产品哲学对硬件工程师的启示:从参数到体验的转变
1. 项目概述:一次对乔布斯遗产的技术性致敬2011年10月6日,当史蒂夫乔布斯逝世的消息传来,整个科技界陷入了一种复杂的情绪。作为一名长期在电子工程与消费电子领域工作的人,我的感受尤为深刻。那天,我和我的同事们&…...
全网没人敢说,关于中小企业AI营销一体机到底是卖硬件还是卖落地闭环的屎盆子,我先扣为敬。
[实话] 干这行十年,我拍着桌子定过一条死规矩。三个不做:不做只卖盒子不管结果的,不做签完合同就消失的,不做让你自己研究三个月才能用的。[实话] 现在的“AI营销一体机”,90%都是在收智商税。我见过太多老板ÿ…...
Fawkes踩过的坑以及如何解决非常详细
首先我是用anaconda创建的一个虚拟环境 fawkes_env后续的所有操作都是在该环境中实现 不使用anaconda 可直接看第一步 坑:直接用 conda create -n fawkes python3.9 后,pip install -e . 可能因为 TensorFlow 版本过新导致不兼容(Keras 2.3…...
AI智能体审批系统设计:从规则到价值网络的动态决策引擎
1. 项目概述:为什么AI需要“举手提问”?在AI智能体(Agent)日益深入业务流程自动化的今天,一个核心的、却常被忽视的问题浮出水面:这个拥有一定自主决策能力的“数字员工”,在什么情况下应该停下…...
如何解决QQ音乐下载的歌曲在其他设备上无法播放的问题
如何解决QQ音乐下载的歌曲在其他设备上无法播放的问题 【免费下载链接】qmcflac2mp3 直接将qmcflac文件转换成mp3文件,突破QQ音乐的格式限制 项目地址: https://gitcode.com/gh_mirrors/qm/qmcflac2mp3 你是否曾经在QQ音乐下载了喜欢的歌曲,却发现…...
基于双链笔记构建个人消费知识系统:从记录到生活策展
1. 项目概述与核心价值看到“SimonsTang/xiaofei-liberal-arts”这个项目标题,我的第一反应是,这应该是一个关于“消费”与“文科”交叉领域的知识库或工具集。作为一名长期关注效率工具和知识管理的从业者,我深知在信息爆炸的时代࿰…...
图神经网络与图Transformer在计算机视觉中的原理、应用与实战
1. 引言:当视觉任务遇上“关系”思维在计算机视觉领域,我们早已习惯了卷积神经网络(CNN)的统治地位。从ImageNet的图像分类,到Mask R-CNN的实例分割,CNN凭借其强大的局部特征提取能力,在像素网格…...
