【ETCD】【源码阅读】深入解析 EtcdServer.applySnapshot方法
今天我们来一步步分析ETCD中applySnapshot函数
一、函数完整代码
函数的完整代码如下:
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {if raft.IsEmptySnap(apply.snapshot) {return}applySnapshotInProgress.Inc()lg := s.Logger()lg.Info("applying snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)defer func() {lg.Info("applied snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)applySnapshotInProgress.Dec()}()if apply.snapshot.Metadata.Index <= ep.appliedi {lg.Panic("unexpected leader snapshot from outdated index",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)}// wait for raftNode to persist snapshot onto the disk<-apply.notifycnewbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)if err != nil {lg.Panic("failed to open snapshot backend", zap.Error(err))}// We need to set the backend to consistIndex before recovering the lessor,// because lessor.Recover will commit the boltDB transaction, accordingly it// will get the old consistent_index persisted into the db in OnPreCommitUnsafe.// Eventually the new consistent_index value coming from snapshot is overwritten// by the old value.s.consistIndex.SetBackend(newbe)// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.if s.lessor != nil {lg.Info("restoring lease store")s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })lg.Info("restored lease store")}lg.Info("restoring mvcc store")if err := s.kv.Restore(newbe); err != nil {lg.Panic("failed to restore mvcc store", zap.Error(err))}newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))// Closing old backend might block until all the txns// on the backend are finished.// We do not want to wait on closing the old backend.s.bemu.Lock()oldbe := s.bego func() {lg.Info("closing old backend file")defer func() {lg.Info("closed old backend file")}()if err := oldbe.Close(); err != nil {lg.Panic("failed to close old backend", zap.Error(err))}}()s.be = newbes.bemu.Unlock()lg.Info("restoring alarm store")if err := s.restoreAlarms(); err != nil {lg.Panic("failed to restore alarm store", zap.Error(err))}lg.Info("restored alarm store")if s.authStore != nil {lg.Info("restoring auth store")s.authStore.Recover(newbe)lg.Info("restored auth store")}lg.Info("restoring v2 store")if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {lg.Panic("failed to restore v2 store", zap.Error(err))}if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {lg.Panic("illegal v2store content", zap.Error(err))}lg.Info("restored v2 store")s.cluster.SetBackend(newbe)lg.Info("restoring cluster configuration")s.cluster.Recover(api.UpdateCapability)lg.Info("restored cluster configuration")lg.Info("removing old peers from network")// recover raft transports.r.transport.RemoveAllPeers()lg.Info("removed old peers from network")lg.Info("adding peers from new cluster configuration")for _, m := range s.cluster.Members() {if m.ID == s.ID() {continue}s.r.transport.AddPeer(m.ID, m.PeerURLs)}lg.Info("added peers from new cluster configuration")ep.appliedt = apply.snapshot.Metadata.Termep.appliedi = apply.snapshot.Metadata.Indexep.snapi = ep.appliediep.confState = apply.snapshot.Metadata.ConfState
}
二、函数功能概览
上述函数的核心功能是 应用一个来自 Raft 协议的快照,并在应用过程中恢复系统的各个关键组件,以确保系统的状态与最新的快照一致。具体来说,函数完成了以下核心任务:
-
检查快照有效性:判断快照是否为空或过时,如果无效则提前退出。
-
记录快照应用的开始和结束:通过日志记录快照应用的开始和结束,同时更新相关的监控指标。
-
等待并加载快照数据:等待 Raft 节点将快照持久化到磁盘,并打开快照后端。
-
恢复一致性索引:将新的快照后端设置为一致性索引的后端,确保一致性。
-
恢复存储组件:
- 租约存储(lease store):恢复与租约相关的数据。
- MVCC 存储:恢复多版本并发控制(MVCC)存储。
- 报警存储:恢复报警数据。
- 认证存储:恢复认证相关数据(如果存在)。
- V2 存储:恢复 V2 存储,并进行合法性检查。
-
恢复集群配置:更新集群配置,并确保集群的一致性。
-
更新 Raft 网络成员:移除旧的集群成员并添加新的集群成员到网络中。
-
更新应用进度:更新快照的任期、索引等应用进度信息。
三、函数详细分析
好的,接下来我将逐步解析这段代码,并用中文进行解释。
1. 检查快照是否为空
if raft.IsEmptySnap(apply.snapshot) {return
}
- 这段代码判断传入的快照是否为空。如果快照为空(即没有数据需要恢复),则直接返回,结束该函数的执行。
2. 增加快照应用中的度量
applySnapshotInProgress.Inc()
- 这行代码会将
applySnapshotInProgress计数器增加 1,表示当前有一个快照正在被应用。这个计数器通常用于监控系统中,帮助跟踪正在进行的操作。
3. 日志记录快照应用的开始
lg := s.Logger()
lg.Info("applying snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
- 获取日志记录器
lg,并记录一条信息级别的日志,说明快照正在被应用。 - 日志中包括当前的快照索引、已应用的索引以及来自领导者的快照的索引和任期(
term)。
4. 使用 defer 确保快照应用完成后记录日志
defer func() {lg.Info("applied snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)applySnapshotInProgress.Dec()
}()
- 使用
defer语句确保在函数结束时,记录一条日志表示快照应用已经完成。 - 同时,减少
applySnapshotInProgress计数器,表示快照应用过程结束。
5. 检查快照的索引是否过时
if apply.snapshot.Metadata.Index <= ep.appliedi {lg.Panic("unexpected leader snapshot from outdated index",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)
}
- 这里会检查传入的快照索引是否小于等于已应用的索引
ep.appliedi。如果是,说明接收到的快照来自一个过时的领导者,这会导致系统崩溃(通过lg.Panic打印错误日志并触发 panic)。
6. 等待 Raft 节点将快照持久化到磁盘
<-apply.notifyc
- 等待一个信号,确保 Raft 节点已将快照持久化到磁盘。
apply.notifyc是一个通道,程序会在此处阻塞,直到 Raft 节点完成快照的持久化操作。
7. 打开新的快照后端
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {lg.Panic("failed to open snapshot backend", zap.Error(err))
}
- 这行代码通过
openSnapshotBackend函数打开新的快照后端(即读取快照的存储),如果打开失败,则记录错误日志并触发 panic。
8. 设置新的后端为一致性索引
s.consistIndex.SetBackend(newbe)
- 将新的快照后端设置为一致性索引的后端。这是为了确保一致性索引能够正确地与新的快照数据同步。
9. 恢复租约存储(lease store)
if s.lessor != nil {lg.Info("restoring lease store")s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })lg.Info("restored lease store")
}
- 如果系统中有
lessor(负责管理租约的组件),则会从新快照后端恢复租约存储。 - 恢复过程中,会在事务回滚时写入 KV 存储。
10. 恢复 MVCC 存储
lg.Info("restoring mvcc store")
if err := s.kv.Restore(newbe); err != nil {lg.Panic("failed to restore mvcc store", zap.Error(err))
}
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
- 接下来,恢复 MVCC(多版本并发控制)存储,如果恢复失败,则触发 panic。
- 恢复成功后,记录恢复的日志,包括一致性索引。
11. 关闭旧的后端
s.bemu.Lock()
oldbe := s.be
go func() {lg.Info("closing old backend file")defer func() {lg.Info("closed old backend file")}()if err := oldbe.Close(); err != nil {lg.Panic("failed to close old backend", zap.Error(err))}
}()
s.be = newbe
s.bemu.Unlock()
- 使用锁 (
bemu.Lock) 来确保线程安全地切换后端文件。 - 在一个新的 goroutine 中关闭旧的快照后端,防止在关闭过程中阻塞主线程。
- 更新
s.be为新的快照后端,并解锁。
12. 恢复报警存储
lg.Info("restoring alarm store")
if err := s.restoreAlarms(); err != nil {lg.Panic("failed to restore alarm store", zap.Error(err))
}
lg.Info("restored alarm store")
- 恢复报警存储,如果恢复失败,则触发 panic。
13. 恢复认证存储
if s.authStore != nil {lg.Info("restoring auth store")s.authStore.Recover(newbe)lg.Info("restored auth store")
}
- 如果存在认证存储(
authStore),则恢复认证存储。
14. 恢复 V2 存储
lg.Info("restoring v2 store")
if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {lg.Panic("failed to restore v2 store", zap.Error(err))
}
if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {lg.Panic("illegal v2store content", zap.Error(err))
}
lg.Info("restored v2 store")
- 恢复 V2 存储,并进行检查,确保没有非法的 V2 存储内容。
15. 恢复集群配置
s.cluster.SetBackend(newbe)
lg.Info("restoring cluster configuration")
s.cluster.Recover(api.UpdateCapability)
lg.Info("restored cluster configuration")
- 将集群配置恢复到新的快照后端,并恢复集群配置。
16. 移除旧的网络成员
lg.Info("removing old peers from network")
s.r.transport.RemoveAllPeers()
lg.Info("removed old peers from network")
- 从网络中移除旧的集群成员。
17. 添加新的集群成员
lg.Info("adding peers from new cluster configuration")
for _, m := range s.cluster.Members() {if m.ID == s.ID() {continue}s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
lg.Info("added peers from new cluster configuration")
- 将新的集群成员添加到网络中。
18. 更新应用进度
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
- 更新应用进度(包括任期、索引等信息),确保与新的快照数据一致。
总结:这段代码的核心任务是应用一个来自 Raft 协议的快照。它通过多个步骤确保快照数据被正确地恢复到系统的各个存储组件中,同时进行了一系列的检查和恢复操作,确保系统的一致性和健康。
相关文章:
【ETCD】【源码阅读】深入解析 EtcdServer.applySnapshot方法
今天我们来一步步分析ETCD中applySnapshot函数 一、函数完整代码 函数的完整代码如下: func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {if raft.IsEmptySnap(apply.snapshot) {return}applySnapshotInProgress.Inc()lg : s.Logger()lg.In…...
HBase是什么,HBase介绍
官方网站:Apache HBase – Apache HBase Home HBase是一个分布式的、面向列的NoSQL数据库,主要用于存储和处理海量数据。它起源于Google的BigTable论文,是Apache Hadoop项目的子项目。HBase设计用于高可靠性、高性能和可伸…...
【Rust自学】3.3. 数据类型:复合类型
3.3.0. 写在正文之前 欢迎来到Rust自学的第三章,一共有6个小节,分别是: 变量与可变性数据类型:标量类型数据类型:复合类型(本文)函数和注释控制流:if else控制流:循环 通过第二章…...
【C++】小乐乐求和问题的高效求解与算法对比分析
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯问题描述与数学模型1.1 题目概述1.2 输入输出要求1.3 数学建模 💯方法一:朴素循环求和法2.1 实现原理2.2 分析与问题2.3 改进方案2.4 性能瓶颈与结论…...
configure错误:“C compiler cannot create executables“
执行./configure命令出现如下奇怪的错误,百思不得姐: ./configure命令的日志文件为config.log,发生错误时,该文件的内容: This file contains any messages produced by compilers while running configure, to aid d…...
PAT乙级 锤子剪刀布 巩固巩固map的使用
主要是想借这题巩固巩固c map的使用方法。 大家应该都会玩“锤子剪刀布”的游戏:两人同时给出手势,胜负规则如图所示: 现给出两人的交锋记录,请统计双方的胜、平、负次数,并且给出双方分别出什么手势的胜算最大。 输…...
Webpack学习笔记(1)
1.为什么使用webpack? webpack不仅可以打包js代码,并且那个且支持es模块化和commonjs,支持其他静态资源打包,如图片、字体。。。 2.如何解决作用域问题? 作用域问题:例如loadsh等库,会绑定window对象,会…...
使用xpath规则进行提取数据并存储
下载lxml !pip install lxmlimport requests headers{"user-agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.95 Safari/537.36" } url"https://movie.douban.com/chart" respon…...
【物联网技术与应用】实验3:七彩LED灯闪烁
实验3 七彩LED灯闪烁 【实验介绍】 七彩LED灯上电后,7色动闪光LED模块可自动闪烁内置颜色。它可以用来制作相当吸引人的灯光效果。 【实验组件】 ● Arduino Uno主板* 1 ● USB数据线* 1 ● 7彩LED模块*1 ● 面包板*1 ● 9V方型电池*1 ● 跳线若干 【实验原…...
素数回文数的个数
素数回文数的个数 C语言代码C 代码Java代码Python代码 💐The Begin💐点点关注,收藏不迷路💐 求11到n之间(包括n),既是素数又是回文数的整数有多少个。 输入 一个大于11小于1000的整数n。 输出…...
车辆重识别代码笔记12.18
1、实例归一化(Instance Normalization)和批量归一化(Batch Normalization) 实例归一化(Instance Normalization): 计算步骤: 对于每个输入样本,在每个通道上分别计算均…...
selenium 在已打开浏览器上继续调试
关闭浏览器,终端执行如下指令,--user-data-dir换成自己的User Data路径 chrome.exe --remote-debugging-port9222 --user-data-dir"C:\Users\xxx\AppData\Local\Google\Chrome\User Data" 会打开浏览器,打开百度,如下状…...
Sentry日志管理thinkphp8 tp8 sentry9 sentry8 php8.x配置步骤, tp8自定义异常处理类使用方法
tp8的默认使用的就是composer来管理第三方包, 所以直接使用 composer 来安装 sentry9 即可. 同时tp8和tp5的配置方式不太一样, 这里我们直接使用自定义异常类来处理Sentry的异常. 1. 安装 sentry9 包 # 安装 sentry9 包 composer require "tekintian/sentry9-php" …...
【经验分享】容器云搭建的知识点
最近忙于备考没关注,有次点进某小黄鱼发现首页出现了我的笔记还被人收费了 虽然我也卖了一些资源,但我以交流、交换为主,笔记都是免费给别人看的 由于当时刚刚接触写的并不成熟,为了避免更多人花没必要的钱,所以决定公…...
Java对集合的操作方法
1. 数组转集合 //数组转集合 String[] split quickRechargeAmount.split(","); List<String> stringList Stream.of(split).collect(Collectors.toList()); 2. 对List集合数据内容进行分组 //对List集合数据内容进行分组 Map<String, List<LiveAppGi…...
FreeRTOS--基础知识
FreeRTOS基础知识 裸机与RTOS的特点: 裸机: 裸机又称为前后台系统,前台系统指的是中断服务函数,后台系统指的是大循环,即应用程序。 1、实时性差:应用程序轮流执行 2、delay:空等待ÿ…...
Node的学习以及学习通过Node书写接口并简单操作数据库
Node的学习 Node的基础上述是关于Node的一些基础,总结的还行; 利用Node书写接口并操作数据库 1. 初始化项目 创建新的项目文件夹,并初始化 package.json mkdir my-backend cd my-backend npm init -y2. 安装必要的依赖 安装Express.js&…...
【Linux探索学习】第二十二弹——用户缓冲区:深入解析操作系统中数据交互时的缓冲区机制
Linux学习笔记: https://blog.csdn.net/2301_80220607/category_12805278.html?spm1001.2014.3001.5482 前言: 前面两章我们已经讲了一些文件操作和文件重定向问题,以及一些相关的知识点,比如文件在内存中的存储位置࿰…...
Cesium-(Primitive)-(CylinderOutlineGeometry)
CylinderOutlineGeometry 以下是 CylinderOutlineGeometry 类的构造函数属性,以表格形式展示: 属性名类型默认值描述lengthnumber圆柱体的长度。topRadiusnumber圆柱体顶部的半径。bottomRadiusnumber圆柱体底部的半径。slicesnumber128可选,圆柱体周长的边数。numberOfVert…...
【ETCD】【源码阅读】深入分析 storeTxnWrite.Put方法源码
该方法是 storeTxnWrite 类型中的核心方法,负责将键值对存储到数据库,同时处理键的元数据(如版本、修订号、租约)并管理租约关联。 目录 一、完整代码二、方法详解方法签名1. 计算修订号并初始化变量2. 检查键是否已存在3. 生成索…...
多场景 OkHttpClient 管理器 - Android 网络通信解决方案
下面是一个完整的 Android 实现,展示如何创建和管理多个 OkHttpClient 实例,分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
管理学院权限管理系统开发总结
文章目录 🎓 管理学院权限管理系统开发总结 - 现代化Web应用实践之路📝 项目概述🏗️ 技术架构设计后端技术栈前端技术栈 💡 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 🗄️ 数据库设…...
回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...
免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...
在 Spring Boot 项目里,MYSQL中json类型字段使用
前言: 因为程序特殊需求导致,需要mysql数据库存储json类型数据,因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...
