当前位置: 首页 > news >正文

【flink状态管理(四)】MemoryStateBackend的实现

文章目录

  • 1.基于MemoryStateBackend创建KeyedStateBackend
    • 1.1. 状态初始化
    • 1.2. 创建状态
  • 2. 基于MemoryStateBackend创建OperatorStateBackend
  • 3.基于MemoryStateBackend创建CheckpointStorage

在Flink中,默认的StateBackend实现为MemoryStateBackend,本文以MemoryStateBackend为例说明StateBackend的设计与实现。

 
本文介绍MemoryStateBackend中如下三个主要组件的创建过程:

  • HeapKeyedStateBackend
  • OperatorStateBackend
  • MemoryBackendCheckpointStorage

FsStateBackend和RocksDBStateBackend这两种状态后端存储的实现,功能和MemoryStateBackend类似,区别在于内部创建的KeyedStateBackend和CheckpointStorage。

 

1.基于MemoryStateBackend创建KeyedStateBackend

1.1. 状态初始化

AbstractStreamOperator.keyedStatedBackend()方法定义了创建和初始化KeyedStatedBackend的逻辑,具体如下。

protected <K> AbstractKeyedStateBackend<K> keyedStateBackend(TypeSerializer<K> keySerializer,String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry,MetricGroup metricGroup) throws Exception {if (keySerializer == null) {return null;}String logDescription = "keyed state backend for " + operatorIdentifierText;//1. TaskInfo taskInfo = environment.getTaskInfo();final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(taskInfo.getMaxNumberOfParallelSubtasks(),taskInfo.getNumberOfParallelSubtasks(),taskInfo.getIndexOfThisSubtask());// 确保恢复状态过程中构建的数据流被关闭CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);// 创建BackendRestorerProcedureBackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createKeyedStateBackend(environment,environment.getJobID(),operatorIdentifierText,keySerializer,taskInfo.getMaxNumberOfParallelSubtasks(),keyGroupRange,environment.getTaskKvStateRegistry(),TtlTimeProvider.DEFAULT,metricGroup,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 获取当前Task的TaskInfo,并基于TaskInfo的参数创建KeyGroupRange,表示当前Task实例中存储的Key分组区间
  2. 创建CloseableRegistry并注册到backendCloseableRegistry中,用于确保在任务取消的情况下关闭在恢复状态过程中构造的数据流。
  3. 创建BackendRestorerProcedure,提供了stateBackend.createKeyedStateBackend()方法,也包含恢复历史状态数据的方法。
  4. 创建KeyedStateBackend,同时对状态数据进行恢复。prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子历史状态,通过prioritizedOperatorSubtaskStates获取当前算子的PrioritizedManagedKeyedState,并基于这些状态数据恢复算子的状态。

 

1.2. 创建状态

接下来我们看MemoryStateBackend.createKeyedStateBackend()方法的具体实现。

public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {// 获取TaskStateManager实例TaskStateManager taskStateManager = env.getTaskStateManager();// 创建HeapPriorityQueueSetFactory实例HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);// 创建HeapKeyedStateBackendBuilder实例HeapKeyedStateBackendreturn new HeapKeyedStateBackendBuilder<>(kvStateRegistry,keySerializer,env.getUserClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),taskStateManager.createLocalRecoveryConfig(),priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();
}
  1. 从environment参数中获取TaskStateManager实例
  2. 创建HeapPriorityQueueSetFactory实例,用于生成HeapPriorityQueueSet优先级队列,存储TimerHeapInternalTimer等数据。
  3. 调用HeapKeyedStateBackendBuilder.build()方法创建HeapKeyedStateBackend。

 

2. 基于MemoryStateBackend创建OperatorStateBackend

和创建KeyedStateBackend的过程相似,AbstractStreamOperator.operatorStateBackend()方法实现了创建OperatorStateBackend的方法。

protected OperatorStateBackend operatorStateBackend(String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry) throws Exception {String logDescription = "operator state backend for " + operatorIdentifierText;CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createOperatorStateBackend(environment,operatorIdentifierText,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 创建CloseableRegisty,确保在任务取消的情况下能够关闭在恢复状态时构造的数据流。
  2. 创建BackendRestorerProcedure,封装了stateBackend.createOperatorStateBackend()方法,并包含恢复历史状态数据的操作。
  3. 创建OperatorStateBackend,并恢复状态数据。

其中prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子专有历史状态,可以通过prioritizedOperatorSubtaskStates获取当前算子中的PrioritizedManagedOperatorState,并基于这些状态数据恢复OperatorStateBackend中算子的状态。

 

3.基于MemoryStateBackend创建CheckpointStorage

在createCheckpointStorage()方法中,直接创建MemoryBackendCheckpointStorage实例并返回,没有涉及太多的流程

public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
}

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

相关文章:

【flink状态管理(四)】MemoryStateBackend的实现

文章目录 1.基于MemoryStateBackend创建KeyedStateBackend1.1. 状态初始化1.2. 创建状态 2. 基于MemoryStateBackend创建OperatorStateBackend3.基于MemoryStateBackend创建CheckpointStorage 在Flink中&#xff0c;默认的StateBackend实现为MemoryStateBackend&#xff0c;本文…...

前端架构: 脚手架在前端研发流程中的意义

关于脚手架 脚手架又被成为 CLI (command-line interface)基于文本界面&#xff0c;通过中断输入命令执行常见的脚手架&#xff1a;npm, webpack-cli, vue-cli拿 npm 这个脚手架来说 在终端当中输入 npm 命令, 系统就会通过文本方式返回 npm 的使用方法它这种通过命令行执行的…...

Qt网络编程-QTcpServer的封装

简单封装Tcp服务器类&#xff0c;将QTcpServer移入线程 头文件&#xff1a; #ifndef TCPSERVER_H #define TCPSERVER_H#include <QObject>class QTcpSocket; class QTcpServer; class QThread; class TcpServer : public QObject {Q_OBJECT public:explicit TcpServer(…...

【MySQL】_JDBC编程

目录 1. JDBC原理 2. 导入JDBC驱动包 3. 编写JDBC代码实现Insert 3.1 创建并初始化一个数据源 3.2 和数据库服务器建立连接 3.3 构造SQL语句 3.4 执行SQL语句 3.5 释放必要的资源 4. JDBC代码的优化 4.1 从控制台输入 4.2 避免SQL注入的SQL语句 5. 编写JDBC代码实现…...

微信小程序编译出现 project.config.json 文件内容错误

问题描述&#xff1a; 更新微信开发工具后&#xff0c;使用微信开发工具编译时出现project.config.json 文件内容错误。 原因&#xff1a;当前使用的微信开发工具非稳定版本。 解决方法&#xff1a; 在 manifest.json中加入以下代码&#xff1a; "mp-weixin" : …...

一周学会Django5 Python Web开发-Django5创建项目(用命令方式)

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计11条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…...

DockerUI如何部署结合内网穿透实现公网环境管理本地docker容器

文章目录 前言1. 安装部署DockerUI2. 安装cpolar内网穿透3. 配置DockerUI公网访问地址4. 公网远程访问DockerUI5. 固定DockerUI公网地址 前言 DockerUI是一个docker容器镜像的可视化图形化管理工具。DockerUI可以用来轻松构建、管理和维护docker环境。它是完全开源且免费的。基…...

UML之在Markdown中使用Mermaid绘制类图

1.UML概述 UML&#xff08;Unified modeling language UML&#xff09;统一建模语言&#xff0c;是一种用于软件系统分析和设计的语言工具&#xff0c;它用于帮助软件开发人员进行思考和记录思路。 类图是描述类与类之间的关系的&#xff0c;是UML图中最核心的。类图的是用于…...

Spring Boot + 七牛OSS: 简化云存储集成

引言 Spring Boot 是一个非常流行的、快速搭建应用的框架&#xff0c;它无需大量的配置即可运行起来&#xff0c;而七牛云OSS提供了稳定高效的云端对象存储服务。利用两者的优势&#xff0c;可以为应用提供强大的文件存储功能。 为什么选择七牛云OSS? 七牛云OSS提供了高速的…...

C++:二叉搜索树模拟实现(KV模型)

C&#xff1a;二叉搜索树模拟实现&#xff08;KV模型&#xff09; 前言模拟实现KV模型1. 节点封装2、前置工作&#xff08;默认构造、拷贝构造、赋值重载、析构函数等&#xff09;2. 数据插入&#xff08;递归和非递归版本&#xff09;3、数据删除&#xff08;递归和非递归版本…...

npm淘宝镜像源换新地址

新的淘宝npm镜像源地址&#xff1a;https://registry.npmmirror.com 切换新的镜像源 npm config set registry https://registry.npmmirror.com然后再执行以下操作查看是否成功 npm config list如果没安装过淘宝镜像源的&#xff0c;则直接安装 npm install -g cnpm --regi…...

十大排序算法之线性时间非比较类排序

线性时间非比较类排序 线性时间的算法执行效率也较高&#xff0c;从时间占用上看&#xff0c;线性时间非比较类排序要优于非线性时间排序&#xff0c;但其空间复杂度较非线性时间排序要大一些。因为线性时间非比较类排序算法会额外申请一定的空间进行分配排序&#xff0c;这也…...

容器基础:Docker 镜像如何保证部署的一致性?

Docker 镜像如何通过固化基础环境、固化依赖性和固化软件启动流程保证部署的一致性 Docker 镜像通过以下三个方面保证部署的一致性&#xff1a; 1. 固化基础环境: 镜像包含构建应用程序所需的所有环境依赖项&#xff0c;例如操作系统、库和工具。构建镜像时&#xff0c;所有…...

爪哇部落算法组2024新生赛热身赛题解

第一题&#xff08;签到&#xff09;&#xff1a; 1、题意&#xff1a; 2、题解: 我们观察到happynewyear的长度是12个字符&#xff0c;我们直接从前往后遍历0到n - 12的位置&#xff08;这里索引从0开始&#xff09;&#xff0c;使用C的substr()函数找到以i开头的长度为12的字…...

1123. 铲雪车(欧拉回路)

活动 - AcWing 随着白天越来越短夜晚越来越长&#xff0c;我们不得不考虑铲雪问题了。 整个城市所有的道路都是双向车道,道路的两个方向均需要铲雪。因为城市预算的削减&#xff0c;整个城市只有 1 辆铲雪车。 铲雪车只能把它开过的地方&#xff08;车道&#xff09;的雪铲干…...

网络协议与攻击模拟_15FTP协议

了解FTP协议 在Windows操作系统上使用serv-U软件搭建FTP服务 分析FTP流量 一、FTP协议 1、FTP概念 FTP&#xff08;文件传输协议&#xff09;由两部分组成&#xff1a;客户端/服务端&#xff08;C/S架构&#xff09; 应用场景&#xff1a;企业内部存放公司文件、开发网站时利…...

「效果图渲染」效果图与3D影视动画渲染平台

效果图渲染和3D影视动画渲染都是视觉图像渲染的领域应用。效果图渲染主要服务于建筑、室内设计和产品设计等行业&#xff0c;这些领域通常对视觉呈现的精度和细节有较高要求。与之相比&#xff0c;3D影视动画渲染则普遍应用于电影、电视、视频游戏和广告等媒体领域&#xff0c;…...

Blender_查看版本

Blender_查看版本 烦人的烦恼&#xff0c;没找见哪儿可以查看版本&#xff1f; 算是个隐蔽的角落&#xff01;...

node.js 读目录.txt文件,用 xml2js 转换为json数据,生成jstree所需的文件

请参阅&#xff1a;java : pdfbox 读取 PDF文件内书签 请注意&#xff1a;书的目录.txt 编码&#xff1a;UTF-8&#xff0c;推荐用 Notepad 转换编码。 npm install elementtree ; npm install xml2js ; node.js 用 elementtree读目录.txt文件&#xff0c;用 xml2js 转换为…...

【Docker】02 镜像管理

文章目录 一、Images镜像二、管理操作2.1 搜索镜像2.1.1 命令行搜索2.1.2 页面搜索2.1.3 搜索条件 2.2 下载镜像2.3 查看本地镜像2.3.1 docker images2.3.2 --help2.3.3 repository name2.3.4 --filter2.3.5 -q2.3.6 --format 2.4 给镜像打标签2.5 推送镜像2.6 删除镜像2.7 导出…...

TypeGPT:全局AI助手实现原理与配置指南,让大模型无缝融入工作流

1. 项目概述&#xff1a;一个全局AI助手&#xff0c;如何让大模型无处不在 如果你和我一样&#xff0c;每天的工作流里充斥着各种文本输入场景——写代码、回邮件、在文档里做笔记、甚至在聊天软件里跟同事讨论问题&#xff0c;那你肯定也想过&#xff1a;要是能让AI助手随时待…...

ServerPackCreator终极指南:3分钟自动化创建Minecraft服务器包 [特殊字符]

ServerPackCreator终极指南&#xff1a;3分钟自动化创建Minecraft服务器包 &#x1f680; 【免费下载链接】ServerPackCreator Create a server pack from a Minecraft Forge, NeoForge, Fabric, LegacyFabric or Quilt modpack! 项目地址: https://gitcode.com/gh_mirrors/s…...

多波束声呐接收机与信号处理算法【附程序】

✨ 长期致力于多通道声呐接收机、电路设计、FPGA、数字信号处理、波束形成研究工作&#xff0c;擅长数据搜集与处理、建模仿真、程序编写、仿真设计。 ✅ 专业定制毕设、代码 ✅ 如需沟通交流&#xff0c;点击《获取方式》 &#xff08;1&#xff09;小型化96通道接收机硬件电路…...

FPGA硬件在环验证:GateRocket方案加速系统级调试

1. 项目概述&#xff1a;为什么FPGA验证需要“硬件在环”&#xff1f;在FPGA设计领域&#xff0c;尤其是当项目规模膨胀到数百万甚至上千万门级时&#xff0c;纯软件仿真&#xff08;Simulation&#xff09;会变成一个令人头疼的瓶颈。想象一下&#xff0c;你写了一段新的RTL代…...

AI辅助编程工具Cursor在经济学研究中的应用与实战指南

1. 从零开始&#xff1a;为什么经济学家需要AI辅助编程工具 如果你是一名经济学研究者、研究生或者研究助理&#xff0c;我猜你肯定经历过这样的场景&#xff1a;为了清洗一份来自世界银行或国家统计局的复杂面板数据&#xff0c;你对着Stata或者R的代码文档反复调试&#xff0…...

终极指南:如何用React JSON Schema Form快速构建专业表单设计语言

终极指南&#xff1a;如何用React JSON Schema Form快速构建专业表单设计语言 【免费下载链接】react-jsonschema-form A React component for building Web forms from JSON Schema. 项目地址: https://gitcode.com/gh_mirrors/re/react-jsonschema-form React JSON Sc…...

STM32H7网络通信避坑指南:CubeMX配置LWIP 2.1.2时,这几个DCache和ETH的坑你别踩

STM32H7网络通信深度优化&#xff1a;LWIP 2.1.2配置与Cache一致性实战解析 当你在CubeMX中勾选了ETH和LWIP组件&#xff0c;生成代码后却发现设备无法稳定响应ping请求&#xff0c;或者传输大文件时出现数据错乱——这很可能与STM32H7独特的Cache架构有关。本文将带你深入理解…...

客户受电工程图纸审核|全网独家复现,多模态+知识图谱创新改进篇 引入MM-KG融合架构,多模态感知+知识关联助力图纸全检、隐患精准定位、审核效率翻倍

目录 一、行业痛点:人工抽检模式的致命瓶颈(附真实场景痛点) 1.1 审核效率极低,无法适配规模化需求 1.2 漏判误判率高,审核质量依赖个人经验 1.3 审核标准不统一,追溯难度大 1.4 人力成本高昂,专业人才缺口大 二、创新突破:多模态+知识图谱融合架构(核心改进解析…...

全球扩张加剧法律复杂性,但仅有7%的企业实现全面合规

• 47%的总法律顾问表示&#xff0c;实际控制人规则对法律运营构成了最大的风险 • 44%的企业对能否满足跨境数据安全要求缺乏信心 随着企业在2026年加速全球扩张&#xff0c;合规工作却未能跟上步伐。事实上&#xff0c;根据全球领先的商业管理与合规解决方案提供商CSC的一项最…...

PS2021神经滤镜离线包保姆级安装指南(附文件夹显示与路径详解)

PS2021神经滤镜离线包安装全流程实战手册 第一次打开Photoshop 2021的神经滤镜功能时&#xff0c;那个漫长的下载进度条简直让人崩溃。特别是当网络环境不稳定时&#xff0c;下载失败的概率直线上升。其实Adobe官方提供了完整的离线安装方案&#xff0c;只是隐藏得比较深——就…...