当前位置: 首页 > news >正文

Flink 处理函数(1)—— 基本处理函数

在 Flink 的多层 API中,处理函数是最底层的API,是所有转换算子的一个概括性的表达,可以自定义处理逻辑

在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权

基本处理函数主要是定义数据流的转换操作,其所对应的函数类为ProcessFunction


处理函数的功能和使用

对于常用的转换算子来说:

  • MapFunction只能获取到当前的数据;
  • AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现);
  • RichMapFunction提供了获取运行时上下文的方法 getRuntimeContext();

但是无论那种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的

与时间相关的操作只能用时间窗口去处理,但如果要求对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了

因此需要使用处理函数

  • 处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数还可以直接将数据输出到侧输出流(side output)中

处理函数的简单使用:基于 DataStream 调用.process()方法就;方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑:

stream.process(new MyProcessFunction())

简单示例:

public class ProcessFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}

ProcessFunction 中重写了.processElement()方法(参数:输入,上下文对象,输出),自定义处理逻辑

ProcessFunction 解析

源码解析

源码如下:

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Process one element from the input stream.** <p>This function can output zero or more elements using the {@link Collector} parameter and* also update internal state or set timers using the {@link Context} parameter.** @param value The input value.* @param ctx A {@link Context} that allows querying the timestamp of the element and getting a*     {@link TimerService} for registering timers and querying the time. The context is only*     valid during the invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;/*** Called when a timer set using {@link TimerService} fires.** @param timestamp The timestamp of the firing timer.* @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,*     querying the {@link TimeDomain} of the firing timer and getting a {@link TimerService}*     for registering timers and querying the time. The context is only valid during the*     invocation of this method, do not store it.* @param out The collector for returning result values.* @throws Exception This method may throw exceptions. Throwing an exception will cause the*     operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}/*** Information available in an invocation of {@link #processElement(Object, Context, Collector)}* or {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** <p>This might be {@code null}, for example if the time characteristic of your program is* set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {@link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}/*** Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {@link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();}
}

可以看到抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:

I 表示 Input,也就是输入的数据类型;O 表示 Output,也就是处理完成之后输出的数据类型

其内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()

  • .processElement():用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来定义
    • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致
    • ctx:类型是 ProcessFunction 中定义的内部抽象类 Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()
    • out:“收集器”(类型为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用
  • .onTimer():用于定义定时触发的操作;这个方法只有在注册好的定时器触发的时候才会调用(在 Flink 中,只有“按键分区流”KeyedStream才支持设置定时器的操作),而定时器是通过“定时服务”TimerService 来注册的
    • 参数:时间戳(timestamp),上下文(ctx),收集器(out)【这里的时间戳是指设置好的触发时间,在事件时间语义下就是水位线

利用onTimer,可以自定义数据按照时间分组、定时触发计算输出结果,这样就实现了窗口的功能

处理函数分类

  1. ProcessFunction:最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入
  2. KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入(要想使用定时器必须基于 KeyedStream )
  3. ProcessWindowFunction:开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入
  4. ProcessAllWindowFunction:开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入
  5. CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入
  6. ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入
  7. BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入(这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物)
  8. KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream 调用.process()时作为参数传入(这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物)

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili

相关文章:

Flink 处理函数(1)—— 基本处理函数

在 Flink 的多层 API中&#xff0c;处理函数是最底层的API&#xff0c;是所有转换算子的一个概括性的表达&#xff0c;可以自定义处理逻辑 在处理函数中&#xff0c;我们直面的就是数据流中最基本的元素&#xff1a;数据事件&#xff08;event&#xff09;、状态&#xff08;st…...

Linux系统下编译MPlayer

一、编译MPlayer 在 http://www.mplayerhq.hu/design7/dload.html 下载MPlayer源码 执行命令&#xff1a; tar -xf MPlayer-1.5.tar.xz cd MPlayer-1.5 ./configure --prefix$(pwd)/install --yasm make make install 然后在install/bin目录下即会生成mplayer的可执行文件 二…...

事务的ACID属性是什么?为什么它们很重要?

引言 在现代的数据库和事务处理系统中&#xff0c;事务处理是一项非常重要的技术。在数据库中&#xff0c;事务是指一组被视为单个逻辑操作单元的SQL语句序列&#xff0c;它们要么全部成功执行&#xff0c;要么全部不执行。事务可以确保数据库在执行时保持一致性和可靠性。ACI…...

计算机毕业设计 基于Java的手机销售网站的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…...

Redis相关命令详解及其原理

Redis概念 Redis&#xff0c;英文全称是remote dictionary service&#xff0c;也就是远程字典服务。这是kv存储数据库。Redis&#xff0c;包括所有的数据库&#xff0c;都是请求-回应模式&#xff0c;通俗来说就是数据库不会主动地要给前台推送数据&#xff0c;只有前台发送了…...

go语言中的GoMock

GoMock是一个Go框架。它与内置的测试包整合得很好&#xff0c;并在单元测试时提供了灵活性。正如我们所知&#xff0c;对具有外部资源&#xff08;数据库、网络和文件&#xff09;或依赖关系的代码进行单元测试总是很麻烦。 安装 为了使用GoMock&#xff0c;我们需要安装gomo…...

DIFFWAVE: A VERSATILE DIFFUSION MODEL FOR AUDIO SYNTHESIS (Paper reading)

DIFFWAVE: A VERSATILE DIFFUSION MODEL FOR AUDIO SYNTHESIS Zhifeng Kong, Computer Science and Engineering, UCSD, ICLR2021, Code, Paper 1. 前言 在这项工作中&#xff0c;我们提出了DiffWave&#xff0c;这是一种用于条件和无条件波形生成的多功能扩散概率模型。该模…...

排序算法8----归并排序(非递归)(C)

1、介绍 归并排序既可以是内排序&#xff08;在内存上的数据排序&#xff09;&#xff0c;也可以是外排序&#xff08;磁盘上&#xff09;&#xff08;硬盘&#xff09;&#xff08;在文件中的数据排序&#xff09;。 其他排序一般都是内排序。 区别于快速排序的非递归&#xf…...

Golang 里的 context

context 的作用 go 的编程中&#xff0c;常常会在一个 goroutine 中启动多个 goroutine&#xff0c;然后有可能在这些 goroutine 中又启动多个 goroutine。 如上图&#xff0c;在 main 函数中&#xff0c;启动了一个 goroutine A 和 goroutine B&#xff0c;然后 goroutine A …...

PHP短链接url还原成长链接

在开发过程中&#xff0c;碰到了需要校验用户回填的短链接是不是系统所需要的&#xff0c;于是就需要还原找出短链接所对应的长链接。 长链接转短链接 在百度上搜索程序员&#xff0c;跳转页面后的url就是一个长链接。当然你可以从任何地方复制一个长链接过来。 长链接 http…...

redis原理(三)redis命令

一、字符串命令&#xff1a; 1、字符串基本操作&#xff1a; 2、自增自减 &#xff1a;如果一个值可以被解释为十进制整数或者浮点数&#xff0c;redis允许用户对这个字符串进行INCR*、DECR*操作。 &#xff08;1&#xff09;INCR key&#xff1a;将键存储的值的值加1。 &a…...

教程:在Django中实现微信授权登录

教程&#xff1a;在Django中实现微信授权登录 本教程将引导您如何在Django项目中实现微信授权登录。在本教程中&#xff0c;我们将使用自定义的用户模型User&#xff0c;并通过微信提供的API来进行用户认证。 在进行以下教程之前&#xff0c;请确保你已经在微信开放平台添加了…...

YOLOv5改进 | 主干篇 | 12月份最新成果TransNeXt特征提取网络(全网首发)

一、本文介绍 本文给大家带来的改进机制是TransNeXt特征提取网络,其发表于2023年的12月份是一个最新最前沿的网络模型&#xff0c;将其应用在我们的特征提取网络来提取特征&#xff0c;同时本文给大家解决其自带的一个报错&#xff0c;通过结合聚合的像素聚焦注意力和卷积GLU&…...

【java八股文】之计算机网络系列篇

1、TCP/IP和UDP模型 TCP/IP分层&#xff08;4层&#xff09;&#xff1a;应用层&#xff0c;传输层&#xff0c;网络层&#xff0c;数据链路层 网络的七层架构 &#xff08;7层&#xff09;&#xff1a;应用层&#xff0c;表示层&#xff0c;会话层&#xff0c;传输层&#xff…...

SpringAMQP的使用

1. 简介&#xff1a; SpringAMQP是基于RabbitMQ封装的一套模板&#xff0c;并且还利用SpringBoot对其实现了自动装配&#xff0c;使用起来非常方便。 SpringAmqp的官方地址&#xff1a;https://spring.io/projects/spring-amqp SpringAMQP提供了三个功能&#xff1a; 自动声…...

MATLAB - 使用运动学 DH 参数构建机械臂

系列文章目录 前言 一、 使用 Puma560 机械手机器人的 Denavit-Hartenberg (DH) 参数&#xff0c;逐步建立刚体树形机器人模型。在连接每个关节时&#xff0c;指定其相对 DH 参数。可视化机器人坐标系&#xff0c;并与最终模型进行交互。 DH 参数定义了每个刚体通过关节与其父…...

2024年腾讯云新用户优惠云服务器价格多少?

腾讯云服务器租用价格表&#xff1a;轻量应用服务器2核2G3M价格62元一年、2核2G4M价格118元一年&#xff0c;540元三年、2核4G5M带宽218元一年&#xff0c;2核4G5M带宽756元三年、轻量4核8G12M服务器446元一年、646元15个月&#xff0c;云服务器CVM S5实例2核2G配置280.8元一年…...

如何在原型中实现继承和多态

在JavaScript中&#xff0c;我们可以通过原型链来实现继承。以下是如何在原型中实现继承的例子&#xff1a; // 定义一个动物原型 var Animal function() {}; Animal.prototype.move function() { console.log(‘This animal can move.’); }; // 定义一个狗的原型&#xf…...

MySQL/Oracle 的 字符串拼接

目录 MySQL、Oracle 的 字符串拼接1、MySQL 的字符串拼接1.1 CONCAT(str1,str2,...) : 可以拼接多个字符串1.2 CONCAT_WS(separator,str1,str2,...) : 指定分隔符拼接多个字符串1.3 GROUP_CONCAT(expr) : 聚合函数&#xff0c;用于将多行的值连接成一个字符串。 2、Oracle 的字…...

【Java SE语法篇】10.String类

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ 文章目录 前言1. String类1.1 字符串的构造1.2 String对象的比…...

基于单片机的心率及跌倒检测系统设计(有完整资料)

资料查找方式&#xff1a;特纳斯电子&#xff08;电子校园网&#xff09;&#xff1a;搜索下面编号即可编号&#xff1a;T4192205M设计简介&#xff1a;本设计是基于单片机的心率及跌倒检测系统&#xff0c;主要实现以下功能&#xff1a;1、可通过心率模块检测当前的心率 2、可…...

苹果内购Java后端避坑指南:收据验证、状态码处理和防重复消费实战

苹果内购Java后端深度防御指南&#xff1a;从收据验收到分布式幂等设计 当你的应用内购收入突然出现异常波动&#xff0c;或是用户投诉被重复扣款时&#xff0c;背后往往隐藏着苹果内购接口的"暗礁"。作为经历过百万级内购交易的老兵&#xff0c;我想分享几个真实生产…...

告别复制粘贴:用影刀RPA+飞书多维表格,我把每周的销售数据汇总从2小时缩到5分钟

告别复制粘贴&#xff1a;用影刀RPA飞书多维表格实现销售数据自动化革命 每周五下午&#xff0c;市场部的张经理总要面对同样的噩梦&#xff1a;从七个不同渠道导出销售数据&#xff0c;手动核对格式差异&#xff0c;复制粘贴到汇总表&#xff0c;再计算各类指标。这个重复劳动…...

GHelper终极指南:用轻量化工具彻底替代Armoury Crate,释放华硕ROG笔记本全部性能!

GHelper终极指南&#xff1a;用轻量化工具彻底替代Armoury Crate&#xff0c;释放华硕ROG笔记本全部性能&#xff01; 【免费下载链接】g-helper Lightweight, open-source control tool for ASUS laptops and ROG Ally. Manage performance modes, fans, GPU, battery, and RG…...

2026上海紧固件专业展6月24-26日国家会展中心(上海)举办

2026第十六届上海紧固件专业展&#xff08;Fastener Expo Shanghai 2026&#xff09;将于6月24日至26日在国家会展中心&#xff08;上海&#xff09;举办。本届展会围绕紧固件全产业链展开&#xff0c;涵盖紧固件成品、冷镦成型设备、模具耗材、检测包装、表面处理以及原材料供…...

PADS VX2.8 极坐标布局技巧:圆形灯板LED高效排列指南

1. 极坐标布局在圆形灯板设计中的核心价值 第一次接触圆形LED灯板设计时&#xff0c;我被密密麻麻的元件排列搞得头晕眼花。传统直角坐标系下&#xff0c;要精确控制每个LED灯珠的间距和角度&#xff0c;需要反复计算XY坐标&#xff0c;效率极低。直到发现PADS VX2.8的极坐标功…...

leetcode 困难题 1591. 奇怪的打印机 II-Strange Printer II

Problem: 1591. 奇怪的打印机 II-Strange Printer II 通过观察可以发现&#xff0c;像Example 2&#xff0c;3的最大外接矩形内包括了3和4&#xff0c;所以先3后4&#xff0c;也就是 3->4 同样的&#xff0c;若1的外接矩形内包括了2&#xff0c; 3&#xff0c;4&#xff0c…...

S-UI前端工程化:ESLint与Prettier代码质量保障

S-UI前端工程化&#xff1a;ESLint与Prettier代码质量保障 还在为代码风格混乱、团队协作困难而头疼吗&#xff1f;S-UI作为专业的代理面板项目&#xff0c;通过完善的工程化配置确保了代码质量。本文将为你解析如何在类似项目中配置ESLint和Prettier&#xff0c;打造规范的开…...

联络中心支付软件市场最新数据披露:规模达41.37亿元,行业格局加速显现

在全球企业数字化转型浪潮汹涌以及客户对便捷支付体验需求日益增长的背景下&#xff0c;联络中心支付软件市场正迎来前所未有的发展机遇。据恒州诚思调研统计&#xff0c;2025年全球联络中心支付软件市场规模约41.37亿元&#xff0c;预计未来将持续保持平稳增长态势&#xff0c…...

基于Matlab的轴承-空心转轴-飞轮不同耦合类型动力学分析

基于Matlab的轴承-空心转轴-飞轮不同耦合类型动力学分析 保持轴承类型不变&#xff0c;变换飞轮和转轴耦合方式&#xff0c;分固有频率的变化趋势 可自行定义轴承、飞轮、转轴参数 程序高度模块化&#xff0c;修改十分方便 程序已调通&#xff0c;可直接运行最近做了一个关于轴…...