【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析
文章目录
- 1. 状态初始化总流程梳理
- 2.创建StreamOperatorStateContext
- 3. StateInitializationContext的接口设计。
- 4. 状态初始化举例:UDF状态初始化
在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。
// Allow invoking method 'invoke' without having to call 'restore' before it.if (!isRunning) {LOG.debug("Restoring during invoke will be called.");restoreInternal();}
StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。
RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { Iterator var2 = this.getAllOperators(true).iterator(); while(var2.hasNext()) { StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next(); StreamOperator<?> operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。
1. 状态初始化总流程梳理
AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:
# AbstractStreamOperator.initializeStatepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { //1. 获取类型序列化器final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); //2. get containingTaskfinal StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask()); final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables()); //3. create StreamOperatorStateContextfinal StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, keySerializer, streamTaskCloseableRegistry, metrics, config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState()); //4. create stateHandlerstateHandler = new StreamOperatorStateHandler( context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager = context.internalTimerServiceManager(); //5. initialize OperatorStatestateHandler.initializeOperatorState(this); //6. set KeyedStateStore in runtimeContextruntimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。
状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。
2.创建StreamOperatorStateContext
接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:
StreamTaskStateInitializerImpl
@Override
public StreamOperatorStateContext streamOperatorStateContext( @Nonnull OperatorID operatorID, @Nonnull String operatorClassName, @Nonnull ProcessingTimeService processingTimeService, @Nonnull KeyContext keyContext, @Nullable TypeSerializer<?> keySerializer, @Nonnull CloseableRegistry streamTaskCloseableRegistry, @Nonnull MetricGroup metricGroup, double managedMemoryFraction, boolean isUsingCustomRawKeyedState) throws Exception { //1. 获取task实例信息TaskInfo taskInfo = environment.getTaskInfo(); OperatorSubtaskDescriptionText operatorSubtaskDescription = new OperatorSubtaskDescriptionText( operatorID, operatorClassName, taskInfo.getIndexOfThisSubtask(), taskInfo.getNumberOfParallelSubtasks()); final String operatorIdentifierText = operatorSubtaskDescription.toString(); final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates = taskStateManager.prioritizedOperatorState(operatorID); CheckpointableKeyedStateBackend<?> keyedStatedBackend = null; OperatorStateBackend operatorStateBackend = null; CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null; CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null; InternalTimeServiceManager<?> timeServiceManager; try { // 创建keyed类型的状态后端// -------------- Keyed State Backend -------------- keyedStatedBackend = keyedStatedBackend( keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, managedMemoryFraction); //创建operator类型的状态后端// -------------- Operator State Backend -------------- operatorStateBackend = operatorStateBackend( operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry); //创建原生类型状态后端// -------------- Raw State Streams -------------- rawKeyedStateInputs = rawKeyedStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawKeyedState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); rawOperatorStateInputs = rawOperatorStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawOperatorState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); //创建Internal Timer Service Manager// -------------- Internal Timer Service Manager -------------- if (keyedStatedBackend != null) { // if the operator indicates that it is using custom raw keyed state, // then whatever was written in the raw keyed state snapshot was NOT written // by the internal timer services (because there is only ever one user of raw keyed // state); // in this case, timers should not attempt to restore timers from the raw keyed // state. final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers = (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState) ? rawKeyedStateInputs : Collections.emptyList(); timeServiceManager = timeServiceManagerProvider.create( keyedStatedBackend, environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, restoredRawKeyedStateTimers); } else { timeServiceManager = null; } // -------------- Preparing return value -------------- return new StreamOperatorStateContextImpl( prioritizedOperatorSubtaskStates.getRestoredCheckpointId(), operatorStateBackend, keyedStatedBackend, timeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs); } catch (Exception ex) { 。。。。
}
流程梳理:
- 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
- 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
- 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
- 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
- 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
- 将所有创建出来的托管状态管理后端
keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件,并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。
StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。
3. StateInitializationContext的接口设计。
StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。
FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。
StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等
StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

4. 状态初始化举例:UDF状态初始化
在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。
AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。
《Flink设计与实现:核心原理与源码解析》张利兵
相关文章:
【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析
文章目录 1. 状态初始化总流程梳理2.创建StreamOperatorStateContext3. StateInitializationContext的接口设计。4. 状态初始化举例:UDF状态初始化 在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在…...
python+flask人口普查数据的应用研究及实现django
作为一款人口普查数据的应用研究及实现,面向的是大多数学者,软件的界面设计简洁清晰,用户可轻松掌握使用技巧。在调查之后,获得用户以下需求: (1)用户注册登录后,可进入系统解锁更多…...
C语言:函数
C语言:函数 函数的概念库函数自定义函数实参与形参return语句数组做参数声明与定义externstatic 嵌套调用 函数的概念 在C语言中,存在一个函数的概念,有人也将其翻译为子程序。 在数学中,函数是一个完成特定功能的公式࿰…...
jmeter-问题一:关于线程组,线程数,用户数详解
文章目录 jmeter参数介绍1.线程数2.准备时长(Ramp-up)3.循环次数4.same user on each iteratio5.调度器 场景一:当你的线程组中线程数为1,循环为1场景二:当你的线程组中线程数为2,循环为1场景三:当你的线程组中线程数为1ÿ…...
golang 通过 cgo 调用 C++ 库
思路 将 C 库包装成 C 库 -> golang 通过 cgo 调用 C 库 C 相关文件 目录列表 include/ some.h C 库头文件some_wrapper.h < 用于将 C 库包装成 C 库的头文件 lib/ libsome.a C 库 src/ some_wrapper.cpp < 用于将 C 库包装成 C 库的源码文件 源码示例 some.h…...
使用 IDEA 开发一个简单易用的 SDK
目录 一、什么是 SDK 二、为什么要开发 SDK 三、开发 SDK 的详细步骤 四、导入 SDK 进行测试 附:ConfigurationProperties 注解的介绍及使用 一、什么是 SDK 1. 定义:软件开发工具包 Software Development Kit 2. 用于开发特定软件或应用程序的工…...
CSS transition(过渡效果)详解
CSS过渡效果(Transition)是一种在CSS3中引入的动画效果,它允许开发者在元素状态变化时(如鼠标悬停、类更改等)平滑地改变CSS属性值,从而创建出平滑的动画效果。过渡效果可以应用于多种CSS属性,如…...
Android13多媒体框架概览
Android13多媒体框架概览 Android 多媒体框架 Android 多媒体框架旨在为 Java 服务提供可靠的接口。它是一个系统,包括多媒体应用程序、框架、OpenCore 引擎、音频/视频/输入的硬件设备,输出设备以及一些核心动态库,比如 libmedia、libmedi…...
一文读懂:MybatisPlus从入门到进阶
快速入门 简介 在项目开发中,Mybatis已经为我们简化了代码编写。 但是我们仍需要编写很多单表CURD语句,MybatisPlus可以进一步简化Mybatis。 MybatisPlus官方文档:https://www.baomidou.com/,感谢苞米豆和黑马程序员。 Mybat…...
C语言--------指针(1)
0.指针&指针变量 32位平台,指针变量是4个字节(32bit/84)--------x86 64位平台,指针变量是8个字节(64bit/88)--------x64 编号指针地址;我们平常讲的p是指针就是说p是一个指针变量; ************只要…...
Vite 下一代的前端工具链,前端开发与构建工具
一、Vite 简介 官方中文网站:Vite | 下一代的前端工具链 官方定义: Vite,下一代的前端工具链,为开发提供极速响应。 Vue3.4版本,Vue新版本使用Vite构建、开发、调试、编译。 Vite的优势 极速的服务启动 使用原生…...
【SpringBoot】FreeMarker视图渲染
目录 一、FreeMarker 简介 1.1 什么是FreeMarker? 1.2 Freemarker模板组成部分 1.3 为什么要使用FreeMarker 二、Springboot集成FreeMarker 2.1 配置 2.2 数据类型 2.2.1 字符串 2.2.2 数值 2.2.3 布尔值 2.2.4 日期 2.3 常见指令 2.3.2 assign 2.3…...
巴尔加瓦算法图解:算法运用。
树 如果能将用户名插入到数组的正确位置就好了,这样就无需在插入后再排序。为此,有人设计了一种名为二叉查找树(binary search tree)的数据结构。 每个node的children 都不大于两个。对于其中的每个节点,左子节点的值都比它小,…...
Docker的镜像和容器的区别
1 Docker镜像 假设Linux内核是第0层,那么无论怎么运行Docker,它都是运行于内核层之上的。这个Docker镜像,是一个只读的镜像,位于第1层,它不能被修改或不能保存状态。 一个Docker镜像可以构建于另一个Docker镜像之上&…...
忘记 RAG:拥抱Agent设计,让 ChatGPT 更智能更贴近实际
RAG(检索增强生成)设计模式通常用于开发特定数据领域的基于实际情况的ChatGPT。 然而,重点主要是改进检索工具的效率,如嵌入式搜索、混合搜索和微调嵌入,而不是智能搜索。 这篇文章介绍了一种新的方法,灵感…...
利用路由懒加载和CDN分发策略,对Vue项目进行性能优化
目录 一、Vue项目 二、路由懒加载 三、CDN分发策略 四、如何对Vue项目进行性能优化 一、Vue项目 Vue是一种用于构建用户界面的JavaScript框架,它是一种渐进式框架,可以用于构建单页应用(SPA)和多页应用。Vue具有简单易学、灵…...
【Scala】1. 变量和数据类型
1. 变量和数据类型 1.1 for begining —— hello world 新建hello.scala文件,注意object名字与文件名一致。 object hello { def main(args:Array[String]): Unit { println("hello world!") } }运行后打印结果如下: hello world!Pr…...
何时以及如何选择制动电阻
制动电阻的选择是优化变频器应用的关键因素 制动电阻器在变频器中是如何工作的? 制动电阻器在 VFD 应用中的工作原理是将电机减速到驱动器设定的精确速度。它们对于电机的快速减速特别有用。制动电阻还可以将任何多余的能量馈入 VFD,以提升直流母线上的…...
消息中间件:Puslar、Kafka、RabbigMQ、ActiveMQ
消息队列 消息队列:它主要用来暂存生产者生产的消息,供后续其他消费者来消费。 它的功能主要有两个: 暂存(存储)队列(有序:先进先出 从目前互联网应用中使用消息队列的场景来看,…...
Rust开发WASM,浏览器运行WASM
首先需要安装wasm-pack cargo install wasm-pack 使用cargo创建工程 cargo new --lib mywasm 编辑Cargo.toml文件,修改lib的类型为cdylib,并且添加依赖wasm-bindgen [package] name "mywasm" version "0.1.0" edition "…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...
逻辑回归暴力训练预测金融欺诈
简述 「使用逻辑回归暴力预测金融欺诈,并不断增加特征维度持续测试」的做法,体现了一种逐步建模与迭代验证的实验思路,在金融欺诈检测中非常有价值,本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...
LabVIEW双光子成像系统技术
双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制,展现出显著的技术优势: 深层组织穿透能力:适用于活体组织深度成像 高分辨率观测性能:满足微观结构的精细研究需求 低光毒性特点:减少对样本的损伤…...
基于PHP的连锁酒店管理系统
有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发,数据库mysql,前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...
Sklearn 机器学习 缺失值处理 获取填充失值的统计值
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...
EasyRTC音视频实时通话功能在WebRTC与智能硬件整合中的应用与优势
一、WebRTC与智能硬件整合趋势 随着物联网和实时通信需求的爆发式增长,WebRTC作为开源实时通信技术,为浏览器与移动应用提供免插件的音视频通信能力,在智能硬件领域的融合应用已成必然趋势。智能硬件不再局限于单一功能,对实时…...
虚幻基础:角色旋转
能帮到你的话,就给个赞吧 😘 文章目录 移动组件使用控制器所需旋转:组件 使用 控制器旋转将旋转朝向运动:组件 使用 移动方向旋转 控制器旋转和移动旋转 缺点移动旋转:必须移动才能旋转,不移动不旋转控制器…...

