etcd选举源码分析和例子
本文主要介绍etcd在分布式多节点服务中如何实现选主。
1、基础知识
在开始之前,先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。
1、version
作用域为key,表示某个key的版本,每个key刚创建的version为1,每次更新key这个值都会自增,表示这个key自创建以来更新的次数。
2、revision
作用域为集群,单调递增,集群内任何key的增删改都会使它自增。可以把它理解为集群的一个逻辑状态标志。记录了每一次集群内的增删改操作。
3、ModRevision
作用域为key,表示某个key修改时的版本,它等于修改这个key时的revision的值。
4、CreateRevision
作用域为key,表示某个key创建时的版本,等于创建这个key时revision的值,再删除之前会保持不变。
现在在etcd里面放一个key:
可以看到,Version是1,CreateRevision与Modrevision和Revision相等,都是7677720。
在修改了key的值以后:
version自增变为2,CreateRevision没有变动。modRevision等于修改时的Revsion。Revision已经跑到前面去了,因为此时还有其他的程序在修改etcd里面的key。
2、选举
先初始化一个session和选举:
electionKey := "/my-election"// Create an election sessionsession, err := concurrency.NewSession(client, concurrency.WithTTL(10))if err != nil {log.Fatal(err)}defer session.Close()election := concurrency.NewElection(session, electionKey)
在启动了三台服务后,在etcd里面找到以下三个key:
现在的leader是第一台:
去看下compaign的源码:
func (e *Election) Campaign(ctx context.Context, val string) error {s := e.sessionclient := e.session.Client()//用leaseID与前面的key前缀拼成keyk := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())//判断当前key的createRevision是否是0,也就是否创建txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))txn = txn.Else(v3.OpGet(k))resp, err := txn.Commit()if err != nil {return err}//获取key的Revisione.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, sif !resp.Succeeded {kv := resp.Responses[0].GetResponseRange().Kvs[0]e.leaderRev = kv.CreateRevisionif string(kv.Value) != val {if err = e.Proclaim(ctx, val); err != nil {e.Resign(ctx)return err}}}_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)if err != nil {// clean up in case of context cancelselect {case <-ctx.Done():e.Resign(client.Ctx())default:e.leaderSession = nil}return err}e.hdr = resp.Headerreturn nil
}
用当前key和leaseid在etcd中创建一个key,并获取到key此时的Revision。
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))for {resp, err := client.Get(ctx, pfx, getOpts...)if err != nil {return nil, err}if len(resp.Kvs) == 0 {return resp.Header, nil}lastKey := string(resp.Kvs[0].Key)if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {return nil, err}}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {cctx, cancel := context.WithCancel(ctx)defer cancel()var wr v3.WatchResponsewch := client.Watch(cctx, key, v3.WithRev(rev))for wr = range wch {for _, ev := range wr.Events {if ev.Type == mvccpb.DELETE {return nil}}}if err := wr.Err(); err != nil {return err}if err := ctx.Err(); err != nil {return err}return fmt.Errorf("lost watcher waiting for delete")
}
这里get的option,找到/my-ection前缀下最新创建的key,并且createRev的值小于等于当前key创建的createRev-1.
// WithLastCreate gets the key with the latest creation revision in the request range.
func WithLastCreate() []OpOption { return withTop(SortByCreateRevision, SortDescend) }
// WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
func WithMaxCreateRev(rev int64) OpOption { return func(op *Op) { op.maxCreateRev = rev } }
此时/my-election,在这个revision之前没有任何key,所以node2启动直接竞选到了主。
紧接着node3启动,在这个get,它能获取到node2创建的key,所以node3去watchnode2的的删除事件。
同理,node4启动以后,在这里的get,它获取的是node3的key,它去wachnode3的删除事件。
当node2释放后,node3获取到node2的删除事件变成主。node3释放以后,node4变成watch到node3
的删除事件变成主。
3、观察者
在上面的例子中,如果node4挂了以后,node2能继续变回为leader吗?
在引入observe以后,可以做到。
找到/my-election这个前缀下最开始创建的key,也就是node2。
然后把这个key放入返回ch中。
func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {client := e.session.Client()defer close(ch)for {resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)if err != nil {return}var kv *mvccpb.KeyValuevar hdr *pb.ResponseHeaderif len(resp.Kvs) == 0 {cctx, cancel := context.WithCancel(ctx)// wait for first key put on prefixopts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}wch := client.Watch(cctx, e.keyPrefix, opts...)for kv == nil {wr, ok := <-wchif !ok || wr.Err() != nil {cancel()return}// only accept puts; a delete will make observe() spinfor _, ev := range wr.Events {if ev.Type == mvccpb.PUT {hdr, kv = &wr.Header, ev.Kv// may have multiple revs; hdr.rev = the last rev// set to kv's rev in case batch has multiple Putshdr.Revision = kv.ModRevisionbreak}}}cancel()} else {hdr, kv = resp.Header, resp.Kvs[0]}select {case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:case <-ctx.Done():return}cctx, cancel := context.WithCancel(ctx)wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))keyDeleted := falsefor !keyDeleted {wr, ok := <-wchif !ok {cancel()return}for _, ev := range wr.Events {if ev.Type == mvccpb.DELETE {keyDeleted = truebreak}resp.Header = &wr.Headerresp.Kvs = []*mvccpb.KeyValue{ev.Kv}select {case ch <- *resp:case <-cctx.Done():cancel()return}}}cancel()}
}
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
接下里watch/my-election下第一个创建的key,在当前revision以后的delete事件。如果有delete事件则把这个消息通知出去。也就是发现了当前主的节点发生了delete事件,主发生了变化。
同理,当node2 down掉,它的key被删除,node3变成了主,observe此时watch的就是node3的key,因为此时node3创建的key是/my-election下的第一个key。
完整代码:
package mainimport ("context""fmt""log""os""time"clientv3 "go.etcd.io/etcd/client/v3""go.etcd.io/etcd/client/v3/concurrency"
)const (FOLLOWER = "follower"LEADER = "leader"
)var state string = FOLLOWER
var preState string = FOLLOWERfunc compaign(election *concurrency.Election, val string) {// Campaign for leadershipfmt.Println("start compaign!")if err := election.Campaign(context.Background(), val); err != nil {log.Fatal(err)}fmt.Println("Became leader!")preState = statestate = LEADER// Hold leadership until a key pressfmt.Println("Press Enter to release leadership...")fmt.Scanln()// Resign leadershipif err := election.Resign(context.Background()); err != nil {log.Fatal(err)}//preState = state//state = FOLLOWERfmt.Println("Released leadership.")
}func observe(election *concurrency.Election, val string) {ch := election.Observe(context.Background())for {select {case rsp, ok := <-ch:if !ok {fmt.Println("now I am follower")//election.Campaign(context.Background(), args[1])//重新开始观察go observe(election, val)return} else {fmt.Printf("leader now is:%s\n", string(rsp.Kvs[0].Value))if string(rsp.Kvs[0].Value) == val {fmt.Println("still be leader")preState = statestate = LEADER} else {fmt.Println("now become follower")preState = statestate = FOLLOWERif preState == LEADER {go compaign(election, val)}}}}}
}func main() {// Connect to etcdargs := os.Argsclient, err := clientv3.New(clientv3.Config{Endpoints: []string{"ipdizhi"}, // Replace with your etcd endpointsDialTimeout: 5 * time.Second,Username: "user",Password: "password",})if err != nil {log.Fatal(err)}defer client.Close()// Key for leader electionelectionKey := "/my-election"// Create an election sessionsession, err := concurrency.NewSession(client, concurrency.WithTTL(10))if err != nil {log.Fatal(err)}defer session.Close()election := concurrency.NewElection(session, electionKey)go compaign(election, args[1])go observe(election, args[1])for {}
}
相关文章:

etcd选举源码分析和例子
本文主要介绍etcd在分布式多节点服务中如何实现选主。 1、基础知识 在开始之前,先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。 1、version 作用域为key,表示某个key的版本,每个key刚创建的version为1&#…...

Android 网络配置
ip tables 和 ip route 是两个不同的工具,它们在不同的阶段执行不同的功能。ip route 是用来管理和控制路由表的,它决定了数据包应该从哪个网卡或网关发送出去。ip tables 是用来配置、管理和控制网络数据包的过滤、转发和转换的,它根据用户定…...

【网络通信 -- WebRTC】Open WebRTC Toolkit 环境搭建指南
【网络通信 -- WebRTC】Open WebRTC Toolkit -- OWT-Server 编译安装指南 【1】OWT Server 与 Web Demo 视频会议环境搭建 【1.1】编译 OWT Server 安装依赖 ./scripts/installDepsUnattended.sh编译 scripts/build.js -t all --check 注意若不支持硬件加速则采用如下命令 s…...

文件上传漏洞(CVE-2022-30887)
简介 多语言药房管理系统(MPMS)是用PHP和MySQL开发的,该软件的主要目的是在药房和客户之间提供一套接口,客户是该软件的主要用户。该软件有助于为药房业务创建一个综合数据库,并根据到期、产品等各种参数提供各种报告…...

LeetCode-77-组合
一:题目描述: 给定两个整数 n 和 k,返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 二:示例与提示 示例 1: 输入:n 4, k 2 输出: [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4…...
Oracle中instr,rtrim,XMLPARSE,XMLAGG,GETCLOBVAL函数的使用
1:INSTR()函数 INSTR 是一个字符串函数,用于查找子字符串在源字符串中的位置。 它的语法如下: INSTR(source_string, search_string)source_string 是源字符串,即要在其中进行搜索的字符串。search_string 是要查找的子字符串。…...

java接入apiv3微信小程序支付(以java的eladmin框架为例)
一、需要准备的资料 1.小程序AppID 如:wx2e56f5****** 2.商户号 如:1641****** 3.商户API私钥路径:什么是商户API证书?如何获取商户API证书? 获取文件如下图: 如: 本地路径:E:\Env\e…...
第19节-PhotoShop基础课程-历史记录画笔工具
文章目录 前言1.历史记录画笔工具1.从当前状态创建文档2.创建新快照 2.历史记录艺术画笔工具 前言 任何记录都会被记录下来,并且可以拍快照,从历史中恢复,特别适合艺术创作的孩子 1.历史记录画笔工具 不只是画笔,所有操作记录都…...

MongoDB常用的比较符号和一些功能符号
比较符号 results collection.find({age: {$gt: 20}})功能符号 results collection.find({name: {$regex: ^M.*}})...

网络安全(黑客)技术自学
前言 一、什么是网络安全 网络安全可以基于攻击和防御视角来分类,我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 无论网络、Web、移动、桌面、云等哪个领域,都有攻与防…...
C++ 引用
C 引用 引用变量是一个别名,也就是说,它是某个已存在变量的另一个名字。一旦把引用初始化为某个变量,就可以使用该引用名称或变量名称来指向变量。 C 引用 vs 指针 引用很容易与指针混淆,它们之间有三个主要的不同:…...

9.1.tensorRT高级(4)封装系列-自动驾驶案例项目self-driving-道路分割分析
目录 前言1. 道路分割总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。 本次课程学习 tensorRT 高级-自动驾驶案例项目self-driving-道路分…...

稳定的 Glance 来了,安卓小部件有救了!
稳定的 Glance 来了,安卓小部件有救了! 稳定版本的 Glance 终于发布了,来一起看看吧,看看这一路的旅程,看看好用么,再看看如何使用! 前世今生 故事发生在两年的一天吧,其实夸张了…...

用友U8与MES系统API接口对接案例分析
企业数字化转型:轻易云数据集成平台助力 U8 ERPMES 系统集成 为什么选择数字化转型? 领导层对企业资源规划(ERP)的深刻理解促使了数字化转型的启动。采用精确的“N5”滚动计划,为供应商提供充分的预期信息,…...

web UI自动化介绍
文章目录 一、web UI自动化介绍1.1 执行UI自动化测试前提1.2 Selenium介绍以及知识点梳理 二、Selenium 学习2.1 基础2.1.1 环境安装与基础使用2.1.2 web浏览器控制2.1.3 常见控件的八大定位方式2.1.3.1 八大定位方式介绍2.1.3.2 NAME、ID定位2.1.3.3 css_selector定位2.1.3.4 …...

小米13Pro/13Ultra刷面具ROOT后激活LSPosed框架微X模块详细教程
喜欢买小米手机,很多是因为小米手机的开放,支持root权限,而ROOT对普通用户来说更多的是刷入DIY模块功能,今天ROM乐园小编就教大家如何使用面具ROOT,实现大家日常情况下非常依赖的微X模块功能,体验微X模块的…...
文盘Rust -- 给程序加个日志 | 京东云技术团队
日志是应用程序的重要组成部分。无论是服务端程序还是客户端程序都需要日志做为错误输出或者业务记录。在这篇文章中,我们结合log4rs聊聊rust 程序中如何使用日志。 log4rs类似java生态中的log4j,使用方式也很相似 log4rs中的基本概念 log4rs 的功能组件也由 appe…...

C语言深入理解指针(非常详细)(五)
目录 回调函数qsort使用举例qsort函数的模拟实现sizeof和strlen的对比sizeofstrlensizeof和strlen的对比一道关于sizeof的题 回调函数 回调函数就是一个通过函数指针调用的函数 如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指…...

[docker]笔记-portainer的安装
1、portainer是一款可视化的容器管理软件,利用portainer可以轻松方便的管理和创建容器。portainer本身是一个容器,完全免费并且具有汉化版。本文介绍portainer的安装和使用。 2、安装好容器并配置好容器环境,可参照https://blog.csdn.net/bl…...

详解TCP/IP的三次握手和四次挥手
文章目录 前言一、TCP/IP协议的三次握手1.1 三次握手流程 二、TCP/IP的四次挥手2.1 四次挥手流程 三、主要字段3.1、标志位(Flags)3.2、序号(sequence number)3.3、确认号(acknowledgement number) 四、状态…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果
GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...

关于easyexcel动态下拉选问题处理
前些日子突然碰到一个问题,说是客户的导入文件模版想支持部分导入内容的下拉选,于是我就找了easyexcel官网寻找解决方案,并没有找到合适的方案,没办法只能自己动手并分享出来,针对Java生成Excel下拉菜单时因选项过多导…...

抽象类和接口(全)
一、抽象类 1.概念:如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象,这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法,包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中,⼀个类如果被 abs…...
Python 训练营打卡 Day 47
注意力热力图可视化 在day 46代码的基础上,对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...
全面解析数据库:从基础概念到前沿应用
在数字化时代,数据已成为企业和社会发展的核心资产,而数据库作为存储、管理和处理数据的关键工具,在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理,到社交网络的用户数据存储,再到金融行业的交易记录处理&a…...