当前位置: 首页 > news >正文

【flink状态管理(四)】MemoryStateBackend的实现

文章目录

  • 1.基于MemoryStateBackend创建KeyedStateBackend
    • 1.1. 状态初始化
    • 1.2. 创建状态
  • 2. 基于MemoryStateBackend创建OperatorStateBackend
  • 3.基于MemoryStateBackend创建CheckpointStorage

在Flink中,默认的StateBackend实现为MemoryStateBackend,本文以MemoryStateBackend为例说明StateBackend的设计与实现。

 
本文介绍MemoryStateBackend中如下三个主要组件的创建过程:

  • HeapKeyedStateBackend
  • OperatorStateBackend
  • MemoryBackendCheckpointStorage

FsStateBackend和RocksDBStateBackend这两种状态后端存储的实现,功能和MemoryStateBackend类似,区别在于内部创建的KeyedStateBackend和CheckpointStorage。

 

1.基于MemoryStateBackend创建KeyedStateBackend

1.1. 状态初始化

AbstractStreamOperator.keyedStatedBackend()方法定义了创建和初始化KeyedStatedBackend的逻辑,具体如下。

protected <K> AbstractKeyedStateBackend<K> keyedStateBackend(TypeSerializer<K> keySerializer,String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry,MetricGroup metricGroup) throws Exception {if (keySerializer == null) {return null;}String logDescription = "keyed state backend for " + operatorIdentifierText;//1. TaskInfo taskInfo = environment.getTaskInfo();final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(taskInfo.getMaxNumberOfParallelSubtasks(),taskInfo.getNumberOfParallelSubtasks(),taskInfo.getIndexOfThisSubtask());// 确保恢复状态过程中构建的数据流被关闭CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);// 创建BackendRestorerProcedureBackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createKeyedStateBackend(environment,environment.getJobID(),operatorIdentifierText,keySerializer,taskInfo.getMaxNumberOfParallelSubtasks(),keyGroupRange,environment.getTaskKvStateRegistry(),TtlTimeProvider.DEFAULT,metricGroup,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 获取当前Task的TaskInfo,并基于TaskInfo的参数创建KeyGroupRange,表示当前Task实例中存储的Key分组区间
  2. 创建CloseableRegistry并注册到backendCloseableRegistry中,用于确保在任务取消的情况下关闭在恢复状态过程中构造的数据流。
  3. 创建BackendRestorerProcedure,提供了stateBackend.createKeyedStateBackend()方法,也包含恢复历史状态数据的方法。
  4. 创建KeyedStateBackend,同时对状态数据进行恢复。prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子历史状态,通过prioritizedOperatorSubtaskStates获取当前算子的PrioritizedManagedKeyedState,并基于这些状态数据恢复算子的状态。

 

1.2. 创建状态

接下来我们看MemoryStateBackend.createKeyedStateBackend()方法的具体实现。

public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {// 获取TaskStateManager实例TaskStateManager taskStateManager = env.getTaskStateManager();// 创建HeapPriorityQueueSetFactory实例HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);// 创建HeapKeyedStateBackendBuilder实例HeapKeyedStateBackendreturn new HeapKeyedStateBackendBuilder<>(kvStateRegistry,keySerializer,env.getUserClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),taskStateManager.createLocalRecoveryConfig(),priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();
}
  1. 从environment参数中获取TaskStateManager实例
  2. 创建HeapPriorityQueueSetFactory实例,用于生成HeapPriorityQueueSet优先级队列,存储TimerHeapInternalTimer等数据。
  3. 调用HeapKeyedStateBackendBuilder.build()方法创建HeapKeyedStateBackend。

 

2. 基于MemoryStateBackend创建OperatorStateBackend

和创建KeyedStateBackend的过程相似,AbstractStreamOperator.operatorStateBackend()方法实现了创建OperatorStateBackend的方法。

protected OperatorStateBackend operatorStateBackend(String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry) throws Exception {String logDescription = "operator state backend for " + operatorIdentifierText;CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createOperatorStateBackend(environment,operatorIdentifierText,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 创建CloseableRegisty,确保在任务取消的情况下能够关闭在恢复状态时构造的数据流。
  2. 创建BackendRestorerProcedure,封装了stateBackend.createOperatorStateBackend()方法,并包含恢复历史状态数据的操作。
  3. 创建OperatorStateBackend,并恢复状态数据。

其中prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子专有历史状态,可以通过prioritizedOperatorSubtaskStates获取当前算子中的PrioritizedManagedOperatorState,并基于这些状态数据恢复OperatorStateBackend中算子的状态。

 

3.基于MemoryStateBackend创建CheckpointStorage

在createCheckpointStorage()方法中,直接创建MemoryBackendCheckpointStorage实例并返回,没有涉及太多的流程

public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
}

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

相关文章:

【flink状态管理(四)】MemoryStateBackend的实现

文章目录 1.基于MemoryStateBackend创建KeyedStateBackend1.1. 状态初始化1.2. 创建状态 2. 基于MemoryStateBackend创建OperatorStateBackend3.基于MemoryStateBackend创建CheckpointStorage 在Flink中&#xff0c;默认的StateBackend实现为MemoryStateBackend&#xff0c;本文…...

前端架构: 脚手架在前端研发流程中的意义

关于脚手架 脚手架又被成为 CLI (command-line interface)基于文本界面&#xff0c;通过中断输入命令执行常见的脚手架&#xff1a;npm, webpack-cli, vue-cli拿 npm 这个脚手架来说 在终端当中输入 npm 命令, 系统就会通过文本方式返回 npm 的使用方法它这种通过命令行执行的…...

Qt网络编程-QTcpServer的封装

简单封装Tcp服务器类&#xff0c;将QTcpServer移入线程 头文件&#xff1a; #ifndef TCPSERVER_H #define TCPSERVER_H#include <QObject>class QTcpSocket; class QTcpServer; class QThread; class TcpServer : public QObject {Q_OBJECT public:explicit TcpServer(…...

【MySQL】_JDBC编程

目录 1. JDBC原理 2. 导入JDBC驱动包 3. 编写JDBC代码实现Insert 3.1 创建并初始化一个数据源 3.2 和数据库服务器建立连接 3.3 构造SQL语句 3.4 执行SQL语句 3.5 释放必要的资源 4. JDBC代码的优化 4.1 从控制台输入 4.2 避免SQL注入的SQL语句 5. 编写JDBC代码实现…...

微信小程序编译出现 project.config.json 文件内容错误

问题描述&#xff1a; 更新微信开发工具后&#xff0c;使用微信开发工具编译时出现project.config.json 文件内容错误。 原因&#xff1a;当前使用的微信开发工具非稳定版本。 解决方法&#xff1a; 在 manifest.json中加入以下代码&#xff1a; "mp-weixin" : …...

一周学会Django5 Python Web开发-Django5创建项目(用命令方式)

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计11条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…...

DockerUI如何部署结合内网穿透实现公网环境管理本地docker容器

文章目录 前言1. 安装部署DockerUI2. 安装cpolar内网穿透3. 配置DockerUI公网访问地址4. 公网远程访问DockerUI5. 固定DockerUI公网地址 前言 DockerUI是一个docker容器镜像的可视化图形化管理工具。DockerUI可以用来轻松构建、管理和维护docker环境。它是完全开源且免费的。基…...

UML之在Markdown中使用Mermaid绘制类图

1.UML概述 UML&#xff08;Unified modeling language UML&#xff09;统一建模语言&#xff0c;是一种用于软件系统分析和设计的语言工具&#xff0c;它用于帮助软件开发人员进行思考和记录思路。 类图是描述类与类之间的关系的&#xff0c;是UML图中最核心的。类图的是用于…...

Spring Boot + 七牛OSS: 简化云存储集成

引言 Spring Boot 是一个非常流行的、快速搭建应用的框架&#xff0c;它无需大量的配置即可运行起来&#xff0c;而七牛云OSS提供了稳定高效的云端对象存储服务。利用两者的优势&#xff0c;可以为应用提供强大的文件存储功能。 为什么选择七牛云OSS? 七牛云OSS提供了高速的…...

C++:二叉搜索树模拟实现(KV模型)

C&#xff1a;二叉搜索树模拟实现&#xff08;KV模型&#xff09; 前言模拟实现KV模型1. 节点封装2、前置工作&#xff08;默认构造、拷贝构造、赋值重载、析构函数等&#xff09;2. 数据插入&#xff08;递归和非递归版本&#xff09;3、数据删除&#xff08;递归和非递归版本…...

npm淘宝镜像源换新地址

新的淘宝npm镜像源地址&#xff1a;https://registry.npmmirror.com 切换新的镜像源 npm config set registry https://registry.npmmirror.com然后再执行以下操作查看是否成功 npm config list如果没安装过淘宝镜像源的&#xff0c;则直接安装 npm install -g cnpm --regi…...

十大排序算法之线性时间非比较类排序

线性时间非比较类排序 线性时间的算法执行效率也较高&#xff0c;从时间占用上看&#xff0c;线性时间非比较类排序要优于非线性时间排序&#xff0c;但其空间复杂度较非线性时间排序要大一些。因为线性时间非比较类排序算法会额外申请一定的空间进行分配排序&#xff0c;这也…...

容器基础:Docker 镜像如何保证部署的一致性?

Docker 镜像如何通过固化基础环境、固化依赖性和固化软件启动流程保证部署的一致性 Docker 镜像通过以下三个方面保证部署的一致性&#xff1a; 1. 固化基础环境: 镜像包含构建应用程序所需的所有环境依赖项&#xff0c;例如操作系统、库和工具。构建镜像时&#xff0c;所有…...

爪哇部落算法组2024新生赛热身赛题解

第一题&#xff08;签到&#xff09;&#xff1a; 1、题意&#xff1a; 2、题解: 我们观察到happynewyear的长度是12个字符&#xff0c;我们直接从前往后遍历0到n - 12的位置&#xff08;这里索引从0开始&#xff09;&#xff0c;使用C的substr()函数找到以i开头的长度为12的字…...

1123. 铲雪车(欧拉回路)

活动 - AcWing 随着白天越来越短夜晚越来越长&#xff0c;我们不得不考虑铲雪问题了。 整个城市所有的道路都是双向车道,道路的两个方向均需要铲雪。因为城市预算的削减&#xff0c;整个城市只有 1 辆铲雪车。 铲雪车只能把它开过的地方&#xff08;车道&#xff09;的雪铲干…...

网络协议与攻击模拟_15FTP协议

了解FTP协议 在Windows操作系统上使用serv-U软件搭建FTP服务 分析FTP流量 一、FTP协议 1、FTP概念 FTP&#xff08;文件传输协议&#xff09;由两部分组成&#xff1a;客户端/服务端&#xff08;C/S架构&#xff09; 应用场景&#xff1a;企业内部存放公司文件、开发网站时利…...

「效果图渲染」效果图与3D影视动画渲染平台

效果图渲染和3D影视动画渲染都是视觉图像渲染的领域应用。效果图渲染主要服务于建筑、室内设计和产品设计等行业&#xff0c;这些领域通常对视觉呈现的精度和细节有较高要求。与之相比&#xff0c;3D影视动画渲染则普遍应用于电影、电视、视频游戏和广告等媒体领域&#xff0c;…...

Blender_查看版本

Blender_查看版本 烦人的烦恼&#xff0c;没找见哪儿可以查看版本&#xff1f; 算是个隐蔽的角落&#xff01;...

node.js 读目录.txt文件,用 xml2js 转换为json数据,生成jstree所需的文件

请参阅&#xff1a;java : pdfbox 读取 PDF文件内书签 请注意&#xff1a;书的目录.txt 编码&#xff1a;UTF-8&#xff0c;推荐用 Notepad 转换编码。 npm install elementtree ; npm install xml2js ; node.js 用 elementtree读目录.txt文件&#xff0c;用 xml2js 转换为…...

【Docker】02 镜像管理

文章目录 一、Images镜像二、管理操作2.1 搜索镜像2.1.1 命令行搜索2.1.2 页面搜索2.1.3 搜索条件 2.2 下载镜像2.3 查看本地镜像2.3.1 docker images2.3.2 --help2.3.3 repository name2.3.4 --filter2.3.5 -q2.3.6 --format 2.4 给镜像打标签2.5 推送镜像2.6 删除镜像2.7 导出…...

JavaSec-RCE

简介 RCE(Remote Code Execution)&#xff0c;可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景&#xff1a;Groovy代码注入 Groovy是一种基于JVM的动态语言&#xff0c;语法简洁&#xff0c;支持闭包、动态类型和Java互操作性&#xff0c…...

Oracle查询表空间大小

1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

转转集团旗下首家二手多品类循环仓店“超级转转”开业

6月9日&#xff0c;国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解&#xff0c;“超级…...

OkHttp 中实现断点续传 demo

在 OkHttp 中实现断点续传主要通过以下步骤完成&#xff0c;核心是利用 HTTP 协议的 Range 请求头指定下载范围&#xff1a; 实现原理 Range 请求头&#xff1a;向服务器请求文件的特定字节范围&#xff08;如 Range: bytes1024-&#xff09; 本地文件记录&#xff1a;保存已…...

什么是EULA和DPA

文章目录 EULA&#xff08;End User License Agreement&#xff09;DPA&#xff08;Data Protection Agreement&#xff09;一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA&#xff08;End User License Agreement&#xff09; 定义&#xff1a; EULA即…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

R 语言科研绘图第 55 期 --- 网络图-聚类

在发表科研论文的过程中&#xff0c;科研绘图是必不可少的&#xff0c;一张好看的图形会是文章很大的加分项。 为了便于使用&#xff0c;本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中&#xff0c;获取方式&#xff1a; R 语言科研绘图模板 --- sciRplothttps://mp.…...

BLEU评分:机器翻译质量评估的黄金标准

BLEU评分&#xff1a;机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域&#xff0c;衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标&#xff0c;自2002年由IBM的Kishore Papineni等人提出以来&#xff0c;…...

【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)

LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 题目描述解题思路Java代码 题目描述 题目链接&#xff1a;LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...

OD 算法题 B卷【正整数到Excel编号之间的转换】

文章目录 正整数到Excel编号之间的转换 正整数到Excel编号之间的转换 excel的列编号是这样的&#xff1a;a b c … z aa ab ac… az ba bb bc…yz za zb zc …zz aaa aab aac…; 分别代表以下的编号1 2 3 … 26 27 28 29… 52 53 54 55… 676 677 678 679 … 702 703 704 705;…...