Raft 一致性算法
Raft
Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。
一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一:
- follower(跟随者):所有节点都以follower的状态开始。如果没有收到leader的消息则会变成candidate状态。
- candidate(候选人):会向其他节点“拉选票”,如果得到大部分的票则成为leader,这个过程叫做Leader选举(Leader Election)。未当选Leader的节点会将状态转换为follower。
- leader(领导者):所有对系统的修改都会先经过leader。
raft 是 etcd 和consoul的核心算法。
Raft 一致性算法
- Raft通过选出一个leader来简化日志副本的管理,例如,日志项(log entry)只允许从leader流向follower。
- 基于leader的方法,Raft算法可以分解成三个子问题。
- Leader election(领导选举):原来的leader挂掉后,必须选出一个新的leader。
- Log replication(日志复制):leader从客户端接收日志,并复制到整个集群中。
- Safety(安全性):如果有任意的server将日志项回放到状态机中了,那么其他的server只会回放相同的日志项。
动画演示
地址:http://thesecretlivesofdata.com/raft/
网络节点必须是奇数个。
动画主要包含三部分内容:
- 第一部分介绍简单版的领导者选举和日志复制的过程
- 第二部分介绍详细版的领导者选举和日志复制过程
- 第三部分介绍如果遇到网络分区(脑裂),raft算法是如何回复网络一致的。
Leader election(领导选举)
- Raft 使用一种心跳机制来触发领导人选举
- 当服务器程序启动时,节点都是 follower(跟随者) 身份
- 如果一个跟随者在一段时间里没有接收到任何消息,也就是选举超时,然后他就会认为系统中没有可用的领导者然后开始进行选举以选出新的领导者
- 要开始一次选举过程,follower 会给当前term加1并且转换成candidate状态,然后它会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。
- 候选人的状态维持直到发生以下任何一个条件发生的时候
- 他自己赢得了这次的选举
- 其他的服务器成为领导者
- 一段时间之后没有任何一个获胜的人
Log replication(日志复制)
- 当选出 leader 后,它会开始接收客户端请求,每个请求会带有一个指令,可以被回放到状态机中
- leader 把指令追加成一个log entry,然后通过AppendEntries RPC并行地发送给其他的server,当该entry被多数server复制后,leader 会把该entry回放到状态机中,然后把结果返回给客户端
- 当 follower 宕机或者运行较慢时,leader 会无限地重发AppendEntries给这些follower,直到所有的follower都复制了该log entry
- raft的log replication要保证如果两个log entry有相同的index和term,那么它们存储相同的指令
- leader在一个特定的term和index下,只会创建一个log entry
代码
package mainimport ("os""strconv""fmt""sync""net/rpc""net/http""math/rand""time""log"
)//对每个节点id和端口的封装类型
type nodeInfo struct {id stringport string
}//声明节点对象类型Raft
type Raft struct {node nodeInfomu sync.Mutex//当前节点编号me intcurrentTerm intvotedFor intstate inttimeout intcurrentLeader int//该节点最后一次处理数据的时间lastMessageTime int64message chan booleclectCh chan boolheartbeat chan bool//子节点给主节点返回心跳信号heartbeatRe chan bool
}//声明leader对象
type Leader struct {//任期Term int//leader 编号LeaderId int
}//设置节点个数
const raftCount = 2var leader = Leader{0, -1}
//存储缓存信息
var bufferMessage = make(map[string]string)
//处理数据库信息
var mysqlMessage = make(map[string]string)
//操作消息数组下标
var messageId = 1
//用nodeTable存储每个节点中的键值对
var nodeTable map[string]stringfunc main() {//终端接收来的是数组if len(os.Args) > 1 {//接收终端输入的信息userId := os.Args[1]//字符串转换整型id, _ := strconv.Atoi(userId)fmt.Println(id)//定义节点id和端口号nodeTable = map[string]string{"1": ":8000","2": ":8001",}//封装nodeInfo对象node := nodeInfo{id: userId, port: nodeTable[userId]}//创建节点对象rf := Make(id)//确保每个新建立的节点都有端口对应//127.0.0.1:8000rf.node = node//注册rpcgo func() {//注册rpc,为了实现远程链接rf.raftRegisterRPC(node.port)}()if userId == "1" {go func() {//回调方法http.HandleFunc("/req", rf.getRequest)fmt.Println("监听8080")if err := http.ListenAndServe(":8080", nil); err != nil {fmt.Println(err)return}}()}}for {;}
}var clientWriter http.ResponseWriterfunc (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {request.ParseForm()if len(request.Form["age"]) > 0 {clientWriter = writerfmt.Println("主节点广播客户端请求age:", request.Form["age"][0])param := Param{Msg: request.Form["age"][0], MsgId: strconv.Itoa(messageId)}messageId++if leader.LeaderId == rf.me {rf.sendMessageToOtherNodes(param)} else {//将消息转发给leaderleaderId := nodeTable[strconv.Itoa(leader.LeaderId)]//连接远程rpc服务rpc, err := rpc.DialHTTP("tcp", "127.0.0.1"+leaderId)if err != nil {log.Fatal("\nrpc转发连接server错误:", leader.LeaderId, err)}var bo = false//首先给leader传递err = rpc.Call("Raft.ForwardingMessage", param, &bo)if err != nil {log.Fatal("\nrpc转发调用server错误:", leader.LeaderId, err)}}}
}func (rf *Raft) sendMessageToOtherNodes(param Param) {bufferMessage[param.MsgId] = param.Msg// 只有领导才能给其它服务器发送消息if rf.currentLeader == rf.me {var success_count = 0fmt.Printf("领导者发送数据中 。。。\n")go func() {rf.broadcast(param, "Raft.LogDataCopy", func(ok bool) {//需要其它服务端回应rf.message <- ok})}()for i := 0; i < raftCount-1; i++ {fmt.Println("等待其它服务端回应")select {case ok := <-rf.message:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()mysqlMessage[param.MsgId] = bufferMessage[param.MsgId]delete(bufferMessage, param.MsgId)if clientWriter != nil {fmt.Fprintf(clientWriter, "OK")}fmt.Printf("\n领导者发送数据结束\n")rf.mu.Unlock()}}}}}
}//注册Raft对象,注册后的目的为确保每个节点(raft) 可以远程接收
func (node *Raft) raftRegisterRPC(port string) {//注册一个服务器rpc.Register(node)//把服务绑定到http协议上rpc.HandleHTTP()err := http.ListenAndServe(port, nil)if err != nil {fmt.Println("注册rpc服务失败", err)}
}//创建节点对象
func Make(me int) *Raft {rf := &Raft{}rf.me = merf.votedFor = -1//0 follower ,1 candidate ,2 leaderrf.state = 0rf.timeout = 0rf.currentLeader = -1rf.setTerm(0)//初始化通道rf.message = make(chan bool)rf.heartbeat = make(chan bool)rf.heartbeatRe = make(chan bool)rf.eclectCh = make(chan bool)//每个节点都有选举权go rf.election()//每个节点都有心跳功能go rf.sendLeaderHeartBeat()return rf
}//选举成功后,应该广播所有的节点,本节点成为了leader
func (rf *Raft) sendLeaderHeartBeat() {for {select {case <-rf.heartbeat:rf.sendAppendEntriesImpl()}}
}func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var success_count = 0go func() {param := Param{Msg: "leader heartbeat",Arg: Leader{rf.currentTerm, rf.me}}rf.broadcast(param, "Raft.Heartbeat", func(ok bool) {rf.heartbeatRe <- ok})}()for i := 0; i < raftCount-1; i++ {select {case ok := <-rf.heartbeatRe:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()fmt.Println("接收到了子节点们的返回信息")rf.mu.Unlock()}}}}}
}func randomRange(min, max int64) int64 {//设置随机时间rand.Seed(time.Now().UnixNano())return rand.Int63n(max-min) + min
}//获得当前时间(毫秒)
func milliseconds() int64 {return time.Now().UnixNano() / int64(time.Millisecond)
}func (rf *Raft) election() {var result bool//每隔一段时间发一次心跳for {//延时时间timeout := randomRange(1500, 3000)//设置该节点最有一次处理消息的时间rf.lastMessageTime = milliseconds()select {//间隔时间为1500-3000ms的随机值case <-time.After(time.Duration(timeout) * time.Millisecond):}result = falsefor !result {//选择leaderresult = rf.election_one_round(&leader)}}
}func (rf *Raft) election_one_round(args *Leader) bool {//已经有了leader,并且不是自己,那么returnif args.LeaderId > -1 && args.LeaderId != rf.me {fmt.Printf("%d已是leader,终止%d选举\n", args.LeaderId, rf.me)return true}var timeout int64var vote intvar triggerHeartbeat booltimeout = 2000last := milliseconds()success := falserf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()fmt.Printf("candidate=%d start electing leader\n", rf.me)for {fmt.Printf("candidate=%d send request vote to server\n", rf.me)go func() {rf.broadcast(Param{Msg: "send request vote"}, "Raft.ElectingLeader", func(ok bool) {//无论成功失败都需要发送到通道 避免堵塞rf.eclectCh <- ok})}()vote = 0triggerHeartbeat = falsefor i := 0; i < raftCount-1; i++ {fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.eclectCh:if ok {vote++success = vote >= raftCount/2 || rf.currentLeader > -1if success && !triggerHeartbeat {fmt.Println("okok", args)triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()args.Term = rf.currentTerm + 1args.LeaderId = rf.merf.mu.Unlock()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)rf.heartbeat <- true}}}fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (vote >= raftCount/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(5000) * time.Millisecond):}}}fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success
}func (rf *Raft) becomeLeader() {rf.state = 2fmt.Println(rf.me, "成为了leader")rf.currentLeader = rf.me
}//设置发送参数的数据类型
type Param struct {Msg stringMsgId stringArg Leader
}func (rf *Raft) broadcast(msg Param, path string, fun func(ok bool)) {//设置不要自己给自己广播for nodeID, port := range nodeTable {if nodeID == rf.node.id {continue;}//链接远程rpcrp, err := rpc.DialHTTP("tcp", "127.0.0.1"+port)if err != nil {fun(false)continue}var bo = falseerr = rp.Call(path, msg, &bo)if err != nil {fun(false)continue}fun(bo)}}func (rf *Raft) becomeCandidate() {if rf.state == 0 || rf.currentLeader == -1 {//候选人状态rf.state = 1rf.votedFor = rf.merf.setTerm(rf.currentTerm + 1)rf.currentLeader = -1}
}func (rf *Raft) setTerm(term int) {rf.currentTerm = term
}//Rpc处理
func (rf *Raft) ElectingLeader(param Param, a *bool) error {//给leader投票*a = truerf.lastMessageTime = milliseconds()return nil
}func (rf *Raft) Heartbeat(param Param, a *bool) error {fmt.Println("\nrpc:heartbeat:", rf.me, param.Msg)if param.Arg.Term < rf.currentTerm {*a = false} else {leader = param.Argfmt.Printf("%d收到leader%d心跳\n", rf.currentLeader, leader.LeaderId)*a = truerf.mu.Lock()rf.currentLeader = leader.LeaderIdrf.votedFor = leader.LeaderIdrf.state = 0rf.lastMessageTime = milliseconds()fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)rf.mu.Unlock()}return nil
}//连接到leader节点
func (rf *Raft) ForwardingMessage(param Param, a *bool) error {fmt.Println("\nrpc:forwardingMessage:", rf.me, param.Msg)rf.sendMessageToOtherNodes(param)*a = truerf.lastMessageTime = milliseconds()return nil
}//接收leader传过来的日志
func (r *Raft) LogDataCopy(param Param, a *bool) error {fmt.Println("\nrpc:LogDataCopy:", r.me, param.Msg)bufferMessage[param.MsgId] = param.Msg*a = truereturn nil
}
Reference
老男孩 Go 5期
相关文章:
Raft 一致性算法
Raft Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。 一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一: …...
驱动程序开发:基于EC20 4G模块自动拨号联网的两种方式(GobiNet工具拨号和PPP工具拨号)
目录一、EC20 4G模块简介二、根据移远官方文档修改EC20 4G模组驱动 1、因为EC20 4G模组min-pice接口其实就是usb接口,因此需要修改Linux内核源码drivers/usb/serial/option.c文件,如下图: 2、根据USB协议的要求,需要在drive…...
Web自动化测试——常见问题篇
文章目录一、什么是自动化测试二、为啥进行自动化测试(优点)三、Webdriver 的工作原理四、显示等待和隐式等待的区别五、什么样的项目适合做自动化六、自动化测试的流程七、如何分析生成的自动化测试报告一、什么是自动化测试 所谓的自动化测试就是使用…...
快速实现Modbus TCP转BACnet IP协议的方案
一、需求背景 BACnet是用于智能楼宇自控系统的主流通信协议,可用在暖通空调系统(HVAC,包括暖气、通风、空气调节),也可以用在照明控制、门禁系统、火警侦测系统及其相关的设备。楼宇中的受控设备都通过BACnet协议连接到…...
Unity CircleLayoutGroup 如何实现一个圆形自动布局组件
文章目录简介实现原理Editor 编辑器简介 Unity中提供了三种类型的自动布局组件,分别是Grid Layou Group、Horizontal Layout Group、Vertical Layout Group,本文自定义了一个圆形的自动布局组件Circle Layout Group,如图所示: Ra…...
springcloud+nacos+gateway案例
一、先搭建好springcloudnacos项目地址:https://javazhong.blog.csdn.net/article/details/128899999二、spring cloud gateway简述Spring Cloud Gateway 是Spring Cloud家族中的一款API网关。Gateway 建立在 Spring Webflux上,目标是提供一个简洁、高效的API网关&a…...
实习这么久,你知道Maven是如何从代码仓库中找到需要的依赖吗?
目录 碎碎念 Maven是如何找到代码仓库里需要的依赖的? 如何根据坐标在本地仓库中寻找所需要的依赖? 如何根据坐标在远程仓库中寻找所需要的依赖? Maven 如何使用 HTTP 或 HTTPS 协议从远程仓库中获取依赖项,请详细解释其原理…...
低代码/零代码的快速开发框架
目前国内主流的低代码开发平台有:宜搭、简道云、明道云、云程、氚云、伙伴云、道一云、JEPaaS、华炎魔方、搭搭云、JeecgBoot 、RuoYi等。这些平台各有优劣势,定位也不同,用户可以根据自己需求选择。 一、阿里云宜搭 宜搭是阿里巴巴集团在20…...
C# 中常见的设计模式
设计模式是一套被广泛应用于软件设计的最佳实践,它们可以帮助开发者解决特定的问题,提高代码的可重用性、可读性和可维护性。本文将介绍 C# 中常见的几种设计模式,并提供相应的示例代码。 工厂模式 工厂模式是一种创建型设计模式,…...
promethues/servicemonitor
目录 1.promethues 能保证源源不断地采集/metrics 信息吗?每次都是最新的吗 2.部署servicemonitor 的作用是什么? 3.pod 部署采集数据直接上报promthues ,不通过servicemonitor 可以吗? 4.你说的"此外,如果部署…...
postman使用简介
1、介绍 postman是一款功能强大的网页调试和模拟发送HTTP请求的Chrome插件,支持几乎所有类型的HTTP请求 2、下载及安装 官方文档:https://www.getpostman.com/docs/v6/ chrome插件:chrome浏览器应用商店直接搜索添加即可(需墙&…...
@DS注解在事务中实现数据源的切换@DS在事务中失效【已解决】
在Springboot的application.yml中的配置: spring:datasource:url: jdbc:mysql://localhost:3306/test2?serverTimezoneUTC&useUnicodetrue&characterEncodingutf8driver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: rootdynamic:primar…...
Java I/O之文件系统
一、全文概览 在学习文件系统之前,需要了解下Java在I/O上的发展史:在Java7之前,打开和读取文件需要编写特别笨拙的代码,涉及到很多的InputStream、OutputStream等组合起来使用,每次在使用时或许都需要查一下文档才能记…...
Mysql元数据获取方法(information_schema绕过方法)
前提:如果waf或其它过滤了information_schema关键字,那我们该如何获取元数据呢?能够代替information_schema的有:sys.schema_auto_increment_columnssys.schema_table_statistics_with_bufferx$schema_table_statistics_with_buff…...
Eclipse快捷键
* 1.补全代码的声明:alt /* 2.快速修复: ctrl 1 * 3.批量导包:ctrl shift o* 4.使用单行注释:ctrl /* 5.使用多行注释: ctrl shift / * 6.取消多行注释:ctrl shift \* 7.复制指定行的代码:ctrl a…...
java ssm自习室选座预约系统开发springmvc
人工管理显然已无法应对时代的变化,而自习室选座预约系统开发能很好地解决这一问题,既能提高人力物力,又能提高预约选座的知名度,取代人工管理是必然趋势。 本自习室选座预约系统开发以SSM作为框架,JSP技术,…...
分享我从功能测试转型到测试开发的真实故事
由于这段时间我面试了很多家公司,也经历了之前公司的不愉快。所以我想写一篇文章来分享一下自己的面试体会。希望能对我在之后的工作或者面试中有一些帮助,也希望能帮助到正在找工作的你。 找工作 我们总是草率地进入一个自己不了解的公司工作…...
TypeScript快速入门———(二)TypeScript常用类型
文章目录概述1 类型注解2 常用基础类型概述3.原始类型4 数组类型5 类型别名6.函数类型7 对象类型8 接口9 元组10 类型推论11 类型断言12 字面量类型13 枚举14 any 类型15 typeof概述 TypeScript 是 JS 的超集,TS 提供了 JS 的所有功能,并且额外的增加了…...
Mac M1 使用Centos8➕VMware Fusion进行静态网络配置
大部分的流程网络上面都有当我们已经下载好mac m1版的Centos8链接: https://pan.baidu.com/s/1UTl4Lo-_c17s-PDj3dA6kA 提取码: 7xh2 和VMware Fusionhttps://www.vmware.com/cn/products/fusion.html之后就可以进行安装了在导入过后 记得将硬盘和内存都设置好了 记得在关机状态…...
RadGraph: Extracting Clinical Entities and Relations from Radiology Reports代码
文章来源:NeurIPS 文章类别:IE(Information Extraction) RadGraph主要基于dygie,主要文件为inference.py。 inference.py: 1、get_file_list(data_path) def get_file_list(path):file_list [item for item in glob.glob(f&q…...
告别小白屏!树莓派3.5寸/5寸屏幕驱动安装全攻略(含HDMI/GPIO款区分与镜像下载)
树莓派外接屏幕终极指南:从驱动安装到故障排查一站式解决 树莓派爱好者们常常会遇到一个令人头疼的问题——当你兴冲冲地连接上一块3.5寸或5寸的小屏幕,期待立即开始项目开发时,迎接你的却是一片刺眼的白屏。这种情况在非官方屏幕中尤为常见&…...
OpenClaw+百川2-13B-4bits:自媒体人的内容创作流水线搭建
OpenClaw百川2-13B-4bits:自媒体人的内容创作流水线搭建 1. 为什么需要自动化内容流水线 作为一个长期运营科技类自媒体的创作者,我每天需要完成热点追踪、大纲构思、初稿撰写、排版发布等一系列重复性工作。最痛苦的不是写作本身,而是大量…...
深度学习训练中loss震荡与不收敛的常见原因及实战调优策略
1. 为什么你的模型loss像过山车?先看懂这些典型症状 第一次打开TensorBoard看到自己的loss曲线像心电图一样上蹿下跳,那种感觉就像新手司机开车时方向盘失控。其实loss震荡和不收敛是深度学习中再常见不过的问题,但不同表现背后藏着完全不同的…...
手把手教你用STM32CubeMX配置LCD1602显示:HAL库驱动移植+Proteus 8.12仿真
STM32CubeMX与Proteus联合开发:LCD1602显示实战指南 在嵌入式开发领域,STM32CubeMX和Proteus的组合为开发者提供了从硬件配置到软件仿真的完整解决方案。本文将深入探讨如何利用这两个工具链实现LCD1602液晶显示屏的驱动与显示功能,特别针对从…...
计算机毕业设计springboot研友帮系统设计与实现 基于SpringBoot的考研互助社区平台开发与实现 SpringBoot框架下研究生学术协作系统的设计与应用
计算机毕业设计springboot研友帮系统设计与实现w2zpm5oh (配套有源码 程序 mysql数据库 论文) 本套源码可以在文本联xi,先看具体系统功能演示视频领取,可分享源码参考。 随着研究生招生规模的持续扩大,考研竞争日益激烈࿰…...
从猫狗识别到工业质检:深入理解PyTorch中的sample_weight,让模型更‘关注’关键样本
从猫狗识别到工业质检:深入理解PyTorch中的sample_weight,让模型更‘关注’关键样本 在工业质检和医疗影像分析中,某些样本的误判代价可能比其他样本高出一个数量级。想象一下,在半导体缺陷检测中漏判一个微小裂纹,或在…...
LuatOS扩展库API——【airlbs 】airlbs 定位服务
LuatOS 是物联网终端开发的常用工具,为轻量级嵌入式 Lua 脚本运行框架兼实时系统,基于 Lua 5.3 深度优化,适配 4G-Cat.1、MCU 等物联网终端硬件。其以 Lua 脚本开发,采用协程多任务架构,配套完善开发资源,含…...
告别特征点!FAST-LIVO2的‘直接法’融合:如何用原始点云和图像块实现更快的SLAM?
FAST-LIVO2:直接法SLAM的革命性突破与工程实践指南 1. 直接法SLAM的技术演进与核心价值 当波士顿动力的Atlas机器人完成后空翻动作时,其核心定位系统正面临着与人类体操运动员相似的挑战——如何在高速运动中维持对环境的精确感知。这正是FAST-LIVO2这类…...
基于FreeSWITCH ESL构建高并发智能客服系统的实战指南
在构建智能客服系统时,通信层的稳定与高效是基石。传统的WebSocket或直接SIP处理在高并发场景下,常常面临连接管理复杂、事件处理混乱、资源消耗大等问题。FreeSWITCH作为成熟的软交换平台,其ESL(Event Socket Library)…...
智能写作工坊:OpenClaw+Qwen3.5-9B辅助小说创作
智能写作工坊:OpenClawQwen3.5-9B辅助小说创作 1. 为什么需要AI辅助写作? 作为一个业余小说创作者,我长期面临三个核心痛点:世界观设定碎片化、人物关系维护困难和情节发展缺乏新意。传统写作软件如Scrivener虽然提供了素材管理…...
