当前位置: 首页 > news >正文

Flink笔记整理(五)

Flink笔记整理(五)

文章目录

  • Flink笔记整理(五)
  • 七、处理函数(最底层最常用最灵活)
    • 7.1基本处理函数(ProcessFunction)
      • 处理函数的功能和使用
      • ProcessFunction解析
    • 7.2按键分区处理函数(KeyedProcessFunction)
      • 定时器(Timer)和定时服务(TimeService)
    • 7.3 窗口处理函数
      • 窗口处理函数的使用
      • ProcessWindowFunction解析
    • 7.4 应用案例——Top N
  • 总结


七、处理函数(最底层最常用最灵活)

之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。

在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

在这里插入图片描述

7.1基本处理函数(ProcessFunction)

处理函数的功能和使用

之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。

这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。

处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction;MyProcessFunction是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

ProcessFunction解析

在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。


public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...
}

ProcessFunction解析

7.2按键分区处理函数(KeyedProcessFunction)

在上节中提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

ProcessFunction解析

定时器(Timer)和定时服务(TimeService)

定时器(Timer)和定时服务(TimeService)及例子

7.3 窗口处理函数

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中,我们之前已经简单地使用过窗口处理函数了。

窗口处理函数的使用

进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。

stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())

ProcessWindowFunction解析

ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
ProcessWindowFunction解析

7.4 应用案例——Top N

案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
案例实现代码


总结

在这里插入图片描述

相关文章:

Flink笔记整理(五)

Flink笔记整理&#xff08;五&#xff09; 文章目录 Flink笔记整理&#xff08;五&#xff09;七、处理函数&#xff08;最底层最常用最灵活&#xff09;7.1基本处理函数&#xff08;ProcessFunction&#xff09;处理函数的功能和使用ProcessFunction解析 7.2按键分区处理函数&…...

数据分析概要【数据分析---偏企业】

各位大佬好 &#xff0c;这里是阿川的博客&#xff0c;祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 数据分析概要前 必看 Python 初阶 Python–语言基础…...

PDF编辑器大分享,这三款加速PDF编辑!

嘿&#xff0c;各位办公室的小伙伴们&#xff0c;今儿咱们来聊聊那些让咱们文员生活变得更加轻松愉快的神器——PDF编辑器&#xff01;作为每天跟文档打交道的“文字魔术师”&#xff0c;选对工具那可真是事半功倍啊。今天&#xff0c;我就从我的亲身体验出发&#xff0c;给大伙…...

Python --Pandas库基础方法(2)

文章目录 Pandas 变量类型的转换查看各列数据类型改变数据类型 重置索引删除行索引和切片seriesDataFrame取列按行列索引选择loc与iloc获取 isin()选择query()的使用排序用索引排序使用变量值排序 修改替换变量值对应数值的替换 数据分组基于拆分进行筛选 分组汇总引用自定义函…...

《Programming from the Ground Up》阅读笔记:p75-p87

《Programming from the Ground Up》学习第4天&#xff0c;p75-p87总结&#xff0c;总计13页。 一、技术总结 1.persistent data p75, Data which is stored in files is called persistent data, because it persists in files that remain on disk even when the program …...

Python面试整理-常用标准库

Python的标准库包含了大量的模块和包,支持各种编程任务,从文件处理、数据序列化,到网络编程等。这些模块预安装在Python中,无需额外安装就可以使用。以下是一些非常有用且常用的标准库模块: 1. os 用于与操作系统进行交互,包括文件和目录管理操作。 import os # 获取当前…...

halcon_C#联合halcon打开摄像头

1. 创建halcon项目 -> 2.测试连接 -> 3. 在halcon中打开摄像头成功 -> 4. 插入代码 -> 5. 导出为.cs文件 6. 创建VS项目 -> 7.将action部分代码嵌入winform -> 8. 编写代码 -> // 导入HalconDotNet命名空间&#xff0c;这是用于Halcon图像处理的…...

无标题栏窗口通过消息模拟拖动窗口时,无法拖动的一个原因

在使用DUI库或者web控件来做窗口和UI时&#xff0c;常常遇到一个问题&#xff1a;整个窗口如果设置了CAPTION区域&#xff0c;那么在CAPTION区域中&#xff0c;web页面的内容无法正常响应鼠标事件&#xff0c;如果不设置CAPTION区域&#xff0c;那么对于窗口的拖动又有影响。在…...

每天一个数据分析题(四百五十四)- 调研问卷

选择题是设计市场调查问卷时常用的题目类型&#xff0c;关于多选题和单选题的优缺点&#xff0c;以下说法不正确的是&#xff1f; A. 多选题相比单选题提供的信息量大。 B. 单选题提供的信息量相对较少&#xff0c;但比较便于后期编码和统计分析。 C. 单选题和多选题可以同时…...

红酒与家居:打造优雅生活空间

在繁忙的都市生活中&#xff0c;我们渴望拥有一处宁静而优雅的家居空间&#xff0c;那里不仅是我们休憩的港湾&#xff0c;更是我们品味生活、享受时光的地方。当定制红酒与家居设计相遇&#xff0c;它们便共同绘制出一幅充满韵味与格调的生活画卷。今天&#xff0c;就让我们一…...

未来生成式 AI 的发展方向,是 Chat 还是 Agent?

什么是生成式AI&#xff1f; 生成式人工智能&#xff08;Generative AI&#xff09;是一种人工智能技术&#xff0c;它能够基于已有的数据模式和结构生成新的数据实例&#xff0c;这些实例可以是文本、图像、音频、视频或任何其他类型的数据。这种技术通常依赖于复杂的算法&am…...

powershell@日期和时间命令和对象

文章目录 abstract获取当前日期和时间格式化日期和时间日期计算&#x1f47a;创建自定义日期和时间&#x1f47a;**[datetime] 类型**及其构造函数缺省值计算日期差异获取特定部分的日期和时间比较日期和时间 常用日期操作总结表时间间隔 TimeSpan &#x1f47a;创建TimeSpan对…...

【Golang 面试 - 基础题】每日 5 题(八)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…...

LeetCode 算法:在排序数组中查找元素的第一个和最后一个位置 c++

原题链接&#x1f517;&#xff1a;在排序数组中查找元素的第一个和最后一个位置 难度&#xff1a;中等⭐️⭐️ 题目 给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标…...

会话存储、本地存储,路由导航守卫、web会话跟踪、JWT生成token、axios请求拦截、响应拦截

1、会话存储、本地存储 前端浏览器中存储用户信息&#xff0c;会话存储、本地存储、cookie 会话存储&#xff08;sessionStorage&#xff09;&#xff1a;会话期间存储&#xff0c;关闭浏览器后&#xff0c;数据就会销毁 sessionStorage.setItem("account",resp.d…...

strcmp库函数原型

int strcmp(const char *str1, const char *str2) {unsigned const char *s1 (unsigned const char *) str1;unsigned const char *s2 (unsigned const char *) str2;while (*s1 && *s1 *s2) {s1;s2;}return *s1 - *s2; }while (*s1 && *s1 *s2) 一直循环&…...

在 Vue.js 项目中延迟加载子组件

在 Vue.js 中&#xff0c;当父组件渲染时&#xff0c;子组件的生命周期钩子函数会立即执行&#xff0c;即使这些子组件并未显示。这是因为 Vue.js 会在渲染父组件时实例化所有引用的子组件。为了避免不必要的函数执行&#xff0c;我们可以通过使用 v-if 指令和异步组件延迟加载…...

何时会用到设计模式、七大设计原则介绍

以下关于b站尚硅谷相关设计模式视频的总结 设计模式的重要性&#xff1a; 代码重用性&#xff08;相同的代码&#xff0c;不用编写很多次&#xff09;、 可读性&#xff08;编程规范&#xff0c;便于其他程序员阅读和理解&#xff09;、 可扩展性&#xff08;增加新功能时&am…...

编程语言发展历史:赋值与相等运算符的变迁历程

本文摘取自笔者书稿《编程语言发展历史》 赋值运算符是编程语言最基础的运算符&#xff0c;其发展历史也非常有趣。最早的赋值语句就是使用等号“”来表示&#xff0c;一些语言为了让赋值运算在数学形式上更加严谨&#xff08;形如“x x 1”的表达式在数学上不成立&#xff0…...

求职Leetcode题目(2)

1.柱状图中最大的矩形 据说这是2024年字节二面的题目&#xff0c;我感觉这道题跟接雨水有点类似&#xff0c;最重要的思路还是要找到什么时候能形成矩形的这么个情况&#xff0c;某个范围的矩形的高度&#xff0c;是由最短的柱形来决定的。 我们先整理一下&#xff0c;解决这道…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装&#xff08;Encapsulation&#xff09; 定义&#xff1a;将数据&#xff08;属性&#xff09;和操作数据的方法绑定在一起&#xff0c;通过访问控制符&#xff08;private、protected、public&#xff09;隐藏内部实现细节。示例&#xff1a; public …...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

06 Deep learning神经网络编程基础 激活函数 --吴恩达

深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

AspectJ 在 Android 中的完整使用指南

一、环境配置&#xff08;Gradle 7.0 适配&#xff09; 1. 项目级 build.gradle // 注意&#xff1a;沪江插件已停更&#xff0c;推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...

关于uniapp展示PDF的解决方案

在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项&#xff1a; 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库&#xff1a; npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...

django blank 与 null的区别

1.blank blank控制表单验证时是否允许字段为空 2.null null控制数据库层面是否为空 但是&#xff0c;要注意以下几点&#xff1a; Django的表单验证与null无关&#xff1a;null参数控制的是数据库层面字段是否可以为NULL&#xff0c;而blank参数控制的是Django表单验证时字…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域&#xff0c;无损检测&#xff08;NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统&#xff0c;以非接触式光学麦克风技术为核心&#xff0c;打破传统检测瓶颈&#xff0c;为半导体、航空航天、汽车制造等行业提供了高灵敏…...