当前位置: 首页 > news >正文

从0-1实现简易Raft分布式共识算法

一、Raft前置简介

Raft目前是最著名的分布式共识性算法,被广泛的应用在各种分布式框架、组件中,如Redis、RocketMq、Kafka、Nacos(CP)等

根据Raft论文,可将Raft拆分为如下4个功能模块

  • 领导者选举
  • 日志同步、心跳
  • 持久化
  • 日志压缩,快照(本文未实现)

这4个模块彼此并不完全独立,如日志的同步情况左右着领导者选举,快照也影响着日志同步等等;为了前后的递进性,对于一些功能的实现,可能会出现改动和优化,比如日志同步实现后,在数据持久化部分又会对同步做一些优化,提高主、从节点日志冲突解决的性能。

这里就不再过多的介绍了,看本文之前请先简单了解一下Raft算法,提供如下资料:

  • Raft算法在线动画演示
  • 图解分布式共识算法Paxos协议
  • 浅谈分布式一致性:Raft 与 SOFAJRaft
  • 深入剖析共识性算法 Raft
  • 深入解读Raft算法与etcd工程实现
  • Raft一致性算法论文-译文
  • SOFA-JRaft:蚂蚁金服的Raft算法实现库(JAVA版)

本文实现不完全和Raft论文一致,做了不少改动,核心思想不变,请悉知!!

二、功能流程简介

你看完上述资料,应该对Raft有一个基本了解了,本文我们实现了一个Raft算法下的简易版的KV存储,我将它拆分成一下几个角色:

RPC模块:复制各节点间的信息传递,如心跳、日志、选举等等

节点模块:节点有三种状态leader、follow、candidate,每种状态下所要做的事是不一样的

状态机:负责节点状态的变更,日志持久化一致性处理,投票一致性处理

定时任务:leader需要定时发送心跳,follow需要定时检测leader是否存活等

日志模块:日志需要持久化在本地文件,还需要给其他节点同步

以上几个角色相互配合,实现以下几个主要功能流程:

1.选举流程

实现细节下面深究,这里暂不过多介绍,简单了解一下大致流程,大体就是:

  1. Follow节点发现Leader节点挂了,则升级为Candidate节点发起投票
  2. 其他Follow节点收到投票请求后,根据条件判断是否投票给它,True或者False
  3. Candidate一旦收到的投票通过请求过半,则升级为Leader
  4. 升级Leader后发送心跳,阻止其他Follow变成Candidate

在这里插入图片描述

2.心跳流程

注意:这里和原文有区别,我将心跳和日志做了拆分,不再耦合了,因为我觉得在没有客户端请求的情况下,记录这些心跳日志没有意义,在没有数据日志或者说数据日志水平都是一样的情况下,谁做Leader我觉得都OK

实现细节下面深究,这里暂不过多介绍,简单了解一下大致流程,大体就是:

  1. Leader会定时发送心跳请求给Follow,告诉它我还活着,防止它篡位
  2. Follow收到心跳后返回一个心跳响应
  3. Leader收到的心跳响应没有过半则自动降级成为Follow停止对外服务

(为什么要心跳响应,还要自动降级?后面咱们细说)

在这里插入图片描述

3.KV客户端请求流程

因为我们要做的是一个简易版KV嘛,那肯定有客户端发送命令嘛对不对:

  1. 客户端发送SET或者GET命令,集群返回成功或者数据
  2. 发送SET命令,只有Leader会处理,同步给其他Follow,然后根据结果返回成功还是失败
  3. 发送GET命令,目前也只有Leader会处理,返回对应数据,没有就null(GET没有日志产生)

节点间日志的同步持久化后面细说,这里也看的出来为什么分布式体系下CAP不能共存了,你想要高可用,性能好,就必须在请求leader刷盘成功后返回甚至异步刷盘,这就必然导致可能存在数据丢失或者主从数据不一致的情况,如果你想要一致性,就必须在节点日志都同步完成后才返回(下面我们将日志同步流程)

在这里插入图片描述

4.日志同步流程

上面说过了,我们将心跳和日志做了拆分,只有客户端请求SET命令才会产生日志

  1. Leader收到客户端请求后,先预提交到内存中,后发送预提交命令给所有Follow
  2. Follow收到Leader的预提交命令同样先提交到内存,然后响应Leader
  3. Leader一旦收到超过半数的Follow响应则执行刷盘持久化,否则给客户端响应失败
  4. Leader刷盘成功后,给所有Follow发送刷盘请求,然后给客户端响应成功(无需关心Follow刷盘结果)

这就是很典型的CP流程,保持了一致性和数据不丢失,但大大降低了性能(发现没有尽管这样做,依旧可能存在Follow数据丢失的情况,比如:我是新加入的Follow节点、Follow节点刷盘失败等等情况,那该怎么办呢,我们下面接着来补充)

在这里插入图片描述

5.日志校验流程

正如上所说,日志依旧存在丢失的风险,我们需要做一个日志校验定时任务,定时校验日志是否丢失,由于这个和日志的设计息息相关,所以我们后面在细说,这里简单过一遍流程

  1. follow会有一个定时任务,定时Check日志文件,寻找缺失的日志
  2. 如果有则拿到缺失的日志发送拉取请求到Leader,获取对应的日志
  3. 然后填充进日志文件,这样就一定保持了和Leader日志数据对齐了

难道每次都要从头到尾扫描一次文件吗?当然不是,扫描过的不需要扫描,有checkPoint,每次只是从checkPoint扫描到lastLogIndex

在这里插入图片描述

三、模块简介

1.RPC模块

这里我们采用Netty框架来做,每个节点即是Client又是Server

按原Raft算法来说,一共有以下几种RPC类型的通信:

RequestVote RPC - 请求投票 RPC,由 Candidate 在选举期间发起。
AppendEntries RPC - 附加条目 RPC,由 Leader 发起,用来复制日志和提供一种心跳机制。

但是我将它进行了一个拆分,拆分的更细了:

  • RequestVoteRPC-请求投票 RPC,由 Candidate 在选举期间发起。
  • RequestVoteResult-投票响应RPC,由follow投票
  • HeartBeatRequest-心跳RPC,由leader定时不间断发起
  • HeartBeatResult-心跳响应RPC,由follow响应
  • AppendEntriesPreCommit-日志预提交RPC,由leader发起预提交
  • AppendEntriesPreCommitResult-日志预提交响应RPC,由follow响应
  • AppendEntriesCommit-日志提交RPC,预提交成功后,leader会发起真正提交的命令
  • LogIndexPull-日志拉取RPC,follow定时检测发现自身存在日志丢失,向leader主动拉取日志
  • LogIndexPullResult-日志拉取响应RPC,leader发现follow存在日志缺失,把日志发给follow
  • ClientRequest-客户端请求RPC,KV存储的客户端,向集群发出的命令
  • ClientResponse-客户端请求响应RPC,对客户端的响应

分别对应着一个实体类:

在这里插入图片描述

RPC整体的编解码设计,序列化等等,都和我之前写的RPC框架差不多,这里就不在过多的介绍了,有兴趣可以看看我的:如何从0-1手写一个RPC框架

这里只介绍一下相比原来做出的调整,原来RPC框架传输的实体是固定的,而现在多了很多,而且大量涉及到同步请求返回,所以相比原来新增了泛型的处理,如下示例,两行代码就搞定了一次请求:

RpcSession<ClientResponse, ClientRequest> rpcSession = RpcSessionFactory.<ClientResponse, ClientRequest>openSession(serverConfig, clientRequest);
ClientResponse clientResponse = rpcSession.<ClientResponse>syncSend(4000L);

同时支持:同步等待、超时等待、异步三种请求方式:

public interface RpcSession<R,T>{<R> R syncSend();<R> R syncSend(long timeout);void asyncSend();
}

感兴趣的建议自己看看,RPC所在目录和Netty所有Handler如下:

在这里插入图片描述

2.节点模块

节点有三种类型,leader、candidate、follow,所以我这抽象出一个节点接口,三种实现,一个统一对外服务,一个全局节点信息类

一个节点接口

public interface RaftNode {/** 客户端的请求,会产生日志 : 只有leader才会处理,follow返回leader地址,candidate拒绝 */ClientResponse clientRequestHandler(ClientRequest command,List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException;/** leader发来的log预处理:会先缓存 */AppendEntriesPreCommitResult logPreCommitHandler(AppendEntriesPreCommit appendEntriesPreCommit);/** leader发来的log提交请求 */void logCommitHandler(AppendEntriesCommit appendEntriesCommit);/** follow发来的log拉取请求 */LogIndexPullResult sendLogPullRequest(List<Long> pullLogIndex);/** leader要处理follow的拉取请求 */LogIndexPullResult logPullRequestHandler(LogIndexPull logIndexPull);/** 发起投票 : 只有候选者 才会发起 */boolean callVoteRequest(List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException;/** 投票请求处理 */RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC);/** 发起心跳 : 只有领导才会发起心跳 阻止其他节点成为候选人*/boolean callHeartBeatRequest(List<ServerConfig>serverConfigs) throws ExecutionException, InterruptedException;/** 心跳请求处理 : 只有追随者/候选人才会处理*/HeartBeatResult heartBeatHandler(HeartBeatRequest heartBeatRequest);}

三种实现

在这里插入图片描述

一个对外服务

public class RaftNodeService {private static final Logger log = LoggerFactory.getLogger(RaftNodeService.class);// 心跳间隔时间private final static long INTERVAL_TIME = 1500L;private static Map<NodeStatusEnums, RaftNode> raftNodeMap = new ConcurrentHashMap<>(8);static {raftNodeMap.put(NodeStatusEnums.LEADER, new LeaderRaftNode());raftNodeMap.put(NodeStatusEnums.CANDIDATE, new CandidateRaftNode());raftNodeMap.put(NodeStatusEnums.FOLLOW, new FollowRaftNode());}/*** 节点信息初始化*/public static void raftNodeInit(ServerConfig self, List<ServerConfig> clusterConfig) {RaftNodeInfo.getInstance().setSelf(self);RaftNodeInfo.getInstance().setClusterConfig(clusterConfig);RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW);createElectionTask();}/*** 发送心跳*/public synchronized static void sendHeartBeat() {RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());boolean result = false;try {result = raftNode.callHeartBeatRequest(RaftNodeInfo.getInstance().getClusterConfig());} catch (ExecutionException e) {log.debug(" {}: 完了,作为leader发送心跳失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);} catch (InterruptedException e) {log.debug(" {}: 完了,作为leader发送心跳失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);}if (!result) {// 代表心跳失败了,状态已经变更了// 需要停止心跳,开启心跳检测heartBeatTestDestroy();createElectionTask();}}/*** 心跳处理*/public static void heartBeatHandler(HeartBeatRequest request, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());channel.writeAndFlush(new RpcRemoteMsg<HeartBeatResult>(raftNode.heartBeatHandler(request)));// 收到了心跳,所以就要停止当前心跳的检测,然后重新开启一个检测任务createElectionTask();});}// 命令合规性校验 目前就get set 随便校验一下public static boolean commandCheck(String command) {if (command == null || !"SET_GET".contains(command.split(" ")[0]) || command.split(" ").length < 2) {return false;}return true;}/*** 客户端的请求,以KV为例 就是set命令 , 这里请求返回就简陋一点*/public static void clientRequestHandler(ClientRequest request, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {ClientResponse clientResponse = ClientResponse.builder().build();clientResponse.setRequestId(request.getRequestId());if (!commandCheck(request.getCommand())) {clientResponse.setCode(401);clientResponse.setMsg("命令格式不正确");channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));return;}// 只有set命令才需要发送日志,get命令直接取数据就行了String[] command = request.getCommand().split(" ");if (command[0].equals("SET")) {RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());try {channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(raftNode.clientRequestHandler(request, RaftNodeInfo.getInstance().getClusterConfig())));return;} catch (ExecutionException e) {log.debug(" {}: 日志提交失败了:{}", request.getCommand(), e.getMessage(), e);clientResponse.setCode(500);clientResponse.setMsg(e.getMessage());channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));} catch (InterruptedException e) {log.debug(" {}: 日志提交失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);clientResponse.setCode(500);clientResponse.setMsg(e.getMessage());channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));}} else {// get命令直接取值clientResponse.setCode(200);clientResponse.setData(RaftNodeInfo.getInstance().getLogManage().getDataByKey(command[1]));channel.writeAndFlush(new RpcRemoteMsg<ClientResponse>(clientResponse));}});}/*** Log预提交请求*/public static void logPreCommitHandler(AppendEntriesPreCommit request, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());channel.writeAndFlush(new RpcRemoteMsg<AppendEntriesPreCommitResult>(raftNode.logPreCommitHandler(request)));});}/*** Log提交请求*/public static void logCommitHandler(AppendEntriesCommit request) {ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());raftNode.logCommitHandler(request);// 收到了日志,所以就要停止当前心跳的检测,然后重新开启一个检测任务createElectionTask();});}/*** 发起投票*/public synchronized static void sendCallVote() {RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());boolean result = false;try {result = raftNode.callVoteRequest(RaftNodeInfo.getInstance().getClusterConfig());} catch (ExecutionException e) {StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug(" {}: 完了,作为candidate发起投票失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);} catch (InterruptedException e) {StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug(" {}: 完了,作为candidate发起投票失败了:{}", RaftNodeInfo.getInstance().getSelf().toString(), e.getMessage(), e);}if (!result) {// 代表发起投票失败了,状态已经变更了// 需要重新开启一个检测任务createElectionTask();return;}// 投票成功了 需要开启心跳任务createHearBeatTask();}/*** 发起投票请求处理*/public synchronized static void callVoteHandler(RequestVoteRPC requestVoteRPC, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());channel.writeAndFlush(new RpcRemoteMsg<RequestVoteResult>(raftNode.voteRequestHandler(requestVoteRPC)));});}/*** 发起Log拉取请求*/public synchronized static LogIndexPullResult sendLogPullRequest(List<Long> pullLogIndex) {if (CollectionUtil.isNotEmpty(pullLogIndex)) {RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());return raftNode.sendLogPullRequest(pullLogIndex);}return null;}/*** 发起Log拉取请求处理*/public synchronized static void logPullRequestHandler(LogIndexPull logIndexPull, Channel channel) {ThreadPoolUtils.nettyServerAsyncPool.execute(() -> {RaftNode raftNode = raftNodeMap.get(RaftNodeInfo.getInstance().getCurrentNodeStatus());log.debug("{}:拉取日志:{}",channel.remoteAddress(),JSONObject.toJSON(logIndexPull));channel.writeAndFlush(new RpcRemoteMsg<LogIndexPullResult>(raftNode.logPullRequestHandler(logIndexPull)));});}/*** 销毁并创建心跳检测任务*/public static void createElectionTask() {long randomTime = getRandomTime();final long intervalTime = INTERVAL_TIME + randomTime;// 先销毁之前的electionTaskDestroy();//开启新的ScheduledFuture<?> schedule = ThreadPoolUtils.hearBeatAsyncPool.schedule(new ElectionTask(intervalTime), intervalTime, TimeUnit.MILLISECONDS);RaftNodeInfo.getInstance().setElectionTask(schedule);}/*** 销毁并创建心跳任务*/public static void createHearBeatTask() {// 先销毁之前的heartBeatTestDestroy();//开启新的ScheduledFuture<?> schedule = ThreadPoolUtils.hearBeatAsyncPool.scheduleAtFixedRate(new HeartBeatTask(), 0L, INTERVAL_TIME, TimeUnit.MILLISECONDS);RaftNodeInfo.getInstance().setElectionTask(schedule);}public static long getRandomTime() {// 要比心跳慢一点return RandomUtil.randomLong(250L, 1000L);}/*** 销毁心跳检测任务*/public static void electionTaskDestroy() {if (null != RaftNodeInfo.getInstance().getElectionTask()) {RaftNodeInfo.getInstance().getElectionTask().cancel(true);RaftNodeInfo.getInstance().setElectionTask(null);}}/*** 销毁心跳任务*/public static void heartBeatTestDestroy() {if (null != RaftNodeInfo.getInstance().getHeartBeatTask()) {RaftNodeInfo.getInstance().getHeartBeatTask().cancel(true);RaftNodeInfo.getInstance().setHeartBeatTask(null);}}

一个全局节点信息类

public class RaftNodeInfo {/*** 自己*/private ServerConfig self;/*** 集群其他节点信息*/private List<ServerConfig> clusterConfig;/*** 当前节点状态 默认FOLLOW*/private volatile NodeStatusEnums currentNodeStatus = NodeStatusEnums.FOLLOW;/*** 当前节点任期*/private volatile long currentTerm = 0L;/*** 当前leader*/private volatile String currentLeaderId;/*** 最后日志索引 已提交的*/private volatile long lastLogIndex = 0L;/*** 最后的日志任期 这我这没用到*/private volatile long lastLogTerm = 0L;/*** 当前任期给谁投过票*/private volatile String voteFor;/*** 最近更新时间  心跳或者日志更新**/private volatile long lastUpdateTime = 0L;/*** 心跳任务**/private ScheduledFuture heartBeatTask;/*** 心跳检测任务**/private ScheduledFuture electionTask;/*** 日志管理**/private LogManage logManage;/*** 日志文件**/private String logPath;
}

3.状态机

提供节点状态变更、心跳结果处理、投票结果处理、日志一致性处理

public class StateMachines {private static final Logger log = LoggerFactory.getLogger(StateMachines.class);/** 候选人-》leader */public static void becomeLeader(){// 变为leaderRaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.LEADER);// leader设置为自己RaftNodeInfo.getInstance().setCurrentLeader(RaftNodeInfo.getInstance().getSelf().toString());// 票清了RaftNodeInfo.getInstance().setVoteFor(null);}/** follow-》候选人 */public static void becomeCandidate(){// 变为候选人RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.CANDIDATE);// 任期+1RaftNodeInfo.getInstance().setCallVoteTerm();// 给自己投一票RaftNodeInfo.getInstance().setVoteFor(RaftNodeInfo.getInstance().getSelf().toString());}/** 候选人、leader->follow */public static void becomeFollow(long term,String leaderId,String voteFor){RaftNodeInfo.getInstance().setCurrentNodeStatus(NodeStatusEnums.FOLLOW);RaftNodeInfo.getInstance().setCurrentLeader(leaderId);RaftNodeInfo.getInstance().setCurrentTerm(term);RaftNodeInfo.getInstance().setVoteFor(voteFor);RaftNodeInfo.getInstance().setLastUpdateTime(System.currentTimeMillis());}/** 投票结果一致性处理 */public static boolean voteResultHandler(List<Future<RequestVoteResult>> taskList,Integer nodeNum) throws ExecutionException, InterruptedException {int voteNum = 0;for (Future<RequestVoteResult> future : taskList) {RequestVoteResult voteResult = future.get();// 判断leader是否还存活 存活的话肯定要把我给否了呀if (leaderIsLive(voteResult)) {return false;}if(voteResult!=null){log.debug("投票结果,我的term:{} ,结果:{}",RaftNodeInfo.getInstance().getCurrentTerm(), JSONObject.toJSON(voteResult));}if (null != voteResult && voteResult.isVoteGranted()) {voteNum++;}}if (voteNum != 0 && voteNum >= (nodeNum / 2)) {// 投票通过 升级为leaderStateMachines.becomeLeader();log.debug(" {}: 哈哈哈,我升级为leader啦", RaftNodeInfo.getInstance().getSelf().toString());return true;} else {// 投票不通过,退成follow 继续苟着StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug(" {}: 完了,这帮人不支持我,等待机会再试", RaftNodeInfo.getInstance().getSelf().toString());return false;}}// 判断leader是否还存活 存活的话肯定要把我给否了呀private static boolean leaderIsLive(RequestVoteResult voteResult) {if (null != voteResult && StrUtil.isNotEmpty(voteResult.getLeaderId())) {// 被leader一票否决,退成follow 继续苟着StateMachines.becomeFollow(voteResult.getTerm(), voteResult.getLeaderId(), null);return true;}return false;}/** 心跳结果一致性处理 */public static boolean heartBeatResultHandler(List<Future<HeartBeatResult>> taskList,Integer nodeNum) throws ExecutionException, InterruptedException {int responseNum = 0;for (Future<HeartBeatResult> future : taskList) {HeartBeatResult heartBeatResult = future.get();if (null != heartBeatResult) {responseNum++;}}if (responseNum != 0 && responseNum >= (nodeNum / 2)) {log.debug("{}: 万众一心,我再接再厉", RaftNodeInfo.getInstance().getSelf().toString());return true;} else {// 没有应答或者应答数量小于一半 就退化为候选者,并停止对外提供服务// 状态变更StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug("{}: 我找不到追随者了,我暂时停止对外服务", RaftNodeInfo.getInstance().getSelf().toString());return false;}}/** 日志预提交结果 */public static boolean logPreCommitHandler(List<Future<AppendEntriesPreCommitResult>> taskList, Integer nodeNum) throws ExecutionException, InterruptedException {int responseNum = 0;for (Future<AppendEntriesPreCommitResult> future : taskList) {AppendEntriesPreCommitResult preCommitResult = future.get();if (null != preCommitResult && preCommitResult.isSuccess()) {responseNum++;}}return responseNum != 0 && responseNum >= (nodeNum / 2);}
}

4.日志模块

public interface LogManage extends ResourceLifeCycle{/** leader预提交 */long preCommitLog(LogEntity logEntity);/** follow预提交 */void preCommitLog(long preCommitLogId,LogEntity logEntity);/** 缓存移除 */void cacheLogRemove(long cacheLogId);/** leader日志提交 */long commitLog(long cacheLogId);/** follow日志提交 */void commitLog(long cacheLogId,long logIndex);/** follow日志Check */void logIndexCheck();/** 根据日志索引获取日志内容 */LogEntity getLogEntityByIndex(long logIndex, RandomAccessFile file);/** 命令数据处理 */void dataHandler(String command);/** 根据Key获取数据 */String getDataByKey(String key);
}

5.定时任务

在这里插入图片描述

  • ElectionTask:心跳检测任务,不通过则升级为Candidate
  • HeartBeatTask:心跳任务,不断给Follow发送心跳,阻止其成为Candidate
  • LogIndexCheckTask:Follow日志Check定时任务

四、核心流程介绍

其实流程图已经很清楚了,这里挑部分来聊聊

1.选举

目前心跳设置的时间为1500ms,心跳检测的时间为1750ms+0-750ms随机数(之前随机数设置的很短,算上网络延迟等因素,导致两个Candidate同任期的几率非常之高),follow收到心跳会更新lastUpdateTIme,而心跳检测则会检测这个时间到当前时间是否超过检测时间间隔,超过了则会变成candidate发起选举

在这里插入图片描述

CandidateRaftNode:发起选举RPC

选举RPC实体类

public class RequestVoteRPC extends RpcMsgId implements Serializable {/** 候选人的任期号  */private long term;/** 请求选票的候选人的 Id(ip:selfPort) */private String candidateId;/** 候选人的最后日志条目的索引值 */private long lastLogIndex;/** 候选人最后日志条目的任期号  */private long lastLogTerm;}

选举方法

    public boolean callVoteRequest(List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException {if (CollectionUtil.isEmpty(serverConfigs)) {StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.error("只有一个节点,还发起什么投票?");return false;}// candidate 会发起投票请求RaftNodeInfo instance = RaftNodeInfo.getInstance();// 投票过程中 可能又收到了心跳或者日志,状态已经变为followif (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) {return false;}log.debug(" {}: 哈哈哈,我发起了投票", RaftNodeInfo.getInstance().getSelf().toString());List<Future<RequestVoteResult>> taskList = new ArrayList<>(serverConfigs.size());// 加上自己的一票 需要 大于= n/2+1// 所以直接 >= n/2 就算通过了// 但是注意此时如果已经存在leader,日志数又不比当前leader大,所以leader还是leader 具有一票否决权for (ServerConfig serverConfig : serverConfigs) {Future<RequestVoteResult> voteResultFuture = ThreadPoolUtils.sendAsyncMsgPool.submit(() -> {// 构建投票RequestVoteRPC voteRPC = RequestVoteRPC.builder().candidateId(instance.getSelf().toString()).term(instance.getCurrentTerm())  // 成为候选 的时候任期就+1了.lastLogIndex(instance.getLastLogIndex()).build();RpcSession<RequestVoteResult, RequestVoteRPC> voteRPCRpcSession = RpcSessionFactory.<RequestVoteResult, RequestVoteRPC>openSession(serverConfig, voteRPC);return voteRPCRpcSession == null ? null : voteRPCRpcSession.syncSend(1000L);});taskList.add(voteResultFuture);}// 投票过程中 可能状态又已经变为followif (!NODE_TYPE.equals(RaftNodeInfo.getInstance().getCurrentNodeStatus())) {return false;}return StateMachines.voteResultHandler(taskList, serverConfigs.size());}

Follow选举响应

  1. 任期比我大我就同意
  2. 任期跟我一样,记录的日志比我多而且我没有投过票我也同意

(Follow同一个任期内只能投一票)

    public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) {// follow 需要处理投票请求RaftNodeInfo instance = RaftNodeInfo.getInstance();RequestVoteResult voteResult = RequestVoteResult.builder().term(instance.getCurrentTerm()).build();voteResult.setRequestId(voteRPC.getRequestId());// 1.任期比我大,我直接就同意if (voteRPC.getTerm() > instance.getCurrentTerm()) {return agreeVote(voteResult, voteRPC);}// 2.任期跟我一样,记录的日志比我多 而且 我没有投过票// 我只能投一票if ((voteRPC.getTerm() == instance.getCurrentTerm() && voteRPC.getLastLogIndex() >= instance.getLastLogIndex())&& (instance.getVoteFor() == null || instance.getVoteFor().equals(voteRPC.getCandidateId()))) {return agreeVote(voteResult, voteRPC);}voteResult.setTerm(instance.getCurrentTerm());voteResult.setVoteGranted(false);log.info(" {}: 我身为现任Follow,我不认可你的实力,我不能给你投票:{}", instance.getSelf().toString(), voteRPC.getCandidateId());return voteResult;}private RequestVoteResult agreeVote(RequestVoteResult voteResult, RequestVoteRPC voteRPC) {voteResult.setTerm(RaftNodeInfo.getInstance().getCurrentTerm());voteResult.setVoteGranted(true);RaftNodeInfo.getInstance().setCurrentTerm(voteRPC.getTerm());RaftNodeInfo.getInstance().setVoteFor(voteRPC.getCandidateId());log.info(" {}: 我身为现任Follow,我认可你的实力,我给你投票:{}", RaftNodeInfo.getInstance().getSelf().toString(), voteRPC.getCandidateId());return voteResult;}

Leader响应

leader有没有可能收到投票?有可能!假设某一个Follow延迟收到心跳或者没有收到心跳就会发起,那leader就会收到它发起的投票,那怎么办?判断任期和日志,任期和日志都比Leader大则Leader需要退位,否则Leader应该具有一票否决权(这样就防止了某个follow无限发起投票,任期无限+1这种情况)

一个candidate任期非常大的时候,其他follow必然会给他投票,那这样就升为leader就导致了同时存在两个leader的情况,所以这时候的当期leader应该具有一票否决权

    public RequestVoteResult voteRequestHandler(RequestVoteRPC voteRPC) {// leader 有可能收到 候选者的投票申请RaftNodeInfo instance = RaftNodeInfo.getInstance();RequestVoteResult requestVoteResult = RequestVoteResult.builder().build();requestVoteResult.setRequestId(voteRPC.getRequestId());// 候选人的任期比我大 而且日志还比我大 说明我已经out了,我需要退位if (voteRPC.getTerm() >= instance.getCurrentTerm() && voteRPC.getLastLogIndex() > instance.getLastLogIndex()) {// 状态变更StateMachines.becomeFollow(voteRPC.getTerm(), voteRPC.getCandidateId(), null);requestVoteResult.setTerm(voteRPC.getTerm());requestVoteResult.setVoteGranted(true);log.info(" {}: 我身为现任leader,我认可你的实力,我下位让贤:{}", instance.getSelf().toString(), voteRPC.getCandidateId());return requestVoteResult;}log.info(" {}: 我身为现任leader,不同你的上任请求:{}", instance.getSelf().toString(), voteRPC.getCandidateId());// 否则就不同意,而且你还得给我老实点requestVoteResult.setTerm(instance.getCurrentTerm());requestVoteResult.setVoteGranted(false);requestVoteResult.setLeaderId(instance.getSelf().toString());return requestVoteResult;}

2.心跳

在这里插入图片描述

心跳这里我做了一个响应降级的操作,其实正常是不需要的,我这的目的是防止网络分区!

假设原本是这样:

在这里插入图片描述

一旦网络分区则会变成这样,导致两个leader的出现,所以这时候心跳的响应就至关重要,一旦响应少于半数,则leader应该自动降级

在这里插入图片描述

LeaderRaftNode:发起心跳

public boolean callHeartBeatRequest(List<ServerConfig> serverConfigs) throws ExecutionException, InterruptedException {if (CollectionUtil.isEmpty(serverConfigs)) {StateMachines.becomeFollow(RaftNodeInfo.getInstance().getCurrentTerm(), null, null);log.debug(" {}: 只有一个leader,还发什么心跳?", RaftNodeInfo.getInstance().getSelf().toString());return false;}List<Future<HeartBeatResult>> taskList = new ArrayList<>(serverConfigs.size());// leader 需要发送心跳 防止网络分区,一旦心跳返回不足 n/2 则自动降级for (ServerConfig serverConfig : serverConfigs) {Future<HeartBeatResult> heartBeatResultFuture = ThreadPoolUtils.sendAsyncMsgPool.submit(() -> {HeartBeatRequest build = HeartBeatRequest.builder().leaderId(RaftNodeInfo.getInstance().getSelf().toString()).leaderLastCommitIndex(RaftNodeInfo.getInstance().getLastLogIndex()).term(RaftNodeInfo.getInstance().getCurrentTerm()).build();RpcSession<HeartBeatResult, HeartBeatRequest> heartBeatRequestRpcSession = RpcSessionFactory.<HeartBeatResult, HeartBeatRequest>openSession(serverConfig, build);return heartBeatRequestRpcSession == null ? null : heartBeatRequestRpcSession.syncSend(200L);});taskList.add(heartBeatResultFuture);}// 响应结果处理return StateMachines.heartBeatResultHandler(taskList, serverConfigs.size());}

3.日志

日志设计的非常之简陋,就不做过多的介绍了,本文目的还是以实现Raft为主,性能问题暂不考虑,不过还是说一下测试结果,因为KV存储,项目启动需要读取数据放入内存,目前读取50m左右文件10w条日志需要8s左右,肯定是不合理的,目前并没有做日志压缩和快照,也没有用零拷贝技术,因为不想搞的太过复杂

在这里插入图片描述

关于日志check,这里放上两种测试常见的结果

在这里插入图片描述

1.新的节点加入,需要拉取一次所有数据

在这里插入图片描述

2.日志中间缺失

在这里插入图片描述

两种情况都是没问题的!

五、遗留的问题

注意:尽管这样还是有几率导致数据丢失的!!!!

再次强调:本文不完全和Raft论文对标,加了不少个人的想法进去,所以在这个过程中都是遇到问题、思考问题、解决问题,这本就是一个学习的过程,目前最大的一个问题就是:

新加入的节点已经收到了Leader的数据,更新的lastCommitIndex,但是还没来得及向Leader同步以前的数据,而这时Leader挂了,所以这时候这个节点就有几率通过投票成为Leader,这时候数据就有几率丢失文章中可能看不太出来,具体得看看代码,这算是一个很严重的BUG,各位想想可以怎么解决,而Raft又是怎么解决的?

当然可能还有其他问题,各位大佬如果知道的也可以提出来

六、总结

只有深入本质才能顺应发展,在分布式体系下,共识算法是必不可少的,光看不实践就容易眼高手低,当初我看Raft的时候也感觉挺简单的,不就是三种状态做不同的事,然后状态变更嘛,真正一做起来就发现好多细节都需要考虑,这还只是个demo,回头想想RocketMq和kafka的存储设计是真的厉害,做完这个又收获不少

七、项目\个人博客地址

项目地址

个人博客 : 无八股,全干货

相关文章:

从0-1实现简易Raft分布式共识算法

一、Raft前置简介 Raft目前是最著名的分布式共识性算法&#xff0c;被广泛的应用在各种分布式框架、组件中&#xff0c;如Redis、RocketMq、Kafka、Nacos&#xff08;CP&#xff09;等 根据Raft论文&#xff0c;可将Raft拆分为如下4个功能模块&#xff1a; 领导者选举日志同…...

Spring 创建和使用

Spring 是⼀个包含了众多⼯具⽅法的 IoC 容器。既然是容器那么它就具备两个最基本的功能&#xff1a; 将对象存储到容器&#xff08;Spring&#xff09;中&#xff1b; 从容器中将对象取出来。 在 Java 语⾔中对象也叫做 Bean 1.创建 Spring 项目 接下来使⽤ Maven ⽅式来创…...

Javadoc comment自动生成

光标放在第二行 按下Alt Shift j 下面是Java doc的生成 Next Next-> Finish...

vue3 +ts 报错 index.vue 不是模块

那是因为index.vue中创建了一个空的script标签&#xff0c;而且语法使用的是ts语法。vue-cli会用ts语法解析和校验 如果是无状态组件&#xff0c;删掉 如果是有状态组件&#xff0c;导出该组件的实例 去掉null的script后&#xff1a;...

win10 hadoop报错 unable to load native-hadoop library

win10 安装hadoop执行hdfs -namenode format 和运行hadoop的start-all报错 unable to load native-hadoop library 验证&#xff1a; hadoop checknative -a 这个命令返回都是false是错的 返回下图是正确的 winutils: true D:\soft\hadoop-3.0.0\bin\winutils.exe Native li…...

前端(九)——探索微信小程序、Vue、React和Uniapp生命周期

&#x1f642;博主&#xff1a;小猫娃来啦 &#x1f642;文章核心&#xff1a;探索微信小程序、Vue、React和Uniapp生命周期 文章目录 微信小程序、Vue、React和Uniapp的基本定义和应用领域微信小程序生命周期生命周期概述页面生命周期应用生命周期组件和API的生命周期钩子 Vu…...

MyBatis查询数据库(2)

目录 前言&#x1f36d; 一、增删查改操作 1、查 Ⅰ、mapper接口&#xff1a; Ⅱ、UserMapper.xml 查询所有用户的具体实现 SQL&#xff1a; Ⅲ、进行单元测试 2、增、删、改操作 Ⅰ、增 添加用户 添加用户并且返回自增 id Ⅱ、改 根据id修改用户名 开启 MyBatis …...

Jenkins构建完成后发送消息至钉钉

钉钉群的最终效果&#xff1a; 1、jenkins安装DingTalk插件&#xff0c;安装完成后重启 2、配置钉钉插件 参考官网文档&#xff1a;快速开始 | 钉钉机器人插件 系统管理 拉到最下面&#xff0c;可以看到钉钉配置 按照如下配置钉钉机器人 配置完成可以点击测试按钮&#xff0…...

从浏览器输入url到页面加载(六)前端必须了解的路由器和光纤小知识

前言 上一章我们说到了数据包在网线中的故事&#xff0c;说到了双绞线&#xff0c;还说到了麻花。这一章继续沿着这条线路往下走&#xff0c;说一些和cdn以及路由器相关&#xff0c;运营商以及光纤相关的小知识&#xff0c;前端同学应该了解一下的 目录 前言 1. CDN和路由器…...

C语言假期作业 DAY 06

题目 一、选择题 1、以下叙述中正确的是&#xff08; &#xff09; A: 只能在循环体内和 switch 语句体内使用 break 语句 B: 当 break 出现在循环体中的 switch 语句体内时&#xff0c;其作用是跳出该 switch 语句体&#xff0c;并中止循环体的执行 C: continue 语句的作用是&…...

[nlp] tokenizer加速:fast_tokenizer=True

fast_tokenizer 是一个布尔值参数,用于指定是否使用快速的 tokenizer。在某些情况下,使用快速的 tokenizer 可以加快模型训练和推理速度。如果 fast_tokenizer 参数为 True,则会使用快速的 tokenizer;否则,将使用默认的 tokenizer。 快速的 tokenizer 通常使用一些技巧来减…...

基于OpenCV solvePnP函数估计头部姿势

人脸识别 文章目录 人脸识别一、姿势估计概述1、概述2、姿态估计3、在数学上表示相机运动4、姿势估计需要什么5、姿势估计算法6、Levenberg-Marquardt 优化 二、solvePnP函数1、函数原型2、参数详解 三、OpenCV源码1、源码路径 四、效果图像示例参考链接 一、姿势估计概述 1、…...

STC12C5A系列单片机内部 EEPROM 的应用

参考范例程序。 eeprom.c #include "eeprom.h"/*---------------------------- Disable ISP/IAP/EEPROM function Make MCU in a safe state ----------------------------*/ void IapIdle() {IAP_CONTR 0; //Close IAP functionIAP_CMD 0; …...

搭建测试平台开发(一):Django基本配置与项目创建

一、安装Django最新版本 1 pip install django 二、创建Django项目 首先进入要存放项目的目录&#xff0c;再执行创建项目的命令 1 django-admin startproject testplatform 三、Django项目目录详解 1 testplatform 2 ├── testplatform  # 项目的容器 3 │ ├──…...

JavaWeb教程笔记

JavaWeb Java Web 1、基本概念 1.1、前言 web开发&#xff1a; web&#xff0c;网页的意思 &#xff0c; www.baidu.com静态web html&#xff0c;css提供给所有人看的数据始终不会发生变化&#xff01; 动态web 淘宝&#xff0c;几乎是所有的网站&#xff1b;提供给所有人…...

数据库压力测试方法小结

一、前言 在前面的压力测试过程中&#xff0c;主要关注的是对接口以及服务器硬件性能进行压力测试&#xff0c;评估请求接口和硬件性能对服务的影响。但是对于多数Web应用来说&#xff0c;整个系统的瓶颈在于数据库。 原因很简单&#xff1a;Web应用中的其他因素&#xff0c;…...

Spring Boot——Spring Boot自动配置原理

系列文章目录 Spring Boot启动原理 Spring Boot自动配置原理 系列文章目录前言一、Spring Boot自动配置原理剖析二、自动配置生效三、总结&#xff1a; 前言 一直在使用Spring Boot特别好奇的是为什么Spring Boot比Spring在项目构建和开发过程中要方便很多&#xff0c;无需编…...

深度学习:Pytorch最全面学习率调整策略lr_scheduler

深度学习&#xff1a;Pytorch最全面学习率调整策略lr_scheduler lr_scheduler.LambdaLRlr_scheduler.MultiplicativeLRlr_scheduler.StepLRlr_scheduler.MultiStepLRlr_scheduler.ConstantLRlr_scheduler.LinearLRlr_scheduler.ExponentialLRlr_scheduler.PolynomialLRlr_sched…...

【uniapp】更改富文本编辑器图片大小

代码块 //<view v-html"productDetails"></view><rich-text :nodes"productDetails"></rich-text>// 假设htmlContent字段是后台返回的富文本字段var htmlContent res.result.productDetailsconst regex new RegExp(<img, gi…...

数据结构和算法一(空间复杂度、时间复杂度等算法入门)

时间复杂度&#xff1a; 空间复杂度&#xff1a; 时间比空间重要 递归&#xff1a; 递归特征&#xff1a; 递归案例&#xff1a; 汉诺塔问题&#xff1a; def hanoi(n,A,B,C):if n>0:hanoi(n-1,A,C,B)print("moving from %s to %s"%(A,C))hanoi(n-1,B,A,C)hanoi…...

Pytorch深度学习-----神经网络的基本骨架-nn.Module的使用

系列文章目录 PyTorch深度学习——Anaconda和PyTorch安装 Pytorch深度学习-----数据模块Dataset类 Pytorch深度学习------TensorBoard的使用 Pytorch深度学习------Torchvision中Transforms的使用&#xff08;ToTensor&#xff0c;Normalize&#xff0c;Resize &#xff0c;Co…...

QT开发快捷键

QT开发快捷键 alt enter // 自动创建类的定义 Ctrl / 注释当前行 或者选中的区域 Ctrl R 运行程序 Ctrl B Build 项目 CtrlShiftF 查找内容 F5 开始调试 ShiftF5 停止调试 F9 设置和取消断点 F10 单步前进 F11 单步进入函数 Shift F11 单步跳出函数 F1 // 查看帮助&#…...

RabbitMQ 教程 | RabbitMQ 入门

&#x1f468;&#x1f3fb;‍&#x1f4bb; 热爱摄影的程序员 &#x1f468;&#x1f3fb;‍&#x1f3a8; 喜欢编码的设计师 &#x1f9d5;&#x1f3fb; 擅长设计的剪辑师 &#x1f9d1;&#x1f3fb;‍&#x1f3eb; 一位高冷无情的编码爱好者 大家好&#xff0c;我是 DevO…...

【雕爷学编程】MicroPython动手做(10)——零基础学MaixPy之神经网络KPU2

KPU的基础架构 让我们回顾下经典神经网络的基础运算操作&#xff1a; 卷积&#xff08;Convolution&#xff09;:1x1卷积&#xff0c;3x3卷积&#xff0c;5x5及更高的卷积 批归一化&#xff08;Batch Normalization&#xff09; 激活&#xff08;Activate&#xff09; 池化&…...

BUG分析以及BUG定位

一般来说bug大多数存在于3个模块&#xff1a; 1、前台界面&#xff0c;包括界面的显示&#xff0c;兼容性&#xff0c;数据提交的判断&#xff0c;页面的跳转等等&#xff0c;这些bug基本都是一眼可见的&#xff0c;不太需要定位&#xff0c;当然也不排除一些特殊情况&#xf…...

Day46 算法记录| 动态规划 13(子序列)

这里写目录标题 300.最长递增子序列 674. 最长连续递增序列718. 最长重复子数组 300.最长递增子序列 视频解析&#xff1a; 第一层for循环遍历每一个元素&#xff0c; ------- 第二层for循环找到当前元素前面有几个小于该值的元素 结尾需要统计最多的个数 class Solution {pu…...

结构型-桥接模式(Bridge Pattern)

概述 桥接模式&#xff08;Bridge Pattern&#xff09;是一种结构型设计模式&#xff0c;将抽象部分和实现部分分离&#xff0c;使它们可以独立地变化。桥接模式通过将继承关系转化为关联关系&#xff0c;将抽象部分和实现部分分离开来&#xff0c;从而使它们可以独立地变化。…...

基于小波哈尔法(WHM)的一维非线性IVP测试问题的求解(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f308;4 Matlab代码实现 &#x1f4a5;1 概述 小波哈尔法&#xff08;WHM&#xff09;是一种求解一维非线性初值问题&#xff08;IVP&#xff09;的数值方法。它基于小波分析的思想&#xf…...

前端(Electron Nodejs)如何读取本地配置文件

使用electron封装了前端界面之后&#xff0c;最终打包为一个客户端&#xff08;exe&#xff09;。但是&#xff0c;最近项目组内做CS&#xff08;c开发&#xff09;的&#xff0c;想把所有的配置都放进安装目录的配置文件中&#xff08;比如config.json&#xff09;。这做法&am…...

没有 telnet 不能测试端口?容器化部署最佳的端口测试方式

写在前面 生产中遇到&#xff0c;整理笔记在容器中没有 telnet &#xff0c;如何测试远程端口理解不足小伙伴帮忙指正 他的一生告诉我们&#xff0c;不能自爱就不能爱人&#xff0c;憎恨自己也必憎恨他人&#xff0c;最后也会像可恶的自私一样&#xff0c;使人变得极度孤独和悲…...