当前位置: 首页 > news >正文

Flink笔记整理(五)

Flink笔记整理(五)

文章目录

  • Flink笔记整理(五)
  • 七、处理函数(最底层最常用最灵活)
    • 7.1基本处理函数(ProcessFunction)
      • 处理函数的功能和使用
      • ProcessFunction解析
    • 7.2按键分区处理函数(KeyedProcessFunction)
      • 定时器(Timer)和定时服务(TimeService)
    • 7.3 窗口处理函数
      • 窗口处理函数的使用
      • ProcessWindowFunction解析
    • 7.4 应用案例——Top N
  • 总结


七、处理函数(最底层最常用最灵活)

之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。

在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

在这里插入图片描述

7.1基本处理函数(ProcessFunction)

处理函数的功能和使用

之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。

这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。

处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction;MyProcessFunction是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

ProcessFunction解析

在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。


public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...
}

ProcessFunction解析

7.2按键分区处理函数(KeyedProcessFunction)

在上节中提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

ProcessFunction解析

定时器(Timer)和定时服务(TimeService)

定时器(Timer)和定时服务(TimeService)及例子

7.3 窗口处理函数

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中,我们之前已经简单地使用过窗口处理函数了。

窗口处理函数的使用

进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。

stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())

ProcessWindowFunction解析

ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
ProcessWindowFunction解析

7.4 应用案例——Top N

案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
案例实现代码


总结

在这里插入图片描述

相关文章:

Flink笔记整理(五)

Flink笔记整理&#xff08;五&#xff09; 文章目录 Flink笔记整理&#xff08;五&#xff09;七、处理函数&#xff08;最底层最常用最灵活&#xff09;7.1基本处理函数&#xff08;ProcessFunction&#xff09;处理函数的功能和使用ProcessFunction解析 7.2按键分区处理函数&…...

数据分析概要【数据分析---偏企业】

各位大佬好 &#xff0c;这里是阿川的博客&#xff0c;祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 数据分析概要前 必看 Python 初阶 Python–语言基础…...

PDF编辑器大分享,这三款加速PDF编辑!

嘿&#xff0c;各位办公室的小伙伴们&#xff0c;今儿咱们来聊聊那些让咱们文员生活变得更加轻松愉快的神器——PDF编辑器&#xff01;作为每天跟文档打交道的“文字魔术师”&#xff0c;选对工具那可真是事半功倍啊。今天&#xff0c;我就从我的亲身体验出发&#xff0c;给大伙…...

Python --Pandas库基础方法(2)

文章目录 Pandas 变量类型的转换查看各列数据类型改变数据类型 重置索引删除行索引和切片seriesDataFrame取列按行列索引选择loc与iloc获取 isin()选择query()的使用排序用索引排序使用变量值排序 修改替换变量值对应数值的替换 数据分组基于拆分进行筛选 分组汇总引用自定义函…...

《Programming from the Ground Up》阅读笔记:p75-p87

《Programming from the Ground Up》学习第4天&#xff0c;p75-p87总结&#xff0c;总计13页。 一、技术总结 1.persistent data p75, Data which is stored in files is called persistent data, because it persists in files that remain on disk even when the program …...

Python面试整理-常用标准库

Python的标准库包含了大量的模块和包,支持各种编程任务,从文件处理、数据序列化,到网络编程等。这些模块预安装在Python中,无需额外安装就可以使用。以下是一些非常有用且常用的标准库模块: 1. os 用于与操作系统进行交互,包括文件和目录管理操作。 import os # 获取当前…...

halcon_C#联合halcon打开摄像头

1. 创建halcon项目 -> 2.测试连接 -> 3. 在halcon中打开摄像头成功 -> 4. 插入代码 -> 5. 导出为.cs文件 6. 创建VS项目 -> 7.将action部分代码嵌入winform -> 8. 编写代码 -> // 导入HalconDotNet命名空间&#xff0c;这是用于Halcon图像处理的…...

无标题栏窗口通过消息模拟拖动窗口时,无法拖动的一个原因

在使用DUI库或者web控件来做窗口和UI时&#xff0c;常常遇到一个问题&#xff1a;整个窗口如果设置了CAPTION区域&#xff0c;那么在CAPTION区域中&#xff0c;web页面的内容无法正常响应鼠标事件&#xff0c;如果不设置CAPTION区域&#xff0c;那么对于窗口的拖动又有影响。在…...

每天一个数据分析题(四百五十四)- 调研问卷

选择题是设计市场调查问卷时常用的题目类型&#xff0c;关于多选题和单选题的优缺点&#xff0c;以下说法不正确的是&#xff1f; A. 多选题相比单选题提供的信息量大。 B. 单选题提供的信息量相对较少&#xff0c;但比较便于后期编码和统计分析。 C. 单选题和多选题可以同时…...

红酒与家居:打造优雅生活空间

在繁忙的都市生活中&#xff0c;我们渴望拥有一处宁静而优雅的家居空间&#xff0c;那里不仅是我们休憩的港湾&#xff0c;更是我们品味生活、享受时光的地方。当定制红酒与家居设计相遇&#xff0c;它们便共同绘制出一幅充满韵味与格调的生活画卷。今天&#xff0c;就让我们一…...

未来生成式 AI 的发展方向,是 Chat 还是 Agent?

什么是生成式AI&#xff1f; 生成式人工智能&#xff08;Generative AI&#xff09;是一种人工智能技术&#xff0c;它能够基于已有的数据模式和结构生成新的数据实例&#xff0c;这些实例可以是文本、图像、音频、视频或任何其他类型的数据。这种技术通常依赖于复杂的算法&am…...

powershell@日期和时间命令和对象

文章目录 abstract获取当前日期和时间格式化日期和时间日期计算&#x1f47a;创建自定义日期和时间&#x1f47a;**[datetime] 类型**及其构造函数缺省值计算日期差异获取特定部分的日期和时间比较日期和时间 常用日期操作总结表时间间隔 TimeSpan &#x1f47a;创建TimeSpan对…...

【Golang 面试 - 基础题】每日 5 题(八)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…...

LeetCode 算法:在排序数组中查找元素的第一个和最后一个位置 c++

原题链接&#x1f517;&#xff1a;在排序数组中查找元素的第一个和最后一个位置 难度&#xff1a;中等⭐️⭐️ 题目 给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标…...

会话存储、本地存储,路由导航守卫、web会话跟踪、JWT生成token、axios请求拦截、响应拦截

1、会话存储、本地存储 前端浏览器中存储用户信息&#xff0c;会话存储、本地存储、cookie 会话存储&#xff08;sessionStorage&#xff09;&#xff1a;会话期间存储&#xff0c;关闭浏览器后&#xff0c;数据就会销毁 sessionStorage.setItem("account",resp.d…...

strcmp库函数原型

int strcmp(const char *str1, const char *str2) {unsigned const char *s1 (unsigned const char *) str1;unsigned const char *s2 (unsigned const char *) str2;while (*s1 && *s1 *s2) {s1;s2;}return *s1 - *s2; }while (*s1 && *s1 *s2) 一直循环&…...

在 Vue.js 项目中延迟加载子组件

在 Vue.js 中&#xff0c;当父组件渲染时&#xff0c;子组件的生命周期钩子函数会立即执行&#xff0c;即使这些子组件并未显示。这是因为 Vue.js 会在渲染父组件时实例化所有引用的子组件。为了避免不必要的函数执行&#xff0c;我们可以通过使用 v-if 指令和异步组件延迟加载…...

何时会用到设计模式、七大设计原则介绍

以下关于b站尚硅谷相关设计模式视频的总结 设计模式的重要性&#xff1a; 代码重用性&#xff08;相同的代码&#xff0c;不用编写很多次&#xff09;、 可读性&#xff08;编程规范&#xff0c;便于其他程序员阅读和理解&#xff09;、 可扩展性&#xff08;增加新功能时&am…...

编程语言发展历史:赋值与相等运算符的变迁历程

本文摘取自笔者书稿《编程语言发展历史》 赋值运算符是编程语言最基础的运算符&#xff0c;其发展历史也非常有趣。最早的赋值语句就是使用等号“”来表示&#xff0c;一些语言为了让赋值运算在数学形式上更加严谨&#xff08;形如“x x 1”的表达式在数学上不成立&#xff0…...

求职Leetcode题目(2)

1.柱状图中最大的矩形 据说这是2024年字节二面的题目&#xff0c;我感觉这道题跟接雨水有点类似&#xff0c;最重要的思路还是要找到什么时候能形成矩形的这么个情况&#xff0c;某个范围的矩形的高度&#xff0c;是由最短的柱形来决定的。 我们先整理一下&#xff0c;解决这道…...

Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件

今天呢&#xff0c;博主的学习进度也是步入了Java Mybatis 框架&#xff0c;目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学&#xff0c;希望能对大家有所帮助&#xff0c;也特别欢迎大家指点不足之处&#xff0c;小生很乐意接受正确的建议&…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

srs linux

下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935&#xff0c;SRS管理页面端口是8080&#xff0c;可…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装

以下是基于 vant-ui&#xff08;适配 Vue2 版本 &#xff09;实现截图中照片上传预览、删除功能&#xff0c;并封装成可复用组件的完整代码&#xff0c;包含样式和逻辑实现&#xff0c;可直接在 Vue2 项目中使用&#xff1a; 1. 封装的图片上传组件 ImageUploader.vue <te…...

C++.OpenGL (10/64)基础光照(Basic Lighting)

基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

AI病理诊断七剑下天山,医疗未来触手可及

一、病理诊断困局&#xff1a;刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断"&#xff0c;医生需通过显微镜观察组织切片&#xff0c;在细胞迷宫中捕捉癌变信号。某省病理质控报告显示&#xff0c;基层医院误诊率达12%-15%&#xff0c;专家会诊…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

Java编程之桥接模式

定义 桥接模式&#xff08;Bridge Pattern&#xff09;属于结构型设计模式&#xff0c;它的核心意图是将抽象部分与实现部分分离&#xff0c;使它们可以独立地变化。这种模式通过组合关系来替代继承关系&#xff0c;从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...

C++.OpenGL (20/64)混合(Blending)

混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...