Flink 处理函数(1)—— 基本处理函数
在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑

在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权
基本处理函数主要是定义数据流的转换操作,其所对应的函数类为ProcessFunction
处理函数的功能和使用
对于常用的转换算子来说:
- MapFunction只能获取到当前的数据;
- AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现);
- RichMapFunction提供了获取运行时上下文的方法 getRuntimeContext();
但是无论那种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的
与时间相关的操作只能用时间窗口去处理,但如果要求对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了
因此需要使用处理函数:
- 处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”
- 处理函数继承了
AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息 - 处理函数还可以直接将数据输出到侧输出流(side output)中
处理函数的简单使用:基于 DataStream 调用.process()方法就;方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑:
stream.process(new MyProcessFunction())
简单示例:
public class ProcessFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}
在 ProcessFunction 中重写了.processElement()方法(参数:输入,上下文对象,输出),自定义处理逻辑
ProcessFunction 解析
源码解析
源码如下:
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter and* also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a* {@link TimerService} for registering timers and querying the time. The context is only* valid during the invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,* querying the {@link TimeDomain} of the firing timer and getting a {@link TimerService}* for registering timers and querying the time. The context is only valid during the* invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program is* set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();}
}
可以看到抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:
I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型
其内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()
.processElement():用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义- value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致
- ctx:类型是
ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output() - out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用
out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用
.onTimer():用于定义定时触发的操作;这个方法只有在注册好的定时器触发的时候才会调用(在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作),而定时器是通过“定时服务”TimerService来注册的- 参数:时间戳(timestamp),上下文(ctx),收集器(out)【这里的时间戳是指设置好的触发时间,在事件时间语义下就是水位线】
利用onTimer,可以自定义数据按照时间分组、定时触发计算输出结果,这样就实现了窗口的功能
处理函数分类
- ProcessFunction:最基本的处理函数,基于
DataStream直接调用.process()时作为参数传入 - KeyedProcessFunction:对流按键分区后的处理函数,基于
KeyedStream调用.process()时作为参数传入(要想使用定时器必须基于 KeyedStream ) - ProcessWindowFunction:开窗之后的处理函数,也是全窗口函数的代表。基于
WindowedStream调用.process()时作为参数传入 - ProcessAllWindowFunction:开窗之后的处理函数,基于
AllWindowedStream调用.process()时作为参数传入 - CoProcessFunction:合并(connect)两条流之后的处理函数,基于
ConnectedStreams调用.process()时作为参数传入 - ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于
IntervalJoined调用.process()时作为参数传入 - BroadcastProcessFunction:广播连接流处理函数,基于
BroadcastConnectedStream调用.process()时作为参数传入(这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物) - KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,同样是基于
BroadcastConnectedStream调用.process()时作为参数传入(这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物)
学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili
相关文章:
Flink 处理函数(1)—— 基本处理函数
在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑 在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(st…...
Linux系统下编译MPlayer
一、编译MPlayer 在 http://www.mplayerhq.hu/design7/dload.html 下载MPlayer源码 执行命令: tar -xf MPlayer-1.5.tar.xz cd MPlayer-1.5 ./configure --prefix$(pwd)/install --yasm make make install 然后在install/bin目录下即会生成mplayer的可执行文件 二…...
事务的ACID属性是什么?为什么它们很重要?
引言 在现代的数据库和事务处理系统中,事务处理是一项非常重要的技术。在数据库中,事务是指一组被视为单个逻辑操作单元的SQL语句序列,它们要么全部成功执行,要么全部不执行。事务可以确保数据库在执行时保持一致性和可靠性。ACI…...
计算机毕业设计 基于Java的手机销售网站的设计与实现 Java实战项目 附源码+文档+视频讲解
博主介绍:✌从事软件开发10年之余,专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ 🍅文末获取源码联系🍅 👇🏻 精…...
Redis相关命令详解及其原理
Redis概念 Redis,英文全称是remote dictionary service,也就是远程字典服务。这是kv存储数据库。Redis,包括所有的数据库,都是请求-回应模式,通俗来说就是数据库不会主动地要给前台推送数据,只有前台发送了…...
go语言中的GoMock
GoMock是一个Go框架。它与内置的测试包整合得很好,并在单元测试时提供了灵活性。正如我们所知,对具有外部资源(数据库、网络和文件)或依赖关系的代码进行单元测试总是很麻烦。 安装 为了使用GoMock,我们需要安装gomo…...
DIFFWAVE: A VERSATILE DIFFUSION MODEL FOR AUDIO SYNTHESIS (Paper reading)
DIFFWAVE: A VERSATILE DIFFUSION MODEL FOR AUDIO SYNTHESIS Zhifeng Kong, Computer Science and Engineering, UCSD, ICLR2021, Code, Paper 1. 前言 在这项工作中,我们提出了DiffWave,这是一种用于条件和无条件波形生成的多功能扩散概率模型。该模…...
排序算法8----归并排序(非递归)(C)
1、介绍 归并排序既可以是内排序(在内存上的数据排序),也可以是外排序(磁盘上)(硬盘)(在文件中的数据排序)。 其他排序一般都是内排序。 区别于快速排序的非递归…...
Golang 里的 context
context 的作用 go 的编程中,常常会在一个 goroutine 中启动多个 goroutine,然后有可能在这些 goroutine 中又启动多个 goroutine。 如上图,在 main 函数中,启动了一个 goroutine A 和 goroutine B,然后 goroutine A …...
PHP短链接url还原成长链接
在开发过程中,碰到了需要校验用户回填的短链接是不是系统所需要的,于是就需要还原找出短链接所对应的长链接。 长链接转短链接 在百度上搜索程序员,跳转页面后的url就是一个长链接。当然你可以从任何地方复制一个长链接过来。 长链接 http…...
redis原理(三)redis命令
一、字符串命令: 1、字符串基本操作: 2、自增自减 :如果一个值可以被解释为十进制整数或者浮点数,redis允许用户对这个字符串进行INCR*、DECR*操作。 (1)INCR key:将键存储的值的值加1。 &a…...
教程:在Django中实现微信授权登录
教程:在Django中实现微信授权登录 本教程将引导您如何在Django项目中实现微信授权登录。在本教程中,我们将使用自定义的用户模型User,并通过微信提供的API来进行用户认证。 在进行以下教程之前,请确保你已经在微信开放平台添加了…...
YOLOv5改进 | 主干篇 | 12月份最新成果TransNeXt特征提取网络(全网首发)
一、本文介绍 本文给大家带来的改进机制是TransNeXt特征提取网络,其发表于2023年的12月份是一个最新最前沿的网络模型,将其应用在我们的特征提取网络来提取特征,同时本文给大家解决其自带的一个报错,通过结合聚合的像素聚焦注意力和卷积GLU&…...
【java八股文】之计算机网络系列篇
1、TCP/IP和UDP模型 TCP/IP分层(4层):应用层,传输层,网络层,数据链路层 网络的七层架构 (7层):应用层,表示层,会话层,传输层ÿ…...
SpringAMQP的使用
1. 简介: SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 SpringAmqp的官方地址:https://spring.io/projects/spring-amqp SpringAMQP提供了三个功能: 自动声…...
MATLAB - 使用运动学 DH 参数构建机械臂
系列文章目录 前言 一、 使用 Puma560 机械手机器人的 Denavit-Hartenberg (DH) 参数,逐步建立刚体树形机器人模型。在连接每个关节时,指定其相对 DH 参数。可视化机器人坐标系,并与最终模型进行交互。 DH 参数定义了每个刚体通过关节与其父…...
2024年腾讯云新用户优惠云服务器价格多少?
腾讯云服务器租用价格表:轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年,540元三年、2核4G5M带宽218元一年,2核4G5M带宽756元三年、轻量4核8G12M服务器446元一年、646元15个月,云服务器CVM S5实例2核2G配置280.8元一年…...
如何在原型中实现继承和多态
在JavaScript中,我们可以通过原型链来实现继承。以下是如何在原型中实现继承的例子: // 定义一个动物原型 var Animal function() {}; Animal.prototype.move function() { console.log(‘This animal can move.’); }; // 定义一个狗的原型…...
MySQL/Oracle 的 字符串拼接
目录 MySQL、Oracle 的 字符串拼接1、MySQL 的字符串拼接1.1 CONCAT(str1,str2,...) : 可以拼接多个字符串1.2 CONCAT_WS(separator,str1,str2,...) : 指定分隔符拼接多个字符串1.3 GROUP_CONCAT(expr) : 聚合函数,用于将多行的值连接成一个字符串。 2、Oracle 的字…...
【Java SE语法篇】10.String类
📚博客主页:爱敲代码的小杨. ✨专栏:《Java SE语法》 ❤️感谢大家点赞👍🏻收藏⭐评论✍🏻,您的三连就是我持续更新的动力❤️ 文章目录 前言1. String类1.1 字符串的构造1.2 String对象的比…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...
涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...
UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...
云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
