Flink源码之Checkpoint执行流程

Checkpoint完整流程如上图所示:
- JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPoint
- SourceTask向下游广播CheckpointBarrier
- SouceTask完成状态快照后向JobMaster发送快照结果
- 非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结果
- JobMaster保存SubTask快照结果
- JobMaster收到所有SubTask快照结果后保存快照信息,想SubTask通知Checkpoint完成
以下对整个流程具体说明。
CheckpointCoordinator
JobMaster将JobGraph转换为ExecutionGraph时,如果开启Checkpoint,会为ExecutionGraph生成一个CheckpointCoordinator
DefaultExecutionGraphBuilder.buildGraph//在此会将JobGraph转换为ExecutionGraphDefaultExecutionGraph::newDefaultExecutionGraph::attachJobGraph //创建ExecutionJobVertexDefaultExecutionTopology.fromExecutionGraph //创建ExecutionTopologyDefaultExecutionGraph::enableCheckpointing //创建CheckpointCoordinatorDefaultExecutionGraph::createCheckpointPlanCalculator//创建DefaultCheckpointPlanCalculatorCheckpointCoordinator::new
CheckpointCoordinator封装了StateBackend和CheckpointStorage
StateBackend负责管理状态:
- HashMapStateBackend //内存
- EmbeddedRocksDBStateBackend //内存+磁盘
CheckpointStorage则是负责存储StateBackend管理的状态:
- JobManagerCheckpointStorage //checkpoint state放入JobManager内存
- FileSystemCheckpointStorage //配置state.checkpoints.dir时
在为StreamTask构造SubtaskCheckpointCoordinatorImpl时会调用:
CheckpointStorage::createCheckpointStorage
创建CheckpointStorageAccess用于执行Checkpoint时解析状态存储位置
- MemoryBackendCheckpointStorageAccess //对应JobManagerCheckpointStorage
- FsCheckpointStorageAccess //对应FileSystemCheckpointStorage
CheckpointCoordinator在执行状态快照时会调用
CheckpointStorageAccess::resolveCheckpointStorageLocation
生成CheckpointStreamFactory用于生成读写状态数据流
- MemCheckpointStreamFactory //对应JobManagerCheckpointStorage
- FsCheckpointStreamFactory //对应FileSystemCheckpointStorage
Checkpoint触发流程
JobMaster状态转换为running后,通过CheckpointCoordinator向SourceTask发送TriggerCheckpoint
JobMaster端触发流程
JobMaster::start //RPCServer启动
JobMaster::onStart
JobMaster::startJobExecution
JobMaster::startJobMasterServices //获取RM地址后与RM建立连接
JobMaster::startScheduling
SchedulerBase::startScheduling
DefaultScheduler::startSchedulingInternal
SchedulerBase::transitionToRunningDefaultExecutionGraph::transitionToRunning //调用ExecutionGraph监听器通知状态变化CheckpointCoordinatorDeActivator::jobStatusChanges//触发checkpointCheckpointCoordinator::startCheckpointSchedulerCheckpointCoordinator::scheduleTriggerWithDelay //定时不断触发CheckpointCheckpointCoordinator::triggerCheckpointCheckpointCoordinator::startTriggeringCheckpointDefaultCheckpointPlanCalculator::calculateCheckpointPlan//Plan中会隔离出SourceTask作为作为Trigger Checkpoint的入口CheckpointCoordinator::createPendingCheckpointCheckpointCoordinator::triggerCheckpointRequestCheckpointCoordinator::triggerTasks Execution::triggerCheckpoint //向每个SourceTask发送TriggerCheckpoint请求Execution::triggerCheckpointHelperTaskManagerGateway::triggerCheckpoint//向TaskExecutor发RPC
StreamTask端执行流程
SourceTask
SourceTask由JobMaster RPC直接触发,执行时先广播CheckpointBarrier,然后对状态执行异步快照
TaskExecutor::triggerCheckpoint
Task::triggerCheckpointBarrier
AbstractInvokable::triggerCheckpointAsync
SourceStreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsyncInMailbox
StreamTask::performCheckpoint
SubtaskCheckpointCoordinatorImpl::checkpointStateOperatorChain.broadcastEvent //广播CheckpointBarrier
CheckpointStorage::createCheckpointStorage//为JobId创建CheckpointStorageAccess
SubtaskCheckpointCoordinatorImpl::takeSnapshotSync
CheckpointStorageWorkerView::resolveCheckpointStorageLocation//CheckpointStorageAccess创建 CheckpointStreamFactoryOperatorChain::snapshotState //对每个OperatorRegularOperatorChain::buildOperatorSnapshotFuturesRegularOperatorChain::checkpointStreamOperatorAbstractStreamOperator::snapshotStateStreamOperatorStateHandler::snapshotState//调用Operator/Keyed Backend的snapshotStateSnapshotContextSynchronousImpl::newAbstractUdfStreamOperator::snapshotState //调用UDF中snapshotState方法,一般用于更新OperatorStateDefaultOperatorStateBackend::snapshotSnapshotStrategyRunner::snapshotDefaultOperatorStateBackendSnapshotStrategy::syncPrepareResources//深copy operator state,便于后续进行异步快照DefaultOperatorStateBackendSnapshotStrategy::asyncSnapshot//异步快照 CheckpointStateOutputStream::closeAndGetHandleOperatorStreamStateHandle::new //包装元信息及数据StreamStateHandleHeapKeyedStateBackend::snapshotSnapshotStrategyRunner::snapshotHeapSnapshotStrategy::syncPrepareResourcesHeapSnapshotStrategy::asyncSnapshot //采用COWSateTable异步快照CheckpointStateOutputStream::closeAndGetHandleKeyGroupsStateHandle::new //包装KeyGroup及数据StreamStateHandle
SubtaskCheckpointCoordinatorImpl::finishAndReportAsync //向JobMaster发送checkpoint的结果AsyncCheckpointRunnable::new AsyncCheckpointRunnable::runAsyncCheckpointRunnable::finalizeNonFinishedSnapshotsOperatorSnapshotFinalizer::new //等待TaskSnapshot状态信息序列化完成AsyncCheckpointRunnable::reportCompletedSnapshotStatesTaskStateManagerImpl::reportTaskStateSnapshotsRpcCheckpointResponder::acknowledgeCheckpoint//向JobMaster发送Ack,带上State信息
非SourceTask
在StreamTask启动后调用StreamTask::processInput不断读取数据进行处理, 非SourceTask在收到上游的CheckpointBarrier对齐后触发Checkpoint,
StreamTask::processInput
StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循环不断从buffer中读取StreamElement
处理CheckpointedInputGate::pollNextCheckpointedInputGate::handleEventSingleCheckpointBarrierHandler::processBarrierSingleCheckpointBarrierHandler::markCheckpointAlignedAndTransformStateWaitingForFirstBarrier::barrierReceivedAbstractAlignedBarrierHandlerState::barrierReceivedSingleCheckpointBarrierHandler.ControllerImpl::allBarriersReceived//判断对齐AbstractAlignedBarrierHandlerState::triggerGlobalCheckpointSingleCheckpointBarrierHandler.ControllerImpl::triggerGlobalCheckpointSingleCheckpointBarrierHandler::triggerCheckpointCheckpointBarrierHandler::notifyCheckpoint //触发StreamTask CheckpointStreamTask::triggerCheckpointOnBarrierStreamTask::performCheckpoint //后续调用过程与SourceTask一样SubtaskCheckpointCoordinatorImpl::checkpointState
根据调用栈看出,非SourceStreamTask执行Checkpoint只是触发时机不同,SourceTask由JobMaster RPC定时不断触发,非SourceTask则是在上游的CheckpointBarrier对齐后触发Checkpoint,最终执行逻辑都是将当前算子的信息写入CheckpointStorage后向JobMaster发送确认信息。
StreamTask向JobMaster ACK信息中包含状态元信息及StreamStateHandle,根据状态存储位置分为:
- ByteStreamStateHandle //对应JobManagerCheckpointStorage,将状态序列化为byte[]发送给JobMaster
- FileStateHandle //对应FileSystemCheckpointStorage,将状态写入文件系统后将文件路径发送给JobMaster
JobMaster端完成流程
JobMaster收到StreamTask的acknowledgeCheckpoint后:
JobMaster::acknowledgeCheckpoint
SchedulerBase::acknowledgeCheckpoint
ExecutionGraphHandler::acknowledgeCheckpoint
CheckpointCoordinator::receiveAcknowledgeMessagePendingCheckpoint::acknowledgeTask //某一个Task的确认PendingCheckpoint::updateOperatorState//更新SubTask状态信息CheckpointCoordinator::completePendingCheckpoint//所有Task Ack后PendingCheckpoint::finalizeCheckpointCheckpoints.storeCheckpointMetadata//保存CheckpointMetadataCompletedCheckpoint::newCheckpointCoordinator::sendAcknowledgeMessages//向Task通知Checkpoint完成消息ExecutionVertex::notifyCheckpointCompleteTaskManagerGateway.notifyCheckpointComplete
JobMaster收到所有StreamTask的Checkpoint状态信息后,标志一次Checkpoint完成,这时会通知StreamTask CheckPoint完成消息,便于SubTask监听Checkpoint完成后做后续动作。
相关文章:
Flink源码之Checkpoint执行流程
Checkpoint完整流程如上图所示: JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPointSourceTask向下游广播CheckpointBarrierSouceTask完成状态快照后向JobMaster发送快照结果非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结…...
【工具使用】Git的使用
dev代表开发版 1. git clone 命令 通过 git add <name> 对文件进行跟踪,把<name>加入到暂存区 git commit -m XXXXXXX 提交修改并补充XXXXX作为注释 “暂存”状态:出现了一些修改,但是还没有提交 对于Java来说,.cl…...
无涯教程-PHP Installation on Windows NT/2000/XP with IIS函数
在Windows Server上运行IIS的PHP的安装比在Unix上简单得多,因为它涉及的是预编译的二进制文件而不是源代码。 如果您打算在Windows上安装PHP,那么这是先决条件列表- 运行中的PHP支持的Web服务器。一个正确安装的PHP支持的数据库,如MySQL或Oracle等。(如果您打算使用的话) PHP…...
EureKa快速入门
EureKa快速入门 远程调用的问题 多个服务有多个端口,这样的话服务有多个,硬编码不太适合 eureKa的作用 将service的所有服务的端口全部记录下来 想要的话 直接从注册中心查询对于所有服务 每隔一段时间需要想eureKa发送请求 保证服务还存活 动手实践 …...
Sectigo EV代码签名申请步骤
一、EV代码签名申请前提 1、单位成立时间不低于:3个月 2、单位工商及企查查可查 3、单位经营正常 4、注册地址真实存在,禁止使用集中注册地址 5、企查查登记电话和邮箱,确定查询结果的电话可以接听、邮箱可以接收邮件,如果信…...
生信学院|08月25日《SOLIDWORKS PDM帮助企业对设计数据版本的管理应用》
课程主题:SOLIDWORKS PDM帮助企业对设计数据版本的管理应用 课程时间:2023年08月25日 14:00-14:30 主讲人:车立洋 生信科技 PDM专家 1、图纸&文档的版本管理对于企业的重要性 2、SolidWorks PDM对图纸&文档版本的管理 3、SolidW…...
vue页面转pdf后分页时文字被横向割裂
效果 预期效果 //避免分页被截断async outPutPdfFn (id, title) {const _t this;const A4_WIDTH 592.28;const A4_HEIGHT 841.89;// dom的id。let target document.getElementById(pdf);let pageHeight target.scrollWidth / A4_WIDTH * A4_HEIGHT;// 获取分割dom…...
数据结构——队列(C语言)
需求:无 本篇文章将解决一下几个问题: 队列是什么?如何实现一个队列?什么场景下会用队列? 队列的概念: 队列:一种只允许一端进行插入数据操作,在另一端进行删除操作的特殊线性表。…...
WGS84地球坐标系,GCJ02火星坐标系,BD09百度坐标系简介与转换 资料收集
野火 ATGM332D简介 高性能、低功耗 GPS、北斗双模定位模块 STM32 GPS定位_为了维护世界和平_的博客-CSDN博客 秉火多功能调试助手上位机开源!共六款软件,学到你吐... , - 电脑上位机 - 野火电子论坛 - Powered by Discuz! https://www.firebbs.cn/for…...
【面试题】前端面试复习6---性能优化
前端面试题库 (面试必备) 推荐:★★★★★ 地址:前端面试题库 性能优化 一、性能指标 要在 Chrome 中查看性能指标,可以按照以下步骤操作: 打开 Chrome 浏览器,并访问你想要测试…...
隧道HTTP具备的条件
作为一名专业的爬虫代理供应商,我们都知道使用代理是保证爬虫的高效性和稳定性的重要手段之一。而隧道代理则是近年来备受推崇的一种代理形式,它通过将请求通过隧道传输,可以有效地隐藏爬虫的真实IP地址,提高爬虫的反爬能力。 在…...
部署FTP服务(二)
目录 2.访问FTP服务 1.使用ftp命令行工具 2.使用浏览器 3.使用FileZilla Client 3.Serv-U 1.定义新域 2.创建用户 4. windowsserver搭建ftp服务器 一、FTP工具 二、Windows资源管理器 三、IE浏览器访问 2.访问FTP服务 下面在一台装有Windows10操作系统的计算机中&#…...
缓存的变更(JVM本地缓存->Redis分布式缓存)
在一次需求修改中,下游的服务附加提出了,针对某个业务数据缓存的生效时间的要求 原JVM设计方案: 采用jvm本地缓存机制,定时任务30秒刷新一次 现在redis方案: 因为很多地方使用了这个业务数据缓存,使用方…...
springMVC Unix 文件参数变更漏洞修复
错误信息如下: 解决方案: 原因:未对用户输入正确执行危险字符清理 未检查用户输入中是否包含“…”(两个点)字符串,比如 url 为 /login?action…/webapps/RTJEKSWTN26635&typerandomCode cookie为Coo…...
【LeetCode】494.目标和
题目 给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个整数前添加 或 - ,然后串联起所有整数,可以构造一个 表达式 : 例如,nums [2, 1] ,可以在 2 之前添加 ,在 1 之前添加 - &#x…...
KaiwuDB 荣获哈佛商业评论 2023“高能韧性团队奖”
8月18日,《哈佛商业评论》中文版携手 FESCO 成功举办“第九届人才经济论坛”暨“2022-2023 高能团队奖颁奖典礼”。论坛秉承前沿的全球视野及权威的管理理念,发掘并展示本土企业组织管理的最佳实践,并重磅揭晓第二届“高能团队奖”评选结果。…...
删除ubuntu开始菜单中的图标
背景 本来是很好看干净的界面 更新谷歌浏览器后出现了Gmail,幻灯片,谷歌硬盘等跟谷歌相关的乱七八糟东西搞得界面就很丑 解决问题 删掉那个图标 输入命令 sudo nautilus /usr/share/applicationssudo nautilus ~/.local/share/applications可以…...
信息系统项目管理基础知识学习笔记 - IT 治理基础 - IT治理的驱动因素
信息系统项目管理基础知识学习笔记 - IT 治理基础 - IT治理的驱动因素 IT治理的驱动因素组织的IT战略驱动组织开展高质量IT治理因素IT治理的内涵IT 治理体系信息系统项目管理基础知识学习笔记 - IT 治理基础 - IT治理的驱动因素 IT治理的驱动因素 组织信息系统建设和运行需要…...
8月21-22日上课内容 第一章 MySQL数据库初始
本章结构 数据库的基本概念 概述(总览) 结构: 数据 表 数据库 数据库管理系统 数据库系统原理 数据 (Data) 描述事物的符号记录 包括数字,文字、图形、图像、声音、档案记录等以“记录”形式按统一的格式进行存储表 将不同…...
等级查询发布助手
考试成绩的发布是学校教学工作中的一项重要任务,传统的手工录入、统计和发布成绩的方式既耗时又容易出错。为了提高老师的工作效率和准确性,推荐老师们试一试易查分考试等级发布系统。 易查分是一个查询/发布发布平台 1. 快速高效:老师只需将…...
AC鸭的温度墙
题目描述AC鸭在实验室里看到了一面很长的温度墙,这面墙从左到右一共有 n 个位置。一开始,每个位置的温度都是 0。接下来 AC鸭会进行 m 次加热操作。每次操作给出 l,r,v表示把第l个位置到第r个位置的温度都加上上v。所有操作结束后,AC鸭想知道…...
【Gemini赋能Google Maps路线优化实战指南】:20年导航算法专家亲授5大降本增效核心策略
更多请点击: https://intelliparadigm.com 第一章:Gemini赋能Google Maps路线优化的底层逻辑与演进脉络 Google Maps 路线规划正经历从传统图算法向多模态智能推理的范式迁移。Gemini 模型并非简单替代 Dijkstra 或 A*,而是作为实时决策中枢…...
三菱FX3U串口通讯无协议编程与RS指令实现Modbus协议
引言 在工业自动化系统中,PLC与上位机之间的通讯至关重要。Modbus RTU协议 作为一种广泛应用的通讯协议,通常用于不同设备之间的数据交换。 对于三菱 FX3U系列PLC 来说,虽然它没有直接内置完整的Modbus RTU从站功能(早期型号需通过…...
BT33F双基二极管:从负阻特性到张弛振荡的实战测试
1. BT33F双基二极管初探:认识这个神奇的小东西 第一次见到BT33F双基二极管时,我完全被它小巧的外形迷惑了——这个看起来和普通二极管差不多的器件,居然能产生如此有趣的负阻特性。记得当时实验室的老师傅神秘兮兮地跟我说:"…...
不止于校验:用HashMyFiles命令行玩转文件批量管理与VirusTotal联动
从本地到云端:HashMyFiles命令行与VirusTotal联动的安全自动化实践 在数字化时代,文件完整性校验和安全检测已成为IT运维、安全分析乃至日常开发中不可或缺的环节。传统图形界面工具虽然直观,但在处理大批量文件或需要自动化集成的场景下显得…...
VisualCppRedist AIO:一站式高效解决Windows运行库依赖问题
VisualCppRedist AIO:一站式高效解决Windows运行库依赖问题 【免费下载链接】vcredist AIO Repack for latest Microsoft Visual C Redistributable Runtimes 项目地址: https://gitcode.com/gh_mirrors/vc/vcredist VisualCppRedist AIO是一个专门为Windows…...
Icarus Verilog终极指南:3分钟掌握开源Verilog仿真工具
Icarus Verilog终极指南:3分钟掌握开源Verilog仿真工具 【免费下载链接】iverilog Icarus Verilog 项目地址: https://gitcode.com/gh_mirrors/iv/iverilog 你是否正在寻找一个完全免费、跨平台的Verilog仿真解决方案?Icarus Verilog(…...
光刻热点修复技术:提升芯片良率的关键方法
1. 光刻热点修复技术概述在45nm及更先进工艺节点下,光刻热点(Litho hotspot)已成为制约集成电路良率提升的关键因素之一。这类问题区域在传统设计规则检查(DRC)中往往难以被完全捕捉,因为它们本质上是由复杂…...
从入门到精通:泉盛UV-K5/K6开源固件的无线通信革命
从入门到精通:泉盛UV-K5/K6开源固件的无线通信革命 【免费下载链接】uv-k5-firmware-custom 全功能泉盛UV-K5/K6固件 Quansheng UV-K5/K6 Firmware 项目地址: https://gitcode.com/gh_mirrors/uvk5f/uv-k5-firmware-custom 想象一下,你手中的百元…...
League Akari终极指南:英雄联盟玩家的智能游戏助手完整教程
League Akari终极指南:英雄联盟玩家的智能游戏助手完整教程 【免费下载链接】League-Toolkit An all-in-one toolkit for LeagueClient. Gathering power 🚀. 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 还在为英雄联盟的繁琐操…...
