当前位置: 首页 > news >正文

【flink状态管理(四)】MemoryStateBackend的实现

文章目录

  • 1.基于MemoryStateBackend创建KeyedStateBackend
    • 1.1. 状态初始化
    • 1.2. 创建状态
  • 2. 基于MemoryStateBackend创建OperatorStateBackend
  • 3.基于MemoryStateBackend创建CheckpointStorage

在Flink中,默认的StateBackend实现为MemoryStateBackend,本文以MemoryStateBackend为例说明StateBackend的设计与实现。

 
本文介绍MemoryStateBackend中如下三个主要组件的创建过程:

  • HeapKeyedStateBackend
  • OperatorStateBackend
  • MemoryBackendCheckpointStorage

FsStateBackend和RocksDBStateBackend这两种状态后端存储的实现,功能和MemoryStateBackend类似,区别在于内部创建的KeyedStateBackend和CheckpointStorage。

 

1.基于MemoryStateBackend创建KeyedStateBackend

1.1. 状态初始化

AbstractStreamOperator.keyedStatedBackend()方法定义了创建和初始化KeyedStatedBackend的逻辑,具体如下。

protected <K> AbstractKeyedStateBackend<K> keyedStateBackend(TypeSerializer<K> keySerializer,String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry,MetricGroup metricGroup) throws Exception {if (keySerializer == null) {return null;}String logDescription = "keyed state backend for " + operatorIdentifierText;//1. TaskInfo taskInfo = environment.getTaskInfo();final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(taskInfo.getMaxNumberOfParallelSubtasks(),taskInfo.getNumberOfParallelSubtasks(),taskInfo.getIndexOfThisSubtask());// 确保恢复状态过程中构建的数据流被关闭CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);// 创建BackendRestorerProcedureBackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createKeyedStateBackend(environment,environment.getJobID(),operatorIdentifierText,keySerializer,taskInfo.getMaxNumberOfParallelSubtasks(),keyGroupRange,environment.getTaskKvStateRegistry(),TtlTimeProvider.DEFAULT,metricGroup,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 获取当前Task的TaskInfo,并基于TaskInfo的参数创建KeyGroupRange,表示当前Task实例中存储的Key分组区间
  2. 创建CloseableRegistry并注册到backendCloseableRegistry中,用于确保在任务取消的情况下关闭在恢复状态过程中构造的数据流。
  3. 创建BackendRestorerProcedure,提供了stateBackend.createKeyedStateBackend()方法,也包含恢复历史状态数据的方法。
  4. 创建KeyedStateBackend,同时对状态数据进行恢复。prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子历史状态,通过prioritizedOperatorSubtaskStates获取当前算子的PrioritizedManagedKeyedState,并基于这些状态数据恢复算子的状态。

 

1.2. 创建状态

接下来我们看MemoryStateBackend.createKeyedStateBackend()方法的具体实现。

public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup,@Nonnull Collection<KeyedStateHandle> stateHandles,CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {// 获取TaskStateManager实例TaskStateManager taskStateManager = env.getTaskStateManager();// 创建HeapPriorityQueueSetFactory实例HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);// 创建HeapKeyedStateBackendBuilder实例HeapKeyedStateBackendreturn new HeapKeyedStateBackendBuilder<>(kvStateRegistry,keySerializer,env.getUserClassLoader(),numberOfKeyGroups,keyGroupRange,env.getExecutionConfig(),ttlTimeProvider,stateHandles,AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),taskStateManager.createLocalRecoveryConfig(),priorityQueueSetFactory,isUsingAsynchronousSnapshots(),cancelStreamRegistry).build();
}
  1. 从environment参数中获取TaskStateManager实例
  2. 创建HeapPriorityQueueSetFactory实例,用于生成HeapPriorityQueueSet优先级队列,存储TimerHeapInternalTimer等数据。
  3. 调用HeapKeyedStateBackendBuilder.build()方法创建HeapKeyedStateBackend。

 

2. 基于MemoryStateBackend创建OperatorStateBackend

和创建KeyedStateBackend的过程相似,AbstractStreamOperator.operatorStateBackend()方法实现了创建OperatorStateBackend的方法。

protected OperatorStateBackend operatorStateBackend(String operatorIdentifierText,PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,CloseableRegistry backendCloseableRegistry) throws Exception {String logDescription = "operator state backend for " + operatorIdentifierText;CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);BackendRestorerProcedure<OperatorStateBackend, OperatorStateHandle> backendRestorer =new BackendRestorerProcedure<>((stateHandles) -> stateBackend.createOperatorStateBackend(environment,operatorIdentifierText,stateHandles,cancelStreamRegistryForRestore),backendCloseableRegistry,logDescription);try {return backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());} finally {if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}
}
  1. 创建CloseableRegisty,确保在任务取消的情况下能够关闭在恢复状态时构造的数据流。
  2. 创建BackendRestorerProcedure,封装了stateBackend.createOperatorStateBackend()方法,并包含恢复历史状态数据的操作。
  3. 创建OperatorStateBackend,并恢复状态数据。

其中prioritizedOperatorSubtaskStates是从TaskStateManager中根据OperatorID获取的算子专有历史状态,可以通过prioritizedOperatorSubtaskStates获取当前算子中的PrioritizedManagedOperatorState,并基于这些状态数据恢复OperatorStateBackend中算子的状态。

 

3.基于MemoryStateBackend创建CheckpointStorage

在createCheckpointStorage()方法中,直接创建MemoryBackendCheckpointStorage实例并返回,没有涉及太多的流程

public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
}

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵

相关文章:

【flink状态管理(四)】MemoryStateBackend的实现

文章目录 1.基于MemoryStateBackend创建KeyedStateBackend1.1. 状态初始化1.2. 创建状态 2. 基于MemoryStateBackend创建OperatorStateBackend3.基于MemoryStateBackend创建CheckpointStorage 在Flink中&#xff0c;默认的StateBackend实现为MemoryStateBackend&#xff0c;本文…...

前端架构: 脚手架在前端研发流程中的意义

关于脚手架 脚手架又被成为 CLI (command-line interface)基于文本界面&#xff0c;通过中断输入命令执行常见的脚手架&#xff1a;npm, webpack-cli, vue-cli拿 npm 这个脚手架来说 在终端当中输入 npm 命令, 系统就会通过文本方式返回 npm 的使用方法它这种通过命令行执行的…...

Qt网络编程-QTcpServer的封装

简单封装Tcp服务器类&#xff0c;将QTcpServer移入线程 头文件&#xff1a; #ifndef TCPSERVER_H #define TCPSERVER_H#include <QObject>class QTcpSocket; class QTcpServer; class QThread; class TcpServer : public QObject {Q_OBJECT public:explicit TcpServer(…...

【MySQL】_JDBC编程

目录 1. JDBC原理 2. 导入JDBC驱动包 3. 编写JDBC代码实现Insert 3.1 创建并初始化一个数据源 3.2 和数据库服务器建立连接 3.3 构造SQL语句 3.4 执行SQL语句 3.5 释放必要的资源 4. JDBC代码的优化 4.1 从控制台输入 4.2 避免SQL注入的SQL语句 5. 编写JDBC代码实现…...

微信小程序编译出现 project.config.json 文件内容错误

问题描述&#xff1a; 更新微信开发工具后&#xff0c;使用微信开发工具编译时出现project.config.json 文件内容错误。 原因&#xff1a;当前使用的微信开发工具非稳定版本。 解决方法&#xff1a; 在 manifest.json中加入以下代码&#xff1a; "mp-weixin" : …...

一周学会Django5 Python Web开发-Django5创建项目(用命令方式)

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计11条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…...

DockerUI如何部署结合内网穿透实现公网环境管理本地docker容器

文章目录 前言1. 安装部署DockerUI2. 安装cpolar内网穿透3. 配置DockerUI公网访问地址4. 公网远程访问DockerUI5. 固定DockerUI公网地址 前言 DockerUI是一个docker容器镜像的可视化图形化管理工具。DockerUI可以用来轻松构建、管理和维护docker环境。它是完全开源且免费的。基…...

UML之在Markdown中使用Mermaid绘制类图

1.UML概述 UML&#xff08;Unified modeling language UML&#xff09;统一建模语言&#xff0c;是一种用于软件系统分析和设计的语言工具&#xff0c;它用于帮助软件开发人员进行思考和记录思路。 类图是描述类与类之间的关系的&#xff0c;是UML图中最核心的。类图的是用于…...

Spring Boot + 七牛OSS: 简化云存储集成

引言 Spring Boot 是一个非常流行的、快速搭建应用的框架&#xff0c;它无需大量的配置即可运行起来&#xff0c;而七牛云OSS提供了稳定高效的云端对象存储服务。利用两者的优势&#xff0c;可以为应用提供强大的文件存储功能。 为什么选择七牛云OSS? 七牛云OSS提供了高速的…...

C++:二叉搜索树模拟实现(KV模型)

C&#xff1a;二叉搜索树模拟实现&#xff08;KV模型&#xff09; 前言模拟实现KV模型1. 节点封装2、前置工作&#xff08;默认构造、拷贝构造、赋值重载、析构函数等&#xff09;2. 数据插入&#xff08;递归和非递归版本&#xff09;3、数据删除&#xff08;递归和非递归版本…...

npm淘宝镜像源换新地址

新的淘宝npm镜像源地址&#xff1a;https://registry.npmmirror.com 切换新的镜像源 npm config set registry https://registry.npmmirror.com然后再执行以下操作查看是否成功 npm config list如果没安装过淘宝镜像源的&#xff0c;则直接安装 npm install -g cnpm --regi…...

十大排序算法之线性时间非比较类排序

线性时间非比较类排序 线性时间的算法执行效率也较高&#xff0c;从时间占用上看&#xff0c;线性时间非比较类排序要优于非线性时间排序&#xff0c;但其空间复杂度较非线性时间排序要大一些。因为线性时间非比较类排序算法会额外申请一定的空间进行分配排序&#xff0c;这也…...

容器基础:Docker 镜像如何保证部署的一致性?

Docker 镜像如何通过固化基础环境、固化依赖性和固化软件启动流程保证部署的一致性 Docker 镜像通过以下三个方面保证部署的一致性&#xff1a; 1. 固化基础环境: 镜像包含构建应用程序所需的所有环境依赖项&#xff0c;例如操作系统、库和工具。构建镜像时&#xff0c;所有…...

爪哇部落算法组2024新生赛热身赛题解

第一题&#xff08;签到&#xff09;&#xff1a; 1、题意&#xff1a; 2、题解: 我们观察到happynewyear的长度是12个字符&#xff0c;我们直接从前往后遍历0到n - 12的位置&#xff08;这里索引从0开始&#xff09;&#xff0c;使用C的substr()函数找到以i开头的长度为12的字…...

1123. 铲雪车(欧拉回路)

活动 - AcWing 随着白天越来越短夜晚越来越长&#xff0c;我们不得不考虑铲雪问题了。 整个城市所有的道路都是双向车道,道路的两个方向均需要铲雪。因为城市预算的削减&#xff0c;整个城市只有 1 辆铲雪车。 铲雪车只能把它开过的地方&#xff08;车道&#xff09;的雪铲干…...

网络协议与攻击模拟_15FTP协议

了解FTP协议 在Windows操作系统上使用serv-U软件搭建FTP服务 分析FTP流量 一、FTP协议 1、FTP概念 FTP&#xff08;文件传输协议&#xff09;由两部分组成&#xff1a;客户端/服务端&#xff08;C/S架构&#xff09; 应用场景&#xff1a;企业内部存放公司文件、开发网站时利…...

「效果图渲染」效果图与3D影视动画渲染平台

效果图渲染和3D影视动画渲染都是视觉图像渲染的领域应用。效果图渲染主要服务于建筑、室内设计和产品设计等行业&#xff0c;这些领域通常对视觉呈现的精度和细节有较高要求。与之相比&#xff0c;3D影视动画渲染则普遍应用于电影、电视、视频游戏和广告等媒体领域&#xff0c;…...

Blender_查看版本

Blender_查看版本 烦人的烦恼&#xff0c;没找见哪儿可以查看版本&#xff1f; 算是个隐蔽的角落&#xff01;...

node.js 读目录.txt文件,用 xml2js 转换为json数据,生成jstree所需的文件

请参阅&#xff1a;java : pdfbox 读取 PDF文件内书签 请注意&#xff1a;书的目录.txt 编码&#xff1a;UTF-8&#xff0c;推荐用 Notepad 转换编码。 npm install elementtree ; npm install xml2js ; node.js 用 elementtree读目录.txt文件&#xff0c;用 xml2js 转换为…...

【Docker】02 镜像管理

文章目录 一、Images镜像二、管理操作2.1 搜索镜像2.1.1 命令行搜索2.1.2 页面搜索2.1.3 搜索条件 2.2 下载镜像2.3 查看本地镜像2.3.1 docker images2.3.2 --help2.3.3 repository name2.3.4 --filter2.3.5 -q2.3.6 --format 2.4 给镜像打标签2.5 推送镜像2.6 删除镜像2.7 导出…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)

名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...

9-Oracle 23 ai Vector Search 特性 知识准备

很多小伙伴是不是参加了 免费认证课程&#xff08;限时至2025/5/15&#xff09; Oracle AI Vector Search 1Z0-184-25考试&#xff0c;都顺利拿到certified了没。 各行各业的AI 大模型的到来&#xff0c;传统的数据库中的SQL还能不能打&#xff0c;结构化和非结构的话数据如何和…...

【WebSocket】SpringBoot项目中使用WebSocket

1. 导入坐标 如果springboot父工程没有加入websocket的起步依赖&#xff0c;添加它的坐标的时候需要带上版本号。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dep…...

「Java基本语法」变量的使用

变量定义 变量是程序中存储数据的容器&#xff0c;用于保存可变的数据值。在Java中&#xff0c;变量必须先声明后使用&#xff0c;声明时需指定变量的数据类型和变量名。 语法 数据类型 变量名 [ 初始值]; 示例&#xff1a;声明与初始化 public class VariableDemo {publi…...

EEG-fNIRS联合成像在跨频率耦合研究中的创新应用

摘要 神经影像技术对医学科学产生了深远的影响&#xff0c;推动了许多神经系统疾病研究的进展并改善了其诊断方法。在此背景下&#xff0c;基于神经血管耦合现象的多模态神经影像方法&#xff0c;通过融合各自优势来提供有关大脑皮层神经活动的互补信息。在这里&#xff0c;本研…...

欢乐熊大话蓝牙知识17:多连接 BLE 怎么设计服务不会乱?分层思维来救场!

多连接 BLE 怎么设计服务不会乱&#xff1f;分层思维来救场&#xff01; 作者按&#xff1a; 你是不是也遇到过 BLE 多连接时&#xff0c;调试现场像网吧“掉线风暴”&#xff1f; 温度传感器连上了&#xff0c;心率带丢了&#xff1b;一边 OTA 更新&#xff0c;一边通知卡壳。…...