当前位置: 首页 > news >正文

大数据-玩转数据-Flink窗口函数

一、Flink窗口函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。

二、ReduceFunction(增量聚合函数)

输入和输出必须一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}

运行结果
在这里插入图片描述
在这里插入图片描述

三、AggregateFunction(增量聚合函数)

输入和输出可以不一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}

运行结果
在这里插入图片描述
在这里插入图片描述

四、ProcessWindowFunction(全窗口函数)

上面例子里已经用到

new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}

相关文章:

大数据-玩转数据-Flink窗口函数

一、Flink窗口函数 前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. Reduc…...

Docker网络-探索容器网络如何相互通信

当今世界&#xff0c;企业热衷于容器化&#xff0c;这需要强大的网络技能来正确配置容器架构&#xff0c;因此引入了 Docker Networking 的概念。Docker 是一种容器化平台&#xff0c;允许您在独立、轻量级的容器中运行应用程序和服务。Docker 提供了一套强大的网络功能&#x…...

ESP32-CAM模块Arduino环境搭建测试

ESP32-CAM模块Arduino环境搭建测试 一.ESP32OV2640摄像头模块CameraWebServer视频查看 二.测试ESP32-CAM(后续称cam模块)代码是否上传执行成功测试 const int led0 12; const int led1 13;void setup() {// put your setup code here, to run once:pinMode(led0, OUTPUT);pin…...

webassembly001 webassembly简述

WebAssembly 官方地址:https://webassembly.org/相关历史 https://en.wikipedia.org/wiki/WebAssembly https://brendaneich.com/2015/06/from-asm-js-to-webassembly/WebAssembly&#xff08;缩写为Wasm&#xff09;是一种基于堆栈的虚拟机的二进制指令格式。Wasm 被设计为编…...

vue 使用C-Lodop打印小票

先从官网下载js文件 https://www.lodop.net/LodopDemo.html 打开安装程序&#xff0c;一直下一步既可&#xff0c;我这边已经安装过就不演示了。 // 引入 import { getLodop } from /utils/CLodopfuncs.js;// 使用 let LODOP getLodop()let Count LODOP.GET_PRINTER_COUNT…...

【C++进阶(二)】STL大法--vector的深度剖析以及模拟实现

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; vector 1. 前言2. 熟悉vector的接口函数2.1 vec…...

1. import pandas as pd 导入库

【目录】 文章目录 1. import pandas as pd 导入库1. pandas库的概念2. 导入pandas库2.1 常规导入2.2 别名导入 3. 别名的作用4. 课堂练习 【正文】 1. import pandas as pd 导入库 【学习时间】 10分钟 1. pandas库的概念 pandas&#xff1a;熊猫panda的复数&#xff0c; …...

DMK5框选变量之后不显示其他位置的此变量高亮

使用软件MDK5.3.8版本 如下在2的位置选择之后&#xff0c;其他同样的变量没有高亮&#xff0c;因为1的原因折叠了&#xff1b; 展开折叠之后就可以了...

0061__Appium

Appium Documentation - Appium Documentation APP自动化测试&#xff08;3&#xff09;-Appium Inspector介绍_六天测试工程师的博客-CSDN博客 https://github.com/appium/appium-inspector https://github.com/appium/appium-desktop https://github.com/appium/appium...

【DEVOPS】需求跟踪管理全面落地

0. 目录 1. 现状/背景2. 需求管理存在的问题3. 改进思路/措施4. 所谓"禅道尚未普及/铺开"5. 最后6. 相关 1. 现状/背景 近期又被领导问到"如何对项目过程中的需求进行量化和跟踪管理"。这真是一个狗皮膏药似的问题&#xff0c;反反复复地&#xff0c;隔一…...

算法修炼Day57|647. 回文子串 ● 516.最长回文子序列

LeetCode:647. 回文子串 647. 回文子串 - 力扣&#xff08;LeetCode&#xff09; 1.思路 暴力思路见对应代码… 动规解法&#xff1a;画图推导动规公式&#xff0c;当前状态由左侧和左下角推出&#xff0c;所以首层应该采用倒序的方式&#xff0c;内部采用正序的方式。 2.…...

呈现数据的精妙之道:选择合适的可视化方法

在当今数据时代&#xff0c;数据可视化已成为理解和传达信息的重要手段。然而&#xff0c;选择适合的数据可视化方法对于有效地呈现数据至关重要。不同的数据和目标需要不同的可视化方法&#xff0c;下面我们将探讨如何选择最佳的数据可视化方法来呈现数据。 1. 理解数据类型&a…...

数据结构(Java实现)-java对象的比较

元素的比较 基本类型的比较 在Java中&#xff0c;基本类型的对象可以直接比较大小。 对象比较的问题 Java中引用类型的变量不能直接按照 > 或者 < 方式进行比较 默认情况下调用的就是equal方法&#xff0c;但是该方法的比较规则是&#xff1a;没有比较引用变量引用对象的…...

Wolfram Mathematica 13 for Mac 数学计算工具

Wolfram Mathematica for Mac是一款功能强大、划时代的科学计算软件。它结合了数字和符号计算引擎、图形系统、编程语言、文本系统以及与其他应用程序的高级连接&#xff0c;在许多功能方面处于世界领先地位&#xff0c;截至2009年&#xff0c;它是使用最广泛的数学软件之一。人…...

系统架构设计高级技能 · Web架构

现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everything is for the future of dream weaving wings, let the dream fly in reality. 点击进入系列文章目录 系统架构设计高级技能 Web架构 一、Web架构介绍1.1 Web架构涉及技术1.2 单台服务…...

再写CentOS7升级OpenSSL-1.0.1U

本文在CentOS7.4以及TencentOS 2.4上测试通过。 原系统自带OpenSSL 1.0.2k-fips。 编译安装方法跟之前的没啥区别。 从官网下载1.0.1u版https://www.openssl.org/source/ 使用tar解包 tar xfz openssl-1.0.1u.tar.gz 依次执行如下&#xff1a; cd openssl-1.0.1u ./con…...

HBase--技术文档--基本概念--《快速扫盲》

官网 Apache HBase – Apache HBase™ Home 阿里云hbase 云数据库HBase_大数据存储_订单风控_数据库-阿里云 云数据库 HBase-阿里云帮助中心 基本概念 HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库。它基于Hadoop&#xff0c;采用列式存储方式&#xff0c;可…...

如何利用SFTP协议远程实现更安全的文件传输 ——【内网穿透】

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《高效编程技巧》《cpolar》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 1. 安装openSSH1.1 安装SSH1.2 启动ssh 2. 安装cpolar2.1 配置termux服务 3. 远程SFTP连接配置3.1 查看生成的随机公…...

深度学习8:详解生成对抗网络原理

目录 大纲 生成随机变量 可以伪随机生成均匀随机变量 随机变量表示为操作或过程的结果 逆变换方法 生成模型 我们试图生成非常复杂的随机变量…… …所以让我们使用神经网络的变换方法作为函数&#xff01; 生成匹配网络 培养生成模型 比较基于样本的两个概率分布 …...

sql入门-多表查询

案例涉及表 ----------------------------------建表语句之前翻看之前博客文章 多表查询 -- 学生表 create table studen ( id int primary key auto_increment comment id, name varchar(50) comment 姓名, no varchar(10) comment 学号 ) comment 学生表; insert…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

push [特殊字符] present

push &#x1f19a; present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中&#xff0c;push 和 present 是两种不同的视图控制器切换方式&#xff0c;它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...

CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝

目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为&#xff1a;一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

HubSpot推出与ChatGPT的深度集成引发兴奋与担忧

上周三&#xff0c;HubSpot宣布已构建与ChatGPT的深度集成&#xff0c;这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋&#xff0c;但同时也存在一些关于数据安全的担忧。 许多网络声音声称&#xff0c;这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...

上位机开发过程中的设计模式体会(1):工厂方法模式、单例模式和生成器模式

简介 在我的 QT/C 开发工作中&#xff0c;合理运用设计模式极大地提高了代码的可维护性和可扩展性。本文将分享我在实际项目中应用的三种创造型模式&#xff1a;工厂方法模式、单例模式和生成器模式。 1. 工厂模式 (Factory Pattern) 应用场景 在我的 QT 项目中曾经有一个需…...

针对药品仓库的效期管理问题,如何利用WMS系统“破局”

案例&#xff1a; 某医药分销企业&#xff0c;主要经营各类药品的批发与零售。由于药品的特殊性&#xff0c;效期管理至关重要&#xff0c;但该企业一直面临效期问题的困扰。在未使用WMS系统之前&#xff0c;其药品入库、存储、出库等环节的效期管理主要依赖人工记录与检查。库…...

ffmpeg(三):处理原始数据命令

FFmpeg 可以直接处理原始音频和视频数据&#xff08;Raw PCM、YUV 等&#xff09;&#xff0c;常见场景包括&#xff1a; 将原始 YUV 图像编码为 H.264 视频将 PCM 音频编码为 AAC 或 MP3对原始音视频数据进行封装&#xff08;如封装为 MP4、TS&#xff09; 处理原始 YUV 视频…...

Qt 按钮类控件(Push Button 与 Radio Button)(1)

文章目录 Push Button前提概要API接口给按钮添加图标给按钮添加快捷键 Radio ButtonAPI接口性别选择 Push Button&#xff08;鼠标点击不放连续移动快捷键&#xff09; Radio Button Push Button 前提概要 1. 之前文章中所提到的各种跟QWidget有关的各种属性/函数/方法&#…...