【ETCD】【源码阅读】深入解析 EtcdServer.applySnapshot方法
今天我们来一步步分析ETCD中applySnapshot函数
一、函数完整代码
函数的完整代码如下:
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {if raft.IsEmptySnap(apply.snapshot) {return}applySnapshotInProgress.Inc()lg := s.Logger()lg.Info("applying snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)defer func() {lg.Info("applied snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)applySnapshotInProgress.Dec()}()if apply.snapshot.Metadata.Index <= ep.appliedi {lg.Panic("unexpected leader snapshot from outdated index",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)}// wait for raftNode to persist snapshot onto the disk<-apply.notifycnewbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)if err != nil {lg.Panic("failed to open snapshot backend", zap.Error(err))}// We need to set the backend to consistIndex before recovering the lessor,// because lessor.Recover will commit the boltDB transaction, accordingly it// will get the old consistent_index persisted into the db in OnPreCommitUnsafe.// Eventually the new consistent_index value coming from snapshot is overwritten// by the old value.s.consistIndex.SetBackend(newbe)// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.if s.lessor != nil {lg.Info("restoring lease store")s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })lg.Info("restored lease store")}lg.Info("restoring mvcc store")if err := s.kv.Restore(newbe); err != nil {lg.Panic("failed to restore mvcc store", zap.Error(err))}newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))// Closing old backend might block until all the txns// on the backend are finished.// We do not want to wait on closing the old backend.s.bemu.Lock()oldbe := s.bego func() {lg.Info("closing old backend file")defer func() {lg.Info("closed old backend file")}()if err := oldbe.Close(); err != nil {lg.Panic("failed to close old backend", zap.Error(err))}}()s.be = newbes.bemu.Unlock()lg.Info("restoring alarm store")if err := s.restoreAlarms(); err != nil {lg.Panic("failed to restore alarm store", zap.Error(err))}lg.Info("restored alarm store")if s.authStore != nil {lg.Info("restoring auth store")s.authStore.Recover(newbe)lg.Info("restored auth store")}lg.Info("restoring v2 store")if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {lg.Panic("failed to restore v2 store", zap.Error(err))}if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {lg.Panic("illegal v2store content", zap.Error(err))}lg.Info("restored v2 store")s.cluster.SetBackend(newbe)lg.Info("restoring cluster configuration")s.cluster.Recover(api.UpdateCapability)lg.Info("restored cluster configuration")lg.Info("removing old peers from network")// recover raft transports.r.transport.RemoveAllPeers()lg.Info("removed old peers from network")lg.Info("adding peers from new cluster configuration")for _, m := range s.cluster.Members() {if m.ID == s.ID() {continue}s.r.transport.AddPeer(m.ID, m.PeerURLs)}lg.Info("added peers from new cluster configuration")ep.appliedt = apply.snapshot.Metadata.Termep.appliedi = apply.snapshot.Metadata.Indexep.snapi = ep.appliediep.confState = apply.snapshot.Metadata.ConfState
}
二、函数功能概览
上述函数的核心功能是 应用一个来自 Raft 协议的快照,并在应用过程中恢复系统的各个关键组件,以确保系统的状态与最新的快照一致。具体来说,函数完成了以下核心任务:
-
检查快照有效性:判断快照是否为空或过时,如果无效则提前退出。
-
记录快照应用的开始和结束:通过日志记录快照应用的开始和结束,同时更新相关的监控指标。
-
等待并加载快照数据:等待 Raft 节点将快照持久化到磁盘,并打开快照后端。
-
恢复一致性索引:将新的快照后端设置为一致性索引的后端,确保一致性。
-
恢复存储组件:
- 租约存储(lease store):恢复与租约相关的数据。
- MVCC 存储:恢复多版本并发控制(MVCC)存储。
- 报警存储:恢复报警数据。
- 认证存储:恢复认证相关数据(如果存在)。
- V2 存储:恢复 V2 存储,并进行合法性检查。
-
恢复集群配置:更新集群配置,并确保集群的一致性。
-
更新 Raft 网络成员:移除旧的集群成员并添加新的集群成员到网络中。
-
更新应用进度:更新快照的任期、索引等应用进度信息。
三、函数详细分析
好的,接下来我将逐步解析这段代码,并用中文进行解释。
1. 检查快照是否为空
if raft.IsEmptySnap(apply.snapshot) {return
}
- 这段代码判断传入的快照是否为空。如果快照为空(即没有数据需要恢复),则直接返回,结束该函数的执行。
2. 增加快照应用中的度量
applySnapshotInProgress.Inc()
- 这行代码会将
applySnapshotInProgress
计数器增加 1,表示当前有一个快照正在被应用。这个计数器通常用于监控系统中,帮助跟踪正在进行的操作。
3. 日志记录快照应用的开始
lg := s.Logger()
lg.Info("applying snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
- 获取日志记录器
lg
,并记录一条信息级别的日志,说明快照正在被应用。 - 日志中包括当前的快照索引、已应用的索引以及来自领导者的快照的索引和任期(
term
)。
4. 使用 defer
确保快照应用完成后记录日志
defer func() {lg.Info("applied snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)applySnapshotInProgress.Dec()
}()
- 使用
defer
语句确保在函数结束时,记录一条日志表示快照应用已经完成。 - 同时,减少
applySnapshotInProgress
计数器,表示快照应用过程结束。
5. 检查快照的索引是否过时
if apply.snapshot.Metadata.Index <= ep.appliedi {lg.Panic("unexpected leader snapshot from outdated index",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)
}
- 这里会检查传入的快照索引是否小于等于已应用的索引
ep.appliedi
。如果是,说明接收到的快照来自一个过时的领导者,这会导致系统崩溃(通过lg.Panic
打印错误日志并触发 panic)。
6. 等待 Raft 节点将快照持久化到磁盘
<-apply.notifyc
- 等待一个信号,确保 Raft 节点已将快照持久化到磁盘。
apply.notifyc
是一个通道,程序会在此处阻塞,直到 Raft 节点完成快照的持久化操作。
7. 打开新的快照后端
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {lg.Panic("failed to open snapshot backend", zap.Error(err))
}
- 这行代码通过
openSnapshotBackend
函数打开新的快照后端(即读取快照的存储),如果打开失败,则记录错误日志并触发 panic。
8. 设置新的后端为一致性索引
s.consistIndex.SetBackend(newbe)
- 将新的快照后端设置为一致性索引的后端。这是为了确保一致性索引能够正确地与新的快照数据同步。
9. 恢复租约存储(lease store)
if s.lessor != nil {lg.Info("restoring lease store")s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })lg.Info("restored lease store")
}
- 如果系统中有
lessor
(负责管理租约的组件),则会从新快照后端恢复租约存储。 - 恢复过程中,会在事务回滚时写入 KV 存储。
10. 恢复 MVCC 存储
lg.Info("restoring mvcc store")
if err := s.kv.Restore(newbe); err != nil {lg.Panic("failed to restore mvcc store", zap.Error(err))
}
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
- 接下来,恢复 MVCC(多版本并发控制)存储,如果恢复失败,则触发 panic。
- 恢复成功后,记录恢复的日志,包括一致性索引。
11. 关闭旧的后端
s.bemu.Lock()
oldbe := s.be
go func() {lg.Info("closing old backend file")defer func() {lg.Info("closed old backend file")}()if err := oldbe.Close(); err != nil {lg.Panic("failed to close old backend", zap.Error(err))}
}()
s.be = newbe
s.bemu.Unlock()
- 使用锁 (
bemu.Lock
) 来确保线程安全地切换后端文件。 - 在一个新的 goroutine 中关闭旧的快照后端,防止在关闭过程中阻塞主线程。
- 更新
s.be
为新的快照后端,并解锁。
12. 恢复报警存储
lg.Info("restoring alarm store")
if err := s.restoreAlarms(); err != nil {lg.Panic("failed to restore alarm store", zap.Error(err))
}
lg.Info("restored alarm store")
- 恢复报警存储,如果恢复失败,则触发 panic。
13. 恢复认证存储
if s.authStore != nil {lg.Info("restoring auth store")s.authStore.Recover(newbe)lg.Info("restored auth store")
}
- 如果存在认证存储(
authStore
),则恢复认证存储。
14. 恢复 V2 存储
lg.Info("restoring v2 store")
if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {lg.Panic("failed to restore v2 store", zap.Error(err))
}
if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {lg.Panic("illegal v2store content", zap.Error(err))
}
lg.Info("restored v2 store")
- 恢复 V2 存储,并进行检查,确保没有非法的 V2 存储内容。
15. 恢复集群配置
s.cluster.SetBackend(newbe)
lg.Info("restoring cluster configuration")
s.cluster.Recover(api.UpdateCapability)
lg.Info("restored cluster configuration")
- 将集群配置恢复到新的快照后端,并恢复集群配置。
16. 移除旧的网络成员
lg.Info("removing old peers from network")
s.r.transport.RemoveAllPeers()
lg.Info("removed old peers from network")
- 从网络中移除旧的集群成员。
17. 添加新的集群成员
lg.Info("adding peers from new cluster configuration")
for _, m := range s.cluster.Members() {if m.ID == s.ID() {continue}s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
lg.Info("added peers from new cluster configuration")
- 将新的集群成员添加到网络中。
18. 更新应用进度
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
- 更新应用进度(包括任期、索引等信息),确保与新的快照数据一致。
总结:这段代码的核心任务是应用一个来自 Raft 协议的快照。它通过多个步骤确保快照数据被正确地恢复到系统的各个存储组件中,同时进行了一系列的检查和恢复操作,确保系统的一致性和健康。
相关文章:
【ETCD】【源码阅读】深入解析 EtcdServer.applySnapshot方法
今天我们来一步步分析ETCD中applySnapshot函数 一、函数完整代码 函数的完整代码如下: func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {if raft.IsEmptySnap(apply.snapshot) {return}applySnapshotInProgress.Inc()lg : s.Logger()lg.In…...

HBase是什么,HBase介绍
官方网站:Apache HBase – Apache HBase Home HBase是一个分布式的、面向列的NoSQL数据库,主要用于存储和处理海量数据。它起源于Google的BigTable论文,是Apache Hadoop项目的子项目。HBase设计用于高可靠性、高性能和可伸…...
【Rust自学】3.3. 数据类型:复合类型
3.3.0. 写在正文之前 欢迎来到Rust自学的第三章,一共有6个小节,分别是: 变量与可变性数据类型:标量类型数据类型:复合类型(本文)函数和注释控制流:if else控制流:循环 通过第二章…...

【C++】小乐乐求和问题的高效求解与算法对比分析
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯问题描述与数学模型1.1 题目概述1.2 输入输出要求1.3 数学建模 💯方法一:朴素循环求和法2.1 实现原理2.2 分析与问题2.3 改进方案2.4 性能瓶颈与结论…...

configure错误:“C compiler cannot create executables“
执行./configure命令出现如下奇怪的错误,百思不得姐: ./configure命令的日志文件为config.log,发生错误时,该文件的内容: This file contains any messages produced by compilers while running configure, to aid d…...

PAT乙级 锤子剪刀布 巩固巩固map的使用
主要是想借这题巩固巩固c map的使用方法。 大家应该都会玩“锤子剪刀布”的游戏:两人同时给出手势,胜负规则如图所示: 现给出两人的交锋记录,请统计双方的胜、平、负次数,并且给出双方分别出什么手势的胜算最大。 输…...

Webpack学习笔记(1)
1.为什么使用webpack? webpack不仅可以打包js代码,并且那个且支持es模块化和commonjs,支持其他静态资源打包,如图片、字体。。。 2.如何解决作用域问题? 作用域问题:例如loadsh等库,会绑定window对象,会…...

使用xpath规则进行提取数据并存储
下载lxml !pip install lxmlimport requests headers{"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.95 Safari/537.36" } url"https://movie.douban.com/chart" respon…...

【物联网技术与应用】实验3:七彩LED灯闪烁
实验3 七彩LED灯闪烁 【实验介绍】 七彩LED灯上电后,7色动闪光LED模块可自动闪烁内置颜色。它可以用来制作相当吸引人的灯光效果。 【实验组件】 ● Arduino Uno主板* 1 ● USB数据线* 1 ● 7彩LED模块*1 ● 面包板*1 ● 9V方型电池*1 ● 跳线若干 【实验原…...

素数回文数的个数
素数回文数的个数 C语言代码C 代码Java代码Python代码 💐The Begin💐点点关注,收藏不迷路💐 求11到n之间(包括n),既是素数又是回文数的整数有多少个。 输入 一个大于11小于1000的整数n。 输出…...
车辆重识别代码笔记12.18
1、实例归一化(Instance Normalization)和批量归一化(Batch Normalization) 实例归一化(Instance Normalization): 计算步骤: 对于每个输入样本,在每个通道上分别计算均…...

selenium 在已打开浏览器上继续调试
关闭浏览器,终端执行如下指令,--user-data-dir换成自己的User Data路径 chrome.exe --remote-debugging-port9222 --user-data-dir"C:\Users\xxx\AppData\Local\Google\Chrome\User Data" 会打开浏览器,打开百度,如下状…...

Sentry日志管理thinkphp8 tp8 sentry9 sentry8 php8.x配置步骤, tp8自定义异常处理类使用方法
tp8的默认使用的就是composer来管理第三方包, 所以直接使用 composer 来安装 sentry9 即可. 同时tp8和tp5的配置方式不太一样, 这里我们直接使用自定义异常类来处理Sentry的异常. 1. 安装 sentry9 包 # 安装 sentry9 包 composer require "tekintian/sentry9-php" …...

【经验分享】容器云搭建的知识点
最近忙于备考没关注,有次点进某小黄鱼发现首页出现了我的笔记还被人收费了 虽然我也卖了一些资源,但我以交流、交换为主,笔记都是免费给别人看的 由于当时刚刚接触写的并不成熟,为了避免更多人花没必要的钱,所以决定公…...
Java对集合的操作方法
1. 数组转集合 //数组转集合 String[] split quickRechargeAmount.split(","); List<String> stringList Stream.of(split).collect(Collectors.toList()); 2. 对List集合数据内容进行分组 //对List集合数据内容进行分组 Map<String, List<LiveAppGi…...

FreeRTOS--基础知识
FreeRTOS基础知识 裸机与RTOS的特点: 裸机: 裸机又称为前后台系统,前台系统指的是中断服务函数,后台系统指的是大循环,即应用程序。 1、实时性差:应用程序轮流执行 2、delay:空等待ÿ…...

Node的学习以及学习通过Node书写接口并简单操作数据库
Node的学习 Node的基础上述是关于Node的一些基础,总结的还行; 利用Node书写接口并操作数据库 1. 初始化项目 创建新的项目文件夹,并初始化 package.json mkdir my-backend cd my-backend npm init -y2. 安装必要的依赖 安装Express.js&…...

【Linux探索学习】第二十二弹——用户缓冲区:深入解析操作系统中数据交互时的缓冲区机制
Linux学习笔记: https://blog.csdn.net/2301_80220607/category_12805278.html?spm1001.2014.3001.5482 前言: 前面两章我们已经讲了一些文件操作和文件重定向问题,以及一些相关的知识点,比如文件在内存中的存储位置࿰…...

Cesium-(Primitive)-(CylinderOutlineGeometry)
CylinderOutlineGeometry 以下是 CylinderOutlineGeometry 类的构造函数属性,以表格形式展示: 属性名类型默认值描述lengthnumber圆柱体的长度。topRadiusnumber圆柱体顶部的半径。bottomRadiusnumber圆柱体底部的半径。slicesnumber128可选,圆柱体周长的边数。numberOfVert…...
【ETCD】【源码阅读】深入分析 storeTxnWrite.Put方法源码
该方法是 storeTxnWrite 类型中的核心方法,负责将键值对存储到数据库,同时处理键的元数据(如版本、修订号、租约)并管理租约关联。 目录 一、完整代码二、方法详解方法签名1. 计算修订号并初始化变量2. 检查键是否已存在3. 生成索…...

Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...

Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...
VTK如何让部分单位不可见
最近遇到一个需求,需要让一个vtkDataSet中的部分单元不可见,查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行,是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示,主要是最后一个参数,透明度…...
LLM基础1_语言模型如何处理文本
基于GitHub项目:https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken:OpenAI开发的专业"分词器" torch:Facebook开发的强力计算引擎,相当于超级计算器 理解词嵌入:给词语画"…...

Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
CMake控制VS2022项目文件分组
我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...

有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...