当前位置: 首页 > news >正文

大数据-玩转数据-Flink窗口函数

一、Flink窗口函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。

二、ReduceFunction(增量聚合函数)

输入和输出必须一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}

运行结果
在这里插入图片描述
在这里插入图片描述

三、AggregateFunction(增量聚合函数)

输入和输出可以不一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}

运行结果
在这里插入图片描述
在这里插入图片描述

四、ProcessWindowFunction(全窗口函数)

上面例子里已经用到

new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}

相关文章:

大数据-玩转数据-Flink窗口函数

一、Flink窗口函数 前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. Reduc…...

Docker网络-探索容器网络如何相互通信

当今世界&#xff0c;企业热衷于容器化&#xff0c;这需要强大的网络技能来正确配置容器架构&#xff0c;因此引入了 Docker Networking 的概念。Docker 是一种容器化平台&#xff0c;允许您在独立、轻量级的容器中运行应用程序和服务。Docker 提供了一套强大的网络功能&#x…...

ESP32-CAM模块Arduino环境搭建测试

ESP32-CAM模块Arduino环境搭建测试 一.ESP32OV2640摄像头模块CameraWebServer视频查看 二.测试ESP32-CAM(后续称cam模块)代码是否上传执行成功测试 const int led0 12; const int led1 13;void setup() {// put your setup code here, to run once:pinMode(led0, OUTPUT);pin…...

webassembly001 webassembly简述

WebAssembly 官方地址:https://webassembly.org/相关历史 https://en.wikipedia.org/wiki/WebAssembly https://brendaneich.com/2015/06/from-asm-js-to-webassembly/WebAssembly&#xff08;缩写为Wasm&#xff09;是一种基于堆栈的虚拟机的二进制指令格式。Wasm 被设计为编…...

vue 使用C-Lodop打印小票

先从官网下载js文件 https://www.lodop.net/LodopDemo.html 打开安装程序&#xff0c;一直下一步既可&#xff0c;我这边已经安装过就不演示了。 // 引入 import { getLodop } from /utils/CLodopfuncs.js;// 使用 let LODOP getLodop()let Count LODOP.GET_PRINTER_COUNT…...

【C++进阶(二)】STL大法--vector的深度剖析以及模拟实现

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; vector 1. 前言2. 熟悉vector的接口函数2.1 vec…...

1. import pandas as pd 导入库

【目录】 文章目录 1. import pandas as pd 导入库1. pandas库的概念2. 导入pandas库2.1 常规导入2.2 别名导入 3. 别名的作用4. 课堂练习 【正文】 1. import pandas as pd 导入库 【学习时间】 10分钟 1. pandas库的概念 pandas&#xff1a;熊猫panda的复数&#xff0c; …...

DMK5框选变量之后不显示其他位置的此变量高亮

使用软件MDK5.3.8版本 如下在2的位置选择之后&#xff0c;其他同样的变量没有高亮&#xff0c;因为1的原因折叠了&#xff1b; 展开折叠之后就可以了...

0061__Appium

Appium Documentation - Appium Documentation APP自动化测试&#xff08;3&#xff09;-Appium Inspector介绍_六天测试工程师的博客-CSDN博客 https://github.com/appium/appium-inspector https://github.com/appium/appium-desktop https://github.com/appium/appium...

【DEVOPS】需求跟踪管理全面落地

0. 目录 1. 现状/背景2. 需求管理存在的问题3. 改进思路/措施4. 所谓"禅道尚未普及/铺开"5. 最后6. 相关 1. 现状/背景 近期又被领导问到"如何对项目过程中的需求进行量化和跟踪管理"。这真是一个狗皮膏药似的问题&#xff0c;反反复复地&#xff0c;隔一…...

算法修炼Day57|647. 回文子串 ● 516.最长回文子序列

LeetCode:647. 回文子串 647. 回文子串 - 力扣&#xff08;LeetCode&#xff09; 1.思路 暴力思路见对应代码… 动规解法&#xff1a;画图推导动规公式&#xff0c;当前状态由左侧和左下角推出&#xff0c;所以首层应该采用倒序的方式&#xff0c;内部采用正序的方式。 2.…...

呈现数据的精妙之道:选择合适的可视化方法

在当今数据时代&#xff0c;数据可视化已成为理解和传达信息的重要手段。然而&#xff0c;选择适合的数据可视化方法对于有效地呈现数据至关重要。不同的数据和目标需要不同的可视化方法&#xff0c;下面我们将探讨如何选择最佳的数据可视化方法来呈现数据。 1. 理解数据类型&a…...

数据结构(Java实现)-java对象的比较

元素的比较 基本类型的比较 在Java中&#xff0c;基本类型的对象可以直接比较大小。 对象比较的问题 Java中引用类型的变量不能直接按照 > 或者 < 方式进行比较 默认情况下调用的就是equal方法&#xff0c;但是该方法的比较规则是&#xff1a;没有比较引用变量引用对象的…...

Wolfram Mathematica 13 for Mac 数学计算工具

Wolfram Mathematica for Mac是一款功能强大、划时代的科学计算软件。它结合了数字和符号计算引擎、图形系统、编程语言、文本系统以及与其他应用程序的高级连接&#xff0c;在许多功能方面处于世界领先地位&#xff0c;截至2009年&#xff0c;它是使用最广泛的数学软件之一。人…...

系统架构设计高级技能 · Web架构

现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everything is for the future of dream weaving wings, let the dream fly in reality. 点击进入系列文章目录 系统架构设计高级技能 Web架构 一、Web架构介绍1.1 Web架构涉及技术1.2 单台服务…...

再写CentOS7升级OpenSSL-1.0.1U

本文在CentOS7.4以及TencentOS 2.4上测试通过。 原系统自带OpenSSL 1.0.2k-fips。 编译安装方法跟之前的没啥区别。 从官网下载1.0.1u版https://www.openssl.org/source/ 使用tar解包 tar xfz openssl-1.0.1u.tar.gz 依次执行如下&#xff1a; cd openssl-1.0.1u ./con…...

HBase--技术文档--基本概念--《快速扫盲》

官网 Apache HBase – Apache HBase™ Home 阿里云hbase 云数据库HBase_大数据存储_订单风控_数据库-阿里云 云数据库 HBase-阿里云帮助中心 基本概念 HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库。它基于Hadoop&#xff0c;采用列式存储方式&#xff0c;可…...

如何利用SFTP协议远程实现更安全的文件传输 ——【内网穿透】

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《高效编程技巧》《cpolar》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 1. 安装openSSH1.1 安装SSH1.2 启动ssh 2. 安装cpolar2.1 配置termux服务 3. 远程SFTP连接配置3.1 查看生成的随机公…...

深度学习8:详解生成对抗网络原理

目录 大纲 生成随机变量 可以伪随机生成均匀随机变量 随机变量表示为操作或过程的结果 逆变换方法 生成模型 我们试图生成非常复杂的随机变量…… …所以让我们使用神经网络的变换方法作为函数&#xff01; 生成匹配网络 培养生成模型 比较基于样本的两个概率分布 …...

sql入门-多表查询

案例涉及表 ----------------------------------建表语句之前翻看之前博客文章 多表查询 -- 学生表 create table studen ( id int primary key auto_increment comment id, name varchar(50) comment 姓名, no varchar(10) comment 学号 ) comment 学生表; insert…...

浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)

✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义&#xff08;Task Definition&…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

反射获取方法和属性

Java反射获取方法 在Java中&#xff0c;反射&#xff08;Reflection&#xff09;是一种强大的机制&#xff0c;允许程序在运行时访问和操作类的内部属性和方法。通过反射&#xff0c;可以动态地创建对象、调用方法、改变属性值&#xff0c;这在很多Java框架中如Spring和Hiberna…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

代理篇12|深入理解 Vite中的Proxy接口代理配置

在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...