Flink四大基石之CheckPoint(检查点) 的使用详解
目录
一、Checkpoint 剖析
State 与 Checkpoint 概念区分
设置 Checkpoint 实战
执行代码所需的服务与遇到的问题
二、重启策略解读
重启策略意义
代码示例与效果展示
三、SavePoint
与 Checkpoint 异同
操作步骤详解
四、总结
在大数据流式处理领域,Apache Flink 凭借其卓越的性能和强大的功能占据重要地位。而理解 Flink 中的 Checkpoint(检查点)、重启策略以及 SavePoint(保存点)这些关键概念,对于保障流处理任务的稳定性、容错性以及可维护性至关重要。本文将深入剖析它们的原理、用法,并结合实际代码示例展示其效果,希望能帮助大家更好地掌握 Flink 相关知识。

一、Checkpoint 剖析
State 与 Checkpoint 概念区分
State(状态)
在 Flink 中,State 代表某一个 Operator(算子)在某一时刻的状态,像常见的聚合算子 maxBy、sum 等操作过程中就会维护状态信息。比如在对数据流按某个字段做 sum 聚合时,它需要记住历史数据以便持续累加计算,并且这些状态数据默认存于内存之中,为算子的持续、准确运行提供依据。
Checkpoint(检查点 / 快照点)
它是 Flink 中所有有状态的 Operator 在某一个特定时刻的 State 快照信息汇总,也就是 State 的存档记录。可以简单理解为对整个作业运行时状态拍了一张 “照片”,定格所有相关算子彼时的状态,方便后续在故障恢复等场景使用。
设置 Checkpoint 实战
以下是一段设置 Checkpoint 的 Flink Java 代码示例:
package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01CheckPointDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 在windows运行,将数据提交hdfs,会出现权限问题,使用这个语句解决。System.setProperty("HADOOP_USER_NAME", "root");// 在这个基础之上,添加快照// 第一句:开启快照,每隔1s保存一次快照env.enableCheckpointing(1000);// 第二句:设置快照保存的位置env.setStateBackend(new FsStateBackend("hdfs://bigdata01:9820/flink/checkpoint"));// 第三句: 通过webui的cancel按钮,取消flink的job时,不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");return Tuple2.of(arr[0], Integer.valueOf(arr[1]));}});//3. transformation-数据处理转换SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapStream.keyBy(0).sum(1);result.print();//4. sink-数据输出//5. execute-执行env.execute();}
}
执行代码所需的服务与遇到的问题
启动本地的nc, 启动hdfs服务。
启动代码,发现有权限问题:
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/flink":root:supergroup:drwxr-xr-x
解决方案:
System.setProperty("HADOOP_USER_NAME", "root");
在设置检查点之前,设置一句这样带权限的语句,如果是集群运行中,不存在该问题。可以不设置!!!

查看快照情况:
运行,刷新查看checkpoint保存的数据,它会先生成一个新的文件夹,然后再删除老的文件夹,在某一时刻,会出现两个文件夹同时存在的情况。


启动HDFS、Flink
start-dfs.sh
start-cluster.sh
数据是保存了,但是并没有起作用,想起作用需要在集群上运行,以下演示集群上的效果:
第一次运行的时候
在本地先clean, 再package ,再Wagon一下:

flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jarflink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar记得,先启动nc ,再启动任务,否则报错!
通过nc -lk 9999 输入以下内容:


想查看运行结果,可以通过使用的slot数量判断一下:


取消flink job的运行

查看一下这次的单词统计到哪个数字了:

第二次运行的时候
flink run -c 全类名 -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34 /opt/app/flink-test-1.0-SNAPSHOT.jar启动:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
-s 指定从checkpoint目录恢复状态数据 ,注意每个人都不一样
从上一次离开时,截止的checkpoint目录


观察数据:输入一个hello,1 得到新的结果hello,8

二、重启策略解读
重启策略意义
流式数据如同永不干涸的河流持续流淌,一旦因某条错误数据致使程序异常退出,后续海量数据丢失风险极高,对企业而言,这意味着数据资产受损、业务分析结果偏差等严重后果,重启策略应运而生。它作为独立策略,与 Checkpoint 虽无必然绑定关系(即便没配置 Checkpoint 也能单独配置重启策略),却在保障程序持续运行层面协同发挥关键作用。
一个流在运行过程中,假如出现了程序异常问题,可以进行重启,比如,在代码中人为添加一些异常:

进行wordcount时,输入了一个bug,1 人为触发异常。

注意:此时如果有checkpoint ,是不会出现异常的,需要将checkpoint的代码关闭,再重启程序。会发现打印了异常,那为什么checkpoint的时候不打印,因为并没有log4j的配置文件,需要搞一个这样的配置文件才行。
程序中添加log4j.properties的代码:
# Global logging configuration
# Debug info warn error
log4j.rootLogger=debug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapper=TRACE
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

开启检查点之后,报错了程序还在运行是因为开启检查点之后,程序会进行自动重启(无限重启【程序错了才重启】)。
//开启checkpoint,默认是无限重启,可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());//重启3次,重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内重启3次,重启时间间隔是5s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);env.execute("checkpoint自动重启"); //最后一句execute可以设置jobName,显示在8081界面
程序如果上传至服务器端运行,可以看到重启状态

代码示例与效果展示
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;public class Demo02 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 代码中不能有checkpoint,不是说checkpoint不好,而是太好了,它已经自带重试机制了。而且是无限重启的// 通过如下方式可将重试机制关掉// env.setRestartStrategy(RestartStrategies.noRestart());//// 两种办法// 第一种办法:重试3次,每一次间隔10S//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 第二种写法:在2分钟内,重启3次,每次间隔10senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)));//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("bigdata01", 8899);streamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] arr = value.split(",");String word = arr[0];if(word.equals("bug")){throw new Exception("有异常,服务会挂掉.....");}// 将一个字符串变为int类型int num = Integer.valueOf(arr[1]);// 第二种将字符串变为数字的方法System.out.println(Integer.parseInt(arr[1]));Tuple2<String, Integer> tuple2 = new Tuple2<>(word,num);// 还有什么方法? 第二种创建tuple的方法Tuple2<String, Integer> tuple2_2 = Tuple2.of(word,num);return tuple2;}}).keyBy(tuple->tuple.f0).sum(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}
在此代码中人为在 map 函数里设置异常触发点(输入包含 “bug” 的数据时抛出异常)。若开启 Checkpoint,因它自带重试机制(默认无限重启),异常可能被掩盖,需关闭 Checkpoint 相关代码才能看到异常打印情况。同时,要完整看到重启策略效果(如按设定的次数、间隔重启),需打包代码上传至集群运行,本地测试难以呈现完整现象,且提交时务必确认使用的类名准确无误。
三、SavePoint
与 Checkpoint 异同
相同点
本质都是对 Flink 作业状态的一种保存方式,以便后续恢复作业时复用状态,保障数据处理连贯性。
不同点
Checkpoint 是 Flink 自动按设定规则周期性完成 State 快照保存,旨在应对故障自动恢复场景;而 SavePoint 是手动触发的快照操作,提供更灵活的作业状态管理时机,比如在版本升级、业务规则调整需暂停并后续重启作业场景发挥优势。
操作步骤详解
提交作业并输入数据
提交含重启策略代码打包成的 jar 包运行作业(类似 flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar),输入数据观察单词对应数字变化。

执行 SavePoint 操作
以下是 --> 停止flink job,并且触发savepoint操作
flink stop --savepointPath hdfs://bigdata01:9820/flink-savepoint 152e493da9cdeb327f6cbbad5a7f8e41后面的序号为Job 的ID以下是 --> 不会停止flink的job,只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint
备注:如何正确停止一个 flink 的任务
flink stop 6a27b580aa5c6b57766ae6241d9270ce(任务编号)
查看与重启作业

查看最近完成作业对应的 SavePoint,之后依据之前保存路径重启作业(flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar),再次输入数据可看到基于之前状态的累加效果。

此外,在集群运行 Flink 程序时,默认并行度常为 1,它不会按照机器的CPU核数,而是按照配置文件中的一个默认值运行的。比如:flink-conf.yaml

web-ui 界面提交作业:


这个图形化界面,跟我们使用如下命令是一个效果:
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
四、总结
通过对 Flink 中 Checkpoint、重启策略和 SavePoint 的详细解读与代码实践展示,我们明晰它们各自在保障流处理任务稳定、容错与灵活运维层面的独特价值。合理运用这些机制,能助我们打造更健壮、高效的 Flink 大数据处理应用,从容应对复杂多变的业务需求与运行环境挑战,后续大家可在实际项目中深入实践优化,挖掘其更大潜力。
相关文章:
Flink四大基石之CheckPoint(检查点) 的使用详解
目录 一、Checkpoint 剖析 State 与 Checkpoint 概念区分 设置 Checkpoint 实战 执行代码所需的服务与遇到的问题 二、重启策略解读 重启策略意义 代码示例与效果展示 三、SavePoint 与 Checkpoint 异同 操作步骤详解 四、总结 在大数据流式处理领域,Ap…...
JVM 常见面试题及解析(2024)
目录 一、JVM 基础概念 二、JVM 内存结构 三、类加载机制 四、垃圾回收机制 五、性能调优 六、实战问题 七、JVM 与其他技术结合 八、JVM 内部机制深化 九、JVM 相关概念拓展 十、故障排查与异常处理 一、JVM 基础概念 1、什么是 JVM?它的主要作用是…...
Python 调用 Umi-OCR API 批量识别图片/PDF文档数据
目录 一、需求分析 二、方案设计(概要/详细) 三、技术选型 四、OCR 测试 Demo 五、批量文件识别完整代码实现 六、总结 一、需求分析 市场部同事进行采购或给客户报价时,往往基于过往采购合同数据,给出现在采购或报价的金额…...
K8S资源之secret资源
secret资源介绍 secret用于敏感数据存储,底层基于base64编码,数据存储在etcd数据库中 应用场景举例: 数据库的用户名,密码,tls的证书ssh等服务的相关证书 secret的基础管理 1 在命令行响应式创建 1.响应式创建 …...
QT:信号和槽01
QT中什么是信号和槽 概念解释 在 Qt 中,信号(Signals)和槽(Slots)是一种用于对象间通信的机制。信号是对象发出的事件通知,而槽是接收并处理这些通知的函数。 例如,当用户点击一个按钮时&#…...
针对Qwen-Agent框架的Function Call及ReAct的源码阅读与解析:Agent基类篇
文章目录 Agent继承链Agent类总体架构初始化方法`__init__` 方法:`_init_tool` 方法:对话生成方法`_call_llm` 方法:工具调用方法`_call_tool` 方法:`_detect_tool` 方法:整体执行方法`run` 方法:`_run` 方法:`run_nonstream` 方法总结回顾本文在 基于Qwen-Agent框架的Functio…...
XML 查看器:深入理解与高效使用
XML 查看器:深入理解与高效使用 XML(可扩展标记语言)是一种用于存储和传输数据的标记语言。它通过使用标签来定义数据结构,使得数据既易于人类阅读,也易于机器解析。在本文中,我们将探讨 XML 查看器的功能、重要性以及如何高效使用它们。 什么是 XML 查看器? XML 查看…...
《Vue零基础入门教程》第十五课:样式绑定
往期内容 《Vue零基础入门教程》第六课:基本选项 《Vue零基础入门教程》第八课:模板语法 《Vue零基础入门教程》第九课:插值语法细节 《Vue零基础入门教程》第十课:属性绑定指令 《Vue零基础入门教程》第十一课:事…...
以AI算力助推转型升级,暴雨亮相CCF中国存储大会
2024年11月29日-12月1日,CCF中国存储大会(CCF ChinaStorage 2024)在广州市长隆国际会展中心召开。本次会议以“存力、算力、智力”为主题,由中国计算机学会(CCF)主办,中山大学计算机学院、CCF信…...
【VMware】Ubuntu 虚拟机硬盘扩容教程(Ubuntu 22.04)
引言 想装个 Anaconda,发现 Ubuntu 硬盘空间不足。 步骤 虚拟机关机 编辑虚拟机设置 扩展硬盘容量 虚拟机开机 安装 gparted sudo apt install gparted启动 gparted sudo gparted右键sda3,调整分区大小 新大小拉满 应用全部操作 调整完成...
3D Bounce Ball Game 有什么技巧吗?
关于3D Bounce Ball Game(3D弹球游戏)的开发,以下是一些具体的技巧和实践建议: 1. 物理引擎的使用: 在Unity中,使用Rigidbody组件来为游戏对象添加物理属性,这样可以让物体受到重力影响并发…...
【SQL】实战--组合两个表
题目描述 表: Person ---------------------- | 列名 | 类型 | ---------------------- | PersonId | int | | FirstName | varchar | | LastName | varchar | ---------------------- personId 是该表的主键(具有唯一值的列)…...
Spring基于注解实现 AOP 切面功能
前言 在Spring AOP(Aspect-Oriented Programming)中,动态代理是常用的技术之一,用于在运行时动态地为目标对象生成代理对象, 并拦截其方法调用。Spring AOP 默认使用两种类型的动态代理机制:JDK 动态代理和…...
设计模式 更新ing
设计模式 1、六大原则1.1 单一设计原则 SRP1.2 开闭原则1.3 里氏替换原则1.4 迪米特法则1.5 接口隔离原则1.6 依赖倒置原则 2、工厂模式 1、六大原则 1.1 单一设计原则 SRP 一个类应该只有一个变化的原因 比如一个视频软件,区分不同的用户级别 包括访客࿰…...
Elasticsearch 进阶
核心概念 索引(Index) 一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必须全部是小写字母),并且当我们要对这个索…...
【AI】Sklearn
长期更新,建议关注、收藏、点赞。 友情链接: AI中的数学_线代微积分概率论最优化 Python numpy_pandas_matplotlib_spicy 建议路线:机器学习->深度学习->强化学习 目录 预处理模型选择分类实例: 二分类比赛 网格搜索实例&…...
通过 JNI 实现 Java 与 Rust 的 Channel 消息传递
做纯粹的自己。“你要搞清楚自己人生的剧本——不是父母的续集,不是子女的前传,更不是朋友的外篇。对待生命你不妨再大胆一点,因为你好歹要失去它。如果这世上真有奇迹,那只是努力的另一个名字”。 一、crossbeam_channel 参考 crossbeam_channel - Rust crossbeam_channel…...
【老白学 Java】对象的起源 Object
对象的起源 Object 文章来源:《Head First Java》修炼感悟。 上一篇文章中,老白学习了抽象类和抽象方法,不禁感慨,原来 Java 还可以这样玩。 同时又有了新的疑问,这些父类从何而来的? 本篇文章老白来聊一聊…...
Ubuntu Linux操作系统
一、 安装和搭建 Thank you for downloading Ubuntu Desktop | Ubuntu (这里我们只提供一个下载地址,详细的下载安装可以参考其他博客) 二、ubuntu的用户使用 2.1 常规用户登陆方式 在系统root用户是无法直接登录的,因为root用户的权限过…...
SpringBoot 打造的新冠密接者跟踪系统:企业复工复产防疫保障利器
摘 要 信息数据从传统到当代,是一直在变革当中,突如其来的互联网让传统的信息管理看到了革命性的曙光,因为传统信息管理从时效性,还是安全性,还是可操作性等各个方面来讲,遇到了互联网时代才发现能补上自古…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
C++:std::is_convertible
C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...
五年级数学知识边界总结思考-下册
目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...
LLM基础1_语言模型如何处理文本
基于GitHub项目:https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken:OpenAI开发的专业"分词器" torch:Facebook开发的强力计算引擎,相当于超级计算器 理解词嵌入:给词语画"…...
OpenLayers 分屏对比(地图联动)
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能,和卷帘图层不一样的是,分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...
mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包
文章目录 现象:mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时,可能是因为以下几个原因:1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...
让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比
在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...
JavaScript 数据类型详解
JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型(Primitive) 和 对象类型(Object) 两大类,共 8 种(ES11): 一、原始类型(7种) 1. undefined 定…...
MySQL:分区的基本使用
目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区(Partitioning)是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分(分区)可以独立存储、管理和优化,…...
离线语音识别方案分析
随着人工智能技术的不断发展,语音识别技术也得到了广泛的应用,从智能家居到车载系统,语音识别正在改变我们与设备的交互方式。尤其是离线语音识别,由于其在没有网络连接的情况下仍然能提供稳定、准确的语音处理能力,广…...
