etcd选举源码分析和例子
本文主要介绍etcd在分布式多节点服务中如何实现选主。
1、基础知识
在开始之前,先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。
1、version
作用域为key,表示某个key的版本,每个key刚创建的version为1,每次更新key这个值都会自增,表示这个key自创建以来更新的次数。
2、revision
作用域为集群,单调递增,集群内任何key的增删改都会使它自增。可以把它理解为集群的一个逻辑状态标志。记录了每一次集群内的增删改操作。
3、ModRevision
作用域为key,表示某个key修改时的版本,它等于修改这个key时的revision的值。
4、CreateRevision
作用域为key,表示某个key创建时的版本,等于创建这个key时revision的值,再删除之前会保持不变。
现在在etcd里面放一个key:

可以看到,Version是1,CreateRevision与Modrevision和Revision相等,都是7677720。
在修改了key的值以后:

version自增变为2,CreateRevision没有变动。modRevision等于修改时的Revsion。Revision已经跑到前面去了,因为此时还有其他的程序在修改etcd里面的key。
2、选举
先初始化一个session和选举:
electionKey := "/my-election"// Create an election sessionsession, err := concurrency.NewSession(client, concurrency.WithTTL(10))if err != nil {log.Fatal(err)}defer session.Close()election := concurrency.NewElection(session, electionKey)
在启动了三台服务后,在etcd里面找到以下三个key:

现在的leader是第一台:

去看下compaign的源码:
func (e *Election) Campaign(ctx context.Context, val string) error {s := e.sessionclient := e.session.Client()//用leaseID与前面的key前缀拼成keyk := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())//判断当前key的createRevision是否是0,也就是否创建txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))txn = txn.Else(v3.OpGet(k))resp, err := txn.Commit()if err != nil {return err}//获取key的Revisione.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, sif !resp.Succeeded {kv := resp.Responses[0].GetResponseRange().Kvs[0]e.leaderRev = kv.CreateRevisionif string(kv.Value) != val {if err = e.Proclaim(ctx, val); err != nil {e.Resign(ctx)return err}}}_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)if err != nil {// clean up in case of context cancelselect {case <-ctx.Done():e.Resign(client.Ctx())default:e.leaderSession = nil}return err}e.hdr = resp.Headerreturn nil
}
用当前key和leaseid在etcd中创建一个key,并获取到key此时的Revision。
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))for {resp, err := client.Get(ctx, pfx, getOpts...)if err != nil {return nil, err}if len(resp.Kvs) == 0 {return resp.Header, nil}lastKey := string(resp.Kvs[0].Key)if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {return nil, err}}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {cctx, cancel := context.WithCancel(ctx)defer cancel()var wr v3.WatchResponsewch := client.Watch(cctx, key, v3.WithRev(rev))for wr = range wch {for _, ev := range wr.Events {if ev.Type == mvccpb.DELETE {return nil}}}if err := wr.Err(); err != nil {return err}if err := ctx.Err(); err != nil {return err}return fmt.Errorf("lost watcher waiting for delete")
}
这里get的option,找到/my-ection前缀下最新创建的key,并且createRev的值小于等于当前key创建的createRev-1.
// WithLastCreate gets the key with the latest creation revision in the request range.
func WithLastCreate() []OpOption { return withTop(SortByCreateRevision, SortDescend) }
// WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
func WithMaxCreateRev(rev int64) OpOption { return func(op *Op) { op.maxCreateRev = rev } }
此时/my-election,在这个revision之前没有任何key,所以node2启动直接竞选到了主。
紧接着node3启动,在这个get,它能获取到node2创建的key,所以node3去watchnode2的的删除事件。
同理,node4启动以后,在这里的get,它获取的是node3的key,它去wachnode3的删除事件。
当node2释放后,node3获取到node2的删除事件变成主。node3释放以后,node4变成watch到node3
的删除事件变成主。



3、观察者
在上面的例子中,如果node4挂了以后,node2能继续变回为leader吗?
在引入observe以后,可以做到。

找到/my-election这个前缀下最开始创建的key,也就是node2。
然后把这个key放入返回ch中。
func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {client := e.session.Client()defer close(ch)for {resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)if err != nil {return}var kv *mvccpb.KeyValuevar hdr *pb.ResponseHeaderif len(resp.Kvs) == 0 {cctx, cancel := context.WithCancel(ctx)// wait for first key put on prefixopts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}wch := client.Watch(cctx, e.keyPrefix, opts...)for kv == nil {wr, ok := <-wchif !ok || wr.Err() != nil {cancel()return}// only accept puts; a delete will make observe() spinfor _, ev := range wr.Events {if ev.Type == mvccpb.PUT {hdr, kv = &wr.Header, ev.Kv// may have multiple revs; hdr.rev = the last rev// set to kv's rev in case batch has multiple Putshdr.Revision = kv.ModRevisionbreak}}}cancel()} else {hdr, kv = resp.Header, resp.Kvs[0]}select {case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:case <-ctx.Done():return}cctx, cancel := context.WithCancel(ctx)wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))keyDeleted := falsefor !keyDeleted {wr, ok := <-wchif !ok {cancel()return}for _, ev := range wr.Events {if ev.Type == mvccpb.DELETE {keyDeleted = truebreak}resp.Header = &wr.Headerresp.Kvs = []*mvccpb.KeyValue{ev.Kv}select {case ch <- *resp:case <-cctx.Done():cancel()return}}}cancel()}
}
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
接下里watch/my-election下第一个创建的key,在当前revision以后的delete事件。如果有delete事件则把这个消息通知出去。也就是发现了当前主的节点发生了delete事件,主发生了变化。

同理,当node2 down掉,它的key被删除,node3变成了主,observe此时watch的就是node3的key,因为此时node3创建的key是/my-election下的第一个key。
完整代码:
package mainimport ("context""fmt""log""os""time"clientv3 "go.etcd.io/etcd/client/v3""go.etcd.io/etcd/client/v3/concurrency"
)const (FOLLOWER = "follower"LEADER = "leader"
)var state string = FOLLOWER
var preState string = FOLLOWERfunc compaign(election *concurrency.Election, val string) {// Campaign for leadershipfmt.Println("start compaign!")if err := election.Campaign(context.Background(), val); err != nil {log.Fatal(err)}fmt.Println("Became leader!")preState = statestate = LEADER// Hold leadership until a key pressfmt.Println("Press Enter to release leadership...")fmt.Scanln()// Resign leadershipif err := election.Resign(context.Background()); err != nil {log.Fatal(err)}//preState = state//state = FOLLOWERfmt.Println("Released leadership.")
}func observe(election *concurrency.Election, val string) {ch := election.Observe(context.Background())for {select {case rsp, ok := <-ch:if !ok {fmt.Println("now I am follower")//election.Campaign(context.Background(), args[1])//重新开始观察go observe(election, val)return} else {fmt.Printf("leader now is:%s\n", string(rsp.Kvs[0].Value))if string(rsp.Kvs[0].Value) == val {fmt.Println("still be leader")preState = statestate = LEADER} else {fmt.Println("now become follower")preState = statestate = FOLLOWERif preState == LEADER {go compaign(election, val)}}}}}
}func main() {// Connect to etcdargs := os.Argsclient, err := clientv3.New(clientv3.Config{Endpoints: []string{"ipdizhi"}, // Replace with your etcd endpointsDialTimeout: 5 * time.Second,Username: "user",Password: "password",})if err != nil {log.Fatal(err)}defer client.Close()// Key for leader electionelectionKey := "/my-election"// Create an election sessionsession, err := concurrency.NewSession(client, concurrency.WithTTL(10))if err != nil {log.Fatal(err)}defer session.Close()election := concurrency.NewElection(session, electionKey)go compaign(election, args[1])go observe(election, args[1])for {}
}相关文章:
etcd选举源码分析和例子
本文主要介绍etcd在分布式多节点服务中如何实现选主。 1、基础知识 在开始之前,先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。 1、version 作用域为key,表示某个key的版本,每个key刚创建的version为1&#…...
Android 网络配置
ip tables 和 ip route 是两个不同的工具,它们在不同的阶段执行不同的功能。ip route 是用来管理和控制路由表的,它决定了数据包应该从哪个网卡或网关发送出去。ip tables 是用来配置、管理和控制网络数据包的过滤、转发和转换的,它根据用户定…...
【网络通信 -- WebRTC】Open WebRTC Toolkit 环境搭建指南
【网络通信 -- WebRTC】Open WebRTC Toolkit -- OWT-Server 编译安装指南 【1】OWT Server 与 Web Demo 视频会议环境搭建 【1.1】编译 OWT Server 安装依赖 ./scripts/installDepsUnattended.sh编译 scripts/build.js -t all --check 注意若不支持硬件加速则采用如下命令 s…...
文件上传漏洞(CVE-2022-30887)
简介 多语言药房管理系统(MPMS)是用PHP和MySQL开发的,该软件的主要目的是在药房和客户之间提供一套接口,客户是该软件的主要用户。该软件有助于为药房业务创建一个综合数据库,并根据到期、产品等各种参数提供各种报告…...
LeetCode-77-组合
一:题目描述: 给定两个整数 n 和 k,返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 二:示例与提示 示例 1: 输入:n 4, k 2 输出: [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4…...
Oracle中instr,rtrim,XMLPARSE,XMLAGG,GETCLOBVAL函数的使用
1:INSTR()函数 INSTR 是一个字符串函数,用于查找子字符串在源字符串中的位置。 它的语法如下: INSTR(source_string, search_string)source_string 是源字符串,即要在其中进行搜索的字符串。search_string 是要查找的子字符串。…...
java接入apiv3微信小程序支付(以java的eladmin框架为例)
一、需要准备的资料 1.小程序AppID 如:wx2e56f5****** 2.商户号 如:1641****** 3.商户API私钥路径:什么是商户API证书?如何获取商户API证书? 获取文件如下图: 如: 本地路径:E:\Env\e…...
第19节-PhotoShop基础课程-历史记录画笔工具
文章目录 前言1.历史记录画笔工具1.从当前状态创建文档2.创建新快照 2.历史记录艺术画笔工具 前言 任何记录都会被记录下来,并且可以拍快照,从历史中恢复,特别适合艺术创作的孩子 1.历史记录画笔工具 不只是画笔,所有操作记录都…...
MongoDB常用的比较符号和一些功能符号
比较符号 results collection.find({age: {$gt: 20}})功能符号 results collection.find({name: {$regex: ^M.*}})...
网络安全(黑客)技术自学
前言 一、什么是网络安全 网络安全可以基于攻击和防御视角来分类,我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 无论网络、Web、移动、桌面、云等哪个领域,都有攻与防…...
C++ 引用
C 引用 引用变量是一个别名,也就是说,它是某个已存在变量的另一个名字。一旦把引用初始化为某个变量,就可以使用该引用名称或变量名称来指向变量。 C 引用 vs 指针 引用很容易与指针混淆,它们之间有三个主要的不同:…...
9.1.tensorRT高级(4)封装系列-自动驾驶案例项目self-driving-道路分割分析
目录 前言1. 道路分割总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。 本次课程学习 tensorRT 高级-自动驾驶案例项目self-driving-道路分…...
稳定的 Glance 来了,安卓小部件有救了!
稳定的 Glance 来了,安卓小部件有救了! 稳定版本的 Glance 终于发布了,来一起看看吧,看看这一路的旅程,看看好用么,再看看如何使用! 前世今生 故事发生在两年的一天吧,其实夸张了…...
用友U8与MES系统API接口对接案例分析
企业数字化转型:轻易云数据集成平台助力 U8 ERPMES 系统集成 为什么选择数字化转型? 领导层对企业资源规划(ERP)的深刻理解促使了数字化转型的启动。采用精确的“N5”滚动计划,为供应商提供充分的预期信息,…...
web UI自动化介绍
文章目录 一、web UI自动化介绍1.1 执行UI自动化测试前提1.2 Selenium介绍以及知识点梳理 二、Selenium 学习2.1 基础2.1.1 环境安装与基础使用2.1.2 web浏览器控制2.1.3 常见控件的八大定位方式2.1.3.1 八大定位方式介绍2.1.3.2 NAME、ID定位2.1.3.3 css_selector定位2.1.3.4 …...
小米13Pro/13Ultra刷面具ROOT后激活LSPosed框架微X模块详细教程
喜欢买小米手机,很多是因为小米手机的开放,支持root权限,而ROOT对普通用户来说更多的是刷入DIY模块功能,今天ROM乐园小编就教大家如何使用面具ROOT,实现大家日常情况下非常依赖的微X模块功能,体验微X模块的…...
文盘Rust -- 给程序加个日志 | 京东云技术团队
日志是应用程序的重要组成部分。无论是服务端程序还是客户端程序都需要日志做为错误输出或者业务记录。在这篇文章中,我们结合log4rs聊聊rust 程序中如何使用日志。 log4rs类似java生态中的log4j,使用方式也很相似 log4rs中的基本概念 log4rs 的功能组件也由 appe…...
C语言深入理解指针(非常详细)(五)
目录 回调函数qsort使用举例qsort函数的模拟实现sizeof和strlen的对比sizeofstrlensizeof和strlen的对比一道关于sizeof的题 回调函数 回调函数就是一个通过函数指针调用的函数 如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指…...
[docker]笔记-portainer的安装
1、portainer是一款可视化的容器管理软件,利用portainer可以轻松方便的管理和创建容器。portainer本身是一个容器,完全免费并且具有汉化版。本文介绍portainer的安装和使用。 2、安装好容器并配置好容器环境,可参照https://blog.csdn.net/bl…...
详解TCP/IP的三次握手和四次挥手
文章目录 前言一、TCP/IP协议的三次握手1.1 三次握手流程 二、TCP/IP的四次挥手2.1 四次挥手流程 三、主要字段3.1、标志位(Flags)3.2、序号(sequence number)3.3、确认号(acknowledgement number) 四、状态…...
eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)
要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况,可以通过以下几种方式模拟或触发: 1. 增加CPU负载 运行大量计算密集型任务,例如: 使用多线程循环执行复杂计算(如数学运算、加密解密等)。运行图…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
tauri项目,如何在rust端读取电脑环境变量
如果想在前端通过调用来获取环境变量的值,可以通过标准的依赖: std::env::var(name).ok() 想在前端通过调用来获取,可以写一个command函数: #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...
