Flink笔记整理(五)
Flink笔记整理(五)
文章目录
- Flink笔记整理(五)
- 七、处理函数(最底层最常用最灵活)
- 7.1基本处理函数(ProcessFunction)
- 处理函数的功能和使用
- ProcessFunction解析
- 7.2按键分区处理函数(KeyedProcessFunction)
- 定时器(Timer)和定时服务(TimeService)
- 7.3 窗口处理函数
- 窗口处理函数的使用
- ProcessWindowFunction解析
- 7.4 应用案例——Top N
- 总结
七、处理函数(最底层最常用最灵活)
之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。
在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

7.1基本处理函数(ProcessFunction)
处理函数的功能和使用
之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。
这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。
处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。
stream.process(new MyProcessFunction())
这里ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction;MyProcessFunction是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。
ProcessFunction解析
在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...
}
ProcessFunction解析
7.2按键分区处理函数(KeyedProcessFunction)
在上节中提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。
ProcessFunction解析
定时器(Timer)和定时服务(TimeService)
定时器(Timer)和定时服务(TimeService)及例子
7.3 窗口处理函数
除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中,我们之前已经简单地使用过窗口处理函数了。
窗口处理函数的使用
进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。
窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。
stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())
ProcessWindowFunction解析
ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
ProcessWindowFunction解析
7.4 应用案例——Top N
案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
案例实现代码
总结

相关文章:
Flink笔记整理(五)
Flink笔记整理(五) 文章目录 Flink笔记整理(五)七、处理函数(最底层最常用最灵活)7.1基本处理函数(ProcessFunction)处理函数的功能和使用ProcessFunction解析 7.2按键分区处理函数&…...
数据分析概要【数据分析---偏企业】
各位大佬好 ,这里是阿川的博客,祝您变得更强 个人主页:在线OJ的阿川 大佬的支持和鼓励,将是我成长路上最大的动力 阿川水平有限,如有错误,欢迎大佬指正 数据分析概要前 必看 Python 初阶 Python–语言基础…...
PDF编辑器大分享,这三款加速PDF编辑!
嘿,各位办公室的小伙伴们,今儿咱们来聊聊那些让咱们文员生活变得更加轻松愉快的神器——PDF编辑器!作为每天跟文档打交道的“文字魔术师”,选对工具那可真是事半功倍啊。今天,我就从我的亲身体验出发,给大伙…...
Python --Pandas库基础方法(2)
文章目录 Pandas 变量类型的转换查看各列数据类型改变数据类型 重置索引删除行索引和切片seriesDataFrame取列按行列索引选择loc与iloc获取 isin()选择query()的使用排序用索引排序使用变量值排序 修改替换变量值对应数值的替换 数据分组基于拆分进行筛选 分组汇总引用自定义函…...
《Programming from the Ground Up》阅读笔记:p75-p87
《Programming from the Ground Up》学习第4天,p75-p87总结,总计13页。 一、技术总结 1.persistent data p75, Data which is stored in files is called persistent data, because it persists in files that remain on disk even when the program …...
Python面试整理-常用标准库
Python的标准库包含了大量的模块和包,支持各种编程任务,从文件处理、数据序列化,到网络编程等。这些模块预安装在Python中,无需额外安装就可以使用。以下是一些非常有用且常用的标准库模块: 1. os 用于与操作系统进行交互,包括文件和目录管理操作。 import os # 获取当前…...
halcon_C#联合halcon打开摄像头
1. 创建halcon项目 -> 2.测试连接 -> 3. 在halcon中打开摄像头成功 -> 4. 插入代码 -> 5. 导出为.cs文件 6. 创建VS项目 -> 7.将action部分代码嵌入winform -> 8. 编写代码 -> // 导入HalconDotNet命名空间,这是用于Halcon图像处理的…...
无标题栏窗口通过消息模拟拖动窗口时,无法拖动的一个原因
在使用DUI库或者web控件来做窗口和UI时,常常遇到一个问题:整个窗口如果设置了CAPTION区域,那么在CAPTION区域中,web页面的内容无法正常响应鼠标事件,如果不设置CAPTION区域,那么对于窗口的拖动又有影响。在…...
每天一个数据分析题(四百五十四)- 调研问卷
选择题是设计市场调查问卷时常用的题目类型,关于多选题和单选题的优缺点,以下说法不正确的是? A. 多选题相比单选题提供的信息量大。 B. 单选题提供的信息量相对较少,但比较便于后期编码和统计分析。 C. 单选题和多选题可以同时…...
红酒与家居:打造优雅生活空间
在繁忙的都市生活中,我们渴望拥有一处宁静而优雅的家居空间,那里不仅是我们休憩的港湾,更是我们品味生活、享受时光的地方。当定制红酒与家居设计相遇,它们便共同绘制出一幅充满韵味与格调的生活画卷。今天,就让我们一…...
未来生成式 AI 的发展方向,是 Chat 还是 Agent?
什么是生成式AI? 生成式人工智能(Generative AI)是一种人工智能技术,它能够基于已有的数据模式和结构生成新的数据实例,这些实例可以是文本、图像、音频、视频或任何其他类型的数据。这种技术通常依赖于复杂的算法&am…...
powershell@日期和时间命令和对象
文章目录 abstract获取当前日期和时间格式化日期和时间日期计算👺创建自定义日期和时间👺**[datetime] 类型**及其构造函数缺省值计算日期差异获取特定部分的日期和时间比较日期和时间 常用日期操作总结表时间间隔 TimeSpan 👺创建TimeSpan对…...
【Golang 面试 - 基础题】每日 5 题(八)
✍个人博客:Pandaconda-CSDN博客 📣专栏地址:http://t.csdnimg.cn/UWz06 📚专栏简介:在这个专栏中,我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话,欢迎点赞👍收藏…...
LeetCode 算法:在排序数组中查找元素的第一个和最后一个位置 c++
原题链接🔗:在排序数组中查找元素的第一个和最后一个位置 难度:中等⭐️⭐️ 题目 给你一个按照非递减顺序排列的整数数组 nums,和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标…...
会话存储、本地存储,路由导航守卫、web会话跟踪、JWT生成token、axios请求拦截、响应拦截
1、会话存储、本地存储 前端浏览器中存储用户信息,会话存储、本地存储、cookie 会话存储(sessionStorage):会话期间存储,关闭浏览器后,数据就会销毁 sessionStorage.setItem("account",resp.d…...
strcmp库函数原型
int strcmp(const char *str1, const char *str2) {unsigned const char *s1 (unsigned const char *) str1;unsigned const char *s2 (unsigned const char *) str2;while (*s1 && *s1 *s2) {s1;s2;}return *s1 - *s2; }while (*s1 && *s1 *s2) 一直循环&…...
在 Vue.js 项目中延迟加载子组件
在 Vue.js 中,当父组件渲染时,子组件的生命周期钩子函数会立即执行,即使这些子组件并未显示。这是因为 Vue.js 会在渲染父组件时实例化所有引用的子组件。为了避免不必要的函数执行,我们可以通过使用 v-if 指令和异步组件延迟加载…...
何时会用到设计模式、七大设计原则介绍
以下关于b站尚硅谷相关设计模式视频的总结 设计模式的重要性: 代码重用性(相同的代码,不用编写很多次)、 可读性(编程规范,便于其他程序员阅读和理解)、 可扩展性(增加新功能时&am…...
编程语言发展历史:赋值与相等运算符的变迁历程
本文摘取自笔者书稿《编程语言发展历史》 赋值运算符是编程语言最基础的运算符,其发展历史也非常有趣。最早的赋值语句就是使用等号“”来表示,一些语言为了让赋值运算在数学形式上更加严谨(形如“x x 1”的表达式在数学上不成立࿰…...
求职Leetcode题目(2)
1.柱状图中最大的矩形 据说这是2024年字节二面的题目,我感觉这道题跟接雨水有点类似,最重要的思路还是要找到什么时候能形成矩形的这么个情况,某个范围的矩形的高度,是由最短的柱形来决定的。 我们先整理一下,解决这道…...
【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...
Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)
在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马(服务器方面的)的原理,连接,以及各种木马及连接工具的分享 文件木马:https://w…...
免费数学几何作图web平台
光锐软件免费数学工具,maths,数学制图,数学作图,几何作图,几何,AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...
云原生周刊:k0s 成为 CNCF 沙箱项目
开源项目推荐 HAMi HAMi(原名 k8s‑vGPU‑scheduler)是一款 CNCF Sandbox 级别的开源 K8s 中间件,通过虚拟化 GPU/NPU 等异构设备并支持内存、计算核心时间片隔离及共享调度,为容器提供统一接口,实现细粒度资源配额…...
沙箱虚拟化技术虚拟机容器之间的关系详解
问题 沙箱、虚拟化、容器三者分开一一介绍的话我知道他们各自都是什么东西,但是如果把三者放在一起,它们之间到底什么关系?又有什么联系呢?我不是很明白!!! 就比如说: 沙箱&#…...
智能职业发展系统:AI驱动的职业规划平台技术解析
智能职业发展系统:AI驱动的职业规划平台技术解析 引言:数字时代的职业革命 在当今瞬息万变的就业市场中,传统的职业规划方法已无法满足个人和企业的需求。据统计,全球每年有超过2亿人面临职业转型困境,而企业也因此遭…...
