Flink checkpoint 源码分析- Checkpoint snapshot 处理流程
背景
在上一篇博客中我们分析了代码中barrier的是如何流动传递的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客
最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent
现在我们接着跟踪相应代码,观察算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。
代码分析
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。

processBarrier方法实现上就可以看出,flink barrier的处理分成两种。

在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。
通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。

org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler
这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。

对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理过程。

总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。
At least once 下 barrier是如何处理的
at least once 下对于barrier的处理是在以下的方法中实现的。
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier
public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {final long barrierId = receivedBarrier.getId();// fast path for single channel trackersif (totalNumberOfInputChannels == 1) {markAlignmentStartAndEnd(receivedBarrier.getTimestamp());notifyCheckpoint(receivedBarrier);return;}// general path for multiple input channelsif (LOG.isDebugEnabled()) {LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount barrierCount = null;int pos = 0;for (CheckpointBarrierCount next : pendingCheckpoints) {if (next.checkpointId == barrierId) {barrierCount = next;break;}pos++;}if (barrierCount != null) {// add one to the count to that barrier and check for completionint numBarriersNew = barrierCount.incrementBarrierCount();if (numBarriersNew == totalNumberOfInputChannels) {// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for (int i = 0; i <= pos; i++) {pendingCheckpoints.pollFirst();}// notify the listenerif (!barrierCount.isAborted()) {if (LOG.isDebugEnabled()) {LOG.debug("Received all barriers for checkpoint {}", barrierId);}markAlignmentEnd();notifyCheckpoint(receivedBarrier);}}}else {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif (barrierId > latestPendingCheckpointID) {markAlignmentStart(receivedBarrier.getTimestamp());latestPendingCheckpointID = barrierId;pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}}
如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.
在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。

实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。
这里面当收取到第一个barrier,会将这个barrier信息存在一个队列中。
每当收到一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候把之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。

总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。
Exactly once checkpoint是如何处理的
首先看这一段的代码
@Overridepublic void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {long barrierId = barrier.getId();LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);if (currentCheckpointId > barrierId|| (currentCheckpointId == barrierId && !isCheckpointPending())) {if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);}return;}checkNewCheckpoint(barrier);checkState(currentCheckpointId == barrierId);if (numBarriersReceived++ == 0) {if (getNumOpenChannels() == 1) {markAlignmentStartAndEnd(barrier.getTimestamp());} else {markAlignmentStart(barrier.getTimestamp());}}// we must mark alignment end before calling currentState.barrierReceived which might// trigger a checkpoint with unfinished future for alignment durationif (numBarriersReceived == numOpenChannels) {if (getNumOpenChannels() > 1) {markAlignmentEnd();}}try {currentState = currentState.barrierReceived(context, channelInfo, barrier);} catch (CheckpointException e) {abortInternal(barrier.getId(), e);} catch (Exception e) {ExceptionUtils.rethrowIOException(e);}if (numBarriersReceived == numOpenChannels) {numBarriersReceived = 0;lastCancelledOrCompletedCheckpointId = currentCheckpointId;LOG.debug("{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);resetAlignmentTimer();allBarriersReceivedFuture.complete(null);}}
这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。

这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。

这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.

如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。

这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint,所以这个时候是不会block住信道的。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived

org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived

最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。

总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。
unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。
在这一篇文章我们主要介绍了ssubtask级别的snapshot的,后面再进一步介绍一下整体流程,就可以结束相关介绍了。
相关文章:
Flink checkpoint 源码分析- Checkpoint snapshot 处理流程
背景 在上一篇博客中我们分析了代码中barrier的是如何流动传递的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客 最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 现在我们接着跟踪相应…...
Leaflet.canvaslabel在Ajax异步请求时bindPopup无效的解决办法
目录 前言 一、场景重现 1、遇到问题的代码 2、问题排查 二、通过实验验证猜想 1、排查LayerGroup和FeatureGroup 2、排查Leaflet.canvaslabel.js 三、柳暗花明又一村 1、点聚类的办法 2、歪打正着 总结 前言 在上一篇博客中介绍了基于SpringBoot的全国风景区WebGIS按…...
Go 处理错误
如果你习惯了 try catch 这样的语法后,会觉得处理错误真简单,然后你再来接触 Go 的错误异常,你会发现他好复杂啊,怎么到处都是 error,到处都需要处理 error。 首先咱们需要知道 Go 语言里面有个约定,就是一…...
python读取excel数据写入mysql
概述 业务中有时会需要解析excel中的数据,按照要求处理后,写入到db中; 用python处理这个正好简便快捷 demo 没有依赖就 pip install pymysql一下 import pymysql from pymysql.converters import escape_string from openpyxl import loa…...
flutter日期选择器仅选择年、月
引入包:flutter_datetime_picker: 1.5.0 封装 import package:flutter/cupertino.dart; import package:flutter/material.dart; import package:flutter_datetime_picker/flutter_datetime_picker.dart;class ATuiDateTimePicker {static Future<DateTime> …...
素数筛详解c++
一、埃式筛法 代码 二、线性筛法(欧拉筛法) 主要的思想就是一个质数的倍数(倍数为1除外)肯定是合数,那么我们利用这个质数算出合数,然后划掉这个合数,下次就可以不用判断它是不是质数,节省了大量的时间。 …...
【Python超详细的学习笔记】Python超详细的学习笔记,涉及多个领域,是个很不错的笔记
获取笔记链接 Python超详细的学习笔记 一,逆向加密模块 1,Python中运行JS代码 1.1 解决中文乱码或者报错问题 import subprocess from functools import partial subprocess.Popen partial(subprocess.Popen, encodingutf-8) import execjs1.2 常用…...
TINA 使用教程
常用功能 分析-电气规则检查:短路,断路等分析- 直流分析 交流分析 瞬态分析 视图-分离曲线 由于输出的容性负载导致的振荡 增加5欧电阻后OK 横扫参数 添加横扫曲线的电阻,选择R3:8K-20K PWL和WAV文件的支持 示例一:…...
weblogic 任意文件上传 CVE-2018-2894
一、漏洞简介 在 Weblogic Web Service Test Page 中存在一处任意文件上传漏洞, Web Service Test Page 在"生产模式"下默认不开启,所以该漏洞有一定限制。利用该 漏洞,可以上传任意 jsp 文件,进而获取服务器权限。 二…...
我的第一个网页:武理天协
1. html代码 1.1 首页.html <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><title>武理天协</title><link rel"stylesheet" href"./style.css"><link rel"stylesh…...
机器学习笔记 KAN网络架构简述(Kolmogorov-Arnold Networks)
一、简述 在最近的研究中,出现了号称传统多层感知器 (MLP) 的突破性替代方案,重塑了人工神经网络 (ANN) 的格局。这种创新架构被称为柯尔莫哥洛夫-阿诺德网络 (KAN),它提出了一种受柯尔莫哥洛夫-阿诺德表示定理启发的函数逼近的方法。 与 MLP 不同,MLP 依赖于各个节…...
基于网络爬虫技术的网络新闻分析(二)
目录 2 系统需求分析 2.1 系统需求概述 2.2 系统需求分析 2.2.1 系统功能要求 2.2.2 系统IPO图 2.2 系统非功能性需求分析 3 系统概要设计 3.1 设计约束 3.1.1 需求约束 3.1.2 设计策略 3.1.3 技术实现 3.3 模块结构 3.3.1 模块结构图 3.3.2 系统层次图 3.3.3…...
Java--初识类和对象
前言 本篇讲解Java类和对象的入门版本。 学习目的: 1.理解什么是类和对象。 2.引入面向对象程序设计的概念 3.学会如何定义类和创建对象。 4.理解this引用。 5.了解构造方法的概念并学会使用 考虑到篇幅过长问题,作者决定分多次发布。 面向对象的引入 J…...
SpringBoot如何实现动态数据源?
在Spring Boot中实现动态数据源主要涉及到创建和管理不同的数据源,并在运行时根据需要切换。这可以通过编程方式配置Spring的AbstractRoutingDataSource来完成。下面我会逐步介绍如何实现动态数据源,并给出代码示例。 第1步:添加依赖 首先&…...
win10安装mysql8.0+汉化
一、官网安装 MySQL 1. 在mysql官网进行下载页面 2. 下滑页面,选择 MySQL community download 3.下载windows版本 4.选择第二个download 5.不用登陆,no thanks,just start my download. 6.下载 二、安装 1. 双击安装 2. 选 Full->next 3…...
全网最全的Postman接口自动化测试!
该篇文章针对已经掌握 Postman 基本用法的读者,即对接口相关概念有一定了解、已经会使用 Postman 进行模拟请求的操作。 当前环境: Window 7 - 64 Postman 版本(免费版):Chrome App v5.5.3 不同版本页面 UI 和部分…...
Spring:了解@Import注解的三种用法
一、前言 在 Spring 框架中,Import 注解用于导入配置类,使得你可以在一个配置类中引入另一个或多个配置类,从而实现配置的模块化。这对于组织大型应用程序的配置非常有用,因为它允许你将配置分散到多个类中,然后再将它…...
简要介绍三大脚本语言 Shell、Python 和 Lua
🍉 CSDN 叶庭云:https://yetingyun.blog.csdn.net/ 脚本语言是一种用于自动化操作系统任务和应用程序功能的编程语言。它们通常用于编写小到中等规模的程序,以提高任务执行的速度和效率。在众多脚本语言中,Shell、Python 和 Lua 是…...
第 397 场 LeetCode 周赛题解
A 两个字符串的排列差 模拟:遍历 s s s 记录各字符出现的位置,然后遍历 t t t 计算排列差 class Solution {public:int findPermutationDifference(string s, string t) {int n s.size();vector<int> loc(26);for (int i 0; i < n; i)loc[s…...
文件存储解决方案-阿里云OSS
文章目录 1.菜单分级显示问题1.问题引出1.苹果灯,放到节能灯下面也就是id大于1272.查看菜单,并没有出现苹果灯3.放到灯具下面id42,就可以显示 2.问题分析和解决1.判断可能出现问题的位置2.找到递归返回树形菜单数据的位置3.这里出现问题的原因…...
51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
【OSG学习笔记】Day 18: 碰撞检测与物理交互
物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...
JavaScript基础-API 和 Web API
在学习JavaScript的过程中,理解API(应用程序接口)和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能,使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
基于PHP的连锁酒店管理系统
有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发,数据库mysql,前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...
