当前位置: 首页 > news >正文

一文弄明白KeyedProcessFunction函数

引言

KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧

正文

了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProcessFunction的注解如下

/*** A keyed function that processes elements of a stream.** <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only* available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.** <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {@link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {@link org.apache.flink.api.common.functions.RichFunction#close()}.*/

上面简单来说就是以下四点

  1. Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
  2. 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
  3. 由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
  4. 需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面

processElement方法解析

  1. Flink会调用processElement方法处理输入流中的每一条数据
  2. KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state
  3. 这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据

onTimer方法解析:在启用TimerService服务时会定时触发此方法,一般会在processElement方法中开启TimerService服务

以上就是这个函数的基本知识,接下来就通过实战来熟悉下它的使用

实战简介

本次实战的目标是学习KeyedProcessFunction,内容如下:

  1. 监听本机7777端口读取字符串
  2. 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1
  3. 将Tuple2实例集合通过f0字段分区,得到KeyedStream
  4. KeyedSteam通过自定义KeyedProcessFunction处理
  5. 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器进行触发

使用代码例子

首先定义pojo类

public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return ++count;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp = lastQuestTimestamp;}
}

接着实现KeyedProcessFunction类

public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {private ValueState<CountWithTimestampNew> state;@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));}// 实现数据处理逻辑的地方@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();if (countWithTimestampNew == null) {countWithTimestampNew = new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新这个单词最后一次出现的时间countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//单词之间不会互相覆盖吗?推测state对象是跟key绑定,针对每一个不同的key KeyedProcessFunction会创建其对应的state对象state.update(countWithTimestampNew);//给当前单词创建定时器,十秒后触发long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;//尝试注释掉看看是否还会触发onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息,用于确保数据准确性System.out.println(String.format(" 触发processElement方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();//标记当前元素是否已经连续10s未出现boolean isTimeout = false;if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {//out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout = true;}//打印所有信息,用于确保数据准确性System.out.println(String.format(" 触发onTimer方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}
}

最后是启动类

public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口,读取字符串DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);// 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream// 对收到的字符串用空格做分割,得到多个单词.flatMap(new SplitterFlatMapFunction())// 设置时间戳分配器,用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}@Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark,返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据,交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来timeOutWord.print();env.execute("ProcessFunction demo : KeyedProcessFunction");}
}

演示

在启动服务前,先通过linux指令监听端口 nc -lk 7777

  1. 启动Flink服务后,往7777端口里面发送数据
    在这里插入图片描述

  2. 通过IDEA的终端可以看到有日志输出,可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志
    在这里插入图片描述

  3. 那么咱们尝试连续发送两条Hello呢,可以看到累加器会持续累加,并且会触发两次onTimer方法,也就是每一条消息都会触发一次。由于连续发送两条,因此可以看得到第三行日志的末尾是false,说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来
    在这里插入图片描述

  4. 再输入点其他的试试
    在这里插入图片描述

  5. 通过输出可以看到这些单词的计数器又从0开始,说明每一个Key都对应一个状态
    在这里插入图片描述

思考题

  1. open方法会在哪里进行调用,KeyedProcessFunction整个类的完整调用逻辑是怎么样的
  2. registerProcessingTimeTimer和registerEventTimeTimer的差异是什么

参考资料

  1. https://blog.csdn.net/boling_cavalry/article/details/106299167
  2. https://blog.csdn.net/lujisen/article/details/105510532
  3. https://blog.csdn.net/qq_31866793/article/details/102831731

相关文章:

一文弄明白KeyedProcessFunction函数

引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合&#xff0c;它比ProcessFunction拥有更多特性&#xff0c;例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解&#xff0c;KeyedProc…...

alibabacloud学习笔记06(小滴课堂)

讲Sentinel流量控制详细操作 基于并发线程进行限流配置实操 在浏览器打开快速刷新会报错 基于并发线程进行限流配置实操 讲解 微服务高可用利器Sentinel熔断降级规则 讲解服务调用常见的熔断状态和恢复 讲解服务调用熔断例子 我们写一个带异常的接口&#xff1a;...

Code Composer Studio (CCS) - Licensing Information

Code Composer Studio [CCS] - Licensing Information 1. Help -> Code Composer Studio Licensing Information2. Upgrade3. Specify a license fileReferences 1. Help -> Code Composer Studio Licensing Information 2. Upgrade ​​​ 3. Specify a license file …...

uniapp引入微信小程序直播组件

方法1.小程序跳转视频号直播 微信小程序跳转到视频号 1.1微信开放平台注册 https://open.weixin.qq.com/ 2.2 方法2.使用小程序提供的直播组件 参考 微信小程序跳转视频号直播 小程序直播官方文档 https://developers.weixin.qq.com/miniprogram/dev/component/live-play…...

五个简单的C#编程案例

案例一&#xff1a;Hello, World! csharp using System; class Program { static void Main() { Console.WriteLine("Hello, World!"); } } 这个案例是最基础的C#程序&#xff0c;它打印出“Hello, World!”到控制台。每个C#程…...

Zlibrary低调官宣2024年最新网址,国内可直接访问,免费下载海量电子书籍

最近过节&#xff0c;文章也没怎么写&#xff0c;明天要上班了&#xff0c;今天写篇文章做个预热。 春节期间&#xff0c;“知识大航海”群里&#xff0c;有位群友分享了一个Zlibrary的最新地址&#xff0c;感谢这位群友妹妹的热心分享&#xff0c;这个地址国内可以直接访问。 …...

Android 开机启动

一、添加权限 <uses-permission android:name"android.permission.RECEIVE_BOOT_COMPLETED"/> 二、写一个广播接收器 public class BootReceiver extends BroadcastReceiver {Overridepublic void onReceive(Context context, Intent intent) {if(Intent.ACT…...

二叉树相关算法需了解汇总-基础算法操作

文章目录 144.二叉树的前序遍历145.二叉树的后序遍历94.二叉树的中序遍历102.二叉树的层序遍历107.二叉树的层次遍历倒序199.二叉树的右视图637.二叉树的层平均值429.N叉树的层序遍历515.在每个树行中找最大值116.填充每个节点的下一个右侧节点指针104.二叉树的最大深度111.二叉…...

万字干货-京东零售数据资产能力升级与实践

开篇 京东自营和商家自运营模式&#xff0c;以及伴随的多种运营视角、多种组合计算、多种销售属性等数据维度&#xff0c;相较于行业同等量级&#xff0c;数据处理的难度与复杂度都显著增加。如何从海量的数据模型与数据指标中提升检索数据的效率&#xff0c;降低数据存算的成…...

探索前端框架的世界:一场前端之旅

在网络世界中&#xff0c;网页开发领域的一颗明星是前端框架。这些框架为开发者提供了丰富的工具和技术&#xff0c;帮助他们构建出漂亮、高效的网页应用。现在&#xff0c;让我们随着小明的故事一起来探索一下吧。 小明的梦想 小明是一位年轻有为的前端开发者&#xff0c;他…...

class complex

class complex from C_OOP_base1_houjie complex.h #ifndef __COMPLEX__ // 防卫式声明 guard; 名称自定义 #define __COMPLEX__// 0. forward declarations class complex;complex& __doapl (complex* ths, const complex& r);// 1. class declarations class compl…...

数据库系统概论整理与总结

数据库系统概论 第一章&#xff1a;绪论 四个基本概念 四个概念 数据&#xff1a;Data 数据库&#xff1a;DataBase 数据库管理系统:DBMS 数据库系统:DBS 打个比喻&#xff0c;比如说菜鸟物流&#xff1a; Data&#xff1a;快递 DB&#xff1a;物流厂库 DBMS&#xff1a;对…...

打通新势力NAS权限壁垒,绿联私有云安装Portainer,实现更强大的Docker功能

打通新势力NAS权限壁垒&#xff0c;绿联私有云安装Portainer&#xff0c;实现更强大的Docker功能 对于国产新势力NAS来说&#xff0c;因为安全问题并没有完全开放SSH权限&#xff0c;所以还不能和传统NAS那样直接通过Docker run命令来部署容器&#xff0c;同时&#xff0c;对于…...

前端基础自学整理|DOM树

DOM&#xff0c;文档对象模型&#xff08;Document Object Model&#xff09;&#xff0c;简单的说&#xff0c;DOM是一种理念&#xff0c;一种思想&#xff0c;一个与系统平台和编程语言无关的接口&#xff0c;一种方法, 使 Web开发人员可以访问HTML元素&#xff01;不是具体方…...

RedisDesktopManager无法远程连接到Linux虚拟机中Redis的docker容器的一种解决方案

1.问题描述 除了RedisDesktopManager以外&#xff0c;使用java代码也无法连接到centos7虚拟机中的docker容器中的Redis &#xff0c;按照网上其他博主的解决方案&#xff0c;在排除Linux防火墙问题&#xff0c;端口映射问题&#xff0c;redis.conf配置文件问题以后&#xff0c…...

HarmonyOS 权限 介绍

权限说明 权限等级 根据权限对于不同等级应用有不同的开放范围&#xff0c;权限类型对应分为以下三种&#xff0c;等级依次提高。 normal权限 normal 权限允许应用访问超出默认规则外的普通系统资源。 这些系统资源的开放&#xff08;包括数据和功能&#xff09;对用户隐私以及…...

算法训练营day33(补),复习二叉树1

// 889. 根据前序和后序遍历构造二叉树 // 前序中左右 后序遍历左右中 func constructFromPrePost(preorder []int, postorder []int) *TreeNode { if len(preorder) 0 { return nil } root : &TreeNode{} root.Val preorder[0] //前序数组去掉root节点 preorder pre…...

k8s-权限管理

1. 身份认证 我们在目前的k8s集群环境里面&#xff0c;只能在master节点上执行kubectl的一些命令&#xff0c;在其他节点上执行就会报错 # 看一下是不是 [rootnode1 ~]# kubectl get nodes E0220 12:50:15.695133 6091 memcache.go:238] couldnt get current server API gro…...

四.QT5工具安装和环境变量的配置

1.以管理员身份运行安装包 2.登录qt账号&#xff0c;点击【next】 3.选中同意 4.选择安装目录&#xff0c;注意不能有中文和空格 5.勾选 64位 mingw。点击【next】&#xff0c;等待安装完成 6.配置环境变量...

为什么需要MDL锁

点击上方蓝字关注我 在数据库管理中&#xff0c;元数据&#xff08;metadata&#xff09;的保护至关重要&#xff0c;而MySQL中的"元数据锁"&#xff08;MDL锁&#xff09;就是它的守护者。 1. 什么是MDL锁MDL锁&#xff0c;全名Metadata Lock&#xff0c;是MySQL中…...

BGE-Reranker-v2-m3批量处理优化:提升高并发排序效率

BGE-Reranker-v2-m3批量处理优化&#xff1a;提升高并发排序效率 你是不是也遇到过这样的问题&#xff1f;在搭建RAG系统时&#xff0c;向量检索返回了一大堆文档&#xff0c;但真正相关的却没几个。大模型拿着这些“噪音”文档生成答案&#xff0c;结果要么答非所问&#xff…...

3个关键步骤解决INAV VTOL模式切换抖动问题

3个关键步骤解决INAV VTOL模式切换抖动问题 【免费下载链接】inav INAV: Navigation-enabled flight control software 项目地址: https://gitcode.com/gh_mirrors/in/inav 垂直起降&#xff08;VTOL&#xff09;无人机融合了固定翼的续航优势与多旋翼的起降灵活性&…...

完整指南:为什么选择WeChatMsg开源工具解决你的微信聊天记录备份与分析难题

完整指南&#xff1a;为什么选择WeChatMsg开源工具解决你的微信聊天记录备份与分析难题 【免费下载链接】WeChatMsg 提取微信聊天记录&#xff0c;将其导出成HTML、Word、CSV文档永久保存&#xff0c;对聊天记录进行分析生成年度聊天报告 项目地址: https://gitcode.com/GitH…...

3个ONNX运行时故障的系统性修复方案:ComfyUI DWPose预处理器实战指南

3个ONNX运行时故障的系统性修复方案&#xff1a;ComfyUI DWPose预处理器实战指南 【免费下载链接】comfyui_controlnet_aux 项目地址: https://gitcode.com/gh_mirrors/co/comfyui_controlnet_aux 在ComfyUI工作流中&#xff0c;DWPose预处理器作为姿态估计的核心组件&…...

通义千问3-4B-Instruct-2507调优技巧:提高指令遵循准确率

通义千问3-4B-Instruct-2507调优技巧&#xff1a;提高指令遵循准确率 通义千问3-4B-Instruct-2507&#xff0c;这个听起来有点长的名字&#xff0c;其实是一个特别适合我们普通开发者和爱好者玩转的AI小模型。它只有40亿参数&#xff0c;但阿里在2025年8月把它开源出来的时候&…...

Element Plus表格滚动卡顿?试试这个Vue3封装方案,性能提升明显

Vue3Element Plus表格性能优化实战&#xff1a;平滑滚动与内存管理 Element Plus的el-table组件在企业级后台系统中广泛应用&#xff0c;但当数据量达到500行以上时&#xff0c;滚动卡顿、内存飙升的问题开始显现。本文将分享一套经过生产环境验证的优化方案&#xff0c;通过数…...

如何用3层智能架构构建你的AI开发助手:从零到精通的完整指南

如何用3层智能架构构建你的AI开发助手&#xff1a;从零到精通的完整指南 【免费下载链接】superpowers Claude Code superpowers: core skills library 项目地址: https://gitcode.com/GitHub_Trending/su/superpowers 你是否曾想过&#xff0c;为什么有些开发者能快速完…...

3步实现专业级语音克隆:GPT-SoVITS技术原理与实践指南

3步实现专业级语音克隆&#xff1a;GPT-SoVITS技术原理与实践指南 【免费下载链接】GPT-SoVITS 项目地址: https://gitcode.com/GitHub_Trending/gp/GPT-SoVITS GPT-SoVITS是一款基于GPT架构的少样本语音合成系统&#xff0c;通过结合SoVITS声学模型&#xff0c;仅需5秒…...

Windows下用rclone挂载S3存储到本地磁盘的完整指南(含MinIO/Ceph配置)

Windows下用rclone挂载S3存储到本地磁盘的完整指南&#xff08;含MinIO/Ceph配置&#xff09; 在数据驱动的现代开发环境中&#xff0c;对象存储已成为基础设施的重要组成部分。无论是个人开发者处理海量数据集&#xff0c;还是企业团队协作处理云端资源&#xff0c;将S3兼容存…...

从Proteus仿真到普中开发板烧录:51单片机抢答器完整开发流程避坑指南

从Proteus仿真到普中开发板烧录&#xff1a;51单片机抢答器完整开发流程避坑指南 在电子设计的学习道路上&#xff0c;51单片机项目开发是一个经典的入门实践。抢答器作为典型的互动式电子系统&#xff0c;涵盖了输入检测、逻辑控制、显示输出等核心知识点&#xff0c;是检验学…...