当前位置: 首页 > news >正文

23、Flink 的 Savepoints 详解

Savepoints
1.什么是 Savepoints

Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的镜像,可以使用 Savepoint 进行 Flink 作业的停止、重启或更新。

Savepoint 由两部分组成:稳定存储(例如 HDFS,S3,…) 上包含二进制文件的目录(通常很大)和元数据文件(相对较小)。

  • 稳定存储上的文件表示作业执行状态的数据镜像。
  • Savepoint 的元数据文件以(相对路径)的形式主要包含指向作为 Savepoint 的稳定存储上的所有文件的指针。
2.分配算子 ID
a)概述

建议通过 uid(String) 方法手动指定算子 ID ,这些 ID 将用于恢复每个算子的状态。

DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID

如果不手动指定 ID ,则会自动生成 ID ;只要这些 ID 不变,就可以从 Savepoint 自动恢复;生成的 ID 取决于程序的结构,并且对程序更改很敏感;强烈建议手动分配这些 ID 。

b)Savepoint 状态

可以将 Savepoint 想象为,每个有状态的算子保存一个映射 “算子 ID ->状态”:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分;默认情况下,尝试将 Savepoint 的每个条目映射回新程序。

3.算子
a)概述

可以使用 命令行客户端触发 Savepoint触发 Savepoint 并取消作业从 Savepoint 恢复,以及删除 Savepoint

从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复

b)触发 Savepoint

当触发 Savepoint 时,将创建一个新的 Savepoint 目录,用于存储数据和元数据;可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制该目录的位置。

注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统(或者对象存储系统)上的位置。

FsStateBackendRocksDBStateBackend 为例:

# Savepoint 目标目录
/savepoint/# Savepoint 目录
/savepoint/savepoint-:shortjobid-:savepointid/# Savepoint 文件包含 Checkpoint元数据
/savepoint/savepoint-:shortjobid-:savepointid/_metadata# Savepoint 状态
/savepoint/savepoint-:shortjobid-:savepointid/...

从 1.11.0 开始,可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。

如下两种情况不支持 savepoint 目录的移动:

1)启用了 entropy injection :此时 savepoint 目录不包含所有的数据文件,因为注入的路径会分散在各个路径中;由于缺乏一个共同的根目录,因此 savepoint 将包含绝对路径,从而导致无法支持 savepoint 目录的迁移。

2)作业包含了 task-owned state(比如 GenericWriteAhreadLog sink)。

和 savepoint 不同,checkpoint 不支持任意移动文件,因为 checkpoint 可能包含一些文件的绝对路径。

如果使用 MemoryStateBackend 的话,metadata 和 savepoint 的数据都会保存在 _metadata 文件中。

Savepoint 格式

可以在 savepoint 的两种二进制格式之间进行选择:

  • 标准格式 - 一种在所有 state backends 间统一的格式,允许使用一种状态后端创建 savepoint 后,使用另一种状态后端恢复这个 savepoint。这是最稳定的格式,旨在与之前的版本、模式、修改等保持最大兼容性。
  • 原生格式 - 标准格式的缺点是它的创建和恢复速度通常很慢。原生格式以特定于使用的状态后端的格式创建快照(例如 RocksDB 的 SST 文件)。

以原生格式创建 savepoint 的能力在 Flink 1.15 中引入,在那之前 savepoint 都是以标准格式创建的。

触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]

将触发 ID 为 :jobId 的作业的 Savepoint,并返回创建的 Savepoint 路径,此路径可用来恢复和删除 Savepoint ;也可以指定创建 Savepoint 的格式,如果没有指定,会采用标准格式创建 Savepoint。

$ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory]

使用上述命令触发 savepoint 时,client 需要等待 savepoint 制作完成,因此当任务的状态较大时,可能会导致 client 出现超时的情况,可以使用 detach 模式来触发savepoint。

$ bin/flink savepoint :jobId [:targetDirectory] -detached

使用该命令时,client 拿到本次 savepoint 的 trigger id 后立即返回,可以通过 REST API 来监控本次 savepoint 的制作情况。

使用 YARN 触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint,并返回创建的 Savepoint 的路径。

使用 Savepoint 停止作业

$ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId

将自动触发 ID 为 :jobid 的作业的 Savepoint,并停止该作业;可以指定一个目标文件系统目录来存储 Savepoint ,该目录需要能被 JobManager(s) 和 TaskManager(s) 访问;可以指定创建 Savepoint 的格式,如果没有指定,会采用标准格式创建 Savepoint。

如果想使用 detach 模式触发 Savepoint,在命令行后添加选项-detached即可。

c)从 Savepoint 恢复
$ bin/flink run -s :savepointPath [:runArgs]

将提交作业并指定要从中恢复的 Savepoint ,可以给出 Savepoint 目录或 _metadata 文件的路径。

跳过无法映射的状态恢复

默认,resume 操作将尝试将 Savepoint 的所有状态映射回要还原的程序,如果删除了运算符,则可以通过 --allowNonRestoredState(short:-n)选项跳过无法映射到新程序的状态:

Restore 模式

Restore 模式 决定了在 restore 之后谁拥有 Savepoint 或者 externalized checkpoint 的文件的所有权。

快照可被用户或者 Flink 自身拥有,如果快照归用户所有,Flink 不会删除其中的文件,而且 Flink 不能依赖该快照中文件的存在,因为它可能在 Flink 的控制之外被删除。

每种 restore 模式都有特定的用途,默认的 NO_CLAIM 模式在大多数情况下是一个很好的折中方案,因为它在提供明确的所有权归属的同时只给恢复后第一个 checkpoint 带来较小的代价。

可以通过如下方式指定 restore 模式:

$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]

NO_CLAIM (默认的)

NO_CLAIM 模式下,Flink 不会接管快照的所有权,它会将快照的文件置于用户的控制之中,并且永远不会删除其中的任何文件,该模式下可以从同一个快照上启动多个作业。

为保证 Flink 不会依赖于该快照的任何文件,它会强制第一个(成功的) checkpoint 为全量 checkpoint 而不是增量的,仅对state.backend: rocksdb 有影响,因为其他 backend 总是创建全量 checkpoint。

一旦第一个全量的 checkpoint 完成后,所有后续的 checkpoint 会照常创建,当第一个 checkpoint 成功制作,就可以删除原快照;在此之前不能删除原快照,因为没有任何完成的 checkpoint,Flink 会在故障时尝试从初始的快照恢复。

在这里插入图片描述

CLAIM

CLAIM 模式下 Flink 将声称拥有快照的所有权,并且本质上将其作为 checkpoint 对待:控制其生命周期并且可能会在其永远不会被用于恢复的时候删除它;手动删除快照和从同一个快照上启动两个作业都是不安全的,Flink 会保持配置数量的 checkpoint。

在这里插入图片描述

注意:

  1. Retained checkpoints 被存储在 //chk- 的目录中。Flink 不会接管 / 目录的所有权,而只会接管 chk- 的所有权。Flink 不会删除旧作业的目录。
  2. Native 格式支持增量的 RocksDB savepoints。对于这些 savepoints,Flink 将所有 SST 存储在 savepoints 目录中。即这些 savepoints 是自包含和目录可移动的。然而,在 CLAIM 模式下恢复时,后续的 checkpoints 可能会复用一些 SST 文件,这反过来会阻止在 savepoints 被清理时删除 savepoints 目录。 Flink 之后运行期间可能会删除复用的SST 文件,但不会删除 savepoints 目录。因此,如果在 CLAIM 模式下恢复,Flink 可能会留下一个空的 savepoints 目录。

LEGACY (已废弃)

Legacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永远不会删除初始恢复的 checkpoint。同时,用户也不清楚是否可以删除它,原因是 Flink 会在用来恢复的 checkpoint 之上创建增量的 checkpoint,因此后续的 checkpoint 都有可能会依赖于用于恢复的那个 checkpoint,即恢复的 checkpoint 的所有权没有明确的界定。

在这里插入图片描述

注意: LEGACY 模式已经被废弃,在 Flink 2.0 版本将会被移除。请使用 CLAIM 或 NO_CLAIM 模式。

d)删除 Savepoint
$ bin/flink savepoint -d :savepointPath

删除存储在 :savepointPath 中的 Savepoint。

注意:还可以通过常规文件系统操作手动删除 Savepoint ,而不会影响其他 Savepoint 或 Checkpoint。

e)配置

可以通过 state.savepoints.dir 配置 savepoint 的默认目录,触发 savepoint 时,将使用此目录来存储 savepoint;可以通过使用触发器命令指定自定义目标目录来覆盖缺省值。

# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目标目录,则触发 Savepoint 将失败。

注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置,例如,分布式文件系统上的位置。

4.F.A.Q

应该为作业中的所有算子分配 ID 吗?

根据经验,是的。 严格来说,仅通过 uid 方法给有状态算子分配 ID 就足够了。Savepoint 仅包含这些有状态算子的状态,无状态算子不是 Savepoint 的一部分。

在实践中,建议给所有算子分配 ID,因为 Flink 的一些内置算子(如 Window 算子)也是有状态的,而内置算子是否有状态并不很明显。 如果完全确定算子是无状态的,则可以跳过 uid 方法。

如果在作业中添加一个需要状态的新算子,会发生什么?

当向作业添加新算子时,它将在没有任何状态的情况下进行初始化。 Savepoint 包含每个有状态算子的状态。 无状态算子根本不是 Savepoint 的一部分。 新算子的行为类似于无状态算子。

如果从作业中删除有状态的算子会发生什么?

默认情况下,从 Savepoint 恢复时将尝试将所有状态分配给新作业。如果有状态算子被删除,则无法从 Savepoint 恢复。

可以通过使用 run 命令设置 --allowNonRestoredState (简称:-n )来允许删除有状态算子:

$ bin/flink run -s :savepointPath -n [:runArgs]

如果在作业中重新排序有状态算子,会发生什么?

如果给这些算子分配了 ID,它们将像往常一样恢复。

如果没有分配 ID ,则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致无法从以前的 Savepoint 恢复。

如果我添加、删除或重新排序作业中没有状态的算子,会发生什么?

如果将 ID 分配给有状态操作符,则无状态操作符不会影响 Savepoint 恢复。

如果没有分配 ID ,则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致无法从以前的Savepoint 恢复。

当在恢复时改变程序的并行度时会发生什么?

如果 Savepoint 是用 Flink >= 1.2.0 触发的,并且没有使用像 Checkpointed 这样不推荐的状态 API,那么可以简单地从 Savepoint 恢复程序并指定新的并行度。

如果正在从 Flink < 1.2.0 触发的 Savepoint 恢复,或者使用现在已经废弃的 api,那么首先必须将作业和 Savepoint 迁移到 Flink >= 1.2.0,然后才能更改并行度。

可以将 savepoint 文件移动到稳定存储上吗?

从 Flink 1.11.0 版本开始,savepoint 是自包含的,可以按需迁移 savepoint 文件后进行恢复。

相关文章:

23、Flink 的 Savepoints 详解

Savepoints 1.什么是 Savepoints Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的镜像&#xff0c;可以使用 Savepoint 进行 Flink 作业的停止、重启或更新。 Savepoint 由两部分组成&#xff1a;稳定存储&#xff08;例如 HDFS&#xff0c;S3&#xff…...

【Unity】Unity项目转抖音小游戏(二)云数据库和云函数

业务需求&#xff0c;开始接触一下抖音小游戏相关的内容&#xff0c;开发过程中记录一下流程。 抖音云官方文档&#xff1a;https://developer.open-douyin.com/docs/resource/zh-CN/developer/tools/cloud/develop-guide/cloud-function-debug 1.开通抖音云环境 抖音云地址&a…...

SpringBoot集成jasypt对yml文件指定参数加密并自定义@bean隐藏密钥

1、查看SpringBoot和jasypt对应版本。 Jasypt 1.9.x 通常与 Spring Boot 1.5.x 相对应。 Jasypt 2.1.x 通常与 Spring Boot 2.0.x 相对应。 Jasypt 3.x 通常与 Spring Boot 2.1.x相对应。 2、引入maven <dependency><groupId>com.github.ulisesbocchio</groupI…...

GDB的使用

即目标机直接使用GDB调试 源码安装&#xff1a; Index of /gnu/gdb 或者 wget https://ftp.gnu.org/gnu/gdb/gdb-8.3.1.tar.gz ./configure make main install 编译报错解决方法&#xff1a; 解决编译安装gdb-10.1 unistd.h:663:3: error: #error “Please include con…...

Linux处理用户输入

目录 一、传递参数 1.1 读取参数 1.2 读取脚本名 二、跟踪参数 三、移动参数 四、处理选项 4.1 查找选项 4.1.1 处理简单选项 4.1.2 分离参数和选项 4.1.3 处理含值的选项 五、选项标准化 5.1 使用 getopt 命令 5.1.1 命令格式 5.1.2 在脚本中使用getopt 5.2 使用…...

【代码笔记】高并发场景下问题解决思路

高并发指的是在单位时间内&#xff0c;瞬时流量激增&#xff0c;系统需要同时处理大量并行的请求或操作。这种情况通常出现在面向大量用户或服务的分布式系统中&#xff0c;尤其是当用户请求高度集中时&#xff0c;比如促销活动、秒杀活动、注册抢课、热点事件、定时任务调度等…...

【Docker系列】Linux部署Docker Compose

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

基于SSM的文化遗产的保护与旅游开发系统(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的文化遗产的保护与旅游开发系统&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;…...

整合springboot-mybatis时,MySQL数据库无法连接问题

整合springboot-mybatis时&#xff0c;MySQL数据库无法连接问题 解决步骤 先手动停止MySQL服务&#xff0c;在cmd后的控制台输入services.msc 找到MySql停止服务 修改配置文件&#xff0c;跳过验证 修改MySQL安装目录下的my.ini配置文件&#xff0c;使登录时跳过权限检查&a…...

C语言循环队列

以下是一个使用 C 语言实现的简单循环队列示例&#xff1a; #include <stdio.h> #include <stdlib.h>#define MAX_SIZE 5// 定义循环队列结构体 typedef struct {int items[MAX_SIZE];int front, rear; } Queue;// 初始化队列 void initQueue(Queue *q) {q->fr…...

Docker运行出现iptables: No chain/target/match by that name报错如何解决?

在尝试重启 Docker 容器时遇到的错误信息表明有关 iptables 的配置出了问题。这通常是因为 Docker 需要配置网络&#xff0c;而 iptables 规则没有正确设置或被意外删除。具体到你的错误信息中&#xff0c;报错 iptables: No chain/target/match by that name 表示 Docker 尝试…...

力扣 122. 买卖股票的最佳时机 II python AC

动态规划 class Solution:def maxProfit(self, prices):pre float(inf)ans 0for now in prices:if now > pre:ans now - prepre nowreturn ans定义一个变量保存上一步位置最小的数值&#xff0c;来模拟dp --遍历股票数值 --如果当前数值大于上一次&#xff0c;将股票卖…...

F5 BIG-IP Next Central Manager SQL注入漏洞(CVE-2024-26026、CVE-2024-21793)

0x01 产品简介 BIG-IP Next Central Manager是BIG-IP Next的原生默认用户界面,它可跨平台管理BIG-IP Next实例。BIG-IP Next是F5 Networks公司推出的一款下一代BIG-IP软件,提供了多云应用安全和应用交付服务。 0x02 漏洞概述 CVE-2024-26026:BIG-IP Next Central Manager…...

Python3 笔记:循环结构 for语句

for语句是Python语言中构造循环结构程序的语句之一。 Python中for语句是通过循环遍历某一序列对象&#xff08;字符串、列表、元组或字典&#xff09;来构建循环&#xff0c;循环结束的条件就是对象被遍历完。 for循环基本语法格式&#xff1a; for 循环变量 in 遍历对象: …...

信息化与数字化的区别在哪里?

信息化与数字化虽然密切相关&#xff0c;但它们在核心理念、实施范围、目标定位、以及对企业的影响上存在本质区别&#xff1a; 1.中心与目标不同&#xff1a; • 信息化通常以流程为中心&#xff0c;致力于提高工作效率&#xff0c;通过信息技术优化和自动化企业内部的流程。…...

记录MySQL数据库查询不等于xxx时的坑

目录 一、背景 二、需求 三、方法 四、示例 一、背景 在使用MySQL数据库查询数据时&#xff0c;需要查询字段name不等于xxx的记录&#xff0c;通过where name ! xxx查询出来的记录不符合预期&#xff0c;通过检查发现少了name字段为null的记录&#xff0c;后经查询得知在My…...

QT的创建,发现编译器有一个黄色三角形感叹号,提示说Cmake配置错误,该怎么办?

确保你安装了Cmake 2.如果你电脑之前已经装了Cmake&#xff0c;那么在qt安装中&#xff0c;即便你选择了Cmake版本&#xff0c;但依旧不会修改电脑的Cmake版本。这时候就会出现黄色箭头。在勾勾的地方会有一个黄色感叹符号&#xff08;我已经解决了&#xff0c;所以没有显示&a…...

0506libMaven项目

0506libMaven项目包-CSDN博客 数据库字段 界面需求...

HTML飘落的花瓣

目录 写在前面 HTML​​​​​​​简介 完整代码 代码分析 系列推荐 写在最后 写在前面 本期小编给大家推荐HTML实现的飘落的花瓣&#xff0c;无需安装软件&#xff0c;直接下载即可打开~ HTML​​​​​​​简介 HTML&#xff08;Hypertext Markup Language&#xff…...

一个小调整,竟然让交换机、路由器的CPU占用率降低了50%

号主&#xff1a;老杨丨11年资深网络工程师&#xff0c;更多网工提升干货&#xff0c;请关注公众号&#xff1a;网络工程师俱乐部 下午好&#xff0c;我的网工朋友。 在信息时代下&#xff0c;不仅仅在网络工程领域&#xff0c;高CPU占用率都是一个非常常见的问题&#xff0c;…...

Vim 调用外部命令学习笔记

Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…...

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...

java_网络服务相关_gateway_nacos_feign区别联系

1. spring-cloud-starter-gateway 作用&#xff1a;作为微服务架构的网关&#xff0c;统一入口&#xff0c;处理所有外部请求。 核心能力&#xff1a; 路由转发&#xff08;基于路径、服务名等&#xff09;过滤器&#xff08;鉴权、限流、日志、Header 处理&#xff09;支持负…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

JVM垃圾回收机制全解析

Java虚拟机&#xff08;JVM&#xff09;中的垃圾收集器&#xff08;Garbage Collector&#xff0c;简称GC&#xff09;是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象&#xff0c;从而释放内存空间&#xff0c;避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

QT: `long long` 类型转换为 `QString` 2025.6.5

在 Qt 中&#xff0c;将 long long 类型转换为 QString 可以通过以下两种常用方法实现&#xff1a; 方法 1&#xff1a;使用 QString::number() 直接调用 QString 的静态方法 number()&#xff0c;将数值转换为字符串&#xff1a; long long value 1234567890123456789LL; …...

【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具

第2章 虚拟机性能监控&#xff0c;故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令&#xff1a;jps [options] [hostid] 功能&#xff1a;本地虚拟机进程显示进程ID&#xff08;与ps相同&#xff09;&#xff0c;可同时显示主类&#x…...