当前位置: 首页 > news >正文

Flink 处理函数(1)—— 基本处理函数

在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑

在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权

基本处理函数主要是定义数据流的转换操作,其所对应的函数类为ProcessFunction


处理函数的功能和使用

对于常用的转换算子来说:

  • MapFunction只能获取到当前的数据;
  • AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现);
  • RichMapFunction提供了获取运行时上下文的方法 getRuntimeContext();

但是无论那种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的

与时间相关的操作只能用时间窗口去处理,但如果要求对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了

因此需要使用处理函数

  • 处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数还可以直接将数据输出到侧输出流(side output)中

处理函数的简单使用:基于 DataStream 调用.process()方法就;方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑:

stream.process(new MyProcessFunction())

简单示例:

public class ProcessFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}

ProcessFunction 中重写了.processElement()方法(参数:输入,上下文对象,输出),自定义处理逻辑

ProcessFunction 解析

源码解析

源码如下:

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter and* also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a*     {@link TimerService} for registering timers and querying the time. The context is only*     valid during the invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,*     querying the {@link TimeDomain} of the firing timer and getting a {@link TimerService}*     for registering timers and querying the time. The context is only valid during the*     invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program is* set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();}
}

可以看到抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:

I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型

其内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()

  • .processElement():用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义
    • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致
    • ctx:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()
    • out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用
  • .onTimer():用于定义定时触发的操作;这个方法只有在注册好的定时器触发的时候才会调用(在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作),而定时器是通过“定时服务”TimerService 来注册的
    • 参数:时间戳(timestamp),上下文(ctx),收集器(out)【这里的时间戳是指设置好的触发时间,在事件时间语义下就是水位线

利用onTimer,可以自定义数据按照时间分组、定时触发计算输出结果,这样就实现了窗口的功能

处理函数分类

  1. ProcessFunction:最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入
  2. KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入(要想使用定时器必须基于 KeyedStream )
  3. ProcessWindowFunction:开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入
  4. ProcessAllWindowFunction:开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入
  5. CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入
  6. ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入
  7. BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入(这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物)
  8. KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream 调用.process()时作为参数传入(这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物)

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili

相关文章:

Flink 处理函数(1)—— 基本处理函数

在 Flink 的多层 API中&#xff0c;处理函数是最底层的API&#xff0c;是所有转换算子的一个概括性的表达&#xff0c;可以自定义处理逻辑 在处理函数中&#xff0c;我们直面的就是数据流中最基本的元素&#xff1a;数据事件&#xff08;event&#xff09;、状态&#xff08;st…...

Linux系统下编译MPlayer

一、编译MPlayer 在 http://www.mplayerhq.hu/design7/dload.html 下载MPlayer源码 执行命令&#xff1a; tar -xf MPlayer-1.5.tar.xz cd MPlayer-1.5 ./configure --prefix$(pwd)/install --yasm make make install 然后在install/bin目录下即会生成mplayer的可执行文件 二…...

事务的ACID属性是什么?为什么它们很重要?

引言 在现代的数据库和事务处理系统中&#xff0c;事务处理是一项非常重要的技术。在数据库中&#xff0c;事务是指一组被视为单个逻辑操作单元的SQL语句序列&#xff0c;它们要么全部成功执行&#xff0c;要么全部不执行。事务可以确保数据库在执行时保持一致性和可靠性。ACI…...

计算机毕业设计 基于Java的手机销售网站的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…...

Redis相关命令详解及其原理

Redis概念 Redis&#xff0c;英文全称是remote dictionary service&#xff0c;也就是远程字典服务。这是kv存储数据库。Redis&#xff0c;包括所有的数据库&#xff0c;都是请求-回应模式&#xff0c;通俗来说就是数据库不会主动地要给前台推送数据&#xff0c;只有前台发送了…...

go语言中的GoMock

GoMock是一个Go框架。它与内置的测试包整合得很好&#xff0c;并在单元测试时提供了灵活性。正如我们所知&#xff0c;对具有外部资源&#xff08;数据库、网络和文件&#xff09;或依赖关系的代码进行单元测试总是很麻烦。 安装 为了使用GoMock&#xff0c;我们需要安装gomo…...

DIFFWAVE: A VERSATILE DIFFUSION MODEL FOR AUDIO SYNTHESIS (Paper reading)

DIFFWAVE: A VERSATILE DIFFUSION MODEL FOR AUDIO SYNTHESIS Zhifeng Kong, Computer Science and Engineering, UCSD, ICLR2021, Code, Paper 1. 前言 在这项工作中&#xff0c;我们提出了DiffWave&#xff0c;这是一种用于条件和无条件波形生成的多功能扩散概率模型。该模…...

排序算法8----归并排序(非递归)(C)

1、介绍 归并排序既可以是内排序&#xff08;在内存上的数据排序&#xff09;&#xff0c;也可以是外排序&#xff08;磁盘上&#xff09;&#xff08;硬盘&#xff09;&#xff08;在文件中的数据排序&#xff09;。 其他排序一般都是内排序。 区别于快速排序的非递归&#xf…...

Golang 里的 context

context 的作用 go 的编程中&#xff0c;常常会在一个 goroutine 中启动多个 goroutine&#xff0c;然后有可能在这些 goroutine 中又启动多个 goroutine。 如上图&#xff0c;在 main 函数中&#xff0c;启动了一个 goroutine A 和 goroutine B&#xff0c;然后 goroutine A …...

PHP短链接url还原成长链接

在开发过程中&#xff0c;碰到了需要校验用户回填的短链接是不是系统所需要的&#xff0c;于是就需要还原找出短链接所对应的长链接。 长链接转短链接 在百度上搜索程序员&#xff0c;跳转页面后的url就是一个长链接。当然你可以从任何地方复制一个长链接过来。 长链接 http…...

redis原理(三)redis命令

一、字符串命令&#xff1a; 1、字符串基本操作&#xff1a; 2、自增自减 &#xff1a;如果一个值可以被解释为十进制整数或者浮点数&#xff0c;redis允许用户对这个字符串进行INCR*、DECR*操作。 &#xff08;1&#xff09;INCR key&#xff1a;将键存储的值的值加1。 &a…...

教程:在Django中实现微信授权登录

教程&#xff1a;在Django中实现微信授权登录 本教程将引导您如何在Django项目中实现微信授权登录。在本教程中&#xff0c;我们将使用自定义的用户模型User&#xff0c;并通过微信提供的API来进行用户认证。 在进行以下教程之前&#xff0c;请确保你已经在微信开放平台添加了…...

YOLOv5改进 | 主干篇 | 12月份最新成果TransNeXt特征提取网络(全网首发)

一、本文介绍 本文给大家带来的改进机制是TransNeXt特征提取网络,其发表于2023年的12月份是一个最新最前沿的网络模型&#xff0c;将其应用在我们的特征提取网络来提取特征&#xff0c;同时本文给大家解决其自带的一个报错&#xff0c;通过结合聚合的像素聚焦注意力和卷积GLU&…...

【java八股文】之计算机网络系列篇

1、TCP/IP和UDP模型 TCP/IP分层&#xff08;4层&#xff09;&#xff1a;应用层&#xff0c;传输层&#xff0c;网络层&#xff0c;数据链路层 网络的七层架构 &#xff08;7层&#xff09;&#xff1a;应用层&#xff0c;表示层&#xff0c;会话层&#xff0c;传输层&#xff…...

SpringAMQP的使用

1. 简介&#xff1a; SpringAMQP是基于RabbitMQ封装的一套模板&#xff0c;并且还利用SpringBoot对其实现了自动装配&#xff0c;使用起来非常方便。 SpringAmqp的官方地址&#xff1a;https://spring.io/projects/spring-amqp SpringAMQP提供了三个功能&#xff1a; 自动声…...

MATLAB - 使用运动学 DH 参数构建机械臂

系列文章目录 前言 一、 使用 Puma560 机械手机器人的 Denavit-Hartenberg (DH) 参数&#xff0c;逐步建立刚体树形机器人模型。在连接每个关节时&#xff0c;指定其相对 DH 参数。可视化机器人坐标系&#xff0c;并与最终模型进行交互。 DH 参数定义了每个刚体通过关节与其父…...

2024年腾讯云新用户优惠云服务器价格多少?

腾讯云服务器租用价格表&#xff1a;轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年&#xff0c;540元三年、2核4G5M带宽218元一年&#xff0c;2核4G5M带宽756元三年、轻量4核8G12M服务器446元一年、646元15个月&#xff0c;云服务器CVM S5实例2核2G配置280.8元一年…...

如何在原型中实现继承和多态

在JavaScript中&#xff0c;我们可以通过原型链来实现继承。以下是如何在原型中实现继承的例子&#xff1a; // 定义一个动物原型 var Animal function() {}; Animal.prototype.move function() { console.log(‘This animal can move.’); }; // 定义一个狗的原型&#xf…...

MySQL/Oracle 的 字符串拼接

目录 MySQL、Oracle 的 字符串拼接1、MySQL 的字符串拼接1.1 CONCAT(str1,str2,...) : 可以拼接多个字符串1.2 CONCAT_WS(separator,str1,str2,...) : 指定分隔符拼接多个字符串1.3 GROUP_CONCAT(expr) : 聚合函数&#xff0c;用于将多行的值连接成一个字符串。 2、Oracle 的字…...

【Java SE语法篇】10.String类

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ 文章目录 前言1. String类1.1 字符串的构造1.2 String对象的比…...

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

在四层代理中还原真实客户端ngx_stream_realip_module

一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡&#xff08;如 HAProxy、AWS NLB、阿里 SLB&#xff09;发起上游连接时&#xff0c;将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后&#xff0c;ngx_stream_realip_module 从中提取原始信息…...

从零实现STL哈希容器:unordered_map/unordered_set封装详解

本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说&#xff0c;直接开始吧&#xff01; 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中&#xff0c;UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)

本文把滑坡位移序列拆开、筛优质因子&#xff0c;再用 CNN-BiLSTM-Attention 来动态预测每个子序列&#xff0c;最后重构出总位移&#xff0c;预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵&#xff08;S…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南&#xff1a;从入门到实战 一、Grunt 是什么&#xff1f; Grunt是一个基于 Node.js 的前端自动化任务运行器&#xff0c;主要用于自动化执行项目开发中重复性高的任务&#xff0c;例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

云原生安全实战:API网关Kong的鉴权与限流详解

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关&#xff08;API Gateway&#xff09; API网关是微服务架构中的核心组件&#xff0c;负责统一管理所有API的流量入口。它像一座…...