Raft 一致性算法
Raft
Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。
一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一:
- follower(跟随者):所有节点都以follower的状态开始。如果没有收到leader的消息则会变成candidate状态。
- candidate(候选人):会向其他节点“拉选票”,如果得到大部分的票则成为leader,这个过程叫做Leader选举(Leader Election)。未当选Leader的节点会将状态转换为follower。
- leader(领导者):所有对系统的修改都会先经过leader。
raft 是 etcd 和consoul的核心算法。
Raft 一致性算法
- Raft通过选出一个leader来简化日志副本的管理,例如,日志项(log entry)只允许从leader流向follower。
- 基于leader的方法,Raft算法可以分解成三个子问题。
- Leader election(领导选举):原来的leader挂掉后,必须选出一个新的leader。
- Log replication(日志复制):leader从客户端接收日志,并复制到整个集群中。
- Safety(安全性):如果有任意的server将日志项回放到状态机中了,那么其他的server只会回放相同的日志项。
动画演示
地址:http://thesecretlivesofdata.com/raft/
网络节点必须是奇数个。
动画主要包含三部分内容:
- 第一部分介绍简单版的领导者选举和日志复制的过程
- 第二部分介绍详细版的领导者选举和日志复制过程
- 第三部分介绍如果遇到网络分区(脑裂),raft算法是如何回复网络一致的。
Leader election(领导选举)
- Raft 使用一种心跳机制来触发领导人选举
- 当服务器程序启动时,节点都是 follower(跟随者) 身份
- 如果一个跟随者在一段时间里没有接收到任何消息,也就是选举超时,然后他就会认为系统中没有可用的领导者然后开始进行选举以选出新的领导者
- 要开始一次选举过程,follower 会给当前term加1并且转换成candidate状态,然后它会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。
- 候选人的状态维持直到发生以下任何一个条件发生的时候
- 他自己赢得了这次的选举
- 其他的服务器成为领导者
- 一段时间之后没有任何一个获胜的人
Log replication(日志复制)
- 当选出 leader 后,它会开始接收客户端请求,每个请求会带有一个指令,可以被回放到状态机中
- leader 把指令追加成一个log entry,然后通过AppendEntries RPC并行地发送给其他的server,当该entry被多数server复制后,leader 会把该entry回放到状态机中,然后把结果返回给客户端
- 当 follower 宕机或者运行较慢时,leader 会无限地重发AppendEntries给这些follower,直到所有的follower都复制了该log entry
- raft的log replication要保证如果两个log entry有相同的index和term,那么它们存储相同的指令
- leader在一个特定的term和index下,只会创建一个log entry
代码
package mainimport ("os""strconv""fmt""sync""net/rpc""net/http""math/rand""time""log"
)//对每个节点id和端口的封装类型
type nodeInfo struct {id stringport string
}//声明节点对象类型Raft
type Raft struct {node nodeInfomu sync.Mutex//当前节点编号me intcurrentTerm intvotedFor intstate inttimeout intcurrentLeader int//该节点最后一次处理数据的时间lastMessageTime int64message chan booleclectCh chan boolheartbeat chan bool//子节点给主节点返回心跳信号heartbeatRe chan bool
}//声明leader对象
type Leader struct {//任期Term int//leader 编号LeaderId int
}//设置节点个数
const raftCount = 2var leader = Leader{0, -1}
//存储缓存信息
var bufferMessage = make(map[string]string)
//处理数据库信息
var mysqlMessage = make(map[string]string)
//操作消息数组下标
var messageId = 1
//用nodeTable存储每个节点中的键值对
var nodeTable map[string]stringfunc main() {//终端接收来的是数组if len(os.Args) > 1 {//接收终端输入的信息userId := os.Args[1]//字符串转换整型id, _ := strconv.Atoi(userId)fmt.Println(id)//定义节点id和端口号nodeTable = map[string]string{"1": ":8000","2": ":8001",}//封装nodeInfo对象node := nodeInfo{id: userId, port: nodeTable[userId]}//创建节点对象rf := Make(id)//确保每个新建立的节点都有端口对应//127.0.0.1:8000rf.node = node//注册rpcgo func() {//注册rpc,为了实现远程链接rf.raftRegisterRPC(node.port)}()if userId == "1" {go func() {//回调方法http.HandleFunc("/req", rf.getRequest)fmt.Println("监听8080")if err := http.ListenAndServe(":8080", nil); err != nil {fmt.Println(err)return}}()}}for {;}
}var clientWriter http.ResponseWriterfunc (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {request.ParseForm()if len(request.Form["age"]) > 0 {clientWriter = writerfmt.Println("主节点广播客户端请求age:", request.Form["age"][0])param := Param{Msg: request.Form["age"][0], MsgId: strconv.Itoa(messageId)}messageId++if leader.LeaderId == rf.me {rf.sendMessageToOtherNodes(param)} else {//将消息转发给leaderleaderId := nodeTable[strconv.Itoa(leader.LeaderId)]//连接远程rpc服务rpc, err := rpc.DialHTTP("tcp", "127.0.0.1"+leaderId)if err != nil {log.Fatal("\nrpc转发连接server错误:", leader.LeaderId, err)}var bo = false//首先给leader传递err = rpc.Call("Raft.ForwardingMessage", param, &bo)if err != nil {log.Fatal("\nrpc转发调用server错误:", leader.LeaderId, err)}}}
}func (rf *Raft) sendMessageToOtherNodes(param Param) {bufferMessage[param.MsgId] = param.Msg// 只有领导才能给其它服务器发送消息if rf.currentLeader == rf.me {var success_count = 0fmt.Printf("领导者发送数据中 。。。\n")go func() {rf.broadcast(param, "Raft.LogDataCopy", func(ok bool) {//需要其它服务端回应rf.message <- ok})}()for i := 0; i < raftCount-1; i++ {fmt.Println("等待其它服务端回应")select {case ok := <-rf.message:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()mysqlMessage[param.MsgId] = bufferMessage[param.MsgId]delete(bufferMessage, param.MsgId)if clientWriter != nil {fmt.Fprintf(clientWriter, "OK")}fmt.Printf("\n领导者发送数据结束\n")rf.mu.Unlock()}}}}}
}//注册Raft对象,注册后的目的为确保每个节点(raft) 可以远程接收
func (node *Raft) raftRegisterRPC(port string) {//注册一个服务器rpc.Register(node)//把服务绑定到http协议上rpc.HandleHTTP()err := http.ListenAndServe(port, nil)if err != nil {fmt.Println("注册rpc服务失败", err)}
}//创建节点对象
func Make(me int) *Raft {rf := &Raft{}rf.me = merf.votedFor = -1//0 follower ,1 candidate ,2 leaderrf.state = 0rf.timeout = 0rf.currentLeader = -1rf.setTerm(0)//初始化通道rf.message = make(chan bool)rf.heartbeat = make(chan bool)rf.heartbeatRe = make(chan bool)rf.eclectCh = make(chan bool)//每个节点都有选举权go rf.election()//每个节点都有心跳功能go rf.sendLeaderHeartBeat()return rf
}//选举成功后,应该广播所有的节点,本节点成为了leader
func (rf *Raft) sendLeaderHeartBeat() {for {select {case <-rf.heartbeat:rf.sendAppendEntriesImpl()}}
}func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var success_count = 0go func() {param := Param{Msg: "leader heartbeat",Arg: Leader{rf.currentTerm, rf.me}}rf.broadcast(param, "Raft.Heartbeat", func(ok bool) {rf.heartbeatRe <- ok})}()for i := 0; i < raftCount-1; i++ {select {case ok := <-rf.heartbeatRe:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()fmt.Println("接收到了子节点们的返回信息")rf.mu.Unlock()}}}}}
}func randomRange(min, max int64) int64 {//设置随机时间rand.Seed(time.Now().UnixNano())return rand.Int63n(max-min) + min
}//获得当前时间(毫秒)
func milliseconds() int64 {return time.Now().UnixNano() / int64(time.Millisecond)
}func (rf *Raft) election() {var result bool//每隔一段时间发一次心跳for {//延时时间timeout := randomRange(1500, 3000)//设置该节点最有一次处理消息的时间rf.lastMessageTime = milliseconds()select {//间隔时间为1500-3000ms的随机值case <-time.After(time.Duration(timeout) * time.Millisecond):}result = falsefor !result {//选择leaderresult = rf.election_one_round(&leader)}}
}func (rf *Raft) election_one_round(args *Leader) bool {//已经有了leader,并且不是自己,那么returnif args.LeaderId > -1 && args.LeaderId != rf.me {fmt.Printf("%d已是leader,终止%d选举\n", args.LeaderId, rf.me)return true}var timeout int64var vote intvar triggerHeartbeat booltimeout = 2000last := milliseconds()success := falserf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()fmt.Printf("candidate=%d start electing leader\n", rf.me)for {fmt.Printf("candidate=%d send request vote to server\n", rf.me)go func() {rf.broadcast(Param{Msg: "send request vote"}, "Raft.ElectingLeader", func(ok bool) {//无论成功失败都需要发送到通道 避免堵塞rf.eclectCh <- ok})}()vote = 0triggerHeartbeat = falsefor i := 0; i < raftCount-1; i++ {fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.eclectCh:if ok {vote++success = vote >= raftCount/2 || rf.currentLeader > -1if success && !triggerHeartbeat {fmt.Println("okok", args)triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()args.Term = rf.currentTerm + 1args.LeaderId = rf.merf.mu.Unlock()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)rf.heartbeat <- true}}}fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (vote >= raftCount/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(5000) * time.Millisecond):}}}fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success
}func (rf *Raft) becomeLeader() {rf.state = 2fmt.Println(rf.me, "成为了leader")rf.currentLeader = rf.me
}//设置发送参数的数据类型
type Param struct {Msg stringMsgId stringArg Leader
}func (rf *Raft) broadcast(msg Param, path string, fun func(ok bool)) {//设置不要自己给自己广播for nodeID, port := range nodeTable {if nodeID == rf.node.id {continue;}//链接远程rpcrp, err := rpc.DialHTTP("tcp", "127.0.0.1"+port)if err != nil {fun(false)continue}var bo = falseerr = rp.Call(path, msg, &bo)if err != nil {fun(false)continue}fun(bo)}}func (rf *Raft) becomeCandidate() {if rf.state == 0 || rf.currentLeader == -1 {//候选人状态rf.state = 1rf.votedFor = rf.merf.setTerm(rf.currentTerm + 1)rf.currentLeader = -1}
}func (rf *Raft) setTerm(term int) {rf.currentTerm = term
}//Rpc处理
func (rf *Raft) ElectingLeader(param Param, a *bool) error {//给leader投票*a = truerf.lastMessageTime = milliseconds()return nil
}func (rf *Raft) Heartbeat(param Param, a *bool) error {fmt.Println("\nrpc:heartbeat:", rf.me, param.Msg)if param.Arg.Term < rf.currentTerm {*a = false} else {leader = param.Argfmt.Printf("%d收到leader%d心跳\n", rf.currentLeader, leader.LeaderId)*a = truerf.mu.Lock()rf.currentLeader = leader.LeaderIdrf.votedFor = leader.LeaderIdrf.state = 0rf.lastMessageTime = milliseconds()fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)rf.mu.Unlock()}return nil
}//连接到leader节点
func (rf *Raft) ForwardingMessage(param Param, a *bool) error {fmt.Println("\nrpc:forwardingMessage:", rf.me, param.Msg)rf.sendMessageToOtherNodes(param)*a = truerf.lastMessageTime = milliseconds()return nil
}//接收leader传过来的日志
func (r *Raft) LogDataCopy(param Param, a *bool) error {fmt.Println("\nrpc:LogDataCopy:", r.me, param.Msg)bufferMessage[param.MsgId] = param.Msg*a = truereturn nil
}
Reference
老男孩 Go 5期
相关文章:
Raft 一致性算法
Raft Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。 一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一: …...
驱动程序开发:基于EC20 4G模块自动拨号联网的两种方式(GobiNet工具拨号和PPP工具拨号)
目录一、EC20 4G模块简介二、根据移远官方文档修改EC20 4G模组驱动 1、因为EC20 4G模组min-pice接口其实就是usb接口,因此需要修改Linux内核源码drivers/usb/serial/option.c文件,如下图: 2、根据USB协议的要求,需要在drive…...
Web自动化测试——常见问题篇
文章目录一、什么是自动化测试二、为啥进行自动化测试(优点)三、Webdriver 的工作原理四、显示等待和隐式等待的区别五、什么样的项目适合做自动化六、自动化测试的流程七、如何分析生成的自动化测试报告一、什么是自动化测试 所谓的自动化测试就是使用…...
快速实现Modbus TCP转BACnet IP协议的方案
一、需求背景 BACnet是用于智能楼宇自控系统的主流通信协议,可用在暖通空调系统(HVAC,包括暖气、通风、空气调节),也可以用在照明控制、门禁系统、火警侦测系统及其相关的设备。楼宇中的受控设备都通过BACnet协议连接到…...
Unity CircleLayoutGroup 如何实现一个圆形自动布局组件
文章目录简介实现原理Editor 编辑器简介 Unity中提供了三种类型的自动布局组件,分别是Grid Layou Group、Horizontal Layout Group、Vertical Layout Group,本文自定义了一个圆形的自动布局组件Circle Layout Group,如图所示: Ra…...
springcloud+nacos+gateway案例
一、先搭建好springcloudnacos项目地址:https://javazhong.blog.csdn.net/article/details/128899999二、spring cloud gateway简述Spring Cloud Gateway 是Spring Cloud家族中的一款API网关。Gateway 建立在 Spring Webflux上,目标是提供一个简洁、高效的API网关&a…...
实习这么久,你知道Maven是如何从代码仓库中找到需要的依赖吗?
目录 碎碎念 Maven是如何找到代码仓库里需要的依赖的? 如何根据坐标在本地仓库中寻找所需要的依赖? 如何根据坐标在远程仓库中寻找所需要的依赖? Maven 如何使用 HTTP 或 HTTPS 协议从远程仓库中获取依赖项,请详细解释其原理…...
低代码/零代码的快速开发框架
目前国内主流的低代码开发平台有:宜搭、简道云、明道云、云程、氚云、伙伴云、道一云、JEPaaS、华炎魔方、搭搭云、JeecgBoot 、RuoYi等。这些平台各有优劣势,定位也不同,用户可以根据自己需求选择。 一、阿里云宜搭 宜搭是阿里巴巴集团在20…...
C# 中常见的设计模式
设计模式是一套被广泛应用于软件设计的最佳实践,它们可以帮助开发者解决特定的问题,提高代码的可重用性、可读性和可维护性。本文将介绍 C# 中常见的几种设计模式,并提供相应的示例代码。 工厂模式 工厂模式是一种创建型设计模式,…...
promethues/servicemonitor
目录 1.promethues 能保证源源不断地采集/metrics 信息吗?每次都是最新的吗 2.部署servicemonitor 的作用是什么? 3.pod 部署采集数据直接上报promthues ,不通过servicemonitor 可以吗? 4.你说的"此外,如果部署…...
postman使用简介
1、介绍 postman是一款功能强大的网页调试和模拟发送HTTP请求的Chrome插件,支持几乎所有类型的HTTP请求 2、下载及安装 官方文档:https://www.getpostman.com/docs/v6/ chrome插件:chrome浏览器应用商店直接搜索添加即可(需墙&…...
@DS注解在事务中实现数据源的切换@DS在事务中失效【已解决】
在Springboot的application.yml中的配置: spring:datasource:url: jdbc:mysql://localhost:3306/test2?serverTimezoneUTC&useUnicodetrue&characterEncodingutf8driver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: rootdynamic:primar…...
Java I/O之文件系统
一、全文概览 在学习文件系统之前,需要了解下Java在I/O上的发展史:在Java7之前,打开和读取文件需要编写特别笨拙的代码,涉及到很多的InputStream、OutputStream等组合起来使用,每次在使用时或许都需要查一下文档才能记…...
Mysql元数据获取方法(information_schema绕过方法)
前提:如果waf或其它过滤了information_schema关键字,那我们该如何获取元数据呢?能够代替information_schema的有:sys.schema_auto_increment_columnssys.schema_table_statistics_with_bufferx$schema_table_statistics_with_buff…...
Eclipse快捷键
* 1.补全代码的声明:alt /* 2.快速修复: ctrl 1 * 3.批量导包:ctrl shift o* 4.使用单行注释:ctrl /* 5.使用多行注释: ctrl shift / * 6.取消多行注释:ctrl shift \* 7.复制指定行的代码:ctrl a…...
java ssm自习室选座预约系统开发springmvc
人工管理显然已无法应对时代的变化,而自习室选座预约系统开发能很好地解决这一问题,既能提高人力物力,又能提高预约选座的知名度,取代人工管理是必然趋势。 本自习室选座预约系统开发以SSM作为框架,JSP技术,…...
分享我从功能测试转型到测试开发的真实故事
由于这段时间我面试了很多家公司,也经历了之前公司的不愉快。所以我想写一篇文章来分享一下自己的面试体会。希望能对我在之后的工作或者面试中有一些帮助,也希望能帮助到正在找工作的你。 找工作 我们总是草率地进入一个自己不了解的公司工作…...
TypeScript快速入门———(二)TypeScript常用类型
文章目录概述1 类型注解2 常用基础类型概述3.原始类型4 数组类型5 类型别名6.函数类型7 对象类型8 接口9 元组10 类型推论11 类型断言12 字面量类型13 枚举14 any 类型15 typeof概述 TypeScript 是 JS 的超集,TS 提供了 JS 的所有功能,并且额外的增加了…...
Mac M1 使用Centos8➕VMware Fusion进行静态网络配置
大部分的流程网络上面都有当我们已经下载好mac m1版的Centos8链接: https://pan.baidu.com/s/1UTl4Lo-_c17s-PDj3dA6kA 提取码: 7xh2 和VMware Fusionhttps://www.vmware.com/cn/products/fusion.html之后就可以进行安装了在导入过后 记得将硬盘和内存都设置好了 记得在关机状态…...
RadGraph: Extracting Clinical Entities and Relations from Radiology Reports代码
文章来源:NeurIPS 文章类别:IE(Information Extraction) RadGraph主要基于dygie,主要文件为inference.py。 inference.py: 1、get_file_list(data_path) def get_file_list(path):file_list [item for item in glob.glob(f&q…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...
算法笔记2
1.字符串拼接最好用StringBuilder,不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...
VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP
编辑-虚拟网络编辑器-更改设置 选择桥接模式,然后找到相应的网卡(可以查看自己本机的网络连接) windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置,选择刚才配置的桥接模式 静态ip设置: 我用的ubuntu24桌…...
Java数值运算常见陷阱与规避方法
整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
