【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析
文章目录
- 1. 状态初始化总流程梳理
- 2.创建StreamOperatorStateContext
- 3. StateInitializationContext的接口设计。
- 4. 状态初始化举例:UDF状态初始化
在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。
// Allow invoking method 'invoke' without having to call 'restore' before it.if (!isRunning) {LOG.debug("Restoring during invoke will be called.");restoreInternal();}
StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。
RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { Iterator var2 = this.getAllOperators(true).iterator(); while(var2.hasNext()) { StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next(); StreamOperator<?> operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。
1. 状态初始化总流程梳理
AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:
# AbstractStreamOperator.initializeStatepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { //1. 获取类型序列化器final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); //2. get containingTaskfinal StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask()); final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables()); //3. create StreamOperatorStateContextfinal StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, keySerializer, streamTaskCloseableRegistry, metrics, config.getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.STATE_BACKEND, runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState()); //4. create stateHandlerstateHandler = new StreamOperatorStateHandler( context, getExecutionConfig(), streamTaskCloseableRegistry); timeServiceManager = context.internalTimerServiceManager(); //5. initialize OperatorStatestateHandler.initializeOperatorState(this); //6. set KeyedStateStore in runtimeContextruntimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。
状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。
2.创建StreamOperatorStateContext
接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:
StreamTaskStateInitializerImpl
@Override
public StreamOperatorStateContext streamOperatorStateContext( @Nonnull OperatorID operatorID, @Nonnull String operatorClassName, @Nonnull ProcessingTimeService processingTimeService, @Nonnull KeyContext keyContext, @Nullable TypeSerializer<?> keySerializer, @Nonnull CloseableRegistry streamTaskCloseableRegistry, @Nonnull MetricGroup metricGroup, double managedMemoryFraction, boolean isUsingCustomRawKeyedState) throws Exception { //1. 获取task实例信息TaskInfo taskInfo = environment.getTaskInfo(); OperatorSubtaskDescriptionText operatorSubtaskDescription = new OperatorSubtaskDescriptionText( operatorID, operatorClassName, taskInfo.getIndexOfThisSubtask(), taskInfo.getNumberOfParallelSubtasks()); final String operatorIdentifierText = operatorSubtaskDescription.toString(); final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates = taskStateManager.prioritizedOperatorState(operatorID); CheckpointableKeyedStateBackend<?> keyedStatedBackend = null; OperatorStateBackend operatorStateBackend = null; CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null; CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null; InternalTimeServiceManager<?> timeServiceManager; try { // 创建keyed类型的状态后端// -------------- Keyed State Backend -------------- keyedStatedBackend = keyedStatedBackend( keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, managedMemoryFraction); //创建operator类型的状态后端// -------------- Operator State Backend -------------- operatorStateBackend = operatorStateBackend( operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry); //创建原生类型状态后端// -------------- Raw State Streams -------------- rawKeyedStateInputs = rawKeyedStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawKeyedState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs); rawOperatorStateInputs = rawOperatorStateInputs( prioritizedOperatorSubtaskStates .getPrioritizedRawOperatorState() .iterator()); streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs); //创建Internal Timer Service Manager// -------------- Internal Timer Service Manager -------------- if (keyedStatedBackend != null) { // if the operator indicates that it is using custom raw keyed state, // then whatever was written in the raw keyed state snapshot was NOT written // by the internal timer services (because there is only ever one user of raw keyed // state); // in this case, timers should not attempt to restore timers from the raw keyed // state. final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers = (prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState) ? rawKeyedStateInputs : Collections.emptyList(); timeServiceManager = timeServiceManagerProvider.create( keyedStatedBackend, environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, restoredRawKeyedStateTimers); } else { timeServiceManager = null; } // -------------- Preparing return value -------------- return new StreamOperatorStateContextImpl( prioritizedOperatorSubtaskStates.getRestoredCheckpointId(), operatorStateBackend, keyedStatedBackend, timeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs); } catch (Exception ex) { 。。。。
}
流程梳理:
- 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
- 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
- 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
- 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
- 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
- 将所有创建出来的托管状态管理后端
keyedStatedBackend和operatorStateBackend
、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例
,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件,并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。
StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。
3. StateInitializationContext的接口设计。
StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。
FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。
StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等
StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。
4. 状态初始化举例:UDF状态初始化
在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。
AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。
《Flink设计与实现:核心原理与源码解析》张利兵
相关文章:

【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析
文章目录 1. 状态初始化总流程梳理2.创建StreamOperatorStateContext3. StateInitializationContext的接口设计。4. 状态初始化举例:UDF状态初始化 在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在…...

python+flask人口普查数据的应用研究及实现django
作为一款人口普查数据的应用研究及实现,面向的是大多数学者,软件的界面设计简洁清晰,用户可轻松掌握使用技巧。在调查之后,获得用户以下需求: (1)用户注册登录后,可进入系统解锁更多…...

C语言:函数
C语言:函数 函数的概念库函数自定义函数实参与形参return语句数组做参数声明与定义externstatic 嵌套调用 函数的概念 在C语言中,存在一个函数的概念,有人也将其翻译为子程序。 在数学中,函数是一个完成特定功能的公式࿰…...

jmeter-问题一:关于线程组,线程数,用户数详解
文章目录 jmeter参数介绍1.线程数2.准备时长(Ramp-up)3.循环次数4.same user on each iteratio5.调度器 场景一:当你的线程组中线程数为1,循环为1场景二:当你的线程组中线程数为2,循环为1场景三:当你的线程组中线程数为1ÿ…...

golang 通过 cgo 调用 C++ 库
思路 将 C 库包装成 C 库 -> golang 通过 cgo 调用 C 库 C 相关文件 目录列表 include/ some.h C 库头文件some_wrapper.h < 用于将 C 库包装成 C 库的头文件 lib/ libsome.a C 库 src/ some_wrapper.cpp < 用于将 C 库包装成 C 库的源码文件 源码示例 some.h…...

使用 IDEA 开发一个简单易用的 SDK
目录 一、什么是 SDK 二、为什么要开发 SDK 三、开发 SDK 的详细步骤 四、导入 SDK 进行测试 附:ConfigurationProperties 注解的介绍及使用 一、什么是 SDK 1. 定义:软件开发工具包 Software Development Kit 2. 用于开发特定软件或应用程序的工…...

CSS transition(过渡效果)详解
CSS过渡效果(Transition)是一种在CSS3中引入的动画效果,它允许开发者在元素状态变化时(如鼠标悬停、类更改等)平滑地改变CSS属性值,从而创建出平滑的动画效果。过渡效果可以应用于多种CSS属性,如…...

Android13多媒体框架概览
Android13多媒体框架概览 Android 多媒体框架 Android 多媒体框架旨在为 Java 服务提供可靠的接口。它是一个系统,包括多媒体应用程序、框架、OpenCore 引擎、音频/视频/输入的硬件设备,输出设备以及一些核心动态库,比如 libmedia、libmedi…...

一文读懂:MybatisPlus从入门到进阶
快速入门 简介 在项目开发中,Mybatis已经为我们简化了代码编写。 但是我们仍需要编写很多单表CURD语句,MybatisPlus可以进一步简化Mybatis。 MybatisPlus官方文档:https://www.baomidou.com/,感谢苞米豆和黑马程序员。 Mybat…...

C语言--------指针(1)
0.指针&指针变量 32位平台,指针变量是4个字节(32bit/84)--------x86 64位平台,指针变量是8个字节(64bit/88)--------x64 编号指针地址;我们平常讲的p是指针就是说p是一个指针变量; ************只要…...

Vite 下一代的前端工具链,前端开发与构建工具
一、Vite 简介 官方中文网站:Vite | 下一代的前端工具链 官方定义: Vite,下一代的前端工具链,为开发提供极速响应。 Vue3.4版本,Vue新版本使用Vite构建、开发、调试、编译。 Vite的优势 极速的服务启动 使用原生…...

【SpringBoot】FreeMarker视图渲染
目录 一、FreeMarker 简介 1.1 什么是FreeMarker? 1.2 Freemarker模板组成部分 1.3 为什么要使用FreeMarker 二、Springboot集成FreeMarker 2.1 配置 2.2 数据类型 2.2.1 字符串 2.2.2 数值 2.2.3 布尔值 2.2.4 日期 2.3 常见指令 2.3.2 assign 2.3…...

巴尔加瓦算法图解:算法运用。
树 如果能将用户名插入到数组的正确位置就好了,这样就无需在插入后再排序。为此,有人设计了一种名为二叉查找树(binary search tree)的数据结构。 每个node的children 都不大于两个。对于其中的每个节点,左子节点的值都比它小,…...

Docker的镜像和容器的区别
1 Docker镜像 假设Linux内核是第0层,那么无论怎么运行Docker,它都是运行于内核层之上的。这个Docker镜像,是一个只读的镜像,位于第1层,它不能被修改或不能保存状态。 一个Docker镜像可以构建于另一个Docker镜像之上&…...

忘记 RAG:拥抱Agent设计,让 ChatGPT 更智能更贴近实际
RAG(检索增强生成)设计模式通常用于开发特定数据领域的基于实际情况的ChatGPT。 然而,重点主要是改进检索工具的效率,如嵌入式搜索、混合搜索和微调嵌入,而不是智能搜索。 这篇文章介绍了一种新的方法,灵感…...

利用路由懒加载和CDN分发策略,对Vue项目进行性能优化
目录 一、Vue项目 二、路由懒加载 三、CDN分发策略 四、如何对Vue项目进行性能优化 一、Vue项目 Vue是一种用于构建用户界面的JavaScript框架,它是一种渐进式框架,可以用于构建单页应用(SPA)和多页应用。Vue具有简单易学、灵…...

【Scala】1. 变量和数据类型
1. 变量和数据类型 1.1 for begining —— hello world 新建hello.scala文件,注意object名字与文件名一致。 object hello { def main(args:Array[String]): Unit { println("hello world!") } }运行后打印结果如下: hello world!Pr…...

何时以及如何选择制动电阻
制动电阻的选择是优化变频器应用的关键因素 制动电阻器在变频器中是如何工作的? 制动电阻器在 VFD 应用中的工作原理是将电机减速到驱动器设定的精确速度。它们对于电机的快速减速特别有用。制动电阻还可以将任何多余的能量馈入 VFD,以提升直流母线上的…...

消息中间件:Puslar、Kafka、RabbigMQ、ActiveMQ
消息队列 消息队列:它主要用来暂存生产者生产的消息,供后续其他消费者来消费。 它的功能主要有两个: 暂存(存储)队列(有序:先进先出 从目前互联网应用中使用消息队列的场景来看,…...

Rust开发WASM,浏览器运行WASM
首先需要安装wasm-pack cargo install wasm-pack 使用cargo创建工程 cargo new --lib mywasm 编辑Cargo.toml文件,修改lib的类型为cdylib,并且添加依赖wasm-bindgen [package] name "mywasm" version "0.1.0" edition "…...

Vue3编写简单的App组件(二)
一、Vue3页面渲染基本流程 1、入口文件 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><link rel"icon" href"/favicon.ico"><meta name"viewport" content"widthde…...

java Servlet 云平台教学系统myeclipse定制开发SQLServer数据库网页模式java编程jdbc
一、源码特点 JSP 云平台教学系统是一套完善的web设计系统,对理解JSP java编程开发语言有帮助 系统采用serlvet dao bean,系统具有完整的源代码和数据库 ,系统主要采用B/S模式开发。开发 环境为TOMCAT7.0,Myeclipse8.5开发,数据…...

QT初始程序
#include "widget.h"#include <QApplication>int main(int argc, char *argv[]){QApplication a(argc, argv);Widget w;w.show();return a.exec();} 解释: Qt系统提供的类头文件没有.h后缀Qt一个类对应一个头文件,类名和头文件名一致QA…...

ubuntu22.04@laptop OpenCV Get Started: 001_reading_displaying_write_image
ubuntu22.04laptop OpenCV Get Started: 001_reading_displaying_write_image 1. 源由2. Read/Display/Write应用Demo2.1 C应用Demo2.2 Python应用Demo 3. 过程分析3.1 导入OpenCV库3.2 读取图像文件3.3 显示图像3.4 保存图像文件 4. 总结5. 参考资料 1. 源由 读、写、显示图像…...

51单片机之LED灯模块篇
御风以翔 破浪以飏 🎥个人主页 🔥个人专栏 目录 点亮一盏LED灯 LED的组成原理 LED的硬件模型 点亮一盏LED灯的程序设计 LED灯闪烁 LED流水灯 独立按键控制LED灯亮灭 独立按键的组成原理 独立按键的硬件模型 独立按键控制LED灯状态 按键的抖动 独立按键…...

springboo冬奥会科普平台源码和论文
随着信息技术和网络技术的飞速发展,人类已进入全新信息化时代,传统管理技术已无法高效,便捷地管理信息。为了迎合时代需求,优化管理效率,各种各样的管理平台应运而生,各行各业相继进入信息管理时代…...

改进神经网络
Improve NN 文章目录 Improve NNtrain/dev/test setBias/Variancebasic recipeRegularizationLogistic RegressionNeural networkother ways optimization problemNormalizing inputsvanishing/exploding gradientsweight initializegradient checkNumerical approximationgrad…...

HarmonyOS 开发学习笔记
HarmonyOS 开发学习笔记 一、开发准备1.1、了解ArkTs语言1.2、TypeScript语法1.2.1、变量声明1.2.2、条件控制1.2.3、函数1.2.4、类和接口1.2.5、模块开发 1.3、快速入门 二、ArkUI组件2.1、Image组件2.2、Text文本显示组件2.3、TextInput文本输入框组件2.4、Button按钮组件2.5…...

maven java 如何打纯源码zip包
一、背景 打纯源码包给第三方进行安全漏洞扫描 二、maven插件 项目中加入下面的maven 插件 <!-- 要将源码放上去,需要加入这个插件 --><plugin><artifactId>maven-source-plugin</artifactId><version>2.4</version><con…...

Altium Designer(AD)原理图库添加阵列管脚图文教程及视频演示
🏡《专栏目录》 目录 视频演示1,概述2,添加方法3,总结视频演示 Altium Designer(AD24)原理图库添加阵列管脚 欢迎点击浏览更多高清视频演示 1,概述...