Flink 处理函数(1)—— 基本处理函数
在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑
在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权
基本处理函数主要是定义数据流的转换操作,其所对应的函数类为ProcessFunction
处理函数的功能和使用
对于常用的转换算子来说:
- MapFunction只能获取到当前的数据;
- AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现);
- RichMapFunction提供了获取运行时上下文的方法 getRuntimeContext();
但是无论那种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的
与时间相关的操作只能用时间窗口去处理,但如果要求对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了
因此需要使用处理函数
:
- 处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”
- 处理函数继承了
AbstractRichFunction
抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息 - 处理函数还可以直接将数据输出到侧输出流(side output)中
处理函数的简单使用:基于 DataStream 调用.process()
方法就;方法需要传入一个 ProcessFunction
作为参数,用来定义处理逻辑:
stream.process(new MyProcessFunction())
简单示例:
public class ProcessFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}
在 ProcessFunction
中重写了.processElement()
方法(参数:输入,上下文对象,输出),自定义处理逻辑
ProcessFunction 解析
源码解析
源码如下:
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter and* also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a* {@link TimerService} for registering timers and querying the time. The context is only* valid during the invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,* querying the {@link TimeDomain} of the firing timer and getting a {@link TimerService}* for registering timers and querying the time. The context is only valid during the* invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program is* set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();}
}
可以看到抽象类 ProcessFunction
继承了 AbstractRichFunction
,有两个泛型类型参数:
I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型
其内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement()
;另一个是非抽象方法.onTimer()
.processElement()
:用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out
。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义- value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致
- ctx:类型是
ProcessFunction
中定义的内部抽象类Context
,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService
),以及可以将数据发送到“侧输出流”(side output)的方法.output() - out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用
out.collect()
方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用
.onTimer()
:用于定义定时触发的操作;这个方法只有在注册好的定时器触发的时候才会调用(在 Flink 中,只有“按键分区流”KeyedStream
才支持设置定时器的操作),而定时器是通过“定时服务”TimerService
来注册的- 参数:时间戳(timestamp),上下文(ctx),收集器(out)【这里的时间戳是指设置好的触发时间,在事件时间语义下就是水位线】
利用onTimer,可以自定义数据按照时间分组、定时触发计算输出结果,这样就实现了窗口的功能
处理函数分类
- ProcessFunction:最基本的处理函数,基于
DataStream
直接调用.process()时作为参数传入 - KeyedProcessFunction:对流按键分区后的处理函数,基于
KeyedStream
调用.process()时作为参数传入(要想使用定时器必须基于 KeyedStream ) - ProcessWindowFunction:开窗之后的处理函数,也是全窗口函数的代表。基于
WindowedStream
调用.process()时作为参数传入 - ProcessAllWindowFunction:开窗之后的处理函数,基于
AllWindowedStream
调用.process()时作为参数传入 - CoProcessFunction:合并(connect)两条流之后的处理函数,基于
ConnectedStreams
调用.process()时作为参数传入 - ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于
IntervalJoined
调用.process()时作为参数传入 - BroadcastProcessFunction:广播连接流处理函数,基于
BroadcastConnectedStream
调用.process()时作为参数传入(这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物) - KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,同样是基于
BroadcastConnectedStream
调用.process()时作为参数传入(这时的广播连接流,是一个KeyedStream
与广播流(BroadcastStream)做连接之后的产物)
学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili
相关文章:

Flink 处理函数(1)—— 基本处理函数
在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑 在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(st…...

Linux系统下编译MPlayer
一、编译MPlayer 在 http://www.mplayerhq.hu/design7/dload.html 下载MPlayer源码 执行命令: tar -xf MPlayer-1.5.tar.xz cd MPlayer-1.5 ./configure --prefix$(pwd)/install --yasm make make install 然后在install/bin目录下即会生成mplayer的可执行文件 二…...
事务的ACID属性是什么?为什么它们很重要?
引言 在现代的数据库和事务处理系统中,事务处理是一项非常重要的技术。在数据库中,事务是指一组被视为单个逻辑操作单元的SQL语句序列,它们要么全部成功执行,要么全部不执行。事务可以确保数据库在执行时保持一致性和可靠性。ACI…...

计算机毕业设计 基于Java的手机销售网站的设计与实现 Java实战项目 附源码+文档+视频讲解
博主介绍:✌从事软件开发10年之余,专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ 🍅文末获取源码联系🍅 👇🏻 精…...

Redis相关命令详解及其原理
Redis概念 Redis,英文全称是remote dictionary service,也就是远程字典服务。这是kv存储数据库。Redis,包括所有的数据库,都是请求-回应模式,通俗来说就是数据库不会主动地要给前台推送数据,只有前台发送了…...
go语言中的GoMock
GoMock是一个Go框架。它与内置的测试包整合得很好,并在单元测试时提供了灵活性。正如我们所知,对具有外部资源(数据库、网络和文件)或依赖关系的代码进行单元测试总是很麻烦。 安装 为了使用GoMock,我们需要安装gomo…...

DIFFWAVE: A VERSATILE DIFFUSION MODEL FOR AUDIO SYNTHESIS (Paper reading)
DIFFWAVE: A VERSATILE DIFFUSION MODEL FOR AUDIO SYNTHESIS Zhifeng Kong, Computer Science and Engineering, UCSD, ICLR2021, Code, Paper 1. 前言 在这项工作中,我们提出了DiffWave,这是一种用于条件和无条件波形生成的多功能扩散概率模型。该模…...

排序算法8----归并排序(非递归)(C)
1、介绍 归并排序既可以是内排序(在内存上的数据排序),也可以是外排序(磁盘上)(硬盘)(在文件中的数据排序)。 其他排序一般都是内排序。 区别于快速排序的非递归…...

Golang 里的 context
context 的作用 go 的编程中,常常会在一个 goroutine 中启动多个 goroutine,然后有可能在这些 goroutine 中又启动多个 goroutine。 如上图,在 main 函数中,启动了一个 goroutine A 和 goroutine B,然后 goroutine A …...

PHP短链接url还原成长链接
在开发过程中,碰到了需要校验用户回填的短链接是不是系统所需要的,于是就需要还原找出短链接所对应的长链接。 长链接转短链接 在百度上搜索程序员,跳转页面后的url就是一个长链接。当然你可以从任何地方复制一个长链接过来。 长链接 http…...
redis原理(三)redis命令
一、字符串命令: 1、字符串基本操作: 2、自增自减 :如果一个值可以被解释为十进制整数或者浮点数,redis允许用户对这个字符串进行INCR*、DECR*操作。 (1)INCR key:将键存储的值的值加1。 &a…...
教程:在Django中实现微信授权登录
教程:在Django中实现微信授权登录 本教程将引导您如何在Django项目中实现微信授权登录。在本教程中,我们将使用自定义的用户模型User,并通过微信提供的API来进行用户认证。 在进行以下教程之前,请确保你已经在微信开放平台添加了…...

YOLOv5改进 | 主干篇 | 12月份最新成果TransNeXt特征提取网络(全网首发)
一、本文介绍 本文给大家带来的改进机制是TransNeXt特征提取网络,其发表于2023年的12月份是一个最新最前沿的网络模型,将其应用在我们的特征提取网络来提取特征,同时本文给大家解决其自带的一个报错,通过结合聚合的像素聚焦注意力和卷积GLU&…...

【java八股文】之计算机网络系列篇
1、TCP/IP和UDP模型 TCP/IP分层(4层):应用层,传输层,网络层,数据链路层 网络的七层架构 (7层):应用层,表示层,会话层,传输层ÿ…...

SpringAMQP的使用
1. 简介: SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 SpringAmqp的官方地址:https://spring.io/projects/spring-amqp SpringAMQP提供了三个功能: 自动声…...

MATLAB - 使用运动学 DH 参数构建机械臂
系列文章目录 前言 一、 使用 Puma560 机械手机器人的 Denavit-Hartenberg (DH) 参数,逐步建立刚体树形机器人模型。在连接每个关节时,指定其相对 DH 参数。可视化机器人坐标系,并与最终模型进行交互。 DH 参数定义了每个刚体通过关节与其父…...

2024年腾讯云新用户优惠云服务器价格多少?
腾讯云服务器租用价格表:轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年,540元三年、2核4G5M带宽218元一年,2核4G5M带宽756元三年、轻量4核8G12M服务器446元一年、646元15个月,云服务器CVM S5实例2核2G配置280.8元一年…...
如何在原型中实现继承和多态
在JavaScript中,我们可以通过原型链来实现继承。以下是如何在原型中实现继承的例子: // 定义一个动物原型 var Animal function() {}; Animal.prototype.move function() { console.log(‘This animal can move.’); }; // 定义一个狗的原型…...

MySQL/Oracle 的 字符串拼接
目录 MySQL、Oracle 的 字符串拼接1、MySQL 的字符串拼接1.1 CONCAT(str1,str2,...) : 可以拼接多个字符串1.2 CONCAT_WS(separator,str1,str2,...) : 指定分隔符拼接多个字符串1.3 GROUP_CONCAT(expr) : 聚合函数,用于将多行的值连接成一个字符串。 2、Oracle 的字…...

【Java SE语法篇】10.String类
📚博客主页:爱敲代码的小杨. ✨专栏:《Java SE语法》 ❤️感谢大家点赞👍🏻收藏⭐评论✍🏻,您的三连就是我持续更新的动力❤️ 文章目录 前言1. String类1.1 字符串的构造1.2 String对象的比…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql
智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
C++中string流知识详解和示例
一、概览与类体系 C 提供三种基于内存字符串的流,定义在 <sstream> 中: std::istringstream:输入流,从已有字符串中读取并解析。std::ostringstream:输出流,向内部缓冲区写入内容,最终取…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

中医有效性探讨
文章目录 西医是如何发展到以生物化学为药理基础的现代医学?传统医学奠基期(远古 - 17 世纪)近代医学转型期(17 世纪 - 19 世纪末)现代医学成熟期(20世纪至今) 中医的源远流长和一脉相承远古至…...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...

Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...