当前位置: 首页 > news >正文

大数据-玩转数据-Flink窗口函数

一、Flink窗口函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。

二、ReduceFunction(增量聚合函数)

输入和输出必须一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}

运行结果
在这里插入图片描述
在这里插入图片描述

三、AggregateFunction(增量聚合函数)

输入和输出可以不一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}

运行结果
在这里插入图片描述
在这里插入图片描述

四、ProcessWindowFunction(全窗口函数)

上面例子里已经用到

new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}

相关文章:

大数据-玩转数据-Flink窗口函数

一、Flink窗口函数 前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. Reduc…...

Docker网络-探索容器网络如何相互通信

当今世界&#xff0c;企业热衷于容器化&#xff0c;这需要强大的网络技能来正确配置容器架构&#xff0c;因此引入了 Docker Networking 的概念。Docker 是一种容器化平台&#xff0c;允许您在独立、轻量级的容器中运行应用程序和服务。Docker 提供了一套强大的网络功能&#x…...

ESP32-CAM模块Arduino环境搭建测试

ESP32-CAM模块Arduino环境搭建测试 一.ESP32OV2640摄像头模块CameraWebServer视频查看 二.测试ESP32-CAM(后续称cam模块)代码是否上传执行成功测试 const int led0 12; const int led1 13;void setup() {// put your setup code here, to run once:pinMode(led0, OUTPUT);pin…...

webassembly001 webassembly简述

WebAssembly 官方地址:https://webassembly.org/相关历史 https://en.wikipedia.org/wiki/WebAssembly https://brendaneich.com/2015/06/from-asm-js-to-webassembly/WebAssembly&#xff08;缩写为Wasm&#xff09;是一种基于堆栈的虚拟机的二进制指令格式。Wasm 被设计为编…...

vue 使用C-Lodop打印小票

先从官网下载js文件 https://www.lodop.net/LodopDemo.html 打开安装程序&#xff0c;一直下一步既可&#xff0c;我这边已经安装过就不演示了。 // 引入 import { getLodop } from /utils/CLodopfuncs.js;// 使用 let LODOP getLodop()let Count LODOP.GET_PRINTER_COUNT…...

【C++进阶(二)】STL大法--vector的深度剖析以及模拟实现

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; vector 1. 前言2. 熟悉vector的接口函数2.1 vec…...

1. import pandas as pd 导入库

【目录】 文章目录 1. import pandas as pd 导入库1. pandas库的概念2. 导入pandas库2.1 常规导入2.2 别名导入 3. 别名的作用4. 课堂练习 【正文】 1. import pandas as pd 导入库 【学习时间】 10分钟 1. pandas库的概念 pandas&#xff1a;熊猫panda的复数&#xff0c; …...

DMK5框选变量之后不显示其他位置的此变量高亮

使用软件MDK5.3.8版本 如下在2的位置选择之后&#xff0c;其他同样的变量没有高亮&#xff0c;因为1的原因折叠了&#xff1b; 展开折叠之后就可以了...

0061__Appium

Appium Documentation - Appium Documentation APP自动化测试&#xff08;3&#xff09;-Appium Inspector介绍_六天测试工程师的博客-CSDN博客 https://github.com/appium/appium-inspector https://github.com/appium/appium-desktop https://github.com/appium/appium...

【DEVOPS】需求跟踪管理全面落地

0. 目录 1. 现状/背景2. 需求管理存在的问题3. 改进思路/措施4. 所谓"禅道尚未普及/铺开"5. 最后6. 相关 1. 现状/背景 近期又被领导问到"如何对项目过程中的需求进行量化和跟踪管理"。这真是一个狗皮膏药似的问题&#xff0c;反反复复地&#xff0c;隔一…...

算法修炼Day57|647. 回文子串 ● 516.最长回文子序列

LeetCode:647. 回文子串 647. 回文子串 - 力扣&#xff08;LeetCode&#xff09; 1.思路 暴力思路见对应代码… 动规解法&#xff1a;画图推导动规公式&#xff0c;当前状态由左侧和左下角推出&#xff0c;所以首层应该采用倒序的方式&#xff0c;内部采用正序的方式。 2.…...

呈现数据的精妙之道:选择合适的可视化方法

在当今数据时代&#xff0c;数据可视化已成为理解和传达信息的重要手段。然而&#xff0c;选择适合的数据可视化方法对于有效地呈现数据至关重要。不同的数据和目标需要不同的可视化方法&#xff0c;下面我们将探讨如何选择最佳的数据可视化方法来呈现数据。 1. 理解数据类型&a…...

数据结构(Java实现)-java对象的比较

元素的比较 基本类型的比较 在Java中&#xff0c;基本类型的对象可以直接比较大小。 对象比较的问题 Java中引用类型的变量不能直接按照 > 或者 < 方式进行比较 默认情况下调用的就是equal方法&#xff0c;但是该方法的比较规则是&#xff1a;没有比较引用变量引用对象的…...

Wolfram Mathematica 13 for Mac 数学计算工具

Wolfram Mathematica for Mac是一款功能强大、划时代的科学计算软件。它结合了数字和符号计算引擎、图形系统、编程语言、文本系统以及与其他应用程序的高级连接&#xff0c;在许多功能方面处于世界领先地位&#xff0c;截至2009年&#xff0c;它是使用最广泛的数学软件之一。人…...

系统架构设计高级技能 · Web架构

现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everything is for the future of dream weaving wings, let the dream fly in reality. 点击进入系列文章目录 系统架构设计高级技能 Web架构 一、Web架构介绍1.1 Web架构涉及技术1.2 单台服务…...

再写CentOS7升级OpenSSL-1.0.1U

本文在CentOS7.4以及TencentOS 2.4上测试通过。 原系统自带OpenSSL 1.0.2k-fips。 编译安装方法跟之前的没啥区别。 从官网下载1.0.1u版https://www.openssl.org/source/ 使用tar解包 tar xfz openssl-1.0.1u.tar.gz 依次执行如下&#xff1a; cd openssl-1.0.1u ./con…...

HBase--技术文档--基本概念--《快速扫盲》

官网 Apache HBase – Apache HBase™ Home 阿里云hbase 云数据库HBase_大数据存储_订单风控_数据库-阿里云 云数据库 HBase-阿里云帮助中心 基本概念 HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库。它基于Hadoop&#xff0c;采用列式存储方式&#xff0c;可…...

如何利用SFTP协议远程实现更安全的文件传输 ——【内网穿透】

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《高效编程技巧》《cpolar》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 1. 安装openSSH1.1 安装SSH1.2 启动ssh 2. 安装cpolar2.1 配置termux服务 3. 远程SFTP连接配置3.1 查看生成的随机公…...

深度学习8:详解生成对抗网络原理

目录 大纲 生成随机变量 可以伪随机生成均匀随机变量 随机变量表示为操作或过程的结果 逆变换方法 生成模型 我们试图生成非常复杂的随机变量…… …所以让我们使用神经网络的变换方法作为函数&#xff01; 生成匹配网络 培养生成模型 比较基于样本的两个概率分布 …...

sql入门-多表查询

案例涉及表 ----------------------------------建表语句之前翻看之前博客文章 多表查询 -- 学生表 create table studen ( id int primary key auto_increment comment id, name varchar(50) comment 姓名, no varchar(10) comment 学号 ) comment 学生表; insert…...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:

在 HarmonyOS 应用开发中&#xff0c;手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力&#xff0c;既支持点击、长按、拖拽等基础单一手势的精细控制&#xff0c;也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档&#xff0c…...

基于Uniapp开发HarmonyOS 5.0旅游应用技术实践

一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架&#xff0c;支持"一次开发&#xff0c;多端部署"&#xff0c;可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务&#xff0c;为旅游应用带来&#xf…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...

VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP

编辑-虚拟网络编辑器-更改设置 选择桥接模式&#xff0c;然后找到相应的网卡&#xff08;可以查看自己本机的网络连接&#xff09; windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置&#xff0c;选择刚才配置的桥接模式 静态ip设置&#xff1a; 我用的ubuntu24桌…...

HTML前端开发:JavaScript 获取元素方法详解

作为前端开发者&#xff0c;高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法&#xff0c;分为两大系列&#xff1a; 一、getElementBy... 系列 传统方法&#xff0c;直接通过 DOM 接口访问&#xff0c;返回动态集合&#xff08;元素变化会实时更新&#xff09;。…...

DeepSeek源码深度解析 × 华为仓颉语言编程精粹——从MoE架构到全场景开发生态

前言 在人工智能技术飞速发展的今天&#xff0c;深度学习与大模型技术已成为推动行业变革的核心驱动力&#xff0c;而高效、灵活的开发工具与编程语言则为技术创新提供了重要支撑。本书以两大前沿技术领域为核心&#xff0c;系统性地呈现了两部深度技术著作的精华&#xff1a;…...

【堆垛策略】设计方法

堆垛策略的设计是积木堆叠系统的核心&#xff0c;直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法&#xff0c;涵盖基础规则、优化算法和容错机制&#xff1a; 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则&#xff1a; 大尺寸/重量积木在下&#xf…...

macOS 终端智能代理检测

&#x1f9e0; 终端智能代理检测&#xff1a;自动判断是否需要设置代理访问 GitHub 在开发中&#xff0c;使用 GitHub 是非常常见的需求。但有时候我们会发现某些命令失败、插件无法更新&#xff0c;例如&#xff1a; fatal: unable to access https://github.com/ohmyzsh/oh…...

PostgreSQL 对 IPv6 的支持情况

PostgreSQL 对 IPv6 的支持情况 PostgreSQL 全面支持 IPv6 网络协议&#xff0c;包括连接、存储和操作 IPv6 地址。以下是详细说明&#xff1a; 一、网络连接支持 1. 监听 IPv6 连接 在 postgresql.conf 中配置&#xff1a; listen_addresses 0.0.0.0,:: # 监听所有IPv4…...