p2p、分布式,区块链笔记: 通过libp2p的Kademlia网络协议实现kv-store
Kademlia 网络协议
-
Kademlia 是一种分布式哈希表协议和算法,用于构建去中心化的对等网络,核心思想是通过分布式的网络结构来实现高效的数据查找和存储。在这个学习项目里,Kademlia 作为 libp2p 中的 NetworkBehaviour的组成。
-
以下这些函数或方法是根据 Kademlia 网络协议设计的,它们实现了基本的网络操作,包括获取数据记录、获取数据提供者、存储数据记录和开始提供数据等功能(这里只展示了项目中用到的函数,常用函数可以看libp2p Kademlia DHT 规范,更多函数可见如下图中的源码部分)。
1. get_record
kademlia.get_record(key, Quorum::One);
- 作用: 从 Kademlia 网络中获取与指定
key
相关的记录。 - 参数:
key
: 要获取记录的键。Quorum::One
: 获取记录时所需的一致性要求,这里是指只需要从一个节点获取记录即可。
- 实现逻辑:
- 根据 Kademlia 协议,节点首先根据
key
计算出其对应的 K-bucket 或者具体的节点 ID,然后向网络中查找负责该key
的节点。 - 节点通过网络查询和消息传递机制,从负责节点处获取存储的记录。
- 返回获取到的记录或者执行相应的处理逻辑。
- 根据 Kademlia 协议,节点首先根据
2. get_providers
kademlia.get_providers(key);
- 作用: 获取能够提供与指定
key
相关数据的节点信息(即数据的提供者)。 - 参数:
key
: 要获取提供者信息的数据的键。
- 实现逻辑:
- 类似于
get_record
,节点根据key
计算出其对应的 K-bucket 或者节点 ID。 - 节点向网络发送查询请求,询问哪些节点能够提供与
key
相关的数据。 - 返回能够提供数据的节点列表或者执行相应的处理逻辑。
- 类似于
3. put_record
let record = Record {key,value,publisher: None,expires: None,
};
kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");
- 作用: 将指定的记录存储到 Kademlia 网络中。
- 参数:
record
: 包含要存储的数据信息的记录对象,包括key
(键)、value
(值)、publisher
(发布者,可能为空)、expires
(过期时间,可能为空)等字段。Quorum::One
: 存储记录时的一致性要求,这里是指只需要将记录存储在一个节点即可。
- 实现逻辑:
- 节点根据
key
计算出对应的 K-bucket 或节点 ID。 - 节点将
record
发送给负责存储该key
的节点,并根据指定的一致性要求存储副本。 - 返回存储成功或失败的结果,或者执行相应的处理逻辑。
- 节点根据
4. start_providing
kademlia.start_providing(key).expect("Failed to start providing key");
- 作用: 在 Kademlia 网络中开始提供指定
key
的数据。 - 参数:
key
: 要开始提供的数据的键。
- 实现逻辑:
- 节点将
key
注册为它可以提供的数据标识。 - 当其他节点查询或需要该
key
的数据时,该节点将响应并提供相应的数据。 - 返回启动提供成功或失败的结果,或者执行相应的处理逻辑。
- 节点将
kv数据库主体代码及注释
use async_std::io;
use futures::{prelude::*, select};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult,Quorum, Record,
};
use libp2p::{development_transport, identity,mdns::{Mdns, MdnsConfig, MdnsEvent},swarm::SwarmEvent,NetworkBehaviour, PeerId, Swarm,
};
use std::error::Error;#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {env_logger::init();// 创建本地密钥,本地peer id和传输控制组件let local_key = identity::Keypair::generate_ed25519();let local_peer_id = PeerId::from(local_key.public());let transport = development_transport(local_key).await?;// 事件行为控制// We create a custom network behaviour that combines Kademlia and mDNS.#[derive(NetworkBehaviour)]// https://docs.rs/libp2p/latest/libp2p/swarm/trait.NetworkBehaviour.html#[behaviour(out_event = "MyBehaviourEvent")]//这个 "MyBehaviourEvent" 定义在下边的代码中// NetworkBehaviour这个trait将对所描述的结构体中的每个成员依次进行操作,例如 NetworkBehavior::poll它将首先轮询第一个结构成员,直到返回poll::Pending,然后再转到后面的成员。// 关于 #[behaviour(out_event = "MyBehaviourEvent")]中的out_event :The final out event. If we find a `#[behaviour(out_event = "Foo")]` attribute on the struct, we set `Foo` as the out event. Otherwise we use `()`.struct MyBehaviour {kademlia: Kademlia<MemoryStore>,mdns: Mdns,}#[allow(clippy::large_enum_variant)] // #[allow()为Lint语法属性检查控制,https://doc.rust-lang.org/reference/attributes/diagnostics.html#lint-check-attributes //关于large_enum_variant 详见https://rust-lang.github.io/rust-clippy/master/index.html#/large_enum_variantenum MyBehaviourEvent {Kademlia(KademliaEvent),Mdns(MdnsEvent),}// 实现(impl)块,用于为类型KademliaEvent实现了From trait,使其能够被转换为类型MyBehaviourEvent。impl From<KademliaEvent> for MyBehaviourEvent {fn from(event: KademliaEvent) -> Self {MyBehaviourEvent::Kademlia(event)}}// 实现(impl)块,用于为类型 MdnsEvent 实现了From trait,使其能够被转换为类型MyBehaviourEvent。impl From<MdnsEvent> for MyBehaviourEvent {fn from(event: MdnsEvent) -> Self {MyBehaviourEvent::Mdns(event)}}// Create a swarm to manage peers and events.let mut swarm = {// Create a Kademlia behaviour.let store = MemoryStore::new(local_peer_id);let kademlia = Kademlia::new(local_peer_id, store);let mdns = Mdns::new(MdnsConfig::default()).await?;let behaviour = MyBehaviour { kademlia, mdns };Swarm::new(transport, behaviour, local_peer_id)};// 从命令行读取指令并赋值给可变变量"stdin"let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();// Listen on all interfaces and whatever port the OS assigns.swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;// Kick it off.loop {select! {line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),event = swarm.select_next_some() => match event { // swarm.select_next_some() 是一个方法,用于从一个事件流中获取下一个事件,后续送到match进行匹配SwarmEvent::NewListenAddr { address, .. } => {//当发生新的监听地址事件时println!("Listening in {:?}", address);},SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {// 发生mDNS服务发现事件时for (peer_id, multiaddr) in list {swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);}}SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {// 当发出的 Kademlia 查询完成时handle_query_result(&result);}_ => {} // 通配符模式,执行一个空的代码块}}}
}// 下面是两个辅助函数,一个根据不同的查询结果类型执行不同的逻辑,另一个处理从命令行输入的命令
fn handle_query_result(result: &QueryResult) {match result {...}
}fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {let mut args = line.split(' ');match args.next() {...}
}
两个辅助函数
处理从命令行输入的命令
- 这段 Rust 代码定义了一个函数 handle_input_line,用于处理从命令行读取的输入 line,并根据命令执行相应的操作。函数通过分割输入行来解析命令和参数,处理缺少参数的错误情况,并根据命令调用传入的 Kademlia 网络实例 (kademlia) 的相应方法。
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {// 将输入行按空格分割为多个参数let mut args = line.split(' ');// 匹配第一个参数(命令)match args.next() {Some("GET") => {// 如果命令是 "GET"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};// 调用 Kademlia 网络的 get_record 方法,传入指定的键和 Quorum::Onekademlia.get_record(key, Quorum::One);}Some("GET_PROVIDERS") => {// 如果命令是 "GET_PROVIDERS"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};// 调用 Kademlia 网络的 get_providers 方法,传入指定的键kademlia.get_providers(key);}Some("PUT") => {// 如果命令是 "PUT"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};let value = {// 尝试获取下一个参数作为值match args.next() {Some(value) => value.as_bytes().to_vec(), // 将值转换为字节向量None => {// 如果未提供值,则打印错误并从函数返回eprintln!("缺少值");return;}}};// 创建一个包含指定键、值及可选字段的 Record 对象let record = Record {key,value,publisher: None,expires: None,};// 在 Kademlia 网络中以 Quorum::One 一致性存储记录kademlia.put_record(record, Quorum::One).expect("本地存储记录失败。");}Some("PUT_PROVIDER") => {// 如果命令是 "PUT_PROVIDER"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};// 在 Kademlia 网络中开始提供指定的键kademlia.start_providing(key).expect("启动提供键失败");}_ => {// 如果命令不匹配预期的任何命令eprintln!("期望命令为 GET、GET_PROVIDERS、PUT 或 PUT_PROVIDER");}}
根据不同的查询结果类型执行不同的逻辑
fn handle_query_result(result: &QueryResult) {match result {QueryResult::GetProviders(Ok(ok)) => {for peer in &ok.providers {println!("Peer {:?} provides key {:?}",peer,std::str::from_utf8(ok.key.as_ref()).unwrap());}}QueryResult::GetProviders(Err(err)) => {eprintln!("Failed to get providers: {:?}", err);}QueryResult::GetRecord(Ok(ok)) => {for PeerRecord {record: Record { key, value, .. },..} in &ok.records{println!("Got record {:?} {:?}",std::str::from_utf8(key.as_ref()).unwrap(),std::str::from_utf8(&value).unwrap(),);}}QueryResult::GetRecord(Err(err)) => {eprintln!("Failed to get record: {:?}", err);}QueryResult::PutRecord(Ok(PutRecordOk { key })) => {println!("Successfully put record {:?}",std::str::from_utf8(key.as_ref()).unwrap());}QueryResult::PutRecord(Err(err)) => {eprintln!("Failed to put record: {:?}", err);}QueryResult::StartProviding(Ok(AddProviderOk { key })) => {println!("Successfully put provider record {:?}",std::str::from_utf8(key.as_ref()).unwrap());}QueryResult::StartProviding(Err(err)) => {eprintln!("Failed to put provider record: {:?}", err);}_ => {}}
}fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {let mut args = line.split(' ');match args.next() {Some("GET") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};kademlia.get_record(key, Quorum::One);}Some("GET_PROVIDERS") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};kademlia.get_providers(key);}Some("PUT") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};let value = {match args.next() {Some(value) => value.as_bytes().to_vec(),None => {eprintln!("Expected value");return;}}};let record = Record {key,value,publisher: None,expires: None,};kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");}Some("PUT_PROVIDER") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};kademlia.start_providing(key).expect("Failed to start providing key");}_ => {eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");}}
}
运行示例
PS C:\Users\kingchuxing\Documents\learning-libp2p-main\rust> cargo run --example 04-kv-store
Listening in "/ip4/172.23.118.182/tcp/65055"
Listening in "/ip4/192.168.0.104/tcp/65055"
Listening in "/ip4/127.0.0.1/tcp/65055"
GET 123
Failed to get record: NotFound { key: Key(b"123"), closest_peers: [] }
PUT 123
缺少值
PUT 123 123456789
Failed to put record: QuorumFailed { key: Key(b"123"), success: [], quorum: 1 }
GET 123
Got record "123" "123456789"
PUT_PROVIDER 234 //输入提供者
Successfully put provider record "234"
GET_PROVIDERS 234 //获取提供者
Peer PeerId("12D3KooWB7CFnrmeH5gzRxA4CYR2YTg2K3NMvNHP5dWDPFwAHY38") provides key "234"
GET 234
Failed to get record: NotFound { key: Key(b"234"), closest_peers: [] }
相关文章:

p2p、分布式,区块链笔记: 通过libp2p的Kademlia网络协议实现kv-store
Kademlia 网络协议 Kademlia 是一种分布式哈希表协议和算法,用于构建去中心化的对等网络,核心思想是通过分布式的网络结构来实现高效的数据查找和存储。在这个学习项目里,Kademlia 作为 libp2p 中的 NetworkBehaviour的组成。 以下这些函数或…...

ShareSDK iOS端如何实现小红书分享
下载SDK 请登陆官网 ,找到SDK下载,勾选需要的平台下载 导入SDK (1)离线导入将上述下载到的SDK,直接将整个SDK资源文件拖进项目里,如下图: 并且勾选以下3个选项 在点击Finish,…...

算法day1 两数之和 两数相加 冒泡排序 快速排序
两数之和 最简单的思维方式肯定是去凑两个数,两个数的和是目标值就ok。这里两遍for循环解决。 两数相加 敲了一晚上哈哈,结果超过int范围捏,难受捏。 public class Test2 {public static void main(String[] args) { // ListNode l1 …...

Rust监控可观测性
可观测性 在监控章节的引言中,我们提到了老板、前端、后端眼中的监控是各不相同的,那么有没有办法将监控模型进行抽象、统一呢? 来简单分析一下: 业务指标实时展示,这是一个指标型的数据( metric )手机 APP 上传的数…...

SVN 的忽略(Ignore)和递归(Recursively)以及忽略部分
SVN中忽略大家经常用到,但总是似懂非懂,下面就详细展开说明一下忽略如何设置。 两个忽略 通常设置忽略都是文件夹和里面的文件都忽略。 设置忽略我们通常只需要鼠标右键点击忽略就可以了,如图: 第一个忽略用的最多,…...

vue3开发过程中遇到的一些问题记录
问题: vue3在使用 defineProps、defineEmits、defineExpose 时不需要import,但是 eslint会报错error defineProps is not defined no-undef 解决方法: 安装 vue-eslint-parser 插件,在 .eslintrc.js 文件中添加配置 parser: vue-e…...

Jedis、Lettuce、RedisTemplate连接中间件
jedis就像jdbc一样,用于两个端直接的连接。 1.创建Spring项目 这里不过多赘述... 2.导入连接工具jedis 在pom文件中导入jedis的依赖。 <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version&…...

【C++】继承(详解)
前言:今天我们正式的步入C进阶内容的学习了,当然了既然是进阶意味着学习难度的不断提升,各位一起努力呐。 💖 博主CSDN主页:卫卫卫的个人主页 💞 👉 专栏分类:高质量C学习 👈 &#…...

网络io与select,poll,epoll
前言 网络 IO,会涉及到两个系统对象,一个是用户空间调用 IO 的进程或者线程,另一个是内核空间的内核系统,比如发生 IO 操作 read 时,它会经历两个阶段: 1. 等待数据准备就绪 2. 将数据从内核拷贝到进程或…...

【Linux】多线程(一万六千字)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 文章目录 前言 线程的概念 线程的理解(Linux系统为例) 在Linux系统里如何保证让正文部分的代码可以并发的去跑呢? 为什么要有多进程呢? 为…...
sh脚本笔记2
test条件测试 语法 条件测试语法说明语法1:test <测试表达式>这是利用test命令进行条件测试表达式的方法。test命令和“<测试表达式>”之间至少有一个空格语法2:[ <测试表达式> ]这是通过[](单中括号)进行条件…...

js替换对象里面的对象名称
data为数组,val为修改前的名称,name为修改后的名称 JSON.parse(JSON.stringify(data).replace(/val/g, name)) ; 1.替换data里面的对象tenantInfoRespVO名称替换成tenantInfoUpdateReqVO 2.替换语句: 代码可复制 let tenantInf…...

鸿蒙开发设备管理:【@ohos.settings (设置数据项名称)】
设置数据项名称 说明: 本模块首批接口从API version 8开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。 本模块提供设置数据项的访问功能相关接口的说明及示例。 导入模块 import settings from ohos.settings;settings.getUri…...

STM32之五:TIM定时器(2-通用定时器)
目录 通用定时器(TIM2~5)框图 1、 输入时钟源选择 2、 时基单元 3 、输入捕获:(IC—Input Capture) 3.1 输入捕获通道框图(TI1为例) 3.1.1 滤波器: 3.1.2 边沿检测器…...

【分布式系统】监控平台Zabbix对接grafana
以前两篇博客为基础 【分布式系统】监控平台Zabbix介绍与部署(命令截图版)-CSDN博客 【分布式系统】监控平台Zabbix自定义模版配置-CSDN博客 一.安装grafana并启动 添加一台服务器192.168.80.104 初始化操作 systemctl disable --now firewalld set…...

操作系统真象还原:编写硬盘驱动程序
第13章-编写硬盘驱动程序 这是一个网站有所有小节的代码实现,同时也包含了Bochs等文件 13.1 硬盘及分区表 13.1.1 创建从盘及获取安装的磁盘数 要实现文件系统,必须先有个磁盘介质,虽然咱们己经有个虚拟磁盘 hd60M.img,但它只…...
firewalld防火墙(二)
一:firewalld高级配置 1:关于iptables的知识 iptables 是Linux系统中传统的命令行防火墙管理工具,它基于内核的netfilter框架工作,用于配置和管理网络规则集,比如过滤(允许/拒绝)进出的数据包…...
Android-悬浮窗口
在Android系统中,如果应用需要弹出一个悬浮窗口,就需要申请一项特殊权限 <uses-permission android:name"android.permission.SYSTEM_ALERT_WINDOW"/>在Android O之前的系统中申请了该权限后,再给对应的window设置 WindowM…...
打破僵局:Foxit Reader无法打开的终极解决方案
打破僵局:Foxit Reader无法打开的终极解决方案 在数字化阅读时代,Foxit Reader作为一款广受欢迎的PDF阅读器,其打不开的问题无疑会给用户带来诸多不便。本文将为您提供全面的解决方案,从基础检查到高级技巧,确保您能够…...

[调试] JTAG下运行正常,从QSPI或者SD卡启动则无响应,如何查找问题
[调试] JTAG下运行正常,从QSPI或者SD卡启动则无响应,如何查找问题 一、问题现象二、用自定义fsbl替代系统默认的fsbl1. 新建fsbl_new2. 如果提示缺少xilffs库3. 使能调试信息输出 三. 启动成功和失败情况下的典型输出1. JTAG启动模式: 正常加载2. QSPI启…...

测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》
引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
渲染学进阶内容——模型
最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...

C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...