Flink checkpoint 源码分析- Checkpoint snapshot 处理流程
背景
在上一篇博客中我们分析了代码中barrier的是如何流动传递的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客
最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent
现在我们接着跟踪相应代码,观察算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。
代码分析
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。
processBarrier方法实现上就可以看出,flink barrier的处理分成两种。
在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。
通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。
org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler
这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。
对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理过程。
总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。
At least once 下 barrier是如何处理的
at least once 下对于barrier的处理是在以下的方法中实现的。
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier
public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {final long barrierId = receivedBarrier.getId();// fast path for single channel trackersif (totalNumberOfInputChannels == 1) {markAlignmentStartAndEnd(receivedBarrier.getTimestamp());notifyCheckpoint(receivedBarrier);return;}// general path for multiple input channelsif (LOG.isDebugEnabled()) {LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount barrierCount = null;int pos = 0;for (CheckpointBarrierCount next : pendingCheckpoints) {if (next.checkpointId == barrierId) {barrierCount = next;break;}pos++;}if (barrierCount != null) {// add one to the count to that barrier and check for completionint numBarriersNew = barrierCount.incrementBarrierCount();if (numBarriersNew == totalNumberOfInputChannels) {// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for (int i = 0; i <= pos; i++) {pendingCheckpoints.pollFirst();}// notify the listenerif (!barrierCount.isAborted()) {if (LOG.isDebugEnabled()) {LOG.debug("Received all barriers for checkpoint {}", barrierId);}markAlignmentEnd();notifyCheckpoint(receivedBarrier);}}}else {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif (barrierId > latestPendingCheckpointID) {markAlignmentStart(receivedBarrier.getTimestamp());latestPendingCheckpointID = barrierId;pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}}
如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.
在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。
实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。
这里面当收取到第一个barrier,会将这个barrier信息存在一个队列中。
每当收到一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候把之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。
总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。
Exactly once checkpoint是如何处理的
首先看这一段的代码
@Overridepublic void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {long barrierId = barrier.getId();LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);if (currentCheckpointId > barrierId|| (currentCheckpointId == barrierId && !isCheckpointPending())) {if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);}return;}checkNewCheckpoint(barrier);checkState(currentCheckpointId == barrierId);if (numBarriersReceived++ == 0) {if (getNumOpenChannels() == 1) {markAlignmentStartAndEnd(barrier.getTimestamp());} else {markAlignmentStart(barrier.getTimestamp());}}// we must mark alignment end before calling currentState.barrierReceived which might// trigger a checkpoint with unfinished future for alignment durationif (numBarriersReceived == numOpenChannels) {if (getNumOpenChannels() > 1) {markAlignmentEnd();}}try {currentState = currentState.barrierReceived(context, channelInfo, barrier);} catch (CheckpointException e) {abortInternal(barrier.getId(), e);} catch (Exception e) {ExceptionUtils.rethrowIOException(e);}if (numBarriersReceived == numOpenChannels) {numBarriersReceived = 0;lastCancelledOrCompletedCheckpointId = currentCheckpointId;LOG.debug("{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);resetAlignmentTimer();allBarriersReceivedFuture.complete(null);}}
这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。
这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。
这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.
如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。
这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint,所以这个时候是不会block住信道的。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived
最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。
总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。
unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。
在这一篇文章我们主要介绍了ssubtask级别的snapshot的,后面再进一步介绍一下整体流程,就可以结束相关介绍了。
相关文章:

Flink checkpoint 源码分析- Checkpoint snapshot 处理流程
背景 在上一篇博客中我们分析了代码中barrier的是如何流动传递的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客 最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 现在我们接着跟踪相应…...

Leaflet.canvaslabel在Ajax异步请求时bindPopup无效的解决办法
目录 前言 一、场景重现 1、遇到问题的代码 2、问题排查 二、通过实验验证猜想 1、排查LayerGroup和FeatureGroup 2、排查Leaflet.canvaslabel.js 三、柳暗花明又一村 1、点聚类的办法 2、歪打正着 总结 前言 在上一篇博客中介绍了基于SpringBoot的全国风景区WebGIS按…...
Go 处理错误
如果你习惯了 try catch 这样的语法后,会觉得处理错误真简单,然后你再来接触 Go 的错误异常,你会发现他好复杂啊,怎么到处都是 error,到处都需要处理 error。 首先咱们需要知道 Go 语言里面有个约定,就是一…...
python读取excel数据写入mysql
概述 业务中有时会需要解析excel中的数据,按照要求处理后,写入到db中; 用python处理这个正好简便快捷 demo 没有依赖就 pip install pymysql一下 import pymysql from pymysql.converters import escape_string from openpyxl import loa…...

flutter日期选择器仅选择年、月
引入包:flutter_datetime_picker: 1.5.0 封装 import package:flutter/cupertino.dart; import package:flutter/material.dart; import package:flutter_datetime_picker/flutter_datetime_picker.dart;class ATuiDateTimePicker {static Future<DateTime> …...

素数筛详解c++
一、埃式筛法 代码 二、线性筛法(欧拉筛法) 主要的思想就是一个质数的倍数(倍数为1除外)肯定是合数,那么我们利用这个质数算出合数,然后划掉这个合数,下次就可以不用判断它是不是质数,节省了大量的时间。 …...

【Python超详细的学习笔记】Python超详细的学习笔记,涉及多个领域,是个很不错的笔记
获取笔记链接 Python超详细的学习笔记 一,逆向加密模块 1,Python中运行JS代码 1.1 解决中文乱码或者报错问题 import subprocess from functools import partial subprocess.Popen partial(subprocess.Popen, encodingutf-8) import execjs1.2 常用…...

TINA 使用教程
常用功能 分析-电气规则检查:短路,断路等分析- 直流分析 交流分析 瞬态分析 视图-分离曲线 由于输出的容性负载导致的振荡 增加5欧电阻后OK 横扫参数 添加横扫曲线的电阻,选择R3:8K-20K PWL和WAV文件的支持 示例一:…...

weblogic 任意文件上传 CVE-2018-2894
一、漏洞简介 在 Weblogic Web Service Test Page 中存在一处任意文件上传漏洞, Web Service Test Page 在"生产模式"下默认不开启,所以该漏洞有一定限制。利用该 漏洞,可以上传任意 jsp 文件,进而获取服务器权限。 二…...
我的第一个网页:武理天协
1. html代码 1.1 首页.html <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><title>武理天协</title><link rel"stylesheet" href"./style.css"><link rel"stylesh…...

机器学习笔记 KAN网络架构简述(Kolmogorov-Arnold Networks)
一、简述 在最近的研究中,出现了号称传统多层感知器 (MLP) 的突破性替代方案,重塑了人工神经网络 (ANN) 的格局。这种创新架构被称为柯尔莫哥洛夫-阿诺德网络 (KAN),它提出了一种受柯尔莫哥洛夫-阿诺德表示定理启发的函数逼近的方法。 与 MLP 不同,MLP 依赖于各个节…...

基于网络爬虫技术的网络新闻分析(二)
目录 2 系统需求分析 2.1 系统需求概述 2.2 系统需求分析 2.2.1 系统功能要求 2.2.2 系统IPO图 2.2 系统非功能性需求分析 3 系统概要设计 3.1 设计约束 3.1.1 需求约束 3.1.2 设计策略 3.1.3 技术实现 3.3 模块结构 3.3.1 模块结构图 3.3.2 系统层次图 3.3.3…...

Java--初识类和对象
前言 本篇讲解Java类和对象的入门版本。 学习目的: 1.理解什么是类和对象。 2.引入面向对象程序设计的概念 3.学会如何定义类和创建对象。 4.理解this引用。 5.了解构造方法的概念并学会使用 考虑到篇幅过长问题,作者决定分多次发布。 面向对象的引入 J…...
SpringBoot如何实现动态数据源?
在Spring Boot中实现动态数据源主要涉及到创建和管理不同的数据源,并在运行时根据需要切换。这可以通过编程方式配置Spring的AbstractRoutingDataSource来完成。下面我会逐步介绍如何实现动态数据源,并给出代码示例。 第1步:添加依赖 首先&…...

win10安装mysql8.0+汉化
一、官网安装 MySQL 1. 在mysql官网进行下载页面 2. 下滑页面,选择 MySQL community download 3.下载windows版本 4.选择第二个download 5.不用登陆,no thanks,just start my download. 6.下载 二、安装 1. 双击安装 2. 选 Full->next 3…...

全网最全的Postman接口自动化测试!
该篇文章针对已经掌握 Postman 基本用法的读者,即对接口相关概念有一定了解、已经会使用 Postman 进行模拟请求的操作。 当前环境: Window 7 - 64 Postman 版本(免费版):Chrome App v5.5.3 不同版本页面 UI 和部分…...

Spring:了解@Import注解的三种用法
一、前言 在 Spring 框架中,Import 注解用于导入配置类,使得你可以在一个配置类中引入另一个或多个配置类,从而实现配置的模块化。这对于组织大型应用程序的配置非常有用,因为它允许你将配置分散到多个类中,然后再将它…...

简要介绍三大脚本语言 Shell、Python 和 Lua
🍉 CSDN 叶庭云:https://yetingyun.blog.csdn.net/ 脚本语言是一种用于自动化操作系统任务和应用程序功能的编程语言。它们通常用于编写小到中等规模的程序,以提高任务执行的速度和效率。在众多脚本语言中,Shell、Python 和 Lua 是…...

第 397 场 LeetCode 周赛题解
A 两个字符串的排列差 模拟:遍历 s s s 记录各字符出现的位置,然后遍历 t t t 计算排列差 class Solution {public:int findPermutationDifference(string s, string t) {int n s.size();vector<int> loc(26);for (int i 0; i < n; i)loc[s…...

文件存储解决方案-阿里云OSS
文章目录 1.菜单分级显示问题1.问题引出1.苹果灯,放到节能灯下面也就是id大于1272.查看菜单,并没有出现苹果灯3.放到灯具下面id42,就可以显示 2.问题分析和解决1.判断可能出现问题的位置2.找到递归返回树形菜单数据的位置3.这里出现问题的原因…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...

【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
大语言模型如何处理长文本?常用文本分割技术详解
为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...

跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...