当前位置: 首页 > news >正文

Flink笔记整理(五)

Flink笔记整理(五)

文章目录

  • Flink笔记整理(五)
  • 七、处理函数(最底层最常用最灵活)
    • 7.1基本处理函数(ProcessFunction)
      • 处理函数的功能和使用
      • ProcessFunction解析
    • 7.2按键分区处理函数(KeyedProcessFunction)
      • 定时器(Timer)和定时服务(TimeService)
    • 7.3 窗口处理函数
      • 窗口处理函数的使用
      • ProcessWindowFunction解析
    • 7.4 应用案例——Top N
  • 总结


七、处理函数(最底层最常用最灵活)

之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。

在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

在这里插入图片描述

7.1基本处理函数(ProcessFunction)

处理函数的功能和使用

之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。

这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。

处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction;MyProcessFunction是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

ProcessFunction解析

在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。


public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...
}

ProcessFunction解析

7.2按键分区处理函数(KeyedProcessFunction)

在上节中提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

ProcessFunction解析

定时器(Timer)和定时服务(TimeService)

定时器(Timer)和定时服务(TimeService)及例子

7.3 窗口处理函数

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中,我们之前已经简单地使用过窗口处理函数了。

窗口处理函数的使用

进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。

stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())

ProcessWindowFunction解析

ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
ProcessWindowFunction解析

7.4 应用案例——Top N

案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
案例实现代码


总结

在这里插入图片描述

相关文章:

Flink笔记整理(五)

Flink笔记整理&#xff08;五&#xff09; 文章目录 Flink笔记整理&#xff08;五&#xff09;七、处理函数&#xff08;最底层最常用最灵活&#xff09;7.1基本处理函数&#xff08;ProcessFunction&#xff09;处理函数的功能和使用ProcessFunction解析 7.2按键分区处理函数&…...

数据分析概要【数据分析---偏企业】

各位大佬好 &#xff0c;这里是阿川的博客&#xff0c;祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 数据分析概要前 必看 Python 初阶 Python–语言基础…...

PDF编辑器大分享,这三款加速PDF编辑!

嘿&#xff0c;各位办公室的小伙伴们&#xff0c;今儿咱们来聊聊那些让咱们文员生活变得更加轻松愉快的神器——PDF编辑器&#xff01;作为每天跟文档打交道的“文字魔术师”&#xff0c;选对工具那可真是事半功倍啊。今天&#xff0c;我就从我的亲身体验出发&#xff0c;给大伙…...

Python --Pandas库基础方法(2)

文章目录 Pandas 变量类型的转换查看各列数据类型改变数据类型 重置索引删除行索引和切片seriesDataFrame取列按行列索引选择loc与iloc获取 isin()选择query()的使用排序用索引排序使用变量值排序 修改替换变量值对应数值的替换 数据分组基于拆分进行筛选 分组汇总引用自定义函…...

《Programming from the Ground Up》阅读笔记:p75-p87

《Programming from the Ground Up》学习第4天&#xff0c;p75-p87总结&#xff0c;总计13页。 一、技术总结 1.persistent data p75, Data which is stored in files is called persistent data, because it persists in files that remain on disk even when the program …...

Python面试整理-常用标准库

Python的标准库包含了大量的模块和包,支持各种编程任务,从文件处理、数据序列化,到网络编程等。这些模块预安装在Python中,无需额外安装就可以使用。以下是一些非常有用且常用的标准库模块: 1. os 用于与操作系统进行交互,包括文件和目录管理操作。 import os # 获取当前…...

halcon_C#联合halcon打开摄像头

1. 创建halcon项目 -> 2.测试连接 -> 3. 在halcon中打开摄像头成功 -> 4. 插入代码 -> 5. 导出为.cs文件 6. 创建VS项目 -> 7.将action部分代码嵌入winform -> 8. 编写代码 -> // 导入HalconDotNet命名空间&#xff0c;这是用于Halcon图像处理的…...

无标题栏窗口通过消息模拟拖动窗口时,无法拖动的一个原因

在使用DUI库或者web控件来做窗口和UI时&#xff0c;常常遇到一个问题&#xff1a;整个窗口如果设置了CAPTION区域&#xff0c;那么在CAPTION区域中&#xff0c;web页面的内容无法正常响应鼠标事件&#xff0c;如果不设置CAPTION区域&#xff0c;那么对于窗口的拖动又有影响。在…...

每天一个数据分析题(四百五十四)- 调研问卷

选择题是设计市场调查问卷时常用的题目类型&#xff0c;关于多选题和单选题的优缺点&#xff0c;以下说法不正确的是&#xff1f; A. 多选题相比单选题提供的信息量大。 B. 单选题提供的信息量相对较少&#xff0c;但比较便于后期编码和统计分析。 C. 单选题和多选题可以同时…...

红酒与家居:打造优雅生活空间

在繁忙的都市生活中&#xff0c;我们渴望拥有一处宁静而优雅的家居空间&#xff0c;那里不仅是我们休憩的港湾&#xff0c;更是我们品味生活、享受时光的地方。当定制红酒与家居设计相遇&#xff0c;它们便共同绘制出一幅充满韵味与格调的生活画卷。今天&#xff0c;就让我们一…...

未来生成式 AI 的发展方向,是 Chat 还是 Agent?

什么是生成式AI&#xff1f; 生成式人工智能&#xff08;Generative AI&#xff09;是一种人工智能技术&#xff0c;它能够基于已有的数据模式和结构生成新的数据实例&#xff0c;这些实例可以是文本、图像、音频、视频或任何其他类型的数据。这种技术通常依赖于复杂的算法&am…...

powershell@日期和时间命令和对象

文章目录 abstract获取当前日期和时间格式化日期和时间日期计算&#x1f47a;创建自定义日期和时间&#x1f47a;**[datetime] 类型**及其构造函数缺省值计算日期差异获取特定部分的日期和时间比较日期和时间 常用日期操作总结表时间间隔 TimeSpan &#x1f47a;创建TimeSpan对…...

【Golang 面试 - 基础题】每日 5 题(八)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…...

LeetCode 算法:在排序数组中查找元素的第一个和最后一个位置 c++

原题链接&#x1f517;&#xff1a;在排序数组中查找元素的第一个和最后一个位置 难度&#xff1a;中等⭐️⭐️ 题目 给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标…...

会话存储、本地存储,路由导航守卫、web会话跟踪、JWT生成token、axios请求拦截、响应拦截

1、会话存储、本地存储 前端浏览器中存储用户信息&#xff0c;会话存储、本地存储、cookie 会话存储&#xff08;sessionStorage&#xff09;&#xff1a;会话期间存储&#xff0c;关闭浏览器后&#xff0c;数据就会销毁 sessionStorage.setItem("account",resp.d…...

strcmp库函数原型

int strcmp(const char *str1, const char *str2) {unsigned const char *s1 (unsigned const char *) str1;unsigned const char *s2 (unsigned const char *) str2;while (*s1 && *s1 *s2) {s1;s2;}return *s1 - *s2; }while (*s1 && *s1 *s2) 一直循环&…...

在 Vue.js 项目中延迟加载子组件

在 Vue.js 中&#xff0c;当父组件渲染时&#xff0c;子组件的生命周期钩子函数会立即执行&#xff0c;即使这些子组件并未显示。这是因为 Vue.js 会在渲染父组件时实例化所有引用的子组件。为了避免不必要的函数执行&#xff0c;我们可以通过使用 v-if 指令和异步组件延迟加载…...

何时会用到设计模式、七大设计原则介绍

以下关于b站尚硅谷相关设计模式视频的总结 设计模式的重要性&#xff1a; 代码重用性&#xff08;相同的代码&#xff0c;不用编写很多次&#xff09;、 可读性&#xff08;编程规范&#xff0c;便于其他程序员阅读和理解&#xff09;、 可扩展性&#xff08;增加新功能时&am…...

编程语言发展历史:赋值与相等运算符的变迁历程

本文摘取自笔者书稿《编程语言发展历史》 赋值运算符是编程语言最基础的运算符&#xff0c;其发展历史也非常有趣。最早的赋值语句就是使用等号“”来表示&#xff0c;一些语言为了让赋值运算在数学形式上更加严谨&#xff08;形如“x x 1”的表达式在数学上不成立&#xff0…...

求职Leetcode题目(2)

1.柱状图中最大的矩形 据说这是2024年字节二面的题目&#xff0c;我感觉这道题跟接雨水有点类似&#xff0c;最重要的思路还是要找到什么时候能形成矩形的这么个情况&#xff0c;某个范围的矩形的高度&#xff0c;是由最短的柱形来决定的。 我们先整理一下&#xff0c;解决这道…...

如何让抓取手机日志---ADB 从入门到实战:小米14日志抓包与连接详解

一、ADB 是什么&#xff1f; ADB 的全称是 Android Debug Bridge&#xff08;安卓调试桥&#xff09;。顾名思义&#xff0c;它就像一座桥梁&#xff0c;连接你的电脑和安卓手机。 Debug&#xff08;调试&#xff09;&#xff1a;它的核心用途是帮助开发者调试应用、分析问题。…...

快速上手:ClaudeCode安装全攻略

以下是从零开始安装 Claude Code 的详细操作步骤&#xff0c;涵盖环境准备、安装过程与验证方法。请根据你的操作系统选择对应的分支操作。 (PS: 官方文档&#xff1a; 接入 Claude Code | DeepSeek API Docs) 一、安装 Node.js 18 或更高版本 Claude Code 基于 Node.js 运行…...

PdrER算法:扩展解析在模型检查中的高效应用

1. PdrER算法核心原理与技术突破1.1 传统PDR算法的局限性分析Property Directed Reachability&#xff08;PDR&#xff0c;也称为IC3&#xff09;是当前最先进的模型检查算法之一&#xff0c;广泛应用于硬件和软件系统的安全属性验证。该算法通过构建归纳不变量&#xff08;ind…...

不跨界,现有的地盘就会被别人用跨界的方式蚕食掉

微软这么多员工养着&#xff0c;有时也不得不多个行业发展&#xff0c;就像是美团一样&#xff0c;不得不电商也做起来和京东抢生意。阿里也同时多个行业做着&#xff0c;影视&#xff0c;外卖&#xff0c;生鲜。否则纯电商做不下去就完了。就像是华为一样本来可以卖AI服务器&a…...

Unity脚本修改源资源的底层机制与高危避坑指南

1. 这不是“改个文件”那么简单&#xff1a;Unity里脚本动源资源的真实边界与风险认知很多人第一次在Unity里写AssetDatabase.SaveAssets()时&#xff0c;心里想的是&#xff1a;“不就是保存一下修改嘛&#xff0c;跟编辑器里点CtrlS一样简单。”我当年也是这么想的——直到上…...

别再手动拖拽了!用CodeWave自由布局5分钟搞定一个高还原度后台管理页

5分钟高保真还原设计稿&#xff1a;CodeWave自由布局实战指南 每次拿到设计师发来的Figma稿子&#xff0c;你是不是也经历过这样的痛苦&#xff1f;在传统开发工具里手动调整像素级间距&#xff0c;反复比对色值&#xff0c;调试响应式效果到深夜…上周我接手一个电商后台改版项…...

7步搞定MASA全家桶汉化包:让你的Minecraft模组说中文

7步搞定MASA全家桶汉化包&#xff1a;让你的Minecraft模组说中文 【免费下载链接】masa-mods-chinese 一个masa mods的汉化资源包 项目地址: https://gitcode.com/gh_mirrors/ma/masa-mods-chinese 还在为MASA模组的英文界面而烦恼吗&#xff1f;作为中文Minecraft玩家&…...

LoftQ量化技术终极指南:如何在4bit精度下高效微调大语言模型

LoftQ量化技术终极指南&#xff1a;如何在4bit精度下高效微调大语言模型 【免费下载链接】peft &#x1f917; PEFT: State-of-the-art Parameter-Efficient Fine-Tuning. 项目地址: https://gitcode.com/gh_mirrors/pe/peft 在大语言模型(LLM)微调的实践中&#xff0c;…...

城市交通气候适应:从生物滞留池到透水铺装的工程实践

1. 项目概述&#xff1a;当城市交通遇上极端天气干了十几年市政工程&#xff0c;我越来越觉得&#xff0c;现在的城市交通系统就像个“玻璃人”——看着钢筋铁骨&#xff0c;实则脆弱得很。一场暴雨&#xff0c;主干道就能变成“主干河”&#xff1b;连续高温&#xff0c;沥青路…...

DataStore vs SharedPreferences 迁移指南:告别 ANR,拥抱类型安全

DataStore vs SharedPreferences 迁移指南&#xff1a;告别 ANR&#xff0c;拥抱类型安全 一句话收益&#xff1a;掌握从 SharedPreferences 迁移到 Jetpack DataStore 的完整路径&#xff0c;彻底消除主线程 I/O 阻塞与类型安全隐患。 适用版本&#xff1a;Android API 21&…...