当前位置: 首页 > news >正文

flink的ProcessWindowFunction函数的三种状态

背景

在处理窗口函数时,ProcessWindowFunction处理函数可以定义三个状态: 富函数getRuntimeContext.getState,
每个key+每个窗口的状态context.windowState(),每个key的状态context.globalState,那么这几个状态之间有什么关系呢?

ProcessWindowFunction处理函数三种状态之间的关系:

1.getRuntimeContext.getState这个定义的状态是每个key维度的,也就是可以跨时间窗口并维持状态的
2.context.windowState()这个定义的状态是和每个key以及窗口相关的,也就是虽然key相同,但是时间窗口不同,他们的值也不一样.
3.context.globalState这个定义的状态是和每个key相关的,也就是和getRuntimeContext.getState的定义一样,可以跨窗口维护状态
验证代码如下所示:

package wikiedits.func;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;
import wikiedits.func.model.KeyCount;import java.text.SimpleDateFormat;import java.util.Date;public class ProcessWindowFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 并行度为1env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int xxxNum = 0;int yyyNum = 0;for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX和YYY两种nameString name = (0 == i % 2) ? "XXX" : "YYY";//更新aaa和bbb元素的总数if (0 == i % 2) {xxxNum++;} else {yyyNum++;}// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n",name,time(timeStamp),xxxNum,yyyNum));// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1000);}}@Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<String> mainDataStream = dataStream// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.seconds(5))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {// 自定义状态private ValueState<KeyCount> state;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态,name是myStatestate = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));}public void clear(Context context){ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));contextWindowValueState.clear();}@Overridepublic void process(String s, Context context, Iterable<Tuple2<String, Integer>> iterable,Collector<String> collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current = state.value();// 如果myState还从未没有赋值过,就在此初始化if (current == null) {current = new KeyCount();current.key = s;current.count = 0;}int count = 0;// iterable可以访问该key当前窗口内的所有数据,// 这里简单处理,只统计了元素数量for (Tuple2<String, Integer> tuple2 : iterable) {count++;}// 更新当前key的元素总数current.count += count;// 更新状态到backendstate.update(current);System.out.println("getRuntimeContext() == context :" + (getRuntimeContext() == context));ValueState<KeyCount> contextWindowValueState = context.windowState().getState(new ValueStateDescriptor<>("myWindowState", KeyCount.class));ValueState<KeyCount> contextGlobalValueState = context.globalState().getState(new ValueStateDescriptor<>("myGlobalState", KeyCount.class));KeyCount windowValue = contextWindowValueState.value();if (windowValue == null) {windowValue = new KeyCount();windowValue.key = s;windowValue.count = 0;}windowValue.count += count;contextWindowValueState.update(windowValue);KeyCount globalValue = contextGlobalValueState.value();if (globalValue == null) {globalValue = new KeyCount();globalValue.key = s;globalValue.count = 0;}globalValue.count += count;contextGlobalValueState.update(globalValue);ValueState<KeyCount> contextWindowSameNameState =context.windowState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));ValueState<KeyCount> contextGlobalSameNameState =context.globalState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));System.out.println("contextWindowSameNameState == contextGlobalSameNameState :" + (contextWindowSameNameState == contextGlobalSameNameState));System.out.println("state == contextGlobalSameNameState :" + (state == contextGlobalSameNameState));// 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串String value = String.format("window, %s, %s - %s, %d,    total : %d, windowStateCount :%s, globalStateCount :%s\n",// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key出现的总数current.count,contextWindowValueState.value(),contextGlobalValueState.value());// 发射到下游算子collector.collect(value);}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute("processfunction demo : processwindowfunction");}public static String time(long timeStamp) {return new SimpleDateFormat("hh:mm:ss").format(new Date(timeStamp));}}

输出结果:

window, XXX, 08:34:45 - 08:34:50, 3,    total : 22, windowStateCount :KeyCount{key='XXX', count=3}, globalStateCount :KeyCount{key='XXX', count=22}
window, YYY, 08:34:45 - 08:34:50, 2,    total : 22, windowStateCount :KeyCount{key='YYY', count=2}, globalStateCount :KeyCount{key='YYY', count=22}

从结果可以验证以上的结论,此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉,因为一旦时间窗口结束,就再也没有机会清理了
从这个例子中还发现一个比较有趣的现象:

ValueState<KeyCount> state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
ValueState<KeyCount> contextWindowSameNameState =context.windowState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));
ValueState<KeyCount> contextGlobalSameNameState =context.globalState().getState(new ValueStateDescriptor<>("myState", KeyCount.class));

在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到,并且他们指向的都是同一个变量,可以参见代码的输出:

System.out.println("contextWindowSameNameState == contextGlobalSameNameState :" + (contextWindowSameNameState == contextGlobalSameNameState));
System.out.println("state == contextGlobalSameNameState :" + (state == contextGlobalSameNameState));

结果如下:

contextWindowSameNameState == contextGlobalSameNameState :true
state == contextGlobalSameNameState :true

参考文献:
https://cloud.tencent.com/developer/article/1815079

相关文章:

flink的ProcessWindowFunction函数的三种状态

背景 在处理窗口函数时&#xff0c;ProcessWindowFunction处理函数可以定义三个状态&#xff1a; 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState&#xff0c;那么这几个状态之间有什么关系呢&#xff1f; …...

day50-springboot+ajax分页

分页依赖&#xff1a; <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper-spring-boot-starter</artifactId> <version>1.0.0</version> </dependency> 配置&#xff1a; …...

Win7 专业版Windows time w32time服务电脑重启后老是已停止

环境&#xff1a; Win7 专业版 问题描述&#xff1a; Win7 专业版Windows time w32time服务电脑重启后老是已停止 解决方案&#xff1a; 1.检查启动Remote Procedure Call (RPC)、Remote Procedure Call (RPC) Locator&#xff0c;DCOM Server Process Launcher这三个服务是…...

全网最强,接口自动化测试-token登录关联实战总结(超详细)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 在PC端登录公司的…...

OLAP ModelKit Crack,ADO.NET和IList

OLAP ModelKit Crack,ADO.NET和IList OLAP ModelKit是一个多功能的.NET OLAP组件&#xff0c;用C#编写&#xff0c;只包含100%托管代码。它具有XP主题的外观&#xff0c;并能够使用任何.NET数据源(ADO.NET和IList)。借助任何第三方组件(尤其是图表组件)呈现数据的能力扩展了产品…...

4 三组例子,用OpenCV玩转图像-AI-python

读取&#xff0c;缩放&#xff0c;旋转&#xff0c;写入图像 首先导入包&#xff0c;为了显示导入matplotlib/为了在matplotlib显示 导入CV2/查看版本 导入图片/查看图片类型 图片数组 数组大小 对于opencv通道顺序蓝色B、绿色G、红色R matplotlib通道顺序为 红色R、绿色G、蓝…...

计算机网络-三种交换方式

计算机网络-三种交换方式 电路交换(Circuit Switching) 电话交换机接通电话线的方式称为电路交换从通信资源分配的角度来看&#xff0c;交换(Switching)就是按照某种方式动态的分配传输线路的资源 电话交换机 为了解决电话之间通信两两之间连线过多&#xff0c;所以产生了电话…...

03 制作Ubuntu启动盘

1 软碟通 我是用软碟通制作启动盘。安装软碟通时一定要把虚拟光驱给勾选上&#xff0c;其余两个可以看你心情。 2 镜像文件 我使用清华镜像网站找到的Ubuntu镜像文件。 Index of /ubuntu-releases/ | 清华大学开源软件镜像站 | Tsinghua Open Source Mirror 请自己选择镜像…...

【JavaSE】String类中常用的字符串方法(超全)

目录 1.求字符串的长度 2.判断字符串是否为空 3.String对象的比较 3.1 判断字符串是否相同 3.2 比较字符串大小 3.3 忽略大小写比较 4.字符串查找 5.转化 5.1 数值和字符串转化 5.1.1 数字转字符串 valueof 5.1.2 valueOf的其他用法 5.1.3 字符串转数字 5.2 大小写转…...

Bootload U-Boot分析

Bootloader是在操作系统运行之前执行的一段小程序。通过这段小程序可以初始化硬件设备、建立内存空间的映射表&#xff0c;从而建立适当的系统软硬件环境&#xff0c;为最终调用操作系统内核做好准备。 对于嵌入式系统&#xff0c;Bootloader是基于特定硬件平台来实现的。因此…...

以公益之行,筑责任之心——2023年中创算力爱心公益助学活动

捐资助学是一项功在当代、利在千秋的义举。 高考录取工作已经开始&#xff0c;一张张高校录取通知书也陆续送达各位准大学生手中。当他们怀揣着对大学的好奇与憧憬&#xff0c;准备迈进理想的大学时&#xff0c;还有一群人&#xff0c;他们渴望知识&#xff0c;却因经济困难而…...

【机器学习】处理样本不平衡的问题

文章目录 样本不均衡的概念及影响样本不均衡的解决方法样本层面欠采样 &#xff08;undersampling&#xff09;过采样数据增强 损失函数层面模型层面采样集成学习 决策及评估指标 样本不均衡的概念及影响 机器学习中&#xff0c;样本不均衡问题经常遇到&#xff0c;比如在金融…...

Android前沿技术?Jetpack如何?

Jetpack Compose是Android开发领域的一项前沿技术&#xff0c;它提供了一种全新的方式来构建用户界面。近年来&#xff0c;Jetpack Compose在各大招聘等网站上的招聘岗位逐渐增多&#xff0c;薪资待遇也相应提高。本文将从招聘岗位的薪资与技术要求入手&#xff0c;分析Jetpack…...

为react项目添加开发/提交规范(前端工程化、eslint、prettier、husky、commitlint、stylelint)

因历史遗留原因&#xff0c;接手的项目没有代码提醒/格式化&#xff0c;包括 eslint、pretttier&#xff0c;也没有 commit 提交校验&#xff0c;如 husky、commitlint、stylelint&#xff0c;与其期待自己或者同事的代码写得完美无缺&#xff0c;不如通过一些工具来进行规范和…...

小研究 - MySQL 数据库安全加固技术的研究(一)

随着信息系统的日益普及&#xff0c;后台数据库的安全问题逐步被人们重视起来。以当下热门的MySQL 数据库为例&#xff0c;通过分析数据库的安全机制以及总结数据库面临的安全风险&#xff0c;针对性地提出了相应的加固策略&#xff0c;为数据库的安全加固工作提供了技术支撑。…...

linux安装redis带图详细

如何在Linux系统中卸载Redis 一、使用apt-get卸载Redis sudo apt-get purge redis-server如果使用apt-get安装Redis&#xff0c;可以使用apt-get purge命令完全卸载Redis。其中&#xff0c;purge命令会不仅仅删除Redis二进制文件&#xff0c;还会删除配置文件、数据文件和日志…...

MySql——数据库常用命令

一、关于数据库的操作 查看mysql中有哪些数据库 show databases;显示创建指定数据库MySQL语句 SHOW CREATE DATABASE 数据库名&#xff1a;使用指定数据库 use 数据库名;查看当前使用的是哪个数据库 select database();查看指定数据库下有哪些表 use 数据库名; -- 先选择…...

如何通过 WordPress 数据库启用插件?【进不去后台可用】

如果您无法访问 WordPress 后台并需要激活插件以恢复访问权限&#xff0c;则可以通过 WordPress 数据库来实现。本文将向您展示如何使用数据库轻松激活 WordPress 插件。 何时使用数据库激活 WordPress 插件&#xff1f; 许多常见的 WordPress 错误会阻止网站所有者访问 WordP…...

芯片热处理设备 HTR-4立式4寸快速退火炉

HTR-4立式4寸快速退火炉 HTR-4立式4寸快速退火炉&#xff08;芯片热处理设备&#xff09;广泛应用在IC晶圆、LED晶圆、MEMS、化合物半导体和功率器件等多种芯片产品的生产&#xff0c;和欧姆接触快速合金、离子注入退火、氧化物生长、消除应力和致密化等工艺当中&#xff0c;通…...

小研究 - 基于 MySQL 数据库的数据安全应用设计(一)

信息系统工程领域对数据安全的要求比较高&#xff0c;MySQL 数据库管理系统普遍应用于各种信息系统应用软件的开发之中&#xff0c;而角色与权限设计不仅关乎数据库中数据保密性的性能高低&#xff0c;也关系到用户使用数据库的最低要求。在对数据库的安全性进行设计时&#xf…...

D2RML终极指南:5分钟掌握暗黑2重制版多开技巧

D2RML终极指南&#xff1a;5分钟掌握暗黑2重制版多开技巧 【免费下载链接】D2RML Diablo 2 Resurrected Multilauncher 项目地址: https://gitcode.com/gh_mirrors/d2/D2RML 想要在《暗黑破坏神2&#xff1a;重制版》中同时管理多个游戏账户&#xff0c;却苦于繁琐的登录…...

LinkSwift:开源网盘直链解析引擎的技术解析与部署指南

LinkSwift&#xff1a;开源网盘直链解析引擎的技术解析与部署指南 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 &#xff0c;支持 百度网盘 / 阿里云盘 / 中国移动云盘 / 天翼…...

如何用3步永久保存QQ空间回忆?GetQzonehistory全攻略

如何用3步永久保存QQ空间回忆&#xff1f;GetQzonehistory全攻略 【免费下载链接】GetQzonehistory 获取QQ空间发布的历史说说 项目地址: https://gitcode.com/GitHub_Trending/ge/GetQzonehistory 你是否担心过QQ空间里那些承载青春记忆的说说会突然消失&#xff1f;Ge…...

端侧AI 模型部署实战二(云端、PC 本地、手机端侧主流大模型及部署工具 )

AI的大模型部署主要有云端、PC 本地、手机端侧 三大场景。* 云端大模型&#xff08;在线 API / 网页&#xff0c;最强能力&#xff09;* PC 本地大模型&#xff08;Windows/macOS&#xff0c;GGUF 优先&#xff09;* 消费电子&#xff08;手机端侧大模型Android/iOS&#xff0c…...

告别乱码!Win11下Bandizip+Notepad++组合拳完美解决中文压缩包问题

告别乱码&#xff01;Win11下BandizipNotepad组合拳完美解决中文压缩包问题 每次解压中文压缩包时看到满屏的"锟斤拷"和"烫烫烫"&#xff0c;是不是瞬间血压飙升&#xff1f;作为开发者&#xff0c;我们每天要处理大量压缩文件&#xff0c;而编码问题就像隐…...

AdGuard浏览器扩展终极指南:3步打造无广告浏览体验

AdGuard浏览器扩展终极指南&#xff1a;3步打造无广告浏览体验 【免费下载链接】AdguardBrowserExtension AdGuard browser extension 项目地址: https://gitcode.com/gh_mirrors/ad/AdguardBrowserExtension 你是否厌倦了网页上无处不在的广告弹窗&#xff1f;是否担心…...

ABAQUS 蜜蜂飞行仿真:翅膀与空气、水域交替接触的奇妙之旅

ABAQUS蜜蜂飞行仿真分析&#xff0c;翅膀与空气和水域交替接触在科学研究和工程模拟领域&#xff0c;ABAQUS 是一款强大的有限元分析软件。今天咱们就来聊聊用 ABAQUS 进行蜜蜂飞行仿真分析&#xff0c;特别是翅膀与空气和水域交替接触这种独特场景。 蜜蜂飞行仿真的意义 蜜蜂作…...

软件评测师基础知识专项刷题:软件测试过程

前言软考软件评测师备考之路&#xff0c;基础刷题必不可少。本文围绕软件测试过程模块整理经典习题 核心考点梳理&#xff0c;系列内容长期连载更新&#xff0c;慢慢积累、逐个突破&#xff0c;轻松夯实应试功底。考点测试过程模型1.组织级测试过程组织级测试过程用于开发和管…...

5分钟快速上手:用LeaguePrank打造你的专属英雄联盟游戏形象

5分钟快速上手&#xff1a;用LeaguePrank打造你的专属英雄联盟游戏形象 【免费下载链接】LeaguePrank 项目地址: https://gitcode.com/gh_mirrors/le/LeaguePrank LeaguePrank是一款基于官方LCU API开发的开源工具&#xff0c;让你能够安全、合规地修改英雄联盟游戏界面…...

璀璨星河在艺术教育落地:中小学美育课AI创作教学案例

璀璨星河在艺术教育落地&#xff1a;中小学美育课AI创作教学案例 1. 引言&#xff1a;当AI艺术遇见美育课堂 在传统的美术课堂上&#xff0c;老师常常面临这样的困境&#xff1a;学生艺术基础参差不齐&#xff0c;创作工具有限&#xff0c;很多有创意的想法难以实现。而今天&…...