Flink checkpoint 源码分析- Checkpoint snapshot 处理流程
背景
在上一篇博客中我们分析了代码中barrier的是如何流动传递的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客
最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent
现在我们接着跟踪相应代码,观察算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。
代码分析
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。
processBarrier方法实现上就可以看出,flink barrier的处理分成两种。
在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。
通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。
org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler
这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。
对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理过程。
总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。
At least once 下 barrier是如何处理的
at least once 下对于barrier的处理是在以下的方法中实现的。
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier
public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {final long barrierId = receivedBarrier.getId();// fast path for single channel trackersif (totalNumberOfInputChannels == 1) {markAlignmentStartAndEnd(receivedBarrier.getTimestamp());notifyCheckpoint(receivedBarrier);return;}// general path for multiple input channelsif (LOG.isDebugEnabled()) {LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount barrierCount = null;int pos = 0;for (CheckpointBarrierCount next : pendingCheckpoints) {if (next.checkpointId == barrierId) {barrierCount = next;break;}pos++;}if (barrierCount != null) {// add one to the count to that barrier and check for completionint numBarriersNew = barrierCount.incrementBarrierCount();if (numBarriersNew == totalNumberOfInputChannels) {// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for (int i = 0; i <= pos; i++) {pendingCheckpoints.pollFirst();}// notify the listenerif (!barrierCount.isAborted()) {if (LOG.isDebugEnabled()) {LOG.debug("Received all barriers for checkpoint {}", barrierId);}markAlignmentEnd();notifyCheckpoint(receivedBarrier);}}}else {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif (barrierId > latestPendingCheckpointID) {markAlignmentStart(receivedBarrier.getTimestamp());latestPendingCheckpointID = barrierId;pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}}
如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.
在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。
实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。
这里面当收取到第一个barrier,会将这个barrier信息存在一个队列中。
每当收到一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候把之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。
总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。
Exactly once checkpoint是如何处理的
首先看这一段的代码
@Overridepublic void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {long barrierId = barrier.getId();LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);if (currentCheckpointId > barrierId|| (currentCheckpointId == barrierId && !isCheckpointPending())) {if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);}return;}checkNewCheckpoint(barrier);checkState(currentCheckpointId == barrierId);if (numBarriersReceived++ == 0) {if (getNumOpenChannels() == 1) {markAlignmentStartAndEnd(barrier.getTimestamp());} else {markAlignmentStart(barrier.getTimestamp());}}// we must mark alignment end before calling currentState.barrierReceived which might// trigger a checkpoint with unfinished future for alignment durationif (numBarriersReceived == numOpenChannels) {if (getNumOpenChannels() > 1) {markAlignmentEnd();}}try {currentState = currentState.barrierReceived(context, channelInfo, barrier);} catch (CheckpointException e) {abortInternal(barrier.getId(), e);} catch (Exception e) {ExceptionUtils.rethrowIOException(e);}if (numBarriersReceived == numOpenChannels) {numBarriersReceived = 0;lastCancelledOrCompletedCheckpointId = currentCheckpointId;LOG.debug("{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);resetAlignmentTimer();allBarriersReceivedFuture.complete(null);}}
这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。
这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。
这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.
如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。
这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint,所以这个时候是不会block住信道的。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived
最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。
总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。
unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。
在这一篇文章我们主要介绍了ssubtask级别的snapshot的,后面再进一步介绍一下整体流程,就可以结束相关介绍了。
相关文章:

Flink checkpoint 源码分析- Checkpoint snapshot 处理流程
背景 在上一篇博客中我们分析了代码中barrier的是如何流动传递的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客 最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 现在我们接着跟踪相应…...

Leaflet.canvaslabel在Ajax异步请求时bindPopup无效的解决办法
目录 前言 一、场景重现 1、遇到问题的代码 2、问题排查 二、通过实验验证猜想 1、排查LayerGroup和FeatureGroup 2、排查Leaflet.canvaslabel.js 三、柳暗花明又一村 1、点聚类的办法 2、歪打正着 总结 前言 在上一篇博客中介绍了基于SpringBoot的全国风景区WebGIS按…...

Go 处理错误
如果你习惯了 try catch 这样的语法后,会觉得处理错误真简单,然后你再来接触 Go 的错误异常,你会发现他好复杂啊,怎么到处都是 error,到处都需要处理 error。 首先咱们需要知道 Go 语言里面有个约定,就是一…...

python读取excel数据写入mysql
概述 业务中有时会需要解析excel中的数据,按照要求处理后,写入到db中; 用python处理这个正好简便快捷 demo 没有依赖就 pip install pymysql一下 import pymysql from pymysql.converters import escape_string from openpyxl import loa…...

flutter日期选择器仅选择年、月
引入包:flutter_datetime_picker: 1.5.0 封装 import package:flutter/cupertino.dart; import package:flutter/material.dart; import package:flutter_datetime_picker/flutter_datetime_picker.dart;class ATuiDateTimePicker {static Future<DateTime> …...

素数筛详解c++
一、埃式筛法 代码 二、线性筛法(欧拉筛法) 主要的思想就是一个质数的倍数(倍数为1除外)肯定是合数,那么我们利用这个质数算出合数,然后划掉这个合数,下次就可以不用判断它是不是质数,节省了大量的时间。 …...

【Python超详细的学习笔记】Python超详细的学习笔记,涉及多个领域,是个很不错的笔记
获取笔记链接 Python超详细的学习笔记 一,逆向加密模块 1,Python中运行JS代码 1.1 解决中文乱码或者报错问题 import subprocess from functools import partial subprocess.Popen partial(subprocess.Popen, encodingutf-8) import execjs1.2 常用…...

TINA 使用教程
常用功能 分析-电气规则检查:短路,断路等分析- 直流分析 交流分析 瞬态分析 视图-分离曲线 由于输出的容性负载导致的振荡 增加5欧电阻后OK 横扫参数 添加横扫曲线的电阻,选择R3:8K-20K PWL和WAV文件的支持 示例一:…...

weblogic 任意文件上传 CVE-2018-2894
一、漏洞简介 在 Weblogic Web Service Test Page 中存在一处任意文件上传漏洞, Web Service Test Page 在"生产模式"下默认不开启,所以该漏洞有一定限制。利用该 漏洞,可以上传任意 jsp 文件,进而获取服务器权限。 二…...

我的第一个网页:武理天协
1. html代码 1.1 首页.html <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><title>武理天协</title><link rel"stylesheet" href"./style.css"><link rel"stylesh…...

机器学习笔记 KAN网络架构简述(Kolmogorov-Arnold Networks)
一、简述 在最近的研究中,出现了号称传统多层感知器 (MLP) 的突破性替代方案,重塑了人工神经网络 (ANN) 的格局。这种创新架构被称为柯尔莫哥洛夫-阿诺德网络 (KAN),它提出了一种受柯尔莫哥洛夫-阿诺德表示定理启发的函数逼近的方法。 与 MLP 不同,MLP 依赖于各个节…...

基于网络爬虫技术的网络新闻分析(二)
目录 2 系统需求分析 2.1 系统需求概述 2.2 系统需求分析 2.2.1 系统功能要求 2.2.2 系统IPO图 2.2 系统非功能性需求分析 3 系统概要设计 3.1 设计约束 3.1.1 需求约束 3.1.2 设计策略 3.1.3 技术实现 3.3 模块结构 3.3.1 模块结构图 3.3.2 系统层次图 3.3.3…...

Java--初识类和对象
前言 本篇讲解Java类和对象的入门版本。 学习目的: 1.理解什么是类和对象。 2.引入面向对象程序设计的概念 3.学会如何定义类和创建对象。 4.理解this引用。 5.了解构造方法的概念并学会使用 考虑到篇幅过长问题,作者决定分多次发布。 面向对象的引入 J…...

SpringBoot如何实现动态数据源?
在Spring Boot中实现动态数据源主要涉及到创建和管理不同的数据源,并在运行时根据需要切换。这可以通过编程方式配置Spring的AbstractRoutingDataSource来完成。下面我会逐步介绍如何实现动态数据源,并给出代码示例。 第1步:添加依赖 首先&…...

win10安装mysql8.0+汉化
一、官网安装 MySQL 1. 在mysql官网进行下载页面 2. 下滑页面,选择 MySQL community download 3.下载windows版本 4.选择第二个download 5.不用登陆,no thanks,just start my download. 6.下载 二、安装 1. 双击安装 2. 选 Full->next 3…...

全网最全的Postman接口自动化测试!
该篇文章针对已经掌握 Postman 基本用法的读者,即对接口相关概念有一定了解、已经会使用 Postman 进行模拟请求的操作。 当前环境: Window 7 - 64 Postman 版本(免费版):Chrome App v5.5.3 不同版本页面 UI 和部分…...

Spring:了解@Import注解的三种用法
一、前言 在 Spring 框架中,Import 注解用于导入配置类,使得你可以在一个配置类中引入另一个或多个配置类,从而实现配置的模块化。这对于组织大型应用程序的配置非常有用,因为它允许你将配置分散到多个类中,然后再将它…...

简要介绍三大脚本语言 Shell、Python 和 Lua
🍉 CSDN 叶庭云:https://yetingyun.blog.csdn.net/ 脚本语言是一种用于自动化操作系统任务和应用程序功能的编程语言。它们通常用于编写小到中等规模的程序,以提高任务执行的速度和效率。在众多脚本语言中,Shell、Python 和 Lua 是…...

第 397 场 LeetCode 周赛题解
A 两个字符串的排列差 模拟:遍历 s s s 记录各字符出现的位置,然后遍历 t t t 计算排列差 class Solution {public:int findPermutationDifference(string s, string t) {int n s.size();vector<int> loc(26);for (int i 0; i < n; i)loc[s…...

文件存储解决方案-阿里云OSS
文章目录 1.菜单分级显示问题1.问题引出1.苹果灯,放到节能灯下面也就是id大于1272.查看菜单,并没有出现苹果灯3.放到灯具下面id42,就可以显示 2.问题分析和解决1.判断可能出现问题的位置2.找到递归返回树形菜单数据的位置3.这里出现问题的原因…...

基于Java的飞机大战游戏的设计与实现(论文 + 源码)
关于基于Java的飞机大战游戏.zip资源-CSDN文库https://download.csdn.net/download/JW_559/89313362 基于Java的飞机大战游戏的设计与实现 摘 要 现如今,随着智能手机的兴起与普及,加上4G(the 4th Generation mobile communication &#x…...

Vue路由开启步骤
1.在控制台输入命令 //控制台下载安装npm add vue-router3.6.5 2.在main.js下导入并注册组件 import Vue from vue import App from ./App.vue//控制台下载安装npm add vue-router3.6.5 //导入 import VueRouter from "vue-router";//注册 Vue.use(VueRouter) con…...

【碎片知识】2024_05_15
char int long float double运算的时候是从低转到高的,表达式的类型会自动提升或者转 换为参与表达式求值的最上级类型. 关于代码的说法正确的是( ) #include <stdio.h> int main() {int x -1;unsigned int y 2;if (x > y){printf…...

彩虹聚合DNS管理系统
聚合DNS管理系统可以实现在一个网站内管理多个平台的域名解析,目前已支持的域名平台有:阿里云、腾讯云、华为云、西部数码、CloudFlare。本系统支持多用户,每个用户可分配不同的域名解析权限;支持API接口,支持获取域名…...

服务网格 SolarMesh v1.13 重磅发布
SolarMesh是行云创新推出的流量治理平台,它基于Istio,为部署在K8s集群上的应用提供全面的流量治理能力。 在之前的版本中,SolarMesh提供的能力有:流量视图,流量控制策略批量配置,API级别的流量数据采集和展…...

三大平台直播视频下载保存方法
终于解决了视频号下载的问题,2024年5月15日亲测可用。 而且免费。 教程第二部分,有本地电脑无法下载的解决方案。 第一部分:使用教程(正常) 第1步:下载安装包 下载迅雷网盘搜索:大海福利合集…...

OpenAI GPT-4o - 介绍
本文翻译整理自: Hello GPT-4o https://openai.com/index/hello-gpt-4o/ 文章目录 一、关于 GPT-4o二、模型能力三、能力探索四、模型评估1、文本评价2、音频 ASR 性能3、音频翻译性能4、M3Exam 零样本结果5、视觉理解评估6、语言 tokenization 六、模型安全性和局限…...

QTreeView学习 branch 虚线设置
1、方法一: #include <QStyleFactory> ui.treeView->setStyle(QStyleFactory::create("windows")); 2、方法二: QString strtyle2 R"( QTreeView::branch:has-siblings:!adjoins-item { border-image: url(:/TreeViewDe…...

C++ 日志库 log4cpp 编译、压测及其范例代码 [全流程手工实践]
文章目录 一、 log4cpp官网二、下载三、编译1.目录结构如下2.configure 编译3.cmake 编译 四、测试五、压测源码及结果1.运行环境信息2.压测源码3.压测结果 文章内容:包含了对其linux上的完整使用流程,下载、编译、安装、测试用例尝试、以及一份自己写好…...

python数据处理与分析入门-pandas使用(4)
往期文章: pandas使用1pandas使用2pandas使用3 pandas使用技巧 创建一个DF对象 # 首先创建一个时间序列 dates pd.date_range(20180101, periods6) print(dates)# 创建DataFrame对象,指定index和columns标签 df pd.DataFrame(np.random.randn(6,4), …...