当前位置: 首页 > news >正文

208.Flink(三):窗口的使用,处理函数的使用

目录

一、窗口

1.窗口的概念

2.窗口的分类

(1)按照驱动类型分

(2)按照窗口分配数据的规则分类

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

*1)按键分区窗口(Keyed Windows)

*2)非按键分区(Non-Keyed Windows)

(2)代码中窗口API的调用

(3)窗口分配器

(4)窗口函数

*1)增量聚合函数

^1)归约函数(ReduceFunction)

^2)聚合函数(AggregateFunction)

*2)全窗口函数(full window functions)

*3)增量聚合和全窗口函数的结合使用

(5)触发器(Trigger)

(6)移除器(Evictor)

(7)窗口的简单原理

*1)一个数据来了,怎么认为他是哪个窗口内的数据?

*2)窗口特性

*3)窗口的生命周期

4.时间语义

(1)Flink中的时间语义

(2)Flink以事件时间为默认时间语义

5.水位线(Watermark)

(1)水位线的概念

*1)有序流中的水位线

*2)乱序流中的水位线

(2)水位线和窗口的工作原理

(3) 生成水位线

*1)总体原则

*2)有序流中内置水位线设置

*3)乱序流中内置水位线设置

*4)自定义水位线生成器(周期式、断点式)

*5)在数据源中发送水位线

(6)迟到数据的处理

*1)设置乱序容忍度

*2)设置窗口延迟关闭

*3)侧输出流

(7)基于时间的合流——双流联结(Join)

*1)窗口联结(Window Join)

*2)间隔联结(Interval Join)

二、处理函数

1.基本处理函数(ProcessFunction)

(1)处理函数的功能和使用

(2)ProcessFunction解析

(3)处理函数的分类

2.按键分区处理函数(KeyedProcessFunction)

(1)定时器(Timer)和定时服务(TimerService)

(2)KeyedProcessFunction注意点及实现

3.应用案例:Top N

(1)方法一:ProcessAllWindowFunction

(2)方法二:

4.侧输出流


一、窗口

在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

1.窗口的概念

Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。

到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。

2.窗口的分类

(1)按照驱动类型分

*1)时间窗口

一定时间作为一个窗口

*2)计数窗口

达到多少数量作为一个窗口

(2)按照窗口分配数据的规则分类

*1)滚动窗口

以一个固定时间为窗口,第一个窗口结束的时间就是下一个窗口开始的时间。

*2)滑动窗口

窗口大小 + 步长。

如果步长 = 窗口大小,其实就是滚动窗口的情况。

步长 > 窗口大小,会有数据被漏掉。

步长 < 窗口大小,窗口会有重叠

*3)会话窗口

基于会话对数据分组

*4)全局窗口

全局有效,没有结束时间

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

定义窗口前,需要确认数据流是基于keyBy还是没有keyBy的。

*1)按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。

stream.keyBy(...).window(...)

*2)非按键分区(Non-Keyed Windows)

窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。

对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

stream.windowAll(...)

(2)代码中窗口API的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

.window()方法需要传入一个窗口分配器,它指明了窗口的类型。

.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。

(3)窗口分配器

窗口分配器指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。

(4)窗口函数

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数全窗口函数

package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO 1. 指定 窗口分配器: 指定 用 哪一种窗口 ---  时间 or 计数? 滚动、滑动、会话?// 1.1 没有keyby的窗口: 窗口内的 所有数据 进入同一个 子任务,并行度只能为1
//        sensorDS.windowAll()// 1.2 有keyby的窗口: 每个key上都定义了一组窗口,各自独立地进行统计计算// 基于时间的
//        sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口,窗口长度10s
//        sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))) // 滑动窗口,窗口长度10s,滑动步长2s
//        sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) // 会话窗口,超时间隔5s
//        sensorKS.window(GlobalWindows.create())  // 全局窗口,计数窗口的底层就是用的这个,需要自定义的时候才会用// 基于计数的
//        sensorKS.countWindow(5)  // 滚动窗口,窗口长度=5个元素
//        sensorKS.countWindow(5,2) // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素// TODO 2. 指定 窗口函数 : 窗口内数据的 计算逻辑WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 增量聚合: 来一条数据,计算一条数据,窗口触发的时候输出计算结果
//        sensorWS
//                .reduce()
//        .aggregate(, )// 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
//        sensorWS.process()env.execute();}
}

*1)增量聚合函数
^1)归约函数(ReduceFunction)
package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);

相关文章:

208.Flink(三):窗口的使用,处理函数的使用

目录 一、窗口 1.窗口的概念 2.窗口的分类 (1)按照驱动类型分 (2)按照窗口分配数据的规则分类 3.窗口api概览 (1)按键分区(Keyed)和非按键分区(Non-Keyed) *1)按键分区窗口(Keyed Windows) *2)非按键分区(Non-Keyed Windows) (2)代码中窗口API的调…...

时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测

时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测 目录 时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现POA-CNN-BiLSTM鹈鹕算…...

【知识点】增量学习、在线学习、离线学习的区别

参考链接&#xff1a;https://www.6aiq.com/article/1613258706447?p1&m0 离线学习 常见的学习方式&#xff0c;一次性将所有数据参与进训练。 离线学习完成了目标函数的优化将不会在改变了离线学习需要一次提供整个训练集时间和空间成本效率低发生数据变更或模型漂移需…...

c++ 学习 之 运算符重载 之 前置++和后置++

前言 int a1;cout << (a) << endl;cout << a << endl;int b1;cout << (b) << endl; // 这个是错误的cout << b << endl;上面样例中&#xff0c; 前置 返回的是引用&#xff0c;所以a 的值变成了3 后置 返回的不是可以改变的…...

K8s Kubelet 垃圾回收机制

前言 Kubelet 垃圾回收(Garbage Collection)是一个非常有用的功能,它负责自动清理节点上的无用镜像和容器。Kubelet 每隔 1 分钟进行一次容器清理,每隔 5 分钟进行一次镜像清理(截止到 v1.15 版本,垃圾回收间隔时间还都是在源码中固化的,不可自定义配置)。如果节点上已…...

docker安装高斯数据库openGauss数据库

1.创建容器 #创建数据没有挂在的容器 docker run --name opengauss --privilegedtrue -d -e GS_PASSWORDEnmo123 -p 8090:5432 enmotech/opengauss:latest 2. 进入容器&#xff0c;并切换omm用户&#xff0c;使用gsql连接高斯数据库 [rootansible ~]# docker ps -a CONTAIN…...

新手学习:ArcGIS 提取SHP 路网数据、节点

新手学习&#xff1a;ArcGIS 提取SHP 路网数据、节点 参考连接 OSM路网提取道路节点 ArcGIS&#xff1a;如何创建地理数据库、创建要素类数据集、导入要素类、表&#xff1f; 1. 导入开源路网SHP文件 2. 在交点处打断路网数据 未打断路网数据 有一些路径很长&#xff0c;…...

性能测试 —— Tomcat监控与调优:Jconsole监控

JConsole的图形用户界面是一个符合Java管理扩展(JMX)规范的监测工具&#xff0c;JConsole使用Java虚拟机(Java VM)&#xff0c;提供在Java平台上运行的应用程序的性能和资源消耗的信息。在Java平台&#xff0c;标准版(Java SE平台)6&#xff0c;JConsole的已经更新到目前的外观…...

刷题笔记26——图论二分图判定

世界上的事情,最忌讳的就是个十全十美,你看那天上的月亮,一旦圆满了,马上就要亏厌;树上的果子,一旦熟透了,马上就要坠落。凡事总要稍留欠缺,才能持恒。 ——莫言 visited数组是在如果有环的情况下&#xff0c;防止在图中一直绕圈设置的&#xff0c;类似于剪枝操作&#xff0c;走…...

网站整站优化-网站整站优化工具

您是否曾为您的网站在搜索引擎中的排名而感到焦虑&#xff1f;是否苦苦思考如何提高流量、吸引更多用户&#xff1f; 什么是整站优化。简而言之&#xff0c;它是一项用于提升网站在搜索引擎中排名的策略和技巧。通过对网站的内容、结构、速度等方面进行优化&#xff0c;可以使…...

冲刺十五届蓝桥杯P0001阶乘求和

文章目录 题目描述思路分析代码解析 题目描述 思路分析 阶乘是蓝桥杯中常考的知识。 首先我们需要知道 int 和long的最大值是多少。 我们可以知道19的阶乘就已经超过了long的最大值&#xff0c;所以让我们直接计算202320232023&#xff01;的阶乘是不现实的。 所以我们需要…...

c++ 学习 之 运算符重载

前言 运算符重载的概念&#xff1a; 对已有的运算符重新进行定义&#xff0c;赋予其另外一种功能&#xff0c;以适应不同的数据类型 加号运算符重载 作用&#xff1a;定义两个自定义的数据类型相加的运算 正常情况下&#xff0c;如果想要实现类中两个int 类型的相加&#xf…...

各种数据库表名长度限制整理

因为工作原因&#xff0c;需要整理下系统支持的数据库的表名长度限制&#xff0c;现发出来&#xff0c;以节省大家的整理时间&#xff0c;如有不对的敬请斧正&#xff01; 数据库类型长度ORACLE 30GreenPlum40KINGBASEES63PostgreSql63Gbase63瀚高63OSCAR64MYSQL 64HBASE64Mar…...

Go 里的超时控制

前言 日常开发中我们大概率会遇到超时控制的场景&#xff0c;比如一个批量耗时任务、网络请求等&#xff1b;一个良好的超时控制可以有效的避免一些问题&#xff08;比如 goroutine 泄露、资源不释放等&#xff09;。 Timer 在 go 中实现超时控制的方法非常简单&#xff0c;…...

一文彻底搞清楚Spark Schema

前言 Spark Schema定义了DataFrame的结构,可以通过对DataFrame对象调用printSchema()方法来获得该结构。Spark SQL提供了StructType和StructField类以编程方式指定架构。 默认情况下,Spark从数据中推断schema,但有时我们可能需要定义自己的schema(列名和数据类型),尤其…...

Nginx多出口IP解决代理端口数量限制,CentOS安装Nginx并开启https2.0

Nginx多出口IP解决代理端口数量限制,CentOS安装Nginx并开启https2.0。 配置文件如下: http {...upstream test {server www.test.com;}server {listen 80 default_server;server_name _;location / {proxy_pass http://test;proxy_bind $split_ip...

SpringBoot项目(百度AI整合)——如何在Springboot中使用语音文件识别 ffmpeg的安装和使用

前言 前言&#xff1a;在实际使用中&#xff0c;经常要参考官方的案例&#xff0c;但有时候因为工具的不一样&#xff0c;比如idea 和 eclipse&#xff0c;普通项目和spring项目等的差别&#xff1b;还有时候因为水平有限&#xff0c;难以在散布于官方的各个文档读懂&#xff…...

探索古彝文AI识别技术:助力中国传统文化的传承与发扬

目录 ⭐️ 写在前面 ⭐️ 一、什么是古彝文 1.1 古彝文介绍 1.2 古彝文与其他古文字示例 1.3 古彝文的重要性 ⭐️二、AI识别技术的挑战与前景 2.1 挑战 2.2 前景 ⭐️三、合合信息AI识别技术 3.1 智能文字识别技术&#x1f44d;&#x1f44d; 3.2 古文识别应用 ⭐…...

mysql面试题2:说一说MySQL的架构设计?一条 MySQL 语句执行的步骤?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:说一说MySQL的架构设计? MySQL的架构设计主要包括以下几个组件: 连接器(Connector):负责与客户端建立连接,并进行身份验证和授权。 查询缓存…...

UPnP协议和SSDP协议

1、两种协议 UPnP协议&#xff1a;Universal Plug and Play&#xff0c;广义的即插即用。UPnP协议的目的&#xff1a;当有新设备连接上网络&#xff0c;网络上的其他设备能够马上知道有新设备加入&#xff0c;然后这些设备能互相宣传和发现彼此&#xff0c;以便能使用和控制彼…...

C++_核心编程_多态案例二-制作饮品

#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为&#xff1a;煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例&#xff0c;提供抽象制作饮品基类&#xff0c;提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...

【WiFi帧结构】

文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成&#xff1a;MAC头部frame bodyFCS&#xff0c;其中MAC是固定格式的&#xff0c;frame body是可变长度。 MAC头部有frame control&#xff0c;duration&#xff0c;address1&#xff0c;address2&#xff0c;addre…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

使用分级同态加密防御梯度泄漏

抽象 联邦学习 &#xff08;FL&#xff09; 支持跨分布式客户端进行协作模型训练&#xff0c;而无需共享原始数据&#xff0c;这使其成为在互联和自动驾驶汽车 &#xff08;CAV&#xff09; 等领域保护隐私的机器学习的一种很有前途的方法。然而&#xff0c;最近的研究表明&…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

leetcodeSQL解题:3564. 季节性销售分析

leetcodeSQL解题&#xff1a;3564. 季节性销售分析 题目&#xff1a; 表&#xff1a;sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...

uniapp中使用aixos 报错

问题&#xff1a; 在uniapp中使用aixos&#xff0c;运行后报如下错误&#xff1a; AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...