当前位置: 首页 > article >正文

简易的分布式kv设计

1. 前言在 Raft KV 系统中每个节点Node都是对等的。一个典型的请求流向是Client-Leader Node-Raft 日志同步-大多数节点确认-应用到状态机 (KV Store)-返回 Client。2. 设计步骤Raft 核心组件包括一致性结点模块RPC 通信日志模块。2.1 日志写日志 → 复制日志 → commit → apply 【leader应用顺序】 细分一下的话就如下 Client ↓ Leader ↓ append log (本地) ↓ 发送 AppendEntries ↓ Followers append log ↓ 多数成功 ↓ Leader commit ↓ Leader apply ↓ 返回客户端成功 ↓ Leader 下次心跳带 commitIndex ↓ Followers apply首先看一下客户端发送一个请求涉及到的大致东西有哪些从后往前看我们需要设计的就是如何写入日志文件以及日志文件的格式该如何设计呢此处我们就弄简单一点儿就好了totalLength inttermlongindexlongcommandLengthintcommandbyte[]整条log entry 长度Raft term日志 index命令长度真正命令这个肯定是变长的totalLength 8 (term) 8 (index) 4 (commandLength) commandLengthpublic class LogEntry { private final long term; private final long index; private final String command; .... }日志存储与管理/** * Description: 日志存储与管理 * Author: txf * Date: 2026/2/9 */ public class LogManager { // 日志文件路径 private static final String LOG_FILE_PATH easy_kv_log.dat; // 内存映射的分段大小128MB可根据内存调整,这里先调整为两兆 private static final int MAPPED_SIZE 2 * 1024 * 1024; // 文件打开模式rw 读写 private static final String FILE_MODE rw; private final File logFile; private RandomAccessFile raf; private FileChannel fileChannel; // 当前映射的内存缓冲区 private MappedByteBuffer currentMappedBuffer; // 当前映射段的起始偏移文件偏移 private long currentMappedOffset 0; // 当前写入的位置相对于文件的总偏移 private long writePosition 0; public LogManager() { this.logFile new File(LOG_FILE_PATH); initFileChannel(); initMappedBuffer(); // 初始化时定位到文件末尾继续追加写 try { this.writePosition fileChannel.size(); } catch (IOException e) { throw new RuntimeException(获取文件大小失败, e); } } /** * 追加写入单条日志核心高性能写入 * param term Raft任期 * param index 日志索引 * param command KV操作命令如PUT key value */ public void appendLogEntry(long term, long index, String command) { // 1. 准备命令字节数组 byte[] commandBytes command.getBytes(StandardCharsets.UTF_8); int commandLength commandBytes.length; // 2. 计算总长度 int totalLength 4 8 8 4 commandLength; // 3. 准备直接缓冲区堆外内存避免拷贝 ByteBuffer directBuffer ByteBuffer.allocateDirect(totalLength); // 按格式写入缓冲区 -- 这里是写入二进制的文件内容我们人类就读不懂了 directBuffer.putInt(totalLength); directBuffer.putLong(term); directBuffer.putLong(index); directBuffer.putInt(commandLength); directBuffer.put(commandBytes); // 这里是写入字符串的 // directBuffer.put((term index command).getBytes(StandardCharsets.UTF_8)); // 翻转缓冲区从写模式转为读模式 directBuffer.flip(); // 4. 写入到内存映射缓冲区核心零拷贝 writeToMappedBuffer(directBuffer); // 5. 更新全局写入位置 writePosition totalLength; } /** * 将缓冲区数据写入内存映射区自动扩容映射段 */ private void writeToMappedBuffer(ByteBuffer buffer) { while (buffer.hasRemaining()) { // 检查当前映射缓冲区是否有足够剩余空间 if (currentMappedBuffer.remaining() buffer.remaining()) { // 先写入当前映射区的剩余空间 int remaining currentMappedBuffer.remaining(); byte[] temp new byte[remaining]; buffer.get(temp); currentMappedBuffer.put(temp); // 强制刷盘将映射内存的数据同步到磁盘可选批量刷盘可提升性能 currentMappedBuffer.force(); // 扩容映射段 initMappedBuffer(); } else { // 直接写入映射缓冲区 currentMappedBuffer.put(buffer); } } } /** * 读取指定索引的日志条目高性能读取 */ public LogEntry readLogEntry(long index) { try { // 使用FileChannel 直接缓冲区读取 ByteBuffer directBuffer ByteBuffer.allocateDirect(1024 * 1024); // 1MB直接缓冲区 long fileOffset 0; long fileSize fileChannel.size(); while (fileOffset fileSize) { // 重置缓冲区 directBuffer.clear(); // 从文件指定偏移读取数据到缓冲区 int readBytes fileChannel.read(directBuffer, fileOffset); if (readBytes -1) break; directBuffer.flip(); // 解析缓冲区中的日志条目 while (directBuffer.hasRemaining()) { // 检查剩余字节是否足够读取固定头部488424字节 if (directBuffer.remaining() 24) break; // 读取固定字段 int totalLength directBuffer.getInt(); long term directBuffer.getLong(); long currentIndex directBuffer.getLong(); int commandLength directBuffer.getInt(); // 检查剩余字节是否足够读取command if (directBuffer.remaining() commandLength) { // 回退缓冲区position下次继续解析 directBuffer.position(directBuffer.position() - 24); break; } // 读取command byte[] commandBytes new byte[commandLength]; directBuffer.get(commandBytes); String command new String(commandBytes, StandardCharsets.UTF_8); // 校验总长度 int actualLength 4 8 8 4 commandLength; if (totalLength ! actualLength) { throw new RuntimeException(日志文件损坏总长度不匹配); } // 找到目标索引则返回 if (currentIndex index) { return new LogEntry(term, index, command); } // 更新文件偏移 fileOffset totalLength; } } } catch (IOException e) { throw new RuntimeException(读取日志条目失败 [index index ], e); } return null; } /** * 加载所有日志条目节点启动时恢复 */ public ListLogEntry loadAllLogEntries() { ListLogEntry logEntries new ArrayList(); try { ByteBuffer directBuffer ByteBuffer.allocateDirect(1024 * 1024); long fileOffset 0; long fileSize fileChannel.size(); while (fileOffset fileSize) { directBuffer.clear(); int readBytes fileChannel.read(directBuffer, fileOffset); if (readBytes -1) break; directBuffer.flip(); while (directBuffer.hasRemaining()) { if (directBuffer.remaining() 24) break; int totalLength directBuffer.getInt(); long term directBuffer.getLong(); long index directBuffer.getLong(); int commandLength directBuffer.getInt(); if (directBuffer.remaining() commandLength) { directBuffer.position(directBuffer.position() - 24); break; } byte[] commandBytes new byte[commandLength]; directBuffer.get(commandBytes); String command new String(commandBytes, StandardCharsets.UTF_8); int actualLength 4 8 8 4 commandLength; if (totalLength ! actualLength) { throw new RuntimeException(日志文件损坏总长度不匹配); } logEntries.add(new LogEntry(term, index, command)); fileOffset totalLength; } } } catch (IOException e) { throw new RuntimeException(加载所有日志条目失败, e); } return logEntries; } /** * 强制刷盘将映射内存的数据同步到磁盘 */ public void forceFlush() { if (currentMappedBuffer ! null) { currentMappedBuffer.force(); // 同步映射内存到磁盘 } try { fileChannel.force(true); // 强制刷盘包含元数据 } catch (IOException e) { throw new RuntimeException(刷盘失败, e); } } /** * 关闭资源必须调用否则会导致文件句柄泄漏 */ public void close() { try { forceFlush(); if (fileChannel ! null) { fileChannel.close(); } if (raf ! null) { raf.close(); } } catch (IOException e) { throw new RuntimeException(关闭资源失败, e); } } /** * 初始化/扩容内存映射缓冲区 */ private void initMappedBuffer() { try { // 计算需要映射的起始位置和大小 long fileSize fileChannel.size(); // 如果当前映射段已写满或首次初始化创建新的映射 if (currentMappedBuffer null || writePosition currentMappedOffset MAPPED_SIZE) { currentMappedOffset (writePosition / MAPPED_SIZE) * MAPPED_SIZE; // 映射文件的指定区间到内存FileChannel.MapMode.READ_WRITE读写模式 currentMappedBuffer fileChannel.map( FileChannel.MapMode.READ_WRITE, currentMappedOffset, MAPPED_SIZE ); // 将缓冲区的position定位到当前写入位置相对于映射段的偏移 currentMappedBuffer.position((int) (writePosition - currentMappedOffset)); } } catch (IOException e) { throw new RuntimeException(初始化内存映射缓冲区失败, e); } } /** * 初始化FileChannel核心高性能通道 */ private void initFileChannel() { try { // 不存在则创建文件 if (!logFile.exists()) { boolean newFile logFile.createNewFile(); if (!newFile) { throw new RuntimeException(创建文件失败); } } this.raf new RandomAccessFile(logFile, FILE_MODE); this.fileChannel raf.getChannel(); } catch (IOException e) { throw new RuntimeException(初始化FileChannel失败, e); } } }2.2 服务端设计2.2.1 消息设计日志设计好了之后接下来看服务端如何设计。我们使用的是netty框架要求有netty基础。然后序列化协议采用的是protobuf读者可以参考这篇文章https://mp.weixin.qq.com/s/kg_-AMHRn_DzFbfBnkK4VQ 这篇文章大致讲解了一下该序列化协议并且也是采用netty整合的。根据proto文件生成类的命令如下也可以用idea的插件自动生成。protoc --proto_pathxxxx目录 --java_outxxx目录 具体的proto文件消息模板如下syntax proto3; option java_outer_classname KvRaftProto; // 生成的外层类名 option java_multiple_files false; // 生成多个独立的Java类而非内部类 // 1. 通用消息封装体核心 // Netty传输时只传这个消息通过type识别具体消息类型 message RaftKvMessage { // 消息类型枚举覆盖所有交互场景 enum MessageType { UNKNOWN 0; // 未知类型兜底 // 客户端 ↔ 节点KV操作 KV_REQUEST 1; // 客户端发起KV请求PUT/GET/DELETE KV_RESPONSE 2; // 节点响应客户端KV请求 // 节点 ↔ 节点Raft共识 VOTE_REQUEST 3; // 选举请求Candidate→Follower VOTE_RESPONSE 4; // 选举响应Follower→Candidate APPEND_ENTRIES_REQUEST 5; // 日志追加/心跳Leader→Follower APPEND_ENTRIES_RESPONSE 6; // 日志追加响应Follower→Leader } MessageType type 1; // 消息类型必传 string node_id 2; // 发送方节点ID用于识别节点 // 具体消息体根据type选择其中一个 KvRequest kv_request 3; KvResponse kv_response 4; VoteRequest vote_request 5; VoteResponse vote_response 6; AppendEntriesRequest append_entries_request 7; AppendEntriesResponse append_entries_response 8; } // 2. 客户端KV操作相关 // 客户端发起的KV请求PUT/GET/DELETE message KvRequest { enum OpType { PUT 0; // 写入/更新 GET 1; // 读取 DELETE 2; // 删除 } OpType op_type 1; // 操作类型必传 string key 2; // KV的key必传 string value 3; // KV的value仅PUT时传 // 可选请求ID用于幂等性防止重复请求 string request_id 4; } // 节点响应客户端的KV结果 message KvResponse { bool success 1; // 操作是否成功 string message 2; // 错误信息/提示失败时必传 string value 3; // 返回的value仅GET成功时传 string request_id 4; // 对应请求的ID幂等性 } // 3. Raft选举相关 // Candidate向Follower发起的投票请求 message VoteRequest { int64 term 1; // Candidate的当前任期必传 string candidate_id 2; // Candidate的节点ID必传 int64 last_log_index 3; // Candidate最后一条日志的索引用于日志一致性检查 int64 last_log_term 4; // Candidate最后一条日志的任期用于日志一致性检查 } // Follower响应Candidate的投票结果 message VoteResponse { int64 term 1; // Follower的当前任期必传用于更新Candidate的任期 bool vote_granted 2; // 是否投赞成票必传 } // 4. Raft日志追加/心跳相关 // 日志条目与你设计的日志格式对齐序列化后可直接写入日志文件 message LogEntry { int64 term 1; // Raft任期对应你日志格式的term int64 index 2; // 日志索引对应你日志格式的index string command 3; // KV操作命令如PUT key1 value1对应你日志格式的command } // Leader向Follower发送的日志追加/心跳请求 message AppendEntriesRequest { int64 term 1; // Leader的当前任期必传 string leader_id 2; // Leader的节点ID必传 int64 prev_log_index 3; // 前一条日志的索引用于日志一致性检查 int64 prev_log_term 4; // 前一条日志的任期用于日志一致性检查 repeated LogEntry entries 5; // 待追加的日志条目心跳时为空 int64 leader_commit 6; // Leader已提交的日志索引Follower据此更新自己的提交索引 } // Follower响应Leader的日志追加结果 message AppendEntriesResponse { int64 term 1; // Follower的当前任期必传用于更新Leader的任期 bool success 2; // 日志追加是否成功必传 int64 match_index 3; // Follower已匹配的日志索引Leader据此更新nextIndex }上面是消息的大致格式。接下来看服务端的节点设计我们从netty服务启动开始往下看在kv-core的app包里面public static void main(String[] args) { int port getPort(args); RaftNettyServer raftNettyServer new RaftNettyServer(port); try { raftNettyServer.start(); } catch (InterruptedException e) { throw new RuntimeException(e); } } private static int getPort(String[] args) { for (String arg : args) { if (arg.startsWith(node.port)) { return Integer.parseInt(arg.substring(10)); } } return 7777; }从java程序启动的命令行读取结点的端口参数我们可以用一台电脑开启多个应用在idea中这样配置就可以了如下图所示从上图中可以看到配置了三个节点然后本项目的jdk是采用的21这个版本。配置好了之后在idea的services里面可以把这些配置一起加进去然后就可以同时启动多个节点了。2.2.2 连接设计RaftNettyServer raftNettyServer new RaftNettyServer(port); ... raftNettyServer.start();从上一节的启动类看出来主要就是new了一个server对象然后调用start方法。我们顺着这两个看就可以了。首先是构造方法public class RaftNettyServer { .... private final int port; private final RaftNode node; public RaftNettyServer(int port) { this.port port; // 这里创建了一个raft结点对象 this.node new RaftNode(port); } } // 这个是RaftNode类 public RaftNode(int port) { this.port port; // 从配置文件中找到自己 this.nodesConfig new NodesConfig(); this.nodeId nodesConfig.findSelf(port); // 需要把自身结点 this.rpcPeers nodesConfig.getNodeList().stream() // node的格式是ip:port .map(node - new RpcPeer(node, node.split(:)[0], Integer.parseInt(node.split(:)[1]), this)) .toList(); this.logManager new LogManager(); this.storage new MemoryStorage(); // 把两个时间先初始化咯 this.electionTimeout 8000 ThreadLocalRandom.current().nextInt(4000); this.lastHeartbeatTime System.currentTimeMillis(); log.info(初始化选举超时{}, electionTimeout); // 定时器 scheduler Executors.newScheduledThreadPool(2); scheduler.scheduleAtFixedRate(this::tick, 2, 2000, TimeUnit.MILLISECONDS); }Raft类是最核心的一个类。上面的构造方法其实很简单。读者自行理解。需要说明一下的就是nodeId的格式【ip:端口】127.0.0.1:8888 // 就是这种字符串的格式还有要说明的就是rpcPeers这个List的构建可以看出来先是从配置文件读取到了集群节点列表然后遍历这个列表创建了对象这个具体是什么意思呢首先看一下Netty的客户端发送请求到服务端服务端处理后在返回给客户端客户端根据响应结果进行逻辑处理。这样一个示意图再看一下下面的连接示意图NettyClient不仅仅是给客户用的集群结点内部互相通信也要用到这个rpcPeers就是集群节点内部通信使用的。如上图所示每个节点都有一个NettyServer启动并监听着端口同时Leader结点需要给所有的Follower结点发送心跳请求此时这个Leader相当于其他两个Follower结点就相当于Client了所以在结点一里面有client的部分在项目归到了rpc的包下也就是上面那个rpcPeers的由来了。在集群启动的时候都是Follower结点只有等到超时了才会开始选主在此之前每个节点都有成为Candidate的可能也就是说每个节点都有向其他结点发送投票请求的可能VoteRequest那么每个节点里面都要有一套Netty Client及处理流程。就如下图所示这样就有一个问题了三个节点我就要六个tcp连接了四个节点就要12个连接十个节点呢就要90个连接这也太多了吧那也确实是的。有一个思路是搞一个中间层叫做routeCenter所有结点连向它然后消息都经过这个路由中心来转发这样连接数就会少很多了。还有一个思路反正NettyClient 一个NettyServer构建出一个Channel理论上我只需要三个tcp连接就可以了啊。如下图所示但是这样的话节点间通信需要转发了代码逻辑就太复杂了况且还有一个问题那就是如何确定这个tcp的连接顺序呢这个可以按照nodeId的字典顺序来嘛。反正就是麻烦就完事儿了。。。综上所述还是最开始的方案最简单直接了反正这就是一个简易的案例不要考虑太多了。如果有一些其他合适的思路欢迎读者给出。节点之间的连接设计就是上面的样子了。接下来看RaftNode的设计。2.2.3 RaftNode设计那就从构造器开始看吧public RaftNode(int port) { this.port port; // 从配置文件中找到自己 this.nodesConfig new NodesConfig(); this.nodeId nodesConfig.findSelf(port); // 需要把自身结点 this.rpcPeers nodesConfig.getNodeList().stream() // node的格式是ip:port .map(node - new RpcPeer(node, node.split(:)[0], Integer.parseInt(node.split(:)[1]), this)) .toList(); this.logManager new LogManager(); this.storage new MemoryStorage(); // 把两个时间先初始化咯 this.electionTimeout 8000 ThreadLocalRandom.current().nextInt(4000); this.lastHeartbeatTime System.currentTimeMillis(); log.info(初始化选举超时{}, electionTimeout); // 定时器 scheduler Executors.newScheduledThreadPool(2); scheduler.scheduleAtFixedRate(this::tick, 2, 2000, TimeUnit.MILLISECONDS); }可以看到都是做一些初始化的工作然后下面是开启了一个定时任务tickprivate void tick() { log.info(检查是否超时{} 状态: {}, nodeId, state); try { if (state ! NodeState.LEADER isTimeout()) { becomeCandidate(); } else if (state NodeState.LEADER) { // sendHeartbeats(); } } catch ( Exception e ) { log.error({} 节点tick定时任务异常, nodeId, e); } } private void becomeCandidate() { log.info({} 选举超时转为 Candidate开始任期: {}, nodeId, currentTerm.get() 1); state NodeState.CANDIDATE; currentTerm.getAndIncrement(); // 任期1 votedFor nodeId; // 给自己投一票 resetElectionTimeout(); // 集群发送投票请求 requestVotes(); } private boolean isTimeout() { return System.currentTimeMillis() - lastHeartbeatTime electionTimeout; } private void resetElectionTimeout() { // 8000ms ~ 12000ms 随机超时避免平票 this.electionTimeout 8000 ThreadLocalRandom.current().nextInt(4000); log.info(重置 {} 节点选举时间随机超时{} ms, nodeId, electionTimeout); this.lastHeartbeatTime System.currentTimeMillis(); }主要就是看becomeCandidate这个方法最后的向集群发送投票请求。private void requestVotes() { // 1. 初始化票数自己的一票 AtomicInteger grantedVotes new AtomicInteger(1); long count rpcPeers.stream().filter(peer - !peer.isSelf()).count(); // 不包含自己的结点数 int majority (int) ((count 1) / 2 1); // 总节点数(包含自己)的半数以上 // 2.构造投票消息 KvRaftProto.VoteRequest voteRequest KvRaftProto.VoteRequest.newBuilder() .setTerm(currentTerm.get()) .setCandidateId(nodeId) .setLastLogIndex(logManager.getLastLogIndex()) .setLastLogTerm(logManager.getLastLogTerm()) .build(); // 3. 发送投票请求 // 构建一个对象表示当前投票请求的状态 // String voteId UUID.randomUUID().toString().replaceAll(-, ); // 这里为什么可以用term因为 Raft 规定一个节点在一个 term 内只能投一张票。 // 所以只要 term 匹配这个响应就一定是针对你当前发起的这一轮选举的。 GlobalVoteManager.setVoteState(currentTerm.get(), new VoteState(nodeId, currentTerm.get(), majority)); int countSend 0; for (RpcPeer peer : rpcPeers) { if (!peer.isSelf()) { // 不是自身结点就发送投票请求 // send方法就很简单了请读者自行查看 boolean send peer.send(KvRaftProto.RaftKvMessage.newBuilder() .setType(KvRaftProto.RaftKvMessage.MessageType.VOTE_REQUEST) .setVoteRequest(voteRequest) .build()); if ( send ) countSend; } } log.info(发送投票请求{}已发送给了 {} 个结点.., voteRequest, countSend); }这样投票请求就发送出去了此时结点是作为客户端发送给其他节点的接下来的逻辑就是其他节点接收到voteRequest请求然后做逻辑处理所以就要在server包下面去查看具体逻辑。// 在kv-core的server包下面的KvBusinessHandler.java // 1.如果是投票请求 if ( raftKvMessage.getType() KvRaftProto.RaftKvMessage.MessageType.VOTE_REQUEST) { log.info(receive vote request.........); KvRaftProto.VoteRequest voteRequest raftKvMessage.getVoteRequest(); // 可以看到交给了node去处理 KvRaftProto.VoteResponse voteResponse node.tackleVoteRequest(voteRequest); ctx.writeAndFlush(KvRaftProto.RaftKvMessage.newBuilder() .setType(KvRaftProto.RaftKvMessage.MessageType.VOTE_RESPONSE) .setVoteResponse(voteResponse) .build()); } // 又回到了RaftNode类了 public KvRaftProto.VoteResponse tackleVoteRequest(KvRaftProto.VoteRequest voteRequest) { // 比较任期 if (voteRequest.getTerm() currentTerm.get()) { log.info({} 投票请求任期太小拒绝投票, nodeId); return buildVoteResponse(false, currentTerm.get()); } if ( votedFor ! null !voteRequest.getCandidateId().equals(votedFor) ) { log.info({}已投给其他人拒绝该投票请求, nodeId); return buildVoteResponse(false, currentTerm.get()); } // 再比较日志情况 if ( voteRequest.getLastLogIndex() logManager.getLastLogIndex() voteRequest.getLastLogTerm() logManager.getLastLogTerm() ) { log.info({} 投票请求ok赞成投票, nodeId); currentTerm.set(voteRequest.getTerm()); // 更新自己的任期 votedFor voteRequest.getCandidateId(); // 投票给该节点 return buildVoteResponse(true, voteRequest.getTerm()); } log.info({} 投票请求日志太旧拒绝该投票请求, nodeId); return buildVoteResponse(false, currentTerm.get()); }其他节点收到了拉票请求会返回response给candidate结点candidate结点是作为Client发送的拉票请求收到的响应肯定是在客户端的处理器handler接下来的逻辑就要在rpc包下面的RpcClientHandler去查看了Override protected void channelRead0(ChannelHandlerContext ctx, KvRaftProto.RaftKvMessage msg) { // 2.VOTE_RESPONSE 投票请求回来的响应【投票请求是结点作为客户端发出的应该在客户端的handler处理响应】 if (msg.getType() KvRaftProto.RaftKvMessage.MessageType.VOTE_RESPONSE) { log.info(receive vote response.........); KvRaftProto.VoteResponse voteResponse msg.getVoteResponse(); raftNode.tackleVoteResponse(voteResponse); // 又回到了RaftNode } } // RaftNode.java // 投票结果处理,【投票请求是结点作为客户端发出的要在客户端的handler处理响应】 public synchronized void tackleVoteResponse(KvRaftProto.VoteResponse voteResponse) { long term voteResponse.getTerm(); // 2. 发现更高任期立即降级并更新 // 1. 任期检查对方比我大我立即认输 if (term currentTerm.get()) { stepDown(term); return; } // 2. 状态检查如果我已经不是 Candidate 了比如已经超时重选或收到心跳忽略 if (state ! NodeState.CANDIDATE) return; // 3. 任期匹配检查确保这是对“当前这一轮”选举的回复 // 如果收到的响应任期比当前小说明是之前过期的选举回复直接丢弃 if (term currentTerm.get()) { return; } // 4. 从全局管理器获取当前选举的投票状态 VoteState voteState GlobalVoteManager.getVoteState(term); if (voteState null) { log.error(未找到任期 {} 的投票记录状态, term); return; } // 5. 如果对方投了赞成票 if (voteResponse.getVoteGranted()) { // 增加票数这里 AtomicInteger 在 VoteState 内部保证了线程安全 // 但由于本方法加了 synchronized其实双重保险 int currentVotes voteState.addVote(); int majority voteState.getMajority(); log.info(赞成票当前票数: {}/{}, currentVotes, nodesConfig.getNodeList().size()); // 6. 检查是否达到多数派 if (currentVotes majority) { log.info(节点 {} 获得过半选票 ({})准备晋升为 Leader, nodeId, currentVotes); becomeLeader(); } } else { log.info(拒绝了我的投票请求); } } private synchronized void becomeLeader() { if (state ! NodeState.CANDIDATE) return; if (state NodeState.LEADER) return; this.state NodeState.LEADER; log.info(Node {} 赢得选举即将成为 Leader, Term: {}, nodeId, currentTerm.get()); // 1. 清理上一任期的残留状态 this.votedFor null; // 2. 立即发送第一波心跳宣示主权 (防止其他节点又超时) sendHeartbeats(); // 3. 启动定时心跳任务 (比如每 2 秒一次) if (heartbeatTask ! null) heartbeatTask.cancel(true); heartbeatTask scheduler.scheduleAtFixedRate(this::sendHeartbeats, 0, 1000, TimeUnit.MILLISECONDS); log.info( 节点 {} 正式成为 Term {} 的 Leader , nodeId, currentTerm.get()); }3.启动测试选主把配置好的三个节点启动一下看看结果

相关文章:

简易的分布式kv设计

1. 前言 在 Raft KV 系统中,每个节点(Node)都是对等的。一个典型的请求流向是: Client -> Leader Node -> Raft 日志同步 -> 大多数节点确认 -> 应用到状态机 (KV Store) -> 返回 Client。 2. 设计步骤 Raft 核…...

《信号完整性》专栏简介

大家好,我是一只豌豆象,一名长期从事信号完整性设计分析的电子工程师,凭着对技术知识的无尽渴望和对技术工作的不断追求,再辅以极高的学习热情,使得我能够十年如一日的高效深耕于电子产品的设计研发领域。 在已过去的…...

ADC过采样技术提升嵌入式系统测量精度

1. ADC过采样技术概述在嵌入式系统开发中,ADC(模数转换器)的性能往往直接决定了整个系统的测量精度。标准的10位ADC在很多场合已经足够使用,但当我们需要更高精度的测量时,过采样技术就成为了一个经济有效的解决方案。…...

Docker容器优化全攻略

Docker容器优化全攻略 引言:Docker的效率革命 哥们,别整那些花里胡哨的!作为一个前端开发兼摇滚鼓手,我最烦的就是容器体积大、启动慢、运行卡。Docker容器的优化直接关系到部署效率、运行性能和资源消耗。今天,我就给…...

Kubernetes集群快速搭建指南

Kubernetes集群快速搭建指南 引言:Kubernetes的时代 哥们,别整那些花里胡哨的!作为一个前端开发兼摇滚鼓手,我最烦的就是复杂的环境搭建。但Kubernetes作为云原生时代的基础设施,你不得不掌握它。今天,我就…...

云原生时代的前端部署最佳实践

云原生时代的前端部署最佳实践 引言:前端部署的进化 哥们,别整那些花里胡哨的!作为一个前端开发兼摇滚鼓手,我最烦的就是部署时的各种幺蛾子。从传统的FTP上传,到现在的云原生部署,前端部署已经发生了天翻地…...

微信小程序助力老年智能评估,Pillow高级实战案例:图像处理的进阶应用。

基于微信小程序的关爱老年人在线能力评估系统设计 系统背景与意义 随着老龄化社会进程加速,老年人能力评估成为养老服务的重要环节。传统纸质评估方式效率低、数据难留存。基于微信小程序的在线评估系统可实现便捷化、标准化评估,提升养老服务智能化水平…...

LIS302DL加速度计I²C驱动库LS302i2c详解

1. LS302i2c 库概述:面向嵌入式系统的 LIS302DL IC 加速度计驱动实现LS302i2c 是一个专为 STM32 及兼容 Cortex-M 微控制器设计的轻量级、可移植 IC 接口加速度计驱动库,其核心目标是为 STMicroelectronics 的 LIS302DL 三轴数字加速度传感器提供稳定、低…...

隐私优先方案:OpenClaw+本地化Qwen3.5-9B处理敏感数据

隐私优先方案:OpenClaw本地化Qwen3.5-9B处理敏感数据 1. 为什么我们需要隐私优先的AI方案 去年我在帮一家诊所做数字化改造时,遇到了一个棘手问题:他们需要自动化处理患者病历,但又担心使用云端AI服务会导致数据泄露。这让我意识…...

Tach库:嵌入式单通道转速测量轻量实现

1. Tach库概述:单通道编码器转速测量的嵌入式实现方案 Tach库是一个轻量级、高精度的嵌入式转速测量工具,专为单通道数字脉冲信号设计,典型应用场景包括红外对射式槽型光电开关(slotted wheel)、霍尔效应转速传感器、磁…...

PN7150/PN7160 NFC控制器I²C驱动库详解

1. 项目概述Electronic Cats PN7150/PN7160 库是一个面向嵌入式平台的轻量级 IC 驱动库,专为 NXP 公司推出的 PN7150 和 PN7160 NFC 控制器芯片设计。该库并非简单封装,而是基于 NCI(NFC Controller Interface)1.0 协议规范实现的…...

(23)ArcGIS Pro 空间连接与缓冲区分析:属性传递、多环缓冲区实战全攻略

点赞+关注送: 1、天地图GS(2024)0650号_2025.9版; 2、全国土地覆盖数据CLCD2025年; 注:其他数据也可私信或留言,看是否有 前言 在 ArcGIS Pro 空间分析中,缓冲区分析与空…...

从工业5.0到实战:一个智能仓库管理系统的设计与Flutter优化

引言 工业5.0并非对工业4.0的颠覆,而是一次“人性的回归”与“价值的重塑”。它强调以人为本(Human-centric)、可持续(Sustainable)与韧性(Resilient)。作为一名计算机专业的毕业生,…...

OpenClaw多模态技能扩展:用Qwen3.5-9B实现截图OCR自动归档

OpenClaw多模态技能扩展:用Qwen3.5-9B实现截图OCR自动归档 1. 为什么需要智能截图归档 作为一个长期依赖截图保存信息的用户,我的桌面常年堆积着数百张未命名的截图文件。传统的解决方案无非两种:手动重命名(耗时费力&#xff0…...

AI Agent学习日记 Day3

今天没怎么搞,只做了一点小优化。之前我是用 agent.stream(invoke_input,stream_mode["messages", "updates"],config {"configurable": {"thread_id": "1"}}) 通过mode "messages"来获取并流式输…...

OpenClaw学习助手:Qwen3.5-9B-AWQ-4bit自动整理网课截图笔记

OpenClaw学习助手:Qwen3.5-9B-AWQ-4bit自动整理网课截图笔记 1. 为什么需要自动化学习助手 作为一名经常通过网课充电的技术从业者,我长期被一个痛点困扰:每次听完两小时的课程,手机相册里会堆满几十张截图,里面有老…...

探索混合动力汽车Simulink整车模型:并联P2构型与基于规则的控制策略

混合动力汽车simulink整车模型,并联P2构型 基于规则的控制策略,可以直接进行CTC,WTLC,NEDC等工况仿真。嘿,各位技术爱好者!今天咱来聊聊混合动力汽车Simulink整车模型,特别是并联P2构型以及基于…...

2026年4月3日 理论基石:数据量与模型参数量的关系

文章目录1. 理论基石:数据量与模型参数量的关系Kaplan Scaling Laws (OpenAI, 2020)Chinchilla Scaling Laws (DeepMind, 2022)2. 实战计算:针对你的 nanoGPT 实验第一步:估算总 Token 数第二步:计算训练步数 (max_iters)第三步&a…...

基于Python的毕业生实习管理系统

项目介绍:基于Python的毕业生实习管理系统技术栈 项目编号:本课题采用 Python 语言进行开发,系统整体基于 Web 平台实现。前端页面主要使用 HTML、CSS、JavaScript 进行构建,并结合 Bootstrap 提升页面布局与交互效果;…...

seo推广外包需要多少投入_seo推广外包如何避免被算法惩罚

SEO推广外包需要多少投入_SEO推广外包如何避免被算法惩罚 在当今数字化经济时代,SEO(搜索引擎优化)推广已经成为企业提升网站流量和品牌知名度的重要手段。随着搜索引擎算法的不断更新,企业在进行SEO推广外包时,不仅需…...

客户和采购都在用豆包、deepseek查资料,怎么才能让这些国内头部大模型在回答时优先推荐公司的产品?

随着人工智能技术的爆发,企业获客与消费者决策的路径正在发生深刻的重构。据近期的公开市场调研与行业报告显示,包括豆包、DeepSeek、文心一言在内的国内头部大模型,其月活跃用户数正呈现指数级增长。一个不可忽视的趋势是:无论是…...

expected_conditions(EC)与元素相关的常用方法

与元素(Element)相关的 expected_conditions,分为存在、可见、可点击、不可见/消失、属性/文本、选中状态等几类引用:from selenium.webdriver.support import expected_conditions as EC1. 元素存在(Presence&#xf…...

MySQL的HAVING:掌握分组过滤的高级用法(实战详解)

本文全面讲解MySQL的HAVING用法,从基础语法到高级技巧,包括分组过滤、聚合查询优化与实战应用。 文章目录一、什么是MySQL的HAVINGHAVING的定义与作用HAVING与WHERE的本质区别二、HAVING的基本语法详解标准语法结构执行顺序解析三、MySQL的HAVING与GROUP…...

javascript之Dom查询操作1

1.通过Id获取单个元素假定要获取下面html代码里面id是div1的div标签内容语法是document.getElementById(Id值)<div id"div1">div1</div>let a document.getElementById("div1") console.log(a)2.根据name属性值获取语法是document.getElement…...

Windows下OpenClaw避坑指南:千问3.5-35B-A3B-FP8接口配置全流程

Windows下OpenClaw避坑指南&#xff1a;千问3.5-35B-A3B-FP8接口配置全流程 1. 为什么选择OpenClaw千问3.5组合&#xff1f; 去年我在尝试自动化处理大量PDF报告时&#xff0c;发现市面上的RPA工具要么太笨重&#xff0c;要么无法处理复杂语义。直到遇到OpenClaw这个开源智能…...

告别token焦虑,Claude Code 本地免费运行

零API无限次100%离线&#xff01;5分钟把专属AI程序员装进电脑&#xff0c;告别API烧钱与代码泄露焦虑 有没有开发者和我一样&#xff0c;被云端 AI 编码工具搞得心力交瘁&#xff1f; Claude Code 写代码是真的顺手&#xff0c;但动辄要绑定 API 密钥、按调用量付费烧钱&#…...

前端测试吐槽:别再写那些没用的测试了!

前端测试吐槽&#xff1a;别再写那些没用的测试了&#xff01; 毒舌时刻 前端测试就像体检——每个人都知道要做&#xff0c;但真正认真做的没几个。Jest、React Testing Library、Cypress... 一堆测试工具让你挑花了眼&#xff0c;结果你的测试还是写得像一坨屎。 我就想不明白…...

【数据结构】线索二叉树之中序遍历线索化详解与实现

在二叉树的遍历过程中&#xff0c;我们会发现大量的空指针域被浪费&#xff0c;而线索二叉树的核心思想就是利用这些空指针&#xff0c;将其指向节点的前驱或后继节点&#xff0c;从而实现二叉树的非递归遍历无需借助栈&#xff0c;提升遍历效率。本文将详细讲解中序遍历线索化…...

2026-04-02 打卡第 2 天

# 2026-04-02 打卡第 2 天 # 列表 """ li [1,2,a] print(li) # 输出结果&#xff1a;[1, 2, a] """# 列表中添加元素 # 整体添加 append """ li [a,b,c] li.append(d) print(li) # 输出结果&#xff1a;[a, b, c, d] "&qu…...

【数据结构与算法】第24篇:哈夫曼树与哈夫曼编码

一、基本概念1.1 带权路径长度在二叉树中&#xff1a;路径长度&#xff1a;从一个节点到另一个节点经过的边数带权路径长度(WPL)&#xff1a;所有叶子节点的权重 路径长度 之和示例&#xff1a;text叶子节点&#xff1a;A(7), B(5), C(2), D(4)普通树&#xff1a;15/ \7 8/…...