Raft 一致性算法
Raft
Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。
一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一:
- follower(跟随者):所有节点都以follower的状态开始。如果没有收到leader的消息则会变成candidate状态。
- candidate(候选人):会向其他节点“拉选票”,如果得到大部分的票则成为leader,这个过程叫做Leader选举(Leader Election)。未当选Leader的节点会将状态转换为follower。
- leader(领导者):所有对系统的修改都会先经过leader。
raft 是 etcd 和consoul的核心算法。
Raft 一致性算法
- Raft通过选出一个leader来简化日志副本的管理,例如,日志项(log entry)只允许从leader流向follower。
- 基于leader的方法,Raft算法可以分解成三个子问题。
- Leader election(领导选举):原来的leader挂掉后,必须选出一个新的leader。
- Log replication(日志复制):leader从客户端接收日志,并复制到整个集群中。
- Safety(安全性):如果有任意的server将日志项回放到状态机中了,那么其他的server只会回放相同的日志项。
动画演示
地址:http://thesecretlivesofdata.com/raft/
网络节点必须是奇数个。
动画主要包含三部分内容:
- 第一部分介绍简单版的领导者选举和日志复制的过程
- 第二部分介绍详细版的领导者选举和日志复制过程
- 第三部分介绍如果遇到网络分区(脑裂),raft算法是如何回复网络一致的。
Leader election(领导选举)
- Raft 使用一种心跳机制来触发领导人选举
- 当服务器程序启动时,节点都是 follower(跟随者) 身份
- 如果一个跟随者在一段时间里没有接收到任何消息,也就是选举超时,然后他就会认为系统中没有可用的领导者然后开始进行选举以选出新的领导者
- 要开始一次选举过程,follower 会给当前term加1并且转换成candidate状态,然后它会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。
- 候选人的状态维持直到发生以下任何一个条件发生的时候
- 他自己赢得了这次的选举
- 其他的服务器成为领导者
- 一段时间之后没有任何一个获胜的人
Log replication(日志复制)
- 当选出 leader 后,它会开始接收客户端请求,每个请求会带有一个指令,可以被回放到状态机中
- leader 把指令追加成一个log entry,然后通过AppendEntries RPC并行地发送给其他的server,当该entry被多数server复制后,leader 会把该entry回放到状态机中,然后把结果返回给客户端
- 当 follower 宕机或者运行较慢时,leader 会无限地重发AppendEntries给这些follower,直到所有的follower都复制了该log entry
- raft的log replication要保证如果两个log entry有相同的index和term,那么它们存储相同的指令
- leader在一个特定的term和index下,只会创建一个log entry
代码
package mainimport ("os""strconv""fmt""sync""net/rpc""net/http""math/rand""time""log"
)//对每个节点id和端口的封装类型
type nodeInfo struct {id stringport string
}//声明节点对象类型Raft
type Raft struct {node nodeInfomu sync.Mutex//当前节点编号me intcurrentTerm intvotedFor intstate inttimeout intcurrentLeader int//该节点最后一次处理数据的时间lastMessageTime int64message chan booleclectCh chan boolheartbeat chan bool//子节点给主节点返回心跳信号heartbeatRe chan bool
}//声明leader对象
type Leader struct {//任期Term int//leader 编号LeaderId int
}//设置节点个数
const raftCount = 2var leader = Leader{0, -1}
//存储缓存信息
var bufferMessage = make(map[string]string)
//处理数据库信息
var mysqlMessage = make(map[string]string)
//操作消息数组下标
var messageId = 1
//用nodeTable存储每个节点中的键值对
var nodeTable map[string]stringfunc main() {//终端接收来的是数组if len(os.Args) > 1 {//接收终端输入的信息userId := os.Args[1]//字符串转换整型id, _ := strconv.Atoi(userId)fmt.Println(id)//定义节点id和端口号nodeTable = map[string]string{"1": ":8000","2": ":8001",}//封装nodeInfo对象node := nodeInfo{id: userId, port: nodeTable[userId]}//创建节点对象rf := Make(id)//确保每个新建立的节点都有端口对应//127.0.0.1:8000rf.node = node//注册rpcgo func() {//注册rpc,为了实现远程链接rf.raftRegisterRPC(node.port)}()if userId == "1" {go func() {//回调方法http.HandleFunc("/req", rf.getRequest)fmt.Println("监听8080")if err := http.ListenAndServe(":8080", nil); err != nil {fmt.Println(err)return}}()}}for {;}
}var clientWriter http.ResponseWriterfunc (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {request.ParseForm()if len(request.Form["age"]) > 0 {clientWriter = writerfmt.Println("主节点广播客户端请求age:", request.Form["age"][0])param := Param{Msg: request.Form["age"][0], MsgId: strconv.Itoa(messageId)}messageId++if leader.LeaderId == rf.me {rf.sendMessageToOtherNodes(param)} else {//将消息转发给leaderleaderId := nodeTable[strconv.Itoa(leader.LeaderId)]//连接远程rpc服务rpc, err := rpc.DialHTTP("tcp", "127.0.0.1"+leaderId)if err != nil {log.Fatal("\nrpc转发连接server错误:", leader.LeaderId, err)}var bo = false//首先给leader传递err = rpc.Call("Raft.ForwardingMessage", param, &bo)if err != nil {log.Fatal("\nrpc转发调用server错误:", leader.LeaderId, err)}}}
}func (rf *Raft) sendMessageToOtherNodes(param Param) {bufferMessage[param.MsgId] = param.Msg// 只有领导才能给其它服务器发送消息if rf.currentLeader == rf.me {var success_count = 0fmt.Printf("领导者发送数据中 。。。\n")go func() {rf.broadcast(param, "Raft.LogDataCopy", func(ok bool) {//需要其它服务端回应rf.message <- ok})}()for i := 0; i < raftCount-1; i++ {fmt.Println("等待其它服务端回应")select {case ok := <-rf.message:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()mysqlMessage[param.MsgId] = bufferMessage[param.MsgId]delete(bufferMessage, param.MsgId)if clientWriter != nil {fmt.Fprintf(clientWriter, "OK")}fmt.Printf("\n领导者发送数据结束\n")rf.mu.Unlock()}}}}}
}//注册Raft对象,注册后的目的为确保每个节点(raft) 可以远程接收
func (node *Raft) raftRegisterRPC(port string) {//注册一个服务器rpc.Register(node)//把服务绑定到http协议上rpc.HandleHTTP()err := http.ListenAndServe(port, nil)if err != nil {fmt.Println("注册rpc服务失败", err)}
}//创建节点对象
func Make(me int) *Raft {rf := &Raft{}rf.me = merf.votedFor = -1//0 follower ,1 candidate ,2 leaderrf.state = 0rf.timeout = 0rf.currentLeader = -1rf.setTerm(0)//初始化通道rf.message = make(chan bool)rf.heartbeat = make(chan bool)rf.heartbeatRe = make(chan bool)rf.eclectCh = make(chan bool)//每个节点都有选举权go rf.election()//每个节点都有心跳功能go rf.sendLeaderHeartBeat()return rf
}//选举成功后,应该广播所有的节点,本节点成为了leader
func (rf *Raft) sendLeaderHeartBeat() {for {select {case <-rf.heartbeat:rf.sendAppendEntriesImpl()}}
}func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var success_count = 0go func() {param := Param{Msg: "leader heartbeat",Arg: Leader{rf.currentTerm, rf.me}}rf.broadcast(param, "Raft.Heartbeat", func(ok bool) {rf.heartbeatRe <- ok})}()for i := 0; i < raftCount-1; i++ {select {case ok := <-rf.heartbeatRe:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()fmt.Println("接收到了子节点们的返回信息")rf.mu.Unlock()}}}}}
}func randomRange(min, max int64) int64 {//设置随机时间rand.Seed(time.Now().UnixNano())return rand.Int63n(max-min) + min
}//获得当前时间(毫秒)
func milliseconds() int64 {return time.Now().UnixNano() / int64(time.Millisecond)
}func (rf *Raft) election() {var result bool//每隔一段时间发一次心跳for {//延时时间timeout := randomRange(1500, 3000)//设置该节点最有一次处理消息的时间rf.lastMessageTime = milliseconds()select {//间隔时间为1500-3000ms的随机值case <-time.After(time.Duration(timeout) * time.Millisecond):}result = falsefor !result {//选择leaderresult = rf.election_one_round(&leader)}}
}func (rf *Raft) election_one_round(args *Leader) bool {//已经有了leader,并且不是自己,那么returnif args.LeaderId > -1 && args.LeaderId != rf.me {fmt.Printf("%d已是leader,终止%d选举\n", args.LeaderId, rf.me)return true}var timeout int64var vote intvar triggerHeartbeat booltimeout = 2000last := milliseconds()success := falserf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()fmt.Printf("candidate=%d start electing leader\n", rf.me)for {fmt.Printf("candidate=%d send request vote to server\n", rf.me)go func() {rf.broadcast(Param{Msg: "send request vote"}, "Raft.ElectingLeader", func(ok bool) {//无论成功失败都需要发送到通道 避免堵塞rf.eclectCh <- ok})}()vote = 0triggerHeartbeat = falsefor i := 0; i < raftCount-1; i++ {fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.eclectCh:if ok {vote++success = vote >= raftCount/2 || rf.currentLeader > -1if success && !triggerHeartbeat {fmt.Println("okok", args)triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()args.Term = rf.currentTerm + 1args.LeaderId = rf.merf.mu.Unlock()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)rf.heartbeat <- true}}}fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (vote >= raftCount/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(5000) * time.Millisecond):}}}fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success
}func (rf *Raft) becomeLeader() {rf.state = 2fmt.Println(rf.me, "成为了leader")rf.currentLeader = rf.me
}//设置发送参数的数据类型
type Param struct {Msg stringMsgId stringArg Leader
}func (rf *Raft) broadcast(msg Param, path string, fun func(ok bool)) {//设置不要自己给自己广播for nodeID, port := range nodeTable {if nodeID == rf.node.id {continue;}//链接远程rpcrp, err := rpc.DialHTTP("tcp", "127.0.0.1"+port)if err != nil {fun(false)continue}var bo = falseerr = rp.Call(path, msg, &bo)if err != nil {fun(false)continue}fun(bo)}}func (rf *Raft) becomeCandidate() {if rf.state == 0 || rf.currentLeader == -1 {//候选人状态rf.state = 1rf.votedFor = rf.merf.setTerm(rf.currentTerm + 1)rf.currentLeader = -1}
}func (rf *Raft) setTerm(term int) {rf.currentTerm = term
}//Rpc处理
func (rf *Raft) ElectingLeader(param Param, a *bool) error {//给leader投票*a = truerf.lastMessageTime = milliseconds()return nil
}func (rf *Raft) Heartbeat(param Param, a *bool) error {fmt.Println("\nrpc:heartbeat:", rf.me, param.Msg)if param.Arg.Term < rf.currentTerm {*a = false} else {leader = param.Argfmt.Printf("%d收到leader%d心跳\n", rf.currentLeader, leader.LeaderId)*a = truerf.mu.Lock()rf.currentLeader = leader.LeaderIdrf.votedFor = leader.LeaderIdrf.state = 0rf.lastMessageTime = milliseconds()fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)rf.mu.Unlock()}return nil
}//连接到leader节点
func (rf *Raft) ForwardingMessage(param Param, a *bool) error {fmt.Println("\nrpc:forwardingMessage:", rf.me, param.Msg)rf.sendMessageToOtherNodes(param)*a = truerf.lastMessageTime = milliseconds()return nil
}//接收leader传过来的日志
func (r *Raft) LogDataCopy(param Param, a *bool) error {fmt.Println("\nrpc:LogDataCopy:", r.me, param.Msg)bufferMessage[param.MsgId] = param.Msg*a = truereturn nil
}
Reference
老男孩 Go 5期
相关文章:
Raft 一致性算法
Raft Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。 一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一: …...
驱动程序开发:基于EC20 4G模块自动拨号联网的两种方式(GobiNet工具拨号和PPP工具拨号)
目录一、EC20 4G模块简介二、根据移远官方文档修改EC20 4G模组驱动 1、因为EC20 4G模组min-pice接口其实就是usb接口,因此需要修改Linux内核源码drivers/usb/serial/option.c文件,如下图: 2、根据USB协议的要求,需要在drive…...
Web自动化测试——常见问题篇
文章目录一、什么是自动化测试二、为啥进行自动化测试(优点)三、Webdriver 的工作原理四、显示等待和隐式等待的区别五、什么样的项目适合做自动化六、自动化测试的流程七、如何分析生成的自动化测试报告一、什么是自动化测试 所谓的自动化测试就是使用…...
快速实现Modbus TCP转BACnet IP协议的方案
一、需求背景 BACnet是用于智能楼宇自控系统的主流通信协议,可用在暖通空调系统(HVAC,包括暖气、通风、空气调节),也可以用在照明控制、门禁系统、火警侦测系统及其相关的设备。楼宇中的受控设备都通过BACnet协议连接到…...
Unity CircleLayoutGroup 如何实现一个圆形自动布局组件
文章目录简介实现原理Editor 编辑器简介 Unity中提供了三种类型的自动布局组件,分别是Grid Layou Group、Horizontal Layout Group、Vertical Layout Group,本文自定义了一个圆形的自动布局组件Circle Layout Group,如图所示: Ra…...
springcloud+nacos+gateway案例
一、先搭建好springcloudnacos项目地址:https://javazhong.blog.csdn.net/article/details/128899999二、spring cloud gateway简述Spring Cloud Gateway 是Spring Cloud家族中的一款API网关。Gateway 建立在 Spring Webflux上,目标是提供一个简洁、高效的API网关&a…...
实习这么久,你知道Maven是如何从代码仓库中找到需要的依赖吗?
目录 碎碎念 Maven是如何找到代码仓库里需要的依赖的? 如何根据坐标在本地仓库中寻找所需要的依赖? 如何根据坐标在远程仓库中寻找所需要的依赖? Maven 如何使用 HTTP 或 HTTPS 协议从远程仓库中获取依赖项,请详细解释其原理…...
低代码/零代码的快速开发框架
目前国内主流的低代码开发平台有:宜搭、简道云、明道云、云程、氚云、伙伴云、道一云、JEPaaS、华炎魔方、搭搭云、JeecgBoot 、RuoYi等。这些平台各有优劣势,定位也不同,用户可以根据自己需求选择。 一、阿里云宜搭 宜搭是阿里巴巴集团在20…...
C# 中常见的设计模式
设计模式是一套被广泛应用于软件设计的最佳实践,它们可以帮助开发者解决特定的问题,提高代码的可重用性、可读性和可维护性。本文将介绍 C# 中常见的几种设计模式,并提供相应的示例代码。 工厂模式 工厂模式是一种创建型设计模式,…...
promethues/servicemonitor
目录 1.promethues 能保证源源不断地采集/metrics 信息吗?每次都是最新的吗 2.部署servicemonitor 的作用是什么? 3.pod 部署采集数据直接上报promthues ,不通过servicemonitor 可以吗? 4.你说的"此外,如果部署…...
postman使用简介
1、介绍 postman是一款功能强大的网页调试和模拟发送HTTP请求的Chrome插件,支持几乎所有类型的HTTP请求 2、下载及安装 官方文档:https://www.getpostman.com/docs/v6/ chrome插件:chrome浏览器应用商店直接搜索添加即可(需墙&…...
@DS注解在事务中实现数据源的切换@DS在事务中失效【已解决】
在Springboot的application.yml中的配置: spring:datasource:url: jdbc:mysql://localhost:3306/test2?serverTimezoneUTC&useUnicodetrue&characterEncodingutf8driver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: rootdynamic:primar…...
Java I/O之文件系统
一、全文概览 在学习文件系统之前,需要了解下Java在I/O上的发展史:在Java7之前,打开和读取文件需要编写特别笨拙的代码,涉及到很多的InputStream、OutputStream等组合起来使用,每次在使用时或许都需要查一下文档才能记…...
Mysql元数据获取方法(information_schema绕过方法)
前提:如果waf或其它过滤了information_schema关键字,那我们该如何获取元数据呢?能够代替information_schema的有:sys.schema_auto_increment_columnssys.schema_table_statistics_with_bufferx$schema_table_statistics_with_buff…...
Eclipse快捷键
* 1.补全代码的声明:alt /* 2.快速修复: ctrl 1 * 3.批量导包:ctrl shift o* 4.使用单行注释:ctrl /* 5.使用多行注释: ctrl shift / * 6.取消多行注释:ctrl shift \* 7.复制指定行的代码:ctrl a…...
java ssm自习室选座预约系统开发springmvc
人工管理显然已无法应对时代的变化,而自习室选座预约系统开发能很好地解决这一问题,既能提高人力物力,又能提高预约选座的知名度,取代人工管理是必然趋势。 本自习室选座预约系统开发以SSM作为框架,JSP技术,…...
分享我从功能测试转型到测试开发的真实故事
由于这段时间我面试了很多家公司,也经历了之前公司的不愉快。所以我想写一篇文章来分享一下自己的面试体会。希望能对我在之后的工作或者面试中有一些帮助,也希望能帮助到正在找工作的你。 找工作 我们总是草率地进入一个自己不了解的公司工作…...
TypeScript快速入门———(二)TypeScript常用类型
文章目录概述1 类型注解2 常用基础类型概述3.原始类型4 数组类型5 类型别名6.函数类型7 对象类型8 接口9 元组10 类型推论11 类型断言12 字面量类型13 枚举14 any 类型15 typeof概述 TypeScript 是 JS 的超集,TS 提供了 JS 的所有功能,并且额外的增加了…...
Mac M1 使用Centos8➕VMware Fusion进行静态网络配置
大部分的流程网络上面都有当我们已经下载好mac m1版的Centos8链接: https://pan.baidu.com/s/1UTl4Lo-_c17s-PDj3dA6kA 提取码: 7xh2 和VMware Fusionhttps://www.vmware.com/cn/products/fusion.html之后就可以进行安装了在导入过后 记得将硬盘和内存都设置好了 记得在关机状态…...
RadGraph: Extracting Clinical Entities and Relations from Radiology Reports代码
文章来源:NeurIPS 文章类别:IE(Information Extraction) RadGraph主要基于dygie,主要文件为inference.py。 inference.py: 1、get_file_list(data_path) def get_file_list(path):file_list [item for item in glob.glob(f&q…...
华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
