【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析
文章目录
- 1. 状态初始化总流程梳理
- 2.创建StreamOperatorStateContext
- 3. StateInitializationContext的接口设计。
- 4. 状态初始化举例:UDF状态初始化
在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。
// Allow invoking method 'invoke' without having to call 'restore' before it.if (!isRunning) {LOG.debug("Restoring during invoke will be called.");restoreInternal();}
StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。
RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { Iterator var2 = this.getAllOperators(true).iterator(); while(var2.hasNext()) { StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next(); StreamOperator<?> operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。
1. 状态初始化总流程梳理
AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:
# AbstractStreamOperator.initializeStatepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { //1. 获取类型序列化器final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); //2. get containingTaskfinal StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask()); final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables()); //3. create StreamOperatorStateContextfinal StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, keySerializer, streamTaskCloseableRegistry, metrics, config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState()); //4. create stateHandlerstateHandler = new StreamOperatorStateHandler( context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager = context.internalTimerServiceManager(); //5. initialize OperatorStatestateHandler.initializeOperatorState(this); //6. set KeyedStateStore in runtimeContextruntimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。
状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。
2.创建StreamOperatorStateContext
接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:
StreamTaskStateInitializerImpl
@Override
public StreamOperatorStateContext streamOperatorStateContext( @Nonnull OperatorID operatorID, @Nonnull String operatorClassName, @Nonnull ProcessingTimeService processingTimeService, @Nonnull KeyContext keyContext, @Nullable TypeSerializer<?> keySerializer, @Nonnull CloseableRegistry streamTaskCloseableRegistry, @Nonnull MetricGroup metricGroup, double managedMemoryFraction, boolean isUsingCustomRawKeyedState) throws Exception { //1. 获取task实例信息TaskInfo taskInfo = environment.getTaskInfo(); OperatorSubtaskDescriptionText operatorSubtaskDescription = new OperatorSubtaskDescriptionText( operatorID, operatorClassName, taskInfo.getIndexOfThisSubtask(), taskInfo.getNumberOfParallelSubtasks()); final String operatorIdentifierText = operatorSubtaskDescription.toString(); final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates = taskStateManager.prioritizedOperatorState(operatorID); CheckpointableKeyedStateBackend<?> keyedStatedBackend = null; OperatorStateBackend operatorStateBackend = null; CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null; CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null; InternalTimeServiceManager<?> timeServiceManager; try { // 创建keyed类型的状态后端// -------------- Keyed State Backend -------------- keyedStatedBackend = keyedStatedBackend( keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, managedMemoryFraction); //创建operator类型的状态后端// -------------- Operator State Backend -------------- operatorStateBackend = operatorStateBackend( operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry); //创建原生类型状态后端// -------------- Raw State Streams -------------- rawKeyedStateInputs = rawKeyedStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawKeyedState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); rawOperatorStateInputs = rawOperatorStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawOperatorState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); //创建Internal Timer Service Manager// -------------- Internal Timer Service Manager -------------- if (keyedStatedBackend != null) { // if the operator indicates that it is using custom raw keyed state, // then whatever was written in the raw keyed state snapshot was NOT written // by the internal timer services (because there is only ever one user of raw keyed // state); // in this case, timers should not attempt to restore timers from the raw keyed // state. final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers = (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState) ? rawKeyedStateInputs : Collections.emptyList(); timeServiceManager = timeServiceManagerProvider.create( keyedStatedBackend, environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, restoredRawKeyedStateTimers); } else { timeServiceManager = null; } // -------------- Preparing return value -------------- return new StreamOperatorStateContextImpl( prioritizedOperatorSubtaskStates.getRestoredCheckpointId(), operatorStateBackend, keyedStatedBackend, timeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs); } catch (Exception ex) { 。。。。
}
流程梳理:
- 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
- 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
- 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
- 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
- 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
- 将所有创建出来的托管状态管理后端
keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件,并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。
StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。
3. StateInitializationContext的接口设计。
StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。
FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。
StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等
StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

4. 状态初始化举例:UDF状态初始化
在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。
AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。
《Flink设计与实现:核心原理与源码解析》张利兵
相关文章:
【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析
文章目录 1. 状态初始化总流程梳理2.创建StreamOperatorStateContext3. StateInitializationContext的接口设计。4. 状态初始化举例:UDF状态初始化 在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在…...
python+flask人口普查数据的应用研究及实现django
作为一款人口普查数据的应用研究及实现,面向的是大多数学者,软件的界面设计简洁清晰,用户可轻松掌握使用技巧。在调查之后,获得用户以下需求: (1)用户注册登录后,可进入系统解锁更多…...
C语言:函数
C语言:函数 函数的概念库函数自定义函数实参与形参return语句数组做参数声明与定义externstatic 嵌套调用 函数的概念 在C语言中,存在一个函数的概念,有人也将其翻译为子程序。 在数学中,函数是一个完成特定功能的公式࿰…...
jmeter-问题一:关于线程组,线程数,用户数详解
文章目录 jmeter参数介绍1.线程数2.准备时长(Ramp-up)3.循环次数4.same user on each iteratio5.调度器 场景一:当你的线程组中线程数为1,循环为1场景二:当你的线程组中线程数为2,循环为1场景三:当你的线程组中线程数为1ÿ…...
golang 通过 cgo 调用 C++ 库
思路 将 C 库包装成 C 库 -> golang 通过 cgo 调用 C 库 C 相关文件 目录列表 include/ some.h C 库头文件some_wrapper.h < 用于将 C 库包装成 C 库的头文件 lib/ libsome.a C 库 src/ some_wrapper.cpp < 用于将 C 库包装成 C 库的源码文件 源码示例 some.h…...
使用 IDEA 开发一个简单易用的 SDK
目录 一、什么是 SDK 二、为什么要开发 SDK 三、开发 SDK 的详细步骤 四、导入 SDK 进行测试 附:ConfigurationProperties 注解的介绍及使用 一、什么是 SDK 1. 定义:软件开发工具包 Software Development Kit 2. 用于开发特定软件或应用程序的工…...
CSS transition(过渡效果)详解
CSS过渡效果(Transition)是一种在CSS3中引入的动画效果,它允许开发者在元素状态变化时(如鼠标悬停、类更改等)平滑地改变CSS属性值,从而创建出平滑的动画效果。过渡效果可以应用于多种CSS属性,如…...
Android13多媒体框架概览
Android13多媒体框架概览 Android 多媒体框架 Android 多媒体框架旨在为 Java 服务提供可靠的接口。它是一个系统,包括多媒体应用程序、框架、OpenCore 引擎、音频/视频/输入的硬件设备,输出设备以及一些核心动态库,比如 libmedia、libmedi…...
一文读懂:MybatisPlus从入门到进阶
快速入门 简介 在项目开发中,Mybatis已经为我们简化了代码编写。 但是我们仍需要编写很多单表CURD语句,MybatisPlus可以进一步简化Mybatis。 MybatisPlus官方文档:https://www.baomidou.com/,感谢苞米豆和黑马程序员。 Mybat…...
C语言--------指针(1)
0.指针&指针变量 32位平台,指针变量是4个字节(32bit/84)--------x86 64位平台,指针变量是8个字节(64bit/88)--------x64 编号指针地址;我们平常讲的p是指针就是说p是一个指针变量; ************只要…...
Vite 下一代的前端工具链,前端开发与构建工具
一、Vite 简介 官方中文网站:Vite | 下一代的前端工具链 官方定义: Vite,下一代的前端工具链,为开发提供极速响应。 Vue3.4版本,Vue新版本使用Vite构建、开发、调试、编译。 Vite的优势 极速的服务启动 使用原生…...
【SpringBoot】FreeMarker视图渲染
目录 一、FreeMarker 简介 1.1 什么是FreeMarker? 1.2 Freemarker模板组成部分 1.3 为什么要使用FreeMarker 二、Springboot集成FreeMarker 2.1 配置 2.2 数据类型 2.2.1 字符串 2.2.2 数值 2.2.3 布尔值 2.2.4 日期 2.3 常见指令 2.3.2 assign 2.3…...
巴尔加瓦算法图解:算法运用。
树 如果能将用户名插入到数组的正确位置就好了,这样就无需在插入后再排序。为此,有人设计了一种名为二叉查找树(binary search tree)的数据结构。 每个node的children 都不大于两个。对于其中的每个节点,左子节点的值都比它小,…...
Docker的镜像和容器的区别
1 Docker镜像 假设Linux内核是第0层,那么无论怎么运行Docker,它都是运行于内核层之上的。这个Docker镜像,是一个只读的镜像,位于第1层,它不能被修改或不能保存状态。 一个Docker镜像可以构建于另一个Docker镜像之上&…...
忘记 RAG:拥抱Agent设计,让 ChatGPT 更智能更贴近实际
RAG(检索增强生成)设计模式通常用于开发特定数据领域的基于实际情况的ChatGPT。 然而,重点主要是改进检索工具的效率,如嵌入式搜索、混合搜索和微调嵌入,而不是智能搜索。 这篇文章介绍了一种新的方法,灵感…...
利用路由懒加载和CDN分发策略,对Vue项目进行性能优化
目录 一、Vue项目 二、路由懒加载 三、CDN分发策略 四、如何对Vue项目进行性能优化 一、Vue项目 Vue是一种用于构建用户界面的JavaScript框架,它是一种渐进式框架,可以用于构建单页应用(SPA)和多页应用。Vue具有简单易学、灵…...
【Scala】1. 变量和数据类型
1. 变量和数据类型 1.1 for begining —— hello world 新建hello.scala文件,注意object名字与文件名一致。 object hello { def main(args:Array[String]): Unit { println("hello world!") } }运行后打印结果如下: hello world!Pr…...
何时以及如何选择制动电阻
制动电阻的选择是优化变频器应用的关键因素 制动电阻器在变频器中是如何工作的? 制动电阻器在 VFD 应用中的工作原理是将电机减速到驱动器设定的精确速度。它们对于电机的快速减速特别有用。制动电阻还可以将任何多余的能量馈入 VFD,以提升直流母线上的…...
消息中间件:Puslar、Kafka、RabbigMQ、ActiveMQ
消息队列 消息队列:它主要用来暂存生产者生产的消息,供后续其他消费者来消费。 它的功能主要有两个: 暂存(存储)队列(有序:先进先出 从目前互联网应用中使用消息队列的场景来看,…...
Rust开发WASM,浏览器运行WASM
首先需要安装wasm-pack cargo install wasm-pack 使用cargo创建工程 cargo new --lib mywasm 编辑Cargo.toml文件,修改lib的类型为cdylib,并且添加依赖wasm-bindgen [package] name "mywasm" version "0.1.0" edition "…...
cool-admin(midway版)数据库索引维护:高级实践指南
cool-admin(midway版)数据库索引维护:高级实践指南 【免费下载链接】cool-admin-midway 🔥 cool-admin(midway版)一个很酷的后台权限管理框架,模块化、插件化、CRUD极速开发,永久开源免费,基于midway.js 3.x、typescri…...
前端进阶 课程二十六、:Flex布局进阶与实战(复杂布局)
一、学习目标 掌握Flex布局嵌套规则,实现容器内多层Flex嵌套; 运用Flex完成头部+内容区+底部、卡片详情、响应式导航三大复杂布局; 解决Flex项目溢出、对齐失效、高度自适应等常见问题; 区分Flex与float布局,明确Flex的现代布局优势。 二、核心知识点+实战代码 1. Fl…...
多模态场景:头巾误判为厨师帽 — 问题分析与调优指南
多模态场景:头巾误判为厨师帽 — 问题分析与调优指南适用对象:使用 Qwen-VL 等多模态大模型做「厨师帽 / 头饰」相关识别时的面试问答、方案设计与落地调优参考。1. 问题本质:为什么会把头巾当成厨师帽 这通常不是「模型坏了」,而…...
Electron 14+ 开发必看:WebContentsView 实战指南(含与 BrowserView 对比)
Electron 14 开发实战:WebContentsView 深度解析与性能优化 如果你正在使用 Electron 14 开发跨平台桌面应用,那么 WebContentsView 绝对是你需要重点掌握的核心组件。作为 Electron 团队在 14 版本引入的全新视图系统,WebContentsView 不仅解…...
能源在线监测管理系统平台[fu源码]
EMS能源管理系统 基于 Vue3 / Spring Boot/Spring Cloud & Alibaba 微服务架构 项目技术框架 RuoYi-Cloud 基础框架上开发而成 源智优控AI能源大脑,能源AI版,即将上线 仓库地址: https://gitee.com/guangdong122/energy-management …...
3D元器件库在PCB设计中的关键作用与应用
1. 为什么你需要一套完整的3D元器件库作为一名电子工程师,我深知在PCB设计过程中,3D元器件库的重要性。传统的2D设计虽然能满足基本需求,但在实际生产装配时往往会遇到各种意想不到的机械干涉问题。记得我刚开始做硬件设计时,就曾…...
KT0803K FM发射芯片Arduino驱动开发与射频工程实践
1. KT0803系列FM发射芯片Arduino库深度解析与工程实践指南1.1 芯片定位与系统级约束KT0803及其衍生型号(KT0803K/L/M)是高度集成的单芯片FM广播发射器,专为低功耗、小体积音频广播应用设计。该系列芯片内部集成了PLL频率合成器、立体声编码器…...
2025届学术党必备的五大AI写作网站解析与推荐
Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek DeepSeek身为新一代人工智能辅助写作工具,于学术论文撰写的整个流程里࿰…...
Ant Design X:AI赋能前端开发的革命性工具
1. Ant Design X:当设计系统遇上AI会发生什么? 第一次听说Ant Design X时,我正在为一个电商项目焦头烂额地调试聊天机器人组件。传统方案需要自己对接NLP服务、处理对话状态、设计交互逻辑...直到同事扔给我一个链接:"试试这…...
ESP32 RMT硬件驱动RF遥控库:替代rc-switch的异步OOK方案
1. 项目概述RCSwitchRmt 是一款专为 ESP32 系列微控制器设计的射频(RF)OOK(On-Off Keying,开关键控)通信库,其核心目标是提供一种现代、异步、非阻塞的硬件驱动型替代方案,以取代广为人知但已显…...

