当前位置: 首页 > news >正文

flink的窗口

目录

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

2.计数窗口(Count window)

2.按照窗口分配数据的规则分类

窗口API分类

API调用

窗口分配器器:

窗口函数

增量聚合函数:

全窗口函数

flink sql 窗口函数

窗口 | Apache Flink

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

    时间窗口以时间点定义窗口的开始和结束,因此截取出就是某一段时间的数据。当到达结束时间时窗口不在接受数据,触发计算输出结果,并关闭销毁窗口。

flink有一个专门的类用来表示时间窗口TimeWindow,这个类只有两个私有属性;窗口的方法获取最大时间戳为end-1,因此窗口[start,end)  左开右闭;

@PublicEvolving
public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}@Overridepublic long maxTimestamp() {return end - 1;}
2.计数窗口(Count window)

计数窗口是基于元素个数截取,在到达固定个数是就触发计算并关闭窗口。

3.全局窗口(Global Windows)

是计数窗口的底层实现,窗口分配器由GlobalWindows类提供,需要自定义触发器实现窗口的计算;

 stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
//                .max().aggregate(new AvgPv()).print();查看源代码,windou函数后见windowStrream时获取默认的触发器
@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.builder =new WindowOperatorBuilder<>(windowAssigner,windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),  //湖区触发器input.getExecutionConfig(),input.getType(),input.getKeySelector(),input.getKeyType());}// 计数窗口底层采用全局窗口加计数器来实现public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {return window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));}

2.按照窗口分配数据的规则分类

滚动窗口(Tumbling Window):窗口大小固定,窗口没有重叠;

滑动窗口 (Sliding Window):滑动窗口有重叠,也可以没有重叠,如果窗口size和滑动size相等,等于滚动窗口;

会话窗口 (Session Window):基于会话对窗口进行分组,与其他两个不同的是,会话窗口是借用会话窗口的超时失效机触发窗口计算,当数据到来后会开启一个窗口,如果在超时时间内有数据陆续到来,窗口不会关闭,反之会关闭;极端情况,如果数据总能在窗口超时时间到达前远远不断的到来,该窗口会一直开启不会关闭;

全局窗口 (Global Window):比较通用的窗口,该窗口会把数据分配到一个窗口中,窗口为全局有效,会把相同key的数据分配到同一个窗口中,默认不会触发计算,跟没有窗口一样,需要自定义触发器才能使用;

窗口API分类

窗口大的分类可以分为按键分区和非按键分区两种:按键分需要经过keyby操作,会把数据进行分发,实现负载均分,可以并行处理更大的数据量。而非按键分区窗口,相当于并行度为1,使用上直接调用windowall(),因此一般并不推荐使用;

stream
.keyby(...)  //流按键分区
.window(...)  //定义窗口分配器
[.trigger()] //设置出发器
[.evictor()]   //设置移除器
[.allowedLateness()]  // 设置延迟时间
[.sideOutputLateData()]  //设置侧输出流
.reduce/aggregate/fold/apply()  //处理函数
[.getSideOutput()] //获取侧输出流stream
.windowAll(...)  //定义窗口分配器
[.trigger()] //设置出发器
[.evictor()]   //设置移除器
[.allowedLateness()]  // 设置延迟时间
[.sideOutputLateData()]  //设置侧输出流
.reduce/aggregate/fold/apply()  //处理函数
[.getSideOutput()] //获取侧输出流

API调用

窗口操作包含两个重要的概念:窗口分配器(window Assigners)和窗口函数(window function)两部分;

窗口分配器用于构建窗口,确定窗口类型,确定数据划分哪一个窗口,窗口函数制定数据的计算规则;

窗口分配器器:

作用:窗口分配器用来划分窗口属于哪一个窗口;

窗口按照时间可以划分为:滚动、滑动和session,三种类型窗口;

窗口计数划分:滚动和滑动两种类型;

  eventStream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate();
窗口函数

窗口函数按照计算特点可以分为增量计算和全量计算;

增量聚合函数:数据到达后立即计算,窗口只保存中间结果。效率高,性能好,但不够灵活。

全量聚合函数:缓存窗口的所有元素,触发后统一计算,效率低,但计算灵活。

增量聚合函数:

数据进入窗口会参与计算,窗口结束前只需要保留一个聚合后的状态值,内存压力小。

1.规约函数(ReduceFunction):数据保存留一个状态,输入类型和输出类型必须一致,来一条数据会处理将数据合并到状态中;

 stream.keyBy(r -> r.f0)// 设置滚动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {// 定义累加规则,窗口闭合时,向下游发送累加结果return Tuple2.of(value1.f0, value1.f1 + value2.f1);}}).print();

sum、max、min等底层都是通过同名AggregateFunction实现(非下面的聚合函数),本质还是实现ReduceFunction结构重写了reduce方法;

2.聚合函数(AggrateFunction):在规约函数基础上进行完善。解决输出和输入类型必须一致的限制问题。实现应用更灵活;

  // 所有数据设置相同的key,发送到同一个分区统计PV和UV,再相除stream.keyBy(data -> true).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2))).aggregate(new AvgPv()).print();public static class AvgPv implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {@Overridepublic Tuple2<HashSet<String>, Long> createAccumulator() {// 创建累加器return Tuple2.of(new HashSet<String>(), 0L);}@Overridepublic Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {// 属于本窗口的数据来一条累加一次,并返回累加器accumulator.f0.add(value.user);return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<HashSet<String>, Long> accumulator) {// 窗口闭合时,增量聚合结束,将计算结果发送到下游return (double) accumulator.f1 / accumulator.f0.size();}@Overridepublic Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {return null;}}
全窗口函数

全窗口函数会将进入窗口的数据先进行缓存,然后在窗口关闭时一起计算,缓存数据会占用内存资源,如果一个窗口数据量太大时,可能出现内存溢出的问题;

全窗口函数可以划分窗口函数(windowFunction)和处理窗口函数(processWindowFunction)两种;

窗口函数(windowFunction):老版本通用窗口接口,window()后调用apply(),传入实现windowFunction接口; 缺点是不能获取上下文信息,也没有更高级的功能。因为在功能上可以被processWindowFunction全覆盖,因此主键被弃用

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param window The window that is being evaluated.* @param input The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

处理窗口函数(processWindowFunction):是窗口API中最底层通用的窗口函数接口,可以获取到上问对象(context),实现为调用process方法传入自定义继承ProcessWindowFunction类;

input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());/* ... */public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {long count = 0;for (Tuple2<String, Long> in: input) {count++;}out.collect("Window: " + context.window() + "count: " + count);}
}

注意:一般增量窗口函数和全量窗口函数可以一起使用,window().aggregate()方法可以传入两个函数,第一个采用增量聚合函数,第二个传入全量函数,这样数据在进入窗口会触发增量计算,窗口不会缓存数据。当窗口关闭触发计算时,结果数据穿度到全量计算,参数Iterable中一般只有一个数据;

aggregate(acct1,acct2)

flink sql 窗口函数

flink sql 窗口也包含常见的滚动窗口、滑动窗口、session窗口,但还有一种累计窗口。

在flink1.13版本后flinksql支持累计窗口CUMULATE,可以实现没5分钟触发一次计算,输出当天的累计数据,使用样例

SELECT cast(PROCTIME() as timestamp_ltz) as window_end_time,manufacturer_name,event_id,case when state is null then -1 else state end ,cast(sum(agg)as string ) as agg
FROM TABLE(CUMULATE(TABLE dm_cumulate, DESCRIPTOR(ts1), INTERVAL '5' MINUTES, INTERVAL '1' DAY(9)))
GROUP BYwindow_end,window_start,manufacturer_name,event_id,case when state is null then -1 else state end

相关文章:

flink的窗口

目录 窗口分类 1.按照驱动类型分类 1. 时间窗口&#xff08;Time window&#xff09; 2.计数窗口&#xff08;Count window&#xff09; 2.按照窗口分配数据的规则分类 窗口API分类 API调用 窗口分配器器&#xff1a; 窗口函数 增量聚合函数&#xff1a; 全窗口函数…...

lodash.js 工具库

lodash 是什么? Lodash是一个流行的JavaScript实用工具库,提供了许多高效、高兼容性的工具函数,能够方便地处理集合、字符串、数值、函数等多种数据类型,大大提高工作效率。 lodash官网 文档参见:Lodash Documentation lodash 在Vue中怎么使用? 1、首先安装 lodash np…...

使用ElementUI组件库

引入ElementUI组件库 1.安装插件 npm i element-ui -S 2.引入组件库 import ElementUI from element-ui; 3.引入全部样式 import element-ui/lib/theme-chalk/index.css; 4.使用 Vue.use(ElementUI); 5.在官网寻找所需样式 饿了么组件官网 我这里以button为例 6.在组件中使用…...

【SkiaSharp绘图14】SKCanvas方法详解(三)URL注释、按顶点绘制、 是否裁切区域之外、旋转、缩放、倾斜、平移、保存/恢复画布

文章目录 SKCanvas方法DrawUrlAnnotation 绘制URL注释DrawVertices 按顶点绘制Flush 立即绘制QuickReject 判断区域是否在裁切区域之外ResetMatrix重置矩阵Restore、RestoreToCountRotateDegrees按角度旋转画布RotateRadians按弧度旋转画布SaveLayer保存并新建图层Scale 缩放画…...

WebDriver API (2)

本文将继续上文对WebDriver API的功能使用进行介绍。 一、浏览器操作 1. 浏览器前进forward与后退back 浏览器前进操作是指导航到前一个页面&#xff0c;在浏览器的历史记录中向前移动一页。 浏览器后退操作是指导航到前一个页面&#xff0c;在浏览器的历史记录中向后移动一…...

GCP FrontendConfig 详解:优化您的云负载均衡

目录 1. 什么是GCP FrontendConfig? 2. FrontendConfig的主要功能 2.1 协议选择 2.2 SSL/TLS配置 2.3 重定向配置 2.4 自定义响应头 3. 配置FrontendConfig 4. FrontendConfig的高级特性 4.1 智能路由 4.2 流量控制 4.3 日志和监控 5. FrontendConfig最佳实践 5.…...

TensorFlow代码逻辑 vs PyTorch代码逻辑

文章目录 一、TensorFlow&#xff08;一&#xff09;导入必要的库&#xff08;二&#xff09;加载MNIST数据集&#xff08;三&#xff09;数据预处理&#xff08;四&#xff09;构建神经网络模型&#xff08;五&#xff09;编译模型&#xff08;六&#xff09;训练模型&#xf…...

boost asio异步服务器(4)处理粘包

粘包的产生 当客户端发送多个数据包给服务器时&#xff0c;服务器底层的tcp接收缓冲区收到的数据为粘连在一起的。这种情况的产生通常是服务器端处理数据的速率不如客户端的发送速率的情况。比如&#xff1a;客户端1s内连续发送了两个hello world&#xff01;,服务器过了2s才接…...

【QT】常用控件|widget|QPushButton|RadioButton|核心属性

目录 ​编辑 概念 信号与槽机制 控件的多样性和定制性 核心属性 enabled geometry ​编辑 windowTiltle windowIcon toolTip styleSheet PushButton RadioButton 概念 QT 控件是构成图形用户界面&#xff08;GUI&#xff09;的基础组件&#xff0c;它们是实现与…...

【C++ Primer Plus学习记录】函数参数和按值传递

函数可以有多个参数。在调用函数时&#xff0c;只需使用都逗号将这些参数分开即可&#xff1a; n_chars(R,25); 上述函数调用将两个参数传递给函数n_chars()&#xff0c;我们将稍后定义该函数。 同样&#xff0c;在定义函数时&#xff0c;也在函数头中使用由逗号分隔的参数声…...

MySQL:设计数据库与操作

设计数据库 1. 数据建模1.1 概念模型1.2 逻辑模型1.3 实体模型主键外键外键约束 2. 标准化2.1 第一范式2.2 链接表2.3 第二范式2.4 第三范式 3. 数据库模型修改3.1 模型的正向工程3.2 同步数据库模型3.3 模型的逆向工程3.4 实际应用建议 4. 数据库实体模型4.1 创建和删除数据库…...

OBS 免费的录屏软件

一、下载 obs 【OBS】OBS Studio 的安装、参数设置和录屏、摄像头使用教程-CSDN博客 二、使用 obs & 输出无黑屏 【OBS任意指定区域录屏的方法-哔哩哔哩】 https://b23.tv/aM0hj8A OBS任意指定区域录屏的方法_哔哩哔哩_bilibili 步骤&#xff1a; 1&#xff09;获取区域…...

uniapp微信小程序使用xr加载模型

1.在根目录与pages同级创建如下目录结构和文件&#xff1a; // index.js Component({properties: {modelPath: { // vue页面传过来的模型type: String,value: }},data: {},methods: {} }) { // index.json"component": true,"renderer": "xr-frame&q…...

机器人运动范围检测 c++

地上有一个m行n列的方格&#xff0c;一个机器人从坐标&#xff08;0&#xff0c;0&#xff09;的格子开始移动&#xff0c;它每次可以向上下左右移动一个格子&#xff0c;但不能进入行坐标和列坐标的位数之和大于k的格子&#xff0c;请问机器人能够到达多少个格子 #include &l…...

kettle从入门到精通 第七十四课 ETL之kettle kettle调用https接口教程,忽略SSL校验

场景&#xff1a;kettle调用https接口&#xff0c;跳过校验SSL。&#xff08;有些公司内部系统之间的https的接口是没有SSL校验这一说&#xff0c;无需使用用证书的&#xff09; 解决方案&#xff1a;自定义插件或者自定义jar包通过javascript调用https接口。 1、http post 步…...

C++轻量级 线程间异步消息架构(向曾经工作的ROSA-RB以及共事的DOPRA的老兄弟们致敬)

1 啰嗦一番背景 这么多年&#xff0c;换着槽位做牛做马&#xff0c;没有什么钱途 手艺仍然很潮&#xff0c;唯有对于第一线的码农工作&#xff0c;孜孜不倦&#xff0c;其实没有啥进步&#xff0c;就是在不断地重复&#xff0c;刷熟练度&#xff0c;和同期的老兄弟们&#xf…...

Kotlin中的类

类初始化顺序 constructor 里的参数列表是首先被执行的&#xff0c;紧接着是 init 块和属性初始化器&#xff0c;最后是次构造函数的函数体。 主构造函数参数列表firstProperty 初始化第一个 init 块secondProperty 初始化第二个 init 块次构造函数函数体 class Example const…...

VSCode中常用的快捷键

通用操作快捷键 显示命令面板&#xff1a;Ctrl Shift P or F1&#xff0c;用于快速访问VSCode的各种命令。 快速打开&#xff1a;Ctrl P&#xff0c;可以快速打开文件、跳转到某个行号或搜索项目内容。 新建窗口/实例&#xff1a;Ctrl Shift N&#xff0c;用于打开一个新的…...

代码随想录-Day45

198. 打家劫舍 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。 给定一个代表每个…...

Rust Eq 和 PartialEq

Eq 和 PartialEq 在 Rust 中&#xff0c;想要重载操作符&#xff0c;你就需要实现对应的特征。 例如 <、<、> 和 > 需要实现 PartialOrd 特征: use std::fmt::Display;struct Pair<T> {x: T,y: T, }impl<T> Pair<T> {fn new(x: T, y: T) ->…...

Vim 调用外部命令学习笔记

Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

地震勘探——干扰波识别、井中地震时距曲线特点

目录 干扰波识别反射波地震勘探的干扰波 井中地震时距曲线特点 干扰波识别 有效波&#xff1a;可以用来解决所提出的地质任务的波&#xff1b;干扰波&#xff1a;所有妨碍辨认、追踪有效波的其他波。 地震勘探中&#xff0c;有效波和干扰波是相对的。例如&#xff0c;在反射波…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

PHP和Node.js哪个更爽?

先说结论&#xff0c;rust完胜。 php&#xff1a;laravel&#xff0c;swoole&#xff0c;webman&#xff0c;最开始在苏宁的时候写了几年php&#xff0c;当时觉得php真的是世界上最好的语言&#xff0c;因为当初活在舒适圈里&#xff0c;不愿意跳出来&#xff0c;就好比当初活在…...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引&#xff0c;可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度&#xff08;创建索引的主要原因&#xff09;。3. 可以加速表和表之间的连接&#xff0c;实现数据的参考完整性。4. 可以在查询过程中&#xff0c;…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

el-switch文字内置

el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...