【ETCD】【源码阅读】深入解析 EtcdServer.applySnapshot方法
今天我们来一步步分析ETCD中applySnapshot函数
一、函数完整代码
函数的完整代码如下:
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {if raft.IsEmptySnap(apply.snapshot) {return}applySnapshotInProgress.Inc()lg := s.Logger()lg.Info("applying snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)defer func() {lg.Info("applied snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)applySnapshotInProgress.Dec()}()if apply.snapshot.Metadata.Index <= ep.appliedi {lg.Panic("unexpected leader snapshot from outdated index",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)}// wait for raftNode to persist snapshot onto the disk<-apply.notifycnewbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)if err != nil {lg.Panic("failed to open snapshot backend", zap.Error(err))}// We need to set the backend to consistIndex before recovering the lessor,// because lessor.Recover will commit the boltDB transaction, accordingly it// will get the old consistent_index persisted into the db in OnPreCommitUnsafe.// Eventually the new consistent_index value coming from snapshot is overwritten// by the old value.s.consistIndex.SetBackend(newbe)// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.if s.lessor != nil {lg.Info("restoring lease store")s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })lg.Info("restored lease store")}lg.Info("restoring mvcc store")if err := s.kv.Restore(newbe); err != nil {lg.Panic("failed to restore mvcc store", zap.Error(err))}newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))// Closing old backend might block until all the txns// on the backend are finished.// We do not want to wait on closing the old backend.s.bemu.Lock()oldbe := s.bego func() {lg.Info("closing old backend file")defer func() {lg.Info("closed old backend file")}()if err := oldbe.Close(); err != nil {lg.Panic("failed to close old backend", zap.Error(err))}}()s.be = newbes.bemu.Unlock()lg.Info("restoring alarm store")if err := s.restoreAlarms(); err != nil {lg.Panic("failed to restore alarm store", zap.Error(err))}lg.Info("restored alarm store")if s.authStore != nil {lg.Info("restoring auth store")s.authStore.Recover(newbe)lg.Info("restored auth store")}lg.Info("restoring v2 store")if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {lg.Panic("failed to restore v2 store", zap.Error(err))}if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {lg.Panic("illegal v2store content", zap.Error(err))}lg.Info("restored v2 store")s.cluster.SetBackend(newbe)lg.Info("restoring cluster configuration")s.cluster.Recover(api.UpdateCapability)lg.Info("restored cluster configuration")lg.Info("removing old peers from network")// recover raft transports.r.transport.RemoveAllPeers()lg.Info("removed old peers from network")lg.Info("adding peers from new cluster configuration")for _, m := range s.cluster.Members() {if m.ID == s.ID() {continue}s.r.transport.AddPeer(m.ID, m.PeerURLs)}lg.Info("added peers from new cluster configuration")ep.appliedt = apply.snapshot.Metadata.Termep.appliedi = apply.snapshot.Metadata.Indexep.snapi = ep.appliediep.confState = apply.snapshot.Metadata.ConfState
}
二、函数功能概览
上述函数的核心功能是 应用一个来自 Raft 协议的快照,并在应用过程中恢复系统的各个关键组件,以确保系统的状态与最新的快照一致。具体来说,函数完成了以下核心任务:
-
检查快照有效性:判断快照是否为空或过时,如果无效则提前退出。
-
记录快照应用的开始和结束:通过日志记录快照应用的开始和结束,同时更新相关的监控指标。
-
等待并加载快照数据:等待 Raft 节点将快照持久化到磁盘,并打开快照后端。
-
恢复一致性索引:将新的快照后端设置为一致性索引的后端,确保一致性。
-
恢复存储组件:
- 租约存储(lease store):恢复与租约相关的数据。
- MVCC 存储:恢复多版本并发控制(MVCC)存储。
- 报警存储:恢复报警数据。
- 认证存储:恢复认证相关数据(如果存在)。
- V2 存储:恢复 V2 存储,并进行合法性检查。
-
恢复集群配置:更新集群配置,并确保集群的一致性。
-
更新 Raft 网络成员:移除旧的集群成员并添加新的集群成员到网络中。
-
更新应用进度:更新快照的任期、索引等应用进度信息。
三、函数详细分析
好的,接下来我将逐步解析这段代码,并用中文进行解释。
1. 检查快照是否为空
if raft.IsEmptySnap(apply.snapshot) {return
}
- 这段代码判断传入的快照是否为空。如果快照为空(即没有数据需要恢复),则直接返回,结束该函数的执行。
2. 增加快照应用中的度量
applySnapshotInProgress.Inc()
- 这行代码会将
applySnapshotInProgress
计数器增加 1,表示当前有一个快照正在被应用。这个计数器通常用于监控系统中,帮助跟踪正在进行的操作。
3. 日志记录快照应用的开始
lg := s.Logger()
lg.Info("applying snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
- 获取日志记录器
lg
,并记录一条信息级别的日志,说明快照正在被应用。 - 日志中包括当前的快照索引、已应用的索引以及来自领导者的快照的索引和任期(
term
)。
4. 使用 defer
确保快照应用完成后记录日志
defer func() {lg.Info("applied snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)applySnapshotInProgress.Dec()
}()
- 使用
defer
语句确保在函数结束时,记录一条日志表示快照应用已经完成。 - 同时,减少
applySnapshotInProgress
计数器,表示快照应用过程结束。
5. 检查快照的索引是否过时
if apply.snapshot.Metadata.Index <= ep.appliedi {lg.Panic("unexpected leader snapshot from outdated index",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)
}
- 这里会检查传入的快照索引是否小于等于已应用的索引
ep.appliedi
。如果是,说明接收到的快照来自一个过时的领导者,这会导致系统崩溃(通过lg.Panic
打印错误日志并触发 panic)。
6. 等待 Raft 节点将快照持久化到磁盘
<-apply.notifyc
- 等待一个信号,确保 Raft 节点已将快照持久化到磁盘。
apply.notifyc
是一个通道,程序会在此处阻塞,直到 Raft 节点完成快照的持久化操作。
7. 打开新的快照后端
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {lg.Panic("failed to open snapshot backend", zap.Error(err))
}
- 这行代码通过
openSnapshotBackend
函数打开新的快照后端(即读取快照的存储),如果打开失败,则记录错误日志并触发 panic。
8. 设置新的后端为一致性索引
s.consistIndex.SetBackend(newbe)
- 将新的快照后端设置为一致性索引的后端。这是为了确保一致性索引能够正确地与新的快照数据同步。
9. 恢复租约存储(lease store)
if s.lessor != nil {lg.Info("restoring lease store")s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })lg.Info("restored lease store")
}
- 如果系统中有
lessor
(负责管理租约的组件),则会从新快照后端恢复租约存储。 - 恢复过程中,会在事务回滚时写入 KV 存储。
10. 恢复 MVCC 存储
lg.Info("restoring mvcc store")
if err := s.kv.Restore(newbe); err != nil {lg.Panic("failed to restore mvcc store", zap.Error(err))
}
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
- 接下来,恢复 MVCC(多版本并发控制)存储,如果恢复失败,则触发 panic。
- 恢复成功后,记录恢复的日志,包括一致性索引。
11. 关闭旧的后端
s.bemu.Lock()
oldbe := s.be
go func() {lg.Info("closing old backend file")defer func() {lg.Info("closed old backend file")}()if err := oldbe.Close(); err != nil {lg.Panic("failed to close old backend", zap.Error(err))}
}()
s.be = newbe
s.bemu.Unlock()
- 使用锁 (
bemu.Lock
) 来确保线程安全地切换后端文件。 - 在一个新的 goroutine 中关闭旧的快照后端,防止在关闭过程中阻塞主线程。
- 更新
s.be
为新的快照后端,并解锁。
12. 恢复报警存储
lg.Info("restoring alarm store")
if err := s.restoreAlarms(); err != nil {lg.Panic("failed to restore alarm store", zap.Error(err))
}
lg.Info("restored alarm store")
- 恢复报警存储,如果恢复失败,则触发 panic。
13. 恢复认证存储
if s.authStore != nil {lg.Info("restoring auth store")s.authStore.Recover(newbe)lg.Info("restored auth store")
}
- 如果存在认证存储(
authStore
),则恢复认证存储。
14. 恢复 V2 存储
lg.Info("restoring v2 store")
if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {lg.Panic("failed to restore v2 store", zap.Error(err))
}
if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {lg.Panic("illegal v2store content", zap.Error(err))
}
lg.Info("restored v2 store")
- 恢复 V2 存储,并进行检查,确保没有非法的 V2 存储内容。
15. 恢复集群配置
s.cluster.SetBackend(newbe)
lg.Info("restoring cluster configuration")
s.cluster.Recover(api.UpdateCapability)
lg.Info("restored cluster configuration")
- 将集群配置恢复到新的快照后端,并恢复集群配置。
16. 移除旧的网络成员
lg.Info("removing old peers from network")
s.r.transport.RemoveAllPeers()
lg.Info("removed old peers from network")
- 从网络中移除旧的集群成员。
17. 添加新的集群成员
lg.Info("adding peers from new cluster configuration")
for _, m := range s.cluster.Members() {if m.ID == s.ID() {continue}s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
lg.Info("added peers from new cluster configuration")
- 将新的集群成员添加到网络中。
18. 更新应用进度
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
- 更新应用进度(包括任期、索引等信息),确保与新的快照数据一致。
总结:这段代码的核心任务是应用一个来自 Raft 协议的快照。它通过多个步骤确保快照数据被正确地恢复到系统的各个存储组件中,同时进行了一系列的检查和恢复操作,确保系统的一致性和健康。
相关文章:
【ETCD】【源码阅读】深入解析 EtcdServer.applySnapshot方法
今天我们来一步步分析ETCD中applySnapshot函数 一、函数完整代码 函数的完整代码如下: func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {if raft.IsEmptySnap(apply.snapshot) {return}applySnapshotInProgress.Inc()lg : s.Logger()lg.In…...

HBase是什么,HBase介绍
官方网站:Apache HBase – Apache HBase Home HBase是一个分布式的、面向列的NoSQL数据库,主要用于存储和处理海量数据。它起源于Google的BigTable论文,是Apache Hadoop项目的子项目。HBase设计用于高可靠性、高性能和可伸…...
【Rust自学】3.3. 数据类型:复合类型
3.3.0. 写在正文之前 欢迎来到Rust自学的第三章,一共有6个小节,分别是: 变量与可变性数据类型:标量类型数据类型:复合类型(本文)函数和注释控制流:if else控制流:循环 通过第二章…...

【C++】小乐乐求和问题的高效求解与算法对比分析
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯问题描述与数学模型1.1 题目概述1.2 输入输出要求1.3 数学建模 💯方法一:朴素循环求和法2.1 实现原理2.2 分析与问题2.3 改进方案2.4 性能瓶颈与结论…...

configure错误:“C compiler cannot create executables“
执行./configure命令出现如下奇怪的错误,百思不得姐: ./configure命令的日志文件为config.log,发生错误时,该文件的内容: This file contains any messages produced by compilers while running configure, to aid d…...

PAT乙级 锤子剪刀布 巩固巩固map的使用
主要是想借这题巩固巩固c map的使用方法。 大家应该都会玩“锤子剪刀布”的游戏:两人同时给出手势,胜负规则如图所示: 现给出两人的交锋记录,请统计双方的胜、平、负次数,并且给出双方分别出什么手势的胜算最大。 输…...

Webpack学习笔记(1)
1.为什么使用webpack? webpack不仅可以打包js代码,并且那个且支持es模块化和commonjs,支持其他静态资源打包,如图片、字体。。。 2.如何解决作用域问题? 作用域问题:例如loadsh等库,会绑定window对象,会…...

使用xpath规则进行提取数据并存储
下载lxml !pip install lxmlimport requests headers{"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.95 Safari/537.36" } url"https://movie.douban.com/chart" respon…...

【物联网技术与应用】实验3:七彩LED灯闪烁
实验3 七彩LED灯闪烁 【实验介绍】 七彩LED灯上电后,7色动闪光LED模块可自动闪烁内置颜色。它可以用来制作相当吸引人的灯光效果。 【实验组件】 ● Arduino Uno主板* 1 ● USB数据线* 1 ● 7彩LED模块*1 ● 面包板*1 ● 9V方型电池*1 ● 跳线若干 【实验原…...

素数回文数的个数
素数回文数的个数 C语言代码C 代码Java代码Python代码 💐The Begin💐点点关注,收藏不迷路💐 求11到n之间(包括n),既是素数又是回文数的整数有多少个。 输入 一个大于11小于1000的整数n。 输出…...
车辆重识别代码笔记12.18
1、实例归一化(Instance Normalization)和批量归一化(Batch Normalization) 实例归一化(Instance Normalization): 计算步骤: 对于每个输入样本,在每个通道上分别计算均…...

selenium 在已打开浏览器上继续调试
关闭浏览器,终端执行如下指令,--user-data-dir换成自己的User Data路径 chrome.exe --remote-debugging-port9222 --user-data-dir"C:\Users\xxx\AppData\Local\Google\Chrome\User Data" 会打开浏览器,打开百度,如下状…...

Sentry日志管理thinkphp8 tp8 sentry9 sentry8 php8.x配置步骤, tp8自定义异常处理类使用方法
tp8的默认使用的就是composer来管理第三方包, 所以直接使用 composer 来安装 sentry9 即可. 同时tp8和tp5的配置方式不太一样, 这里我们直接使用自定义异常类来处理Sentry的异常. 1. 安装 sentry9 包 # 安装 sentry9 包 composer require "tekintian/sentry9-php" …...

【经验分享】容器云搭建的知识点
最近忙于备考没关注,有次点进某小黄鱼发现首页出现了我的笔记还被人收费了 虽然我也卖了一些资源,但我以交流、交换为主,笔记都是免费给别人看的 由于当时刚刚接触写的并不成熟,为了避免更多人花没必要的钱,所以决定公…...
Java对集合的操作方法
1. 数组转集合 //数组转集合 String[] split quickRechargeAmount.split(","); List<String> stringList Stream.of(split).collect(Collectors.toList()); 2. 对List集合数据内容进行分组 //对List集合数据内容进行分组 Map<String, List<LiveAppGi…...

FreeRTOS--基础知识
FreeRTOS基础知识 裸机与RTOS的特点: 裸机: 裸机又称为前后台系统,前台系统指的是中断服务函数,后台系统指的是大循环,即应用程序。 1、实时性差:应用程序轮流执行 2、delay:空等待ÿ…...

Node的学习以及学习通过Node书写接口并简单操作数据库
Node的学习 Node的基础上述是关于Node的一些基础,总结的还行; 利用Node书写接口并操作数据库 1. 初始化项目 创建新的项目文件夹,并初始化 package.json mkdir my-backend cd my-backend npm init -y2. 安装必要的依赖 安装Express.js&…...

【Linux探索学习】第二十二弹——用户缓冲区:深入解析操作系统中数据交互时的缓冲区机制
Linux学习笔记: https://blog.csdn.net/2301_80220607/category_12805278.html?spm1001.2014.3001.5482 前言: 前面两章我们已经讲了一些文件操作和文件重定向问题,以及一些相关的知识点,比如文件在内存中的存储位置࿰…...

Cesium-(Primitive)-(CylinderOutlineGeometry)
CylinderOutlineGeometry 以下是 CylinderOutlineGeometry 类的构造函数属性,以表格形式展示: 属性名类型默认值描述lengthnumber圆柱体的长度。topRadiusnumber圆柱体顶部的半径。bottomRadiusnumber圆柱体底部的半径。slicesnumber128可选,圆柱体周长的边数。numberOfVert…...
【ETCD】【源码阅读】深入分析 storeTxnWrite.Put方法源码
该方法是 storeTxnWrite 类型中的核心方法,负责将键值对存储到数据库,同时处理键的元数据(如版本、修订号、租约)并管理租约关联。 目录 一、完整代码二、方法详解方法签名1. 计算修订号并初始化变量2. 检查键是否已存在3. 生成索…...

超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...
k8s从入门到放弃之Ingress七层负载
k8s从入门到放弃之Ingress七层负载 在Kubernetes(简称K8s)中,Ingress是一个API对象,它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress,你可…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...

零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...

Map相关知识
数据结构 二叉树 二叉树,顾名思义,每个节点最多有两个“叉”,也就是两个子节点,分别是左子 节点和右子节点。不过,二叉树并不要求每个节点都有两个子节点,有的节点只 有左子节点,有的节点只有…...
在Ubuntu24上采用Wine打开SourceInsight
1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...
Mysql8 忘记密码重置,以及问题解决
1.使用免密登录 找到配置MySQL文件,我的文件路径是/etc/mysql/my.cnf,有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...