【Flink状态管理五】Checkpoint的设计与实现
文章目录
- 1. Checkpoint的整体设计
- 2. Checkpoint创建源码解析
- 2.1. DefaultExecutionGraphBuilder.buildGraph
- 2.2. ExecutionGraph.enableCheckpointing
由于系统原因导致Flink作业无法正常运行的情况非常多,且很多时候都是无法避免的。对于Flink集群来讲,能够快速从异常状态中恢复,同时保证处理数据的正确性和一致性非常重要。Flink主要借助Checkpoint的方式保障整个系统状态数据的一致性,也就是基于ABS算法实现轻量级快照服务。
本节我们详细了解Checkpoint的设计与实现。
1. Checkpoint的整体设计
Checkpoint的执行过程分为三个阶段:启动、执行以及确认完成。其中Checkpoint的启动过程由JobManager管理节点中的CheckpointCoordinator组件控制,该组件会周期性地向数据源节点发送执行Checkpoint的请求,执行频率取决于用户配置的CheckpointInterval参数。
执行过程:
- 在JobManager管理节点通过CheckpointCoordinator组件向每个数据源节点发送Checkpoint执行请求,此时数据源节点中的算子会将
消费数据对应的Position发送到JobManager管理节点中。- JobManager节点会存储Checkpoint元数据,用于记录每次执行Checkpoint操作过程中算子的元数据信息,例如在FlinkKafkaConsumer中会记录消费Kafka主题的偏移量,用于确认从Kafka主题中读取数据的位置。
- 在数据源节点执行完Checkpoint操作后,继续向下游节点发送CheckpointBarrier事件,下游算子通过对齐Barrier事件,触发该算子的Checkpoint操作。
当下游的map算子接收到数据源节点的Checkpoint
Barrier事件后,首先对当前算子的数据进行处理,并等待其他上游数据源节点的Barrier事件到达。该过程就是Checkpoint
Barrier对齐,目的是确保属于同一Checkpoint的数据能够全部到达当前节点。

Barrier事件的作用就是切分不同Checkpoint批次的数据。
当map算子接收到所有上游的Barrier事件后,就会触发当前算子的Checkpoint操作,并将状态数据快照到指定的外部持久化介质中,该操作主要借助状态后端存储实现。
当状态数据执行完毕后,继续将Barrier事件发送至下游的算子,进行后续算子的Checkpoint操作。
另外,在map算子中执行完Checkpoint操作后,也会向JobManager管理节点发送Ack消息,确认当前算子的Checkpoint操作正常执行。此时Checkpoint数据会存储该算子对应的状态数据,如果StateBackend为MemoryStateBackend,则主要会将状态数据存储在JobManager的堆内存中。
sink节点的ack
像map算子节点一样,当Barrier事件到达sink类型的节点后,sink节点也会进行Barrier对齐操作,确认上游节点的数据全部接入。然后对接入的数据进行处理,将结果输出到外部系统中。完成以上步骤后,sink节点会向JobManager管理节点发送Ack确认消息,确认当前Checkpoint中的状态数据都正常进行了持久化操作。(之后呢?当任务结束之后,cp会消失还是?)
2. Checkpoint创建源码解析
通过调用StreamExecutionEnvironment.enableCheckpointing(),开启Checkpoint。
此时Checkpoint的配置会被存储在StreamGraph中,然后将StreamGraph中的CheckpointConfig转换为JobCheckpointingSettings数据结构存储在JobGraph对象中,并伴随JobGraph提交到集群运行。启动JobMaster服务后,JobMaster调度和执行Checkpoint操作。
2.1. DefaultExecutionGraphBuilder.buildGraph
如下代码,通过JobGraph构建ExecutionGraph的过程中,获取JobGraph中存储的JobCheckpointingSettings配置,然后创建ExecutionGraph。
1)根据snapshotSettings配置获取triggerVertices、ackVertices以及confirmVertices节点集合,并转换为对应的ExecutionJobVertex集合。
- 其中triggerVertices集合存储了所有SourceOperator节点,这些节点通过CheckpointCoordinator主动触发Checkpoint操作。
- ackVertices和confirmVertices集合存储了StreamGraph中的全部节点,代表所有节点都需要返回Ack确认信息并确认Checkpoint执行成功。
2)创建CompletedCheckpointStore组件,用于存储Checkpoint过程中的元数据。
- 当对作业进行恢复操作时会在CompletedCheckpointStore中检索最新完成的Checkpoint元数据信息,然后基于元数据信息恢复Checkpoint中存储的状态数据。CompletedCheckpointStore有两种实现,分别为StandaloneCompletedCheckpointStore和ZooKeeperCompletedCheckpointStore。
- 在CompletedCheckpointStore中通过maxNumberOfCheckpointsToRetain参数配置以及结合checkpointIdCounter计数器保证只会存储固定数量的CompletedCheckpoint。
3)创建CheckpointStatsTracker实例
用于监控和追踪Checkpoint执行和更新的情况,包括Checkpoint执行的统计信息以及执行状况,WebUI中显示的Checkpoint监控数据主要来自CheckpointStatsTracker。4)创建StateBackend,从UserClassLoader中反序列化出应用指定的StateBackend并设定为applicationConfiguredBackend。
5)初始化用户自定义的Checkpoint Hook函数
6)最终调用executionGraph.enableCheckpointing()方法,在作业的执行和调度过程中开启Checkpoint。
// 配置状态数据checkpointing
// 从jobGraph中获取JobCheckpointingSettings
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
//如果snapshotSettings不为空,则开启checkpoint功能
if (snapshotSettings != null) {List<ExecutionJobVertex> triggerVertices =idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);List<ExecutionJobVertex> ackVertices =idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);List<ExecutionJobVertex> confirmVertices =idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);//创建CompletedCheckpointStoreCompletedCheckpointStore completedCheckpoints;CheckpointIDCounter checkpointIdCounter;try {int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);if (maxNumberOfCheckpointsToRetain <= 0) {maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();}// 通过recoveryFactory创建CheckpointStorecompletedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader); // 通过recoveryFactory创建CheckpointIDCountercheckpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);}catch (Exception e) {throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);}// 获取checkpoints最长的记录次数int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);// 创建CheckpointStatsTracker实例CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(historySize,ackVertices,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// 从application中获取StateBackendfinal StateBackend applicationConfiguredBackend;final SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();if (serializedAppConfigured == null) {applicationConfiguredBackend = null;}else {try {applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId,"Could not deserialize application-defined state backend.", e);}}// 获取最终的rootBackendfinal StateBackend rootBackend;try {rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend, jobManagerConfig, classLoader, log);}catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}// 初始化用户自定义的checkpoint Hooks函数final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();final List<MasterTriggerRestoreHook<?>> hooks;// 如果serializedHooks为空,则hooks为空if (serializedHooks == null) {hooks = Collections.emptyList();}else {// 加载MasterTriggerRestoreHookfinal MasterTriggerRestoreHook.Factory[] hookFactories;try {hookFactories = serializedHooks.deserializeValue(classLoader);}catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);}// 设定ClassLoader为UserClassLoaderfinal Thread thread = Thread.currentThread();final ClassLoader originalClassLoader = thread.getContextClassLoader();thread.setContextClassLoader(classLoader);// 创建hooks函数try {hooks = new ArrayList<>(hookFactories.length);for (MasterTriggerRestoreHook.Factory factory : hookFactories) {hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}}// 将thread的ContextClassLoader设定为originalClassLoaderfinally {thread.setContextClassLoader(originalClassLoader);}}// 获取CheckpointCoordinatorConfigurationfinal CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();// 开启executionGraph中的Checkpoint功能executionGraph.enableCheckpointing(chkConfig,triggerVertices,ackVertices,confirmVertices,hooks,checkpointIdCounter,completedCheckpoints,rootBackend,checkpointStatsTracker);
}
2.2. ExecutionGraph.enableCheckpointing
继续看ExecutionGraph.enableCheckpointing()方法的实现,包含如下逻辑。
- 将tasksToTrigger、tasksToWaitFor以及tasksToCommitTo三个ExecutionJobVertex集合转换为ExecutionVertex[]数组,每个ExecutionVertex代表ExecutionJobVertex中的一个SubTask节点。
- 容错管理:创建CheckpointFailureManager,用于Checkpoint执行过程中的
容错管理,包含failJob和failJobDueToTaskFailure两个处理方法。- 定时调度和执行:创建checkpointCoordinatorTimer,用于Checkpoint异步线程的定时调度和执行。
- 协调和管理作业中的Checkpoint:创建CheckpointCoordinator组件,通过CheckpointCoordinator协调和管理作业中的Checkpoint,同时收集各Task节点中Checkpoint的执行状况等信息。
- Hook:将Master Hook注册到CheckpointCoordinator中,实现用户自定义Hook代码的调用。
- 控制CheckpointCoordinator的启停:将JobStatusListener的实现类CheckpointCoordinatorDeActivator注册到JobManager中,此时系统会根据作业的运行状态控制CheckpointCoordinator的启停,当作业的状态为Running时会触发启动CheckpointCoordinator组件。
public void enableCheckpointing(CheckpointCoordinatorConfiguration chkConfig,List<ExecutionJobVertex> verticesToTrigger,List<ExecutionJobVertex> verticesToWaitFor,List<ExecutionJobVertex> verticesToCommitTo,List<MasterTriggerRestoreHook<?>> masterHooks,CheckpointIDCounter checkpointIDCounter,CompletedCheckpointStore checkpointStore,StateBackend checkpointStateBackend,CheckpointStatsTracker statsTracker) {checkState(state == JobStatus.CREATED, "Job must be in CREATED state");checkState(checkpointCoordinator == null, "checkpointing already enabled");ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");// 创建CheckpointFailureManagerCheckpointFailureManager failureManager = new CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(),new CheckpointFailureManager.FailJobCallback() {@Overridepublic void failJob(Throwable cause) {getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));}@Overridepublic void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {getJobMasterMainThreadExecutor().execute(() -> failGlobalIfExecutionIsStillRunning(cause, failingTask));}});// 创建checkpointCoordinatorTimercheckState(checkpointCoordinatorTimer == null);checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor(new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));// 创建checkpointCoordinatorcheckpointCoordinator = new CheckpointCoordinator(jobInformation.getJobId(),chkConfig,tasksToTrigger,tasksToWaitFor,tasksToCommitTo,checkpointIDCounter,checkpointStore,checkpointStateBackend,ioExecutor,new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),SharedStateRegistry.DEFAULT_FACTORY,failureManager);// 向checkpoint Coordinator中注册master Hooksfor (MasterTriggerRestoreHook<?> hook : masterHooks) {if (!checkpointCoordinator.addMasterHook(hook)) {LOG.warn("Trying to register multiple checkpoint hooks with the name: {}",hook.getIdentifier());}}//向checkpointCoordinator中设定checkpointStatsTrackercheckpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);// 注册JobStatusListener,用于自动启动CheckpointCoordinatorif (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());}this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
}
参考:《Flink设计与实现:核心原理与源码解析》–张利兵
相关文章:
【Flink状态管理五】Checkpoint的设计与实现
文章目录 1. Checkpoint的整体设计2. Checkpoint创建源码解析2.1. DefaultExecutionGraphBuilder.buildGraph2.2. ExecutionGraph.enableCheckpointing 由于系统原因导致Flink作业无法正常运行的情况非常多,且很多时候都是无法避免的。对于Flink集群来讲,…...
How to install a specific version of a package in R
How to install a specific version of a package in R 在使用R语言完成数据分析的过程中,很多时候,因为项目实际需要,我们应该指定某些库文件的安装包的版本,这个时候,我们可以基于devtools包中的函数install_version…...
SIGSEGV 段错误
SIGSEGV是在Unix/Linux系统中表示"Segmentation Fault"(分段错误)的信号。当一个进程访问未分配给它的内存、访问超出其内存边界或者访问不允许的内存区域时,就会产生SIGSEGV信号,导致进程被操作系统终止。 SIGSEGV的常…...
OpenCV 4基础篇| OpenCV简介
目录 1. 什么是OpenCV2. OpenCV的发展历程3. 为什么用OpenCV4. OpenCV应用领域5. OpenCV的功能模块5.1 基本模块5.2 扩展模块5.3 常用函数目录 1. 什么是OpenCV OpenCV(Open Source Computer Vision Library)是一个开源的计算机视觉和机器学习软件库。它…...
Vue常用内置指令,代码Demo演示和讲解
文章目录 一、v-text二、v-html三、v-bind四、v-model五、v-on六、v-show七、v-if、v-else八、v-for九、v-cloak 一、v-text 作用,更改标签的显示值使用 v-text 会读取 data中的变量值如果变量存在替换原来的值,如果不存在则报错 <!DOCTYPE html>…...
Spring设计模式之工厂模式创建Bean对象
BeanFactory和Application是Spring容器中创建和管理Bean对象的接口,但是它们的实现方式不同。 BeanFactory: BeanFactory采用延迟初始化策略,只有应用程序向容器请求特定的Bean时才创建该Bean对象。它的启动速度很快,但在程序运…...
Elasticsearch 别名(Aliases)的作用
Elasticsearch 8.4.3 别名(Aliases) 一. 介绍二. 别名的优势三. 别名的基本操作3.1 创建别名3.2 查询别名关联的索引3.3 删除别名3.4 更新别名3.5 通过别名查询数据 前言 这是我在这个网站整理的笔记,有错误的地方请指出,关注我,接…...
基于vue的个性化推荐餐饮系统Springboot
项目:基于vue的个性化推荐餐饮系统Springboot 摘要 现代信息化社会下的数据管理对活动的重要性越来越为明显,人们出门可以通过网络进行交流、信息咨询、查询等操作。网络化生活对人们通过网上购物也有了非常大的考验,通过网上进行点餐的人也…...
量子计算:数据安全难题
当今数字技术面临的最大挑战之一是安全系统和数据。为此,人们设计了复杂的算法来加密数据并通过称为对称加密的框架来保护数据。虽然这已被证明是成功的,但量子计算的进步(利用量子力学比传统计算机更快地解决复杂问题)可能会彻底…...
CCF-B类SGP‘24 4月10日截稿!速速行动!
会议之眼 快讯 第22届SGP(Eurographics Symposium on Geometry Processing)即欧洲图形学几何处理专题讨论会将于 2024 年 6月24 -日至26日在美国麻省理工学院举行!SGP是传播几何处理新研究想法和尖端成果的首要学术会议。作为该领域的重要学术盛事,SGP会…...
阿里云服务器安装MySQL、Apache、PHP
节日期间突然想要自己搭建一个个人网站,于是在阿里云申请了一个可以免费使用3个月的服务器,申请的云市场产品Wordpress平台( ALinux3 LNMP PHP7.4)。官方教程使用的CentOs系统,和我申请的ALinux3操作有一些差异&#x…...
Rust基础拾遗--并发和异步编程
Rust基础拾遗 前言1.并发1.1 分叉与合并并行1.1.1 启动与联结1.1.2 跨线程错误处理1.1.3 跨线程共享不可变数据1.1.4 rayon 1.2 通道1.2.1 发送值1.2.2 接收值1.2.3 运行管道1.2.4 通道的特性与性能1.2.5 线程安全:Send与Sync 1.3 共享可变状态 2.异步编…...
Javascript怎么输出内容?两种常见方式以及控制台介绍
javascript是一种非常重要的编程语言,在许多网页中它被广泛使用,可以实现许多交互效果和动态效果。输出是javascript中最基本的操作之一,下面将介绍两种常见的输出方式。 一、使用console.log()函数输出 console.log()函数是常用的输出函数…...
机器人路径平滑——线性插值
C++代码 //要实现平滑二维曲线的算法,你可以使用贝塞尔曲线或B样条曲线。下面是一个使用B样条曲线的C++算法的示例:#include <iostream> #include <vector> #include <fstream> #include <iomanip>...
2024-2-21-多线程基础作业
作业: 源代码: #include <myhead.h> #define MAXSIZE 64 //定义要传递的结构体类型 struct Info {const char *src;const char *dest;int len; }; int get_file_len(const char *srcfile, const char *destfile) {//以只读的形式打开源文件int sr…...
MySQL8的ONLY_FULL_GROUP_BY SQL模式兼容问题
文章目录 1. 问题描述2. 解决方法1. 修改查询2. 修改SQL模式3. 使用ANY_VALUE()函数 1. 问题描述 Cause: java.sql.SQLSyntaxErrorException: Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column btc-cloud.t1.id which is not funct…...
Django使用Celery异步
安装包 pip install celerypip install eventlet 1.在项目文件的根目录下创建目录结果 2. 在main.py文件中 # !/usr/bin/env python # -*-coding:utf-8 -*-""" # Author :skyTree # version :python 3.11 # Description&#…...
vue3 + ts + echart 实现柱形图表
首先封装Echart一个文件 代码如下 <script setup lang"ts"> import { ECharts, EChartsOption, init } from echarts; import { ref, watch, onMounted, onBeforeUnmount } from vue;// 定义props interface Props {width?: string;height?: string;optio…...
c语言结构体与共用体
前面我们介绍了基本的数据类型 在c语言中 有一种特殊的数据类型 由程序员来定义类型 目录 一结构体 1.1概述 1.2定义结构体 1.3 结构体变量的初始化 1.4 访问结构体的成员 1.5结构体作为函数的参数 1.6指向结构的指针 1.7结构体大小的计算 二共用体 2.1概述 2.2 访…...
vue系列--vue封装拖拽指令v-drag
1.首先将下面的代码引入代码中 export const initVDrag (Vue) > {Vue.directive("drag", (el) > {const oDiv el // 当前元素const minTop oDiv.getAttribute("drag-min-top")const ifMoveSizeArea 20oDiv.onmousedown (e) > {let target …...
告别黑盒:用Python拆解OpenBCI GUI的滤波与可视化模块(附完整代码)
从零构建Python版OpenBCI数据处理引擎:解码脑电信号处理全流程 在脑机接口开发领域,OpenBCI以其开源特性和专业级性能成为众多研究者的首选硬件平台。然而,其官方GUI虽然功能完善,却像一座封闭的城堡——我们能看到华丽的城墙&…...
Chord视频分析工具实操手册:预览区播放控制与分析结果同步验证
Chord视频分析工具实操手册:预览区播放控制与分析结果同步验证 1. 工具概览与核心价值 Chord视频时空理解工具是一款基于Qwen2.5-VL架构开发的本地智能视频分析解决方案。这个工具专门针对视频内容分析需求设计,能够在完全离线的环境下对视频进行深度理…...
DeepSeekGEO生成式引擎优化技术方案
DeepSeekGEO生成式引擎优化技术方案技术支持:拓世网络技术开发工作室1 方案背景与技术范式转移随着生成式AI成为信息分发的主入口,用户获取信息的方式已从“搜索-点击”转变为“提问-答案”。据统计,超过60%的Z世代用户更倾向于通过AI助手获取…...
3行代码实现微信级扫码:OpenCV wechat_qrcode 实战全解(c++实现)
文章目录前言一、wechat_qrcode 核心优势1.模块定位2.核心技术优势二、环境准备与模块部署1.版本要求2.环境安装3.模型下载与路径配置三、核心代码实战(c)1.单张图片解码2.摄像头实时流解码总结前言 日常开发中,传统二维码解码方案总会遇到各类难题&…...
实战指南:利用快马ai为django项目生成开箱即用的vscode python开发环境
作为一个长期使用Python开发Django项目的程序员,我深知配置开发环境是个既基础又容易踩坑的环节。最近尝试用InsCode(快马)平台生成配置方案,发现能省去大量重复劳动。下面分享我的实战经验: 项目结构规范化 平台生成的Django项目骨架严格遵循…...
数据库表的性能优化过程
问题背景做一个数据库表查看、标注与分析的工具软件。是数据库中表的信息(information_schema.tables);是的数据字典文档,存储在本地文件中;是对的额外标注信息,存储在另一个数据库中。每一条,最…...
RMBG-2.0镜像免配置部署:无需配置Python环境,开箱即用Web交互界面
RMBG-2.0镜像免配置部署:无需配置Python环境,开箱即用Web交互界面 你是不是也遇到过这样的烦恼?想用AI模型给图片换个背景,结果光是安装Python环境、配置依赖库就折腾了大半天,最后还可能因为版本冲突、CUDA不兼容等问…...
基于Wan 3D Causal VAE(Show-o2)的模型,重新完整地分析 10分钟的视频 对应多少 vison token
可以。这次我按 Show-o2 官方 432432 配置 和 Wan 3D Causal VAE 的公开时间压缩规则,把 10B token 且全部都是 vision token 的情况重新完整算一遍。下面的“大小”我统一按 未压缩 RGB 原始数据量 来算;如果你问的是实际 JPG / PNG / MP4 落盘大小,那会随压缩格式、码率和…...
复旦微FMQL平台:memorytest工程实战指南与DDR稳定性验证
1. 从Procise导出memorytest工程 第一次接触复旦微FMQL平台时,我也被各种工程文件搞得晕头转向。memorytest工程作为内存测试的基础工具,其实导出过程比想象中简单得多。在Procise界面中找到memtest选项,就像在Windows资源管理器里找文件夹一…...
自动驾驶开发必备:Vscode+Git双神器组合的隐藏技巧(含分支管理秘籍)
自动驾驶开发必备:VscodeGit双神器组合的隐藏技巧(含分支管理秘籍) 在自动驾驶开发领域,高效的代码管理和协作流程是项目成功的关键因素。随着代码库规模不断扩大,团队规模持续增长,传统的版本控制方式往往…...
