当前位置: 首页 > news >正文

208.Flink(三):窗口的使用,处理函数的使用

目录

一、窗口

1.窗口的概念

2.窗口的分类

(1)按照驱动类型分

(2)按照窗口分配数据的规则分类

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

*1)按键分区窗口(Keyed Windows)

*2)非按键分区(Non-Keyed Windows)

(2)代码中窗口API的调用

(3)窗口分配器

(4)窗口函数

*1)增量聚合函数

^1)归约函数(ReduceFunction)

^2)聚合函数(AggregateFunction)

*2)全窗口函数(full window functions)

*3)增量聚合和全窗口函数的结合使用

(5)触发器(Trigger)

(6)移除器(Evictor)

(7)窗口的简单原理

*1)一个数据来了,怎么认为他是哪个窗口内的数据?

*2)窗口特性

*3)窗口的生命周期

4.时间语义

(1)Flink中的时间语义

(2)Flink以事件时间为默认时间语义

5.水位线(Watermark)

(1)水位线的概念

*1)有序流中的水位线

*2)乱序流中的水位线

(2)水位线和窗口的工作原理

(3) 生成水位线

*1)总体原则

*2)有序流中内置水位线设置

*3)乱序流中内置水位线设置

*4)自定义水位线生成器(周期式、断点式)

*5)在数据源中发送水位线

(6)迟到数据的处理

*1)设置乱序容忍度

*2)设置窗口延迟关闭

*3)侧输出流

(7)基于时间的合流——双流联结(Join)

*1)窗口联结(Window Join)

*2)间隔联结(Interval Join)

二、处理函数

1.基本处理函数(ProcessFunction)

(1)处理函数的功能和使用

(2)ProcessFunction解析

(3)处理函数的分类

2.按键分区处理函数(KeyedProcessFunction)

(1)定时器(Timer)和定时服务(TimerService)

(2)KeyedProcessFunction注意点及实现

3.应用案例:Top N

(1)方法一:ProcessAllWindowFunction

(2)方法二:

4.侧输出流


一、窗口

在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

1.窗口的概念

Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。

到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。

2.窗口的分类

(1)按照驱动类型分

*1)时间窗口

一定时间作为一个窗口

*2)计数窗口

达到多少数量作为一个窗口

(2)按照窗口分配数据的规则分类

*1)滚动窗口

以一个固定时间为窗口,第一个窗口结束的时间就是下一个窗口开始的时间。

*2)滑动窗口

窗口大小 + 步长。

如果步长 = 窗口大小,其实就是滚动窗口的情况。

步长 > 窗口大小,会有数据被漏掉。

步长 < 窗口大小,窗口会有重叠

*3)会话窗口

基于会话对数据分组

*4)全局窗口

全局有效,没有结束时间

3.窗口api概览

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

定义窗口前,需要确认数据流是基于keyBy还是没有keyBy的。

*1)按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。

stream.keyBy(...).window(...)

*2)非按键分区(Non-Keyed Windows)

窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。

对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

stream.windowAll(...)

(2)代码中窗口API的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)

.window()方法需要传入一个窗口分配器,它指明了窗口的类型。

.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。

(3)窗口分配器

窗口分配器指定窗口的类型。窗口分配器最通用的定义方式,就是调用.window()方法。

(4)窗口函数

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数全窗口函数

package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO 1. 指定 窗口分配器: 指定 用 哪一种窗口 ---  时间 or 计数? 滚动、滑动、会话?// 1.1 没有keyby的窗口: 窗口内的 所有数据 进入同一个 子任务,并行度只能为1
//        sensorDS.windowAll()// 1.2 有keyby的窗口: 每个key上都定义了一组窗口,各自独立地进行统计计算// 基于时间的
//        sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // 滚动窗口,窗口长度10s
//        sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2))) // 滑动窗口,窗口长度10s,滑动步长2s
//        sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) // 会话窗口,超时间隔5s
//        sensorKS.window(GlobalWindows.create())  // 全局窗口,计数窗口的底层就是用的这个,需要自定义的时候才会用// 基于计数的
//        sensorKS.countWindow(5)  // 滚动窗口,窗口长度=5个元素
//        sensorKS.countWindow(5,2) // 滑动窗口,窗口长度=5个元素,滑动步长=2个元素// TODO 2. 指定 窗口函数 : 窗口内数据的 计算逻辑WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 增量聚合: 来一条数据,计算一条数据,窗口触发的时候输出计算结果
//        sensorWS
//                .reduce()
//        .aggregate(, )// 全窗口函数:数据来了不计算,存起来,窗口触发的时候,计算并输出结果
//        sensorWS.process()env.execute();}
}

*1)增量聚合函数
^1)归约函数(ReduceFunction)
package com.atguigu.window;import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** TODO** @author cjp* @version 1.0*/
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);

相关文章:

208.Flink(三):窗口的使用,处理函数的使用

目录 一、窗口 1.窗口的概念 2.窗口的分类 (1)按照驱动类型分 (2)按照窗口分配数据的规则分类 3.窗口api概览 (1)按键分区(Keyed)和非按键分区(Non-Keyed) *1)按键分区窗口(Keyed Windows) *2)非按键分区(Non-Keyed Windows) (2)代码中窗口API的调…...

时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测

时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测 目录 时序预测 | MATLAB实现POA-CNN-BiLSTM鹈鹕算法优化卷积双向长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现POA-CNN-BiLSTM鹈鹕算…...

【知识点】增量学习、在线学习、离线学习的区别

参考链接&#xff1a;https://www.6aiq.com/article/1613258706447?p1&m0 离线学习 常见的学习方式&#xff0c;一次性将所有数据参与进训练。 离线学习完成了目标函数的优化将不会在改变了离线学习需要一次提供整个训练集时间和空间成本效率低发生数据变更或模型漂移需…...

c++ 学习 之 运算符重载 之 前置++和后置++

前言 int a1;cout << (a) << endl;cout << a << endl;int b1;cout << (b) << endl; // 这个是错误的cout << b << endl;上面样例中&#xff0c; 前置 返回的是引用&#xff0c;所以a 的值变成了3 后置 返回的不是可以改变的…...

K8s Kubelet 垃圾回收机制

前言 Kubelet 垃圾回收(Garbage Collection)是一个非常有用的功能,它负责自动清理节点上的无用镜像和容器。Kubelet 每隔 1 分钟进行一次容器清理,每隔 5 分钟进行一次镜像清理(截止到 v1.15 版本,垃圾回收间隔时间还都是在源码中固化的,不可自定义配置)。如果节点上已…...

docker安装高斯数据库openGauss数据库

1.创建容器 #创建数据没有挂在的容器 docker run --name opengauss --privilegedtrue -d -e GS_PASSWORDEnmo123 -p 8090:5432 enmotech/opengauss:latest 2. 进入容器&#xff0c;并切换omm用户&#xff0c;使用gsql连接高斯数据库 [rootansible ~]# docker ps -a CONTAIN…...

新手学习:ArcGIS 提取SHP 路网数据、节点

新手学习&#xff1a;ArcGIS 提取SHP 路网数据、节点 参考连接 OSM路网提取道路节点 ArcGIS&#xff1a;如何创建地理数据库、创建要素类数据集、导入要素类、表&#xff1f; 1. 导入开源路网SHP文件 2. 在交点处打断路网数据 未打断路网数据 有一些路径很长&#xff0c;…...

性能测试 —— Tomcat监控与调优:Jconsole监控

JConsole的图形用户界面是一个符合Java管理扩展(JMX)规范的监测工具&#xff0c;JConsole使用Java虚拟机(Java VM)&#xff0c;提供在Java平台上运行的应用程序的性能和资源消耗的信息。在Java平台&#xff0c;标准版(Java SE平台)6&#xff0c;JConsole的已经更新到目前的外观…...

刷题笔记26——图论二分图判定

世界上的事情,最忌讳的就是个十全十美,你看那天上的月亮,一旦圆满了,马上就要亏厌;树上的果子,一旦熟透了,马上就要坠落。凡事总要稍留欠缺,才能持恒。 ——莫言 visited数组是在如果有环的情况下&#xff0c;防止在图中一直绕圈设置的&#xff0c;类似于剪枝操作&#xff0c;走…...

网站整站优化-网站整站优化工具

您是否曾为您的网站在搜索引擎中的排名而感到焦虑&#xff1f;是否苦苦思考如何提高流量、吸引更多用户&#xff1f; 什么是整站优化。简而言之&#xff0c;它是一项用于提升网站在搜索引擎中排名的策略和技巧。通过对网站的内容、结构、速度等方面进行优化&#xff0c;可以使…...

冲刺十五届蓝桥杯P0001阶乘求和

文章目录 题目描述思路分析代码解析 题目描述 思路分析 阶乘是蓝桥杯中常考的知识。 首先我们需要知道 int 和long的最大值是多少。 我们可以知道19的阶乘就已经超过了long的最大值&#xff0c;所以让我们直接计算202320232023&#xff01;的阶乘是不现实的。 所以我们需要…...

c++ 学习 之 运算符重载

前言 运算符重载的概念&#xff1a; 对已有的运算符重新进行定义&#xff0c;赋予其另外一种功能&#xff0c;以适应不同的数据类型 加号运算符重载 作用&#xff1a;定义两个自定义的数据类型相加的运算 正常情况下&#xff0c;如果想要实现类中两个int 类型的相加&#xf…...

各种数据库表名长度限制整理

因为工作原因&#xff0c;需要整理下系统支持的数据库的表名长度限制&#xff0c;现发出来&#xff0c;以节省大家的整理时间&#xff0c;如有不对的敬请斧正&#xff01; 数据库类型长度ORACLE 30GreenPlum40KINGBASEES63PostgreSql63Gbase63瀚高63OSCAR64MYSQL 64HBASE64Mar…...

Go 里的超时控制

前言 日常开发中我们大概率会遇到超时控制的场景&#xff0c;比如一个批量耗时任务、网络请求等&#xff1b;一个良好的超时控制可以有效的避免一些问题&#xff08;比如 goroutine 泄露、资源不释放等&#xff09;。 Timer 在 go 中实现超时控制的方法非常简单&#xff0c;…...

一文彻底搞清楚Spark Schema

前言 Spark Schema定义了DataFrame的结构,可以通过对DataFrame对象调用printSchema()方法来获得该结构。Spark SQL提供了StructType和StructField类以编程方式指定架构。 默认情况下,Spark从数据中推断schema,但有时我们可能需要定义自己的schema(列名和数据类型),尤其…...

Nginx多出口IP解决代理端口数量限制,CentOS安装Nginx并开启https2.0

Nginx多出口IP解决代理端口数量限制,CentOS安装Nginx并开启https2.0。 配置文件如下: http {...upstream test {server www.test.com;}server {listen 80 default_server;server_name _;location / {proxy_pass http://test;proxy_bind $split_ip...

SpringBoot项目(百度AI整合)——如何在Springboot中使用语音文件识别 ffmpeg的安装和使用

前言 前言&#xff1a;在实际使用中&#xff0c;经常要参考官方的案例&#xff0c;但有时候因为工具的不一样&#xff0c;比如idea 和 eclipse&#xff0c;普通项目和spring项目等的差别&#xff1b;还有时候因为水平有限&#xff0c;难以在散布于官方的各个文档读懂&#xff…...

探索古彝文AI识别技术:助力中国传统文化的传承与发扬

目录 ⭐️ 写在前面 ⭐️ 一、什么是古彝文 1.1 古彝文介绍 1.2 古彝文与其他古文字示例 1.3 古彝文的重要性 ⭐️二、AI识别技术的挑战与前景 2.1 挑战 2.2 前景 ⭐️三、合合信息AI识别技术 3.1 智能文字识别技术&#x1f44d;&#x1f44d; 3.2 古文识别应用 ⭐…...

mysql面试题2:说一说MySQL的架构设计?一条 MySQL 语句执行的步骤?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:说一说MySQL的架构设计? MySQL的架构设计主要包括以下几个组件: 连接器(Connector):负责与客户端建立连接,并进行身份验证和授权。 查询缓存…...

UPnP协议和SSDP协议

1、两种协议 UPnP协议&#xff1a;Universal Plug and Play&#xff0c;广义的即插即用。UPnP协议的目的&#xff1a;当有新设备连接上网络&#xff0c;网络上的其他设备能够马上知道有新设备加入&#xff0c;然后这些设备能互相宣传和发现彼此&#xff0c;以便能使用和控制彼…...

从零到一:基于LLaMA-Factory的微调实战与核心参数精讲

1. 环境准备与LLaMA-Factory初探 第一次接触LLaMA-Factory时&#xff0c;我对着官方文档发呆了半小时——这个工具链实在太强大了&#xff0c;但新手很容易被各种依赖项劝退。这里分享我的踩坑经验&#xff1a;不要一上来就追求最新版本。去年12月我在RTX 3090上折腾v0.4.0时&a…...

新版药典解读:生物制品生产用动物细胞基质的质量控制修订重点

2025年版《中国药典》已正式实施2个多月&#xff0c;其对生物制品生产用动物细胞基质的质量控制要求进行了重要修订。本次修订对生物制品生产企业和检测机构的影响路径和深度虽有差异&#xff0c;但都指向一个核心转变&#xff1a;从“遵循规定”到“证明科学性”。接下来&…...

从零搭建企业级开源大模型平台:Ollama+Llama3+open-webui实战指南

1. 为什么选择OllamaLlama3open-webui组合&#xff1f; 最近两年大语言模型的发展速度简直让人瞠目结舌&#xff0c;从最初的GPT-3到现在的Llama3&#xff0c;模型能力突飞猛进的同时&#xff0c;部署门槛也在不断降低。作为一个在AI领域摸爬滚打多年的老手&#xff0c;我实测过…...

swoole方案 实时监控大盘推送中心

业务服务 --写--> Kafka ---> Swoole消费 --WebSocket推--> 浏览器ECharts实时刷新Kafka 当缓冲层&#xff0c;业务打点不管推送快不快&#xff0c;Swoole 从 Kafka 拉数据&#xff0c;有新数据就推给所有看板页面。---代码<?php// composer require longlang/php…...

73.基于matlab的weber能量法求解齿轮时变啮合刚度的能够跑出刚度图,通过求解轮齿部分...

73.基于matlab的weber能量法求解齿轮时变啮合刚度的能够跑出刚度图&#xff0c;通过求解轮齿部分变形、基体变形及局部接触变形这三部分的变形&#xff0c;进而求得综合弹性变形&#xff0c;最终求出时变啮合刚度 程序已调通&#xff0c;可直接运行齿轮传动系统的时变啮合刚度计…...

终极指南:如何在Windows电脑上直接安装Android应用

终极指南&#xff1a;如何在Windows电脑上直接安装Android应用 【免费下载链接】APK-Installer An Android Application Installer for Windows 项目地址: https://gitcode.com/GitHub_Trending/ap/APK-Installer APK Installer是一款专为Windows系统设计的Android应用安…...

【linux】Xorg与X Window System的交互机制解析

1. X Window System与Xorg的关系 当你打开Linux电脑看到图形界面时&#xff0c;背后默默工作的就是X Window System。这个诞生于1984年的图形系统至今仍是Linux桌面环境的基石&#xff0c;而Xorg则是它的现代实现版本。简单来说&#xff0c;X Window System定义了图形显示的标准…...

机器学习期末考突击指南:从线性回归到SVM的实战解题技巧

机器学习期末考突击指南&#xff1a;从线性回归到SVM的实战解题技巧 期末考试临近&#xff0c;面对机器学习课程中纷繁复杂的算法和公式&#xff0c;许多同学感到无从下手。本文将从实际考题出发&#xff0c;手把手带你攻克线性回归、朴素贝叶斯和SVM三大核心考点&#xff0c;不…...

4步突破AI算法学习瓶颈:用Excel可视化打开深度学习黑箱

4步突破AI算法学习瓶颈&#xff1a;用Excel可视化打开深度学习黑箱 【免费下载链接】ai-by-hand-excel 项目地址: https://gitcode.com/gh_mirrors/ai/ai-by-hand-excel 传统AI算法学习常陷入"公式理解难、数据流向抽象、参数调整盲目"的三重困境&#xff0c…...

突破数字阅读壁垒:bypass-paywalls-chrome-clean工具深度实战指南

突破数字阅读壁垒&#xff1a;bypass-paywalls-chrome-clean工具深度实战指南 【免费下载链接】bypass-paywalls-chrome-clean 项目地址: https://gitcode.com/GitHub_Trending/by/bypass-paywalls-chrome-clean 在信息获取成本日益增高的今天&#xff0c;优质内容常常…...