p2p、分布式,区块链笔记: 通过libp2p的Kademlia网络协议实现kv-store
Kademlia 网络协议
-
Kademlia 是一种分布式哈希表协议和算法,用于构建去中心化的对等网络,核心思想是通过分布式的网络结构来实现高效的数据查找和存储。在这个学习项目里,Kademlia 作为 libp2p 中的 NetworkBehaviour的组成。
-
以下这些函数或方法是根据 Kademlia 网络协议设计的,它们实现了基本的网络操作,包括获取数据记录、获取数据提供者、存储数据记录和开始提供数据等功能(这里只展示了项目中用到的函数,常用函数可以看libp2p Kademlia DHT 规范,更多函数可见如下图中的源码部分)。

1. get_record
kademlia.get_record(key, Quorum::One);
- 作用: 从 Kademlia 网络中获取与指定
key相关的记录。 - 参数:
key: 要获取记录的键。Quorum::One: 获取记录时所需的一致性要求,这里是指只需要从一个节点获取记录即可。
- 实现逻辑:
- 根据 Kademlia 协议,节点首先根据
key计算出其对应的 K-bucket 或者具体的节点 ID,然后向网络中查找负责该key的节点。 - 节点通过网络查询和消息传递机制,从负责节点处获取存储的记录。
- 返回获取到的记录或者执行相应的处理逻辑。
- 根据 Kademlia 协议,节点首先根据
2. get_providers
kademlia.get_providers(key);
- 作用: 获取能够提供与指定
key相关数据的节点信息(即数据的提供者)。 - 参数:
key: 要获取提供者信息的数据的键。
- 实现逻辑:
- 类似于
get_record,节点根据key计算出其对应的 K-bucket 或者节点 ID。 - 节点向网络发送查询请求,询问哪些节点能够提供与
key相关的数据。 - 返回能够提供数据的节点列表或者执行相应的处理逻辑。
- 类似于
3. put_record
let record = Record {key,value,publisher: None,expires: None,
};
kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");
- 作用: 将指定的记录存储到 Kademlia 网络中。
- 参数:
record: 包含要存储的数据信息的记录对象,包括key(键)、value(值)、publisher(发布者,可能为空)、expires(过期时间,可能为空)等字段。Quorum::One: 存储记录时的一致性要求,这里是指只需要将记录存储在一个节点即可。
- 实现逻辑:
- 节点根据
key计算出对应的 K-bucket 或节点 ID。 - 节点将
record发送给负责存储该key的节点,并根据指定的一致性要求存储副本。 - 返回存储成功或失败的结果,或者执行相应的处理逻辑。
- 节点根据
4. start_providing
kademlia.start_providing(key).expect("Failed to start providing key");
- 作用: 在 Kademlia 网络中开始提供指定
key的数据。 - 参数:
key: 要开始提供的数据的键。
- 实现逻辑:
- 节点将
key注册为它可以提供的数据标识。 - 当其他节点查询或需要该
key的数据时,该节点将响应并提供相应的数据。 - 返回启动提供成功或失败的结果,或者执行相应的处理逻辑。
- 节点将
kv数据库主体代码及注释
use async_std::io;
use futures::{prelude::*, select};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult,Quorum, Record,
};
use libp2p::{development_transport, identity,mdns::{Mdns, MdnsConfig, MdnsEvent},swarm::SwarmEvent,NetworkBehaviour, PeerId, Swarm,
};
use std::error::Error;#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {env_logger::init();// 创建本地密钥,本地peer id和传输控制组件let local_key = identity::Keypair::generate_ed25519();let local_peer_id = PeerId::from(local_key.public());let transport = development_transport(local_key).await?;// 事件行为控制// We create a custom network behaviour that combines Kademlia and mDNS.#[derive(NetworkBehaviour)]// https://docs.rs/libp2p/latest/libp2p/swarm/trait.NetworkBehaviour.html#[behaviour(out_event = "MyBehaviourEvent")]//这个 "MyBehaviourEvent" 定义在下边的代码中// NetworkBehaviour这个trait将对所描述的结构体中的每个成员依次进行操作,例如 NetworkBehavior::poll它将首先轮询第一个结构成员,直到返回poll::Pending,然后再转到后面的成员。// 关于 #[behaviour(out_event = "MyBehaviourEvent")]中的out_event :The final out event. If we find a `#[behaviour(out_event = "Foo")]` attribute on the struct, we set `Foo` as the out event. Otherwise we use `()`.struct MyBehaviour {kademlia: Kademlia<MemoryStore>,mdns: Mdns,}#[allow(clippy::large_enum_variant)] // #[allow()为Lint语法属性检查控制,https://doc.rust-lang.org/reference/attributes/diagnostics.html#lint-check-attributes //关于large_enum_variant 详见https://rust-lang.github.io/rust-clippy/master/index.html#/large_enum_variantenum MyBehaviourEvent {Kademlia(KademliaEvent),Mdns(MdnsEvent),}// 实现(impl)块,用于为类型KademliaEvent实现了From trait,使其能够被转换为类型MyBehaviourEvent。impl From<KademliaEvent> for MyBehaviourEvent {fn from(event: KademliaEvent) -> Self {MyBehaviourEvent::Kademlia(event)}}// 实现(impl)块,用于为类型 MdnsEvent 实现了From trait,使其能够被转换为类型MyBehaviourEvent。impl From<MdnsEvent> for MyBehaviourEvent {fn from(event: MdnsEvent) -> Self {MyBehaviourEvent::Mdns(event)}}// Create a swarm to manage peers and events.let mut swarm = {// Create a Kademlia behaviour.let store = MemoryStore::new(local_peer_id);let kademlia = Kademlia::new(local_peer_id, store);let mdns = Mdns::new(MdnsConfig::default()).await?;let behaviour = MyBehaviour { kademlia, mdns };Swarm::new(transport, behaviour, local_peer_id)};// 从命令行读取指令并赋值给可变变量"stdin"let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();// Listen on all interfaces and whatever port the OS assigns.swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;// Kick it off.loop {select! {line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),event = swarm.select_next_some() => match event { // swarm.select_next_some() 是一个方法,用于从一个事件流中获取下一个事件,后续送到match进行匹配SwarmEvent::NewListenAddr { address, .. } => {//当发生新的监听地址事件时println!("Listening in {:?}", address);},SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {// 发生mDNS服务发现事件时for (peer_id, multiaddr) in list {swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);}}SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {// 当发出的 Kademlia 查询完成时handle_query_result(&result);}_ => {} // 通配符模式,执行一个空的代码块}}}
}// 下面是两个辅助函数,一个根据不同的查询结果类型执行不同的逻辑,另一个处理从命令行输入的命令
fn handle_query_result(result: &QueryResult) {match result {...}
}fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {let mut args = line.split(' ');match args.next() {...}
}
两个辅助函数
处理从命令行输入的命令
- 这段 Rust 代码定义了一个函数 handle_input_line,用于处理从命令行读取的输入 line,并根据命令执行相应的操作。函数通过分割输入行来解析命令和参数,处理缺少参数的错误情况,并根据命令调用传入的 Kademlia 网络实例 (kademlia) 的相应方法。
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {// 将输入行按空格分割为多个参数let mut args = line.split(' ');// 匹配第一个参数(命令)match args.next() {Some("GET") => {// 如果命令是 "GET"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};// 调用 Kademlia 网络的 get_record 方法,传入指定的键和 Quorum::Onekademlia.get_record(key, Quorum::One);}Some("GET_PROVIDERS") => {// 如果命令是 "GET_PROVIDERS"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};// 调用 Kademlia 网络的 get_providers 方法,传入指定的键kademlia.get_providers(key);}Some("PUT") => {// 如果命令是 "PUT"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};let value = {// 尝试获取下一个参数作为值match args.next() {Some(value) => value.as_bytes().to_vec(), // 将值转换为字节向量None => {// 如果未提供值,则打印错误并从函数返回eprintln!("缺少值");return;}}};// 创建一个包含指定键、值及可选字段的 Record 对象let record = Record {key,value,publisher: None,expires: None,};// 在 Kademlia 网络中以 Quorum::One 一致性存储记录kademlia.put_record(record, Quorum::One).expect("本地存储记录失败。");}Some("PUT_PROVIDER") => {// 如果命令是 "PUT_PROVIDER"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};// 在 Kademlia 网络中开始提供指定的键kademlia.start_providing(key).expect("启动提供键失败");}_ => {// 如果命令不匹配预期的任何命令eprintln!("期望命令为 GET、GET_PROVIDERS、PUT 或 PUT_PROVIDER");}}
根据不同的查询结果类型执行不同的逻辑
fn handle_query_result(result: &QueryResult) {match result {QueryResult::GetProviders(Ok(ok)) => {for peer in &ok.providers {println!("Peer {:?} provides key {:?}",peer,std::str::from_utf8(ok.key.as_ref()).unwrap());}}QueryResult::GetProviders(Err(err)) => {eprintln!("Failed to get providers: {:?}", err);}QueryResult::GetRecord(Ok(ok)) => {for PeerRecord {record: Record { key, value, .. },..} in &ok.records{println!("Got record {:?} {:?}",std::str::from_utf8(key.as_ref()).unwrap(),std::str::from_utf8(&value).unwrap(),);}}QueryResult::GetRecord(Err(err)) => {eprintln!("Failed to get record: {:?}", err);}QueryResult::PutRecord(Ok(PutRecordOk { key })) => {println!("Successfully put record {:?}",std::str::from_utf8(key.as_ref()).unwrap());}QueryResult::PutRecord(Err(err)) => {eprintln!("Failed to put record: {:?}", err);}QueryResult::StartProviding(Ok(AddProviderOk { key })) => {println!("Successfully put provider record {:?}",std::str::from_utf8(key.as_ref()).unwrap());}QueryResult::StartProviding(Err(err)) => {eprintln!("Failed to put provider record: {:?}", err);}_ => {}}
}fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {let mut args = line.split(' ');match args.next() {Some("GET") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};kademlia.get_record(key, Quorum::One);}Some("GET_PROVIDERS") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};kademlia.get_providers(key);}Some("PUT") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};let value = {match args.next() {Some(value) => value.as_bytes().to_vec(),None => {eprintln!("Expected value");return;}}};let record = Record {key,value,publisher: None,expires: None,};kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");}Some("PUT_PROVIDER") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};kademlia.start_providing(key).expect("Failed to start providing key");}_ => {eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");}}
}
运行示例
PS C:\Users\kingchuxing\Documents\learning-libp2p-main\rust> cargo run --example 04-kv-store
Listening in "/ip4/172.23.118.182/tcp/65055"
Listening in "/ip4/192.168.0.104/tcp/65055"
Listening in "/ip4/127.0.0.1/tcp/65055"
GET 123
Failed to get record: NotFound { key: Key(b"123"), closest_peers: [] }
PUT 123
缺少值
PUT 123 123456789
Failed to put record: QuorumFailed { key: Key(b"123"), success: [], quorum: 1 }
GET 123
Got record "123" "123456789"
PUT_PROVIDER 234 //输入提供者
Successfully put provider record "234"
GET_PROVIDERS 234 //获取提供者
Peer PeerId("12D3KooWB7CFnrmeH5gzRxA4CYR2YTg2K3NMvNHP5dWDPFwAHY38") provides key "234"
GET 234
Failed to get record: NotFound { key: Key(b"234"), closest_peers: [] }
相关文章:
p2p、分布式,区块链笔记: 通过libp2p的Kademlia网络协议实现kv-store
Kademlia 网络协议 Kademlia 是一种分布式哈希表协议和算法,用于构建去中心化的对等网络,核心思想是通过分布式的网络结构来实现高效的数据查找和存储。在这个学习项目里,Kademlia 作为 libp2p 中的 NetworkBehaviour的组成。 以下这些函数或…...
ShareSDK iOS端如何实现小红书分享
下载SDK 请登陆官网 ,找到SDK下载,勾选需要的平台下载 导入SDK (1)离线导入将上述下载到的SDK,直接将整个SDK资源文件拖进项目里,如下图: 并且勾选以下3个选项 在点击Finish,…...
算法day1 两数之和 两数相加 冒泡排序 快速排序
两数之和 最简单的思维方式肯定是去凑两个数,两个数的和是目标值就ok。这里两遍for循环解决。 两数相加 敲了一晚上哈哈,结果超过int范围捏,难受捏。 public class Test2 {public static void main(String[] args) { // ListNode l1 …...
Rust监控可观测性
可观测性 在监控章节的引言中,我们提到了老板、前端、后端眼中的监控是各不相同的,那么有没有办法将监控模型进行抽象、统一呢? 来简单分析一下: 业务指标实时展示,这是一个指标型的数据( metric )手机 APP 上传的数…...
SVN 的忽略(Ignore)和递归(Recursively)以及忽略部分
SVN中忽略大家经常用到,但总是似懂非懂,下面就详细展开说明一下忽略如何设置。 两个忽略 通常设置忽略都是文件夹和里面的文件都忽略。 设置忽略我们通常只需要鼠标右键点击忽略就可以了,如图: 第一个忽略用的最多,…...
vue3开发过程中遇到的一些问题记录
问题: vue3在使用 defineProps、defineEmits、defineExpose 时不需要import,但是 eslint会报错error defineProps is not defined no-undef 解决方法: 安装 vue-eslint-parser 插件,在 .eslintrc.js 文件中添加配置 parser: vue-e…...
Jedis、Lettuce、RedisTemplate连接中间件
jedis就像jdbc一样,用于两个端直接的连接。 1.创建Spring项目 这里不过多赘述... 2.导入连接工具jedis 在pom文件中导入jedis的依赖。 <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version&…...
【C++】继承(详解)
前言:今天我们正式的步入C进阶内容的学习了,当然了既然是进阶意味着学习难度的不断提升,各位一起努力呐。 💖 博主CSDN主页:卫卫卫的个人主页 💞 👉 专栏分类:高质量C学习 👈 &#…...
网络io与select,poll,epoll
前言 网络 IO,会涉及到两个系统对象,一个是用户空间调用 IO 的进程或者线程,另一个是内核空间的内核系统,比如发生 IO 操作 read 时,它会经历两个阶段: 1. 等待数据准备就绪 2. 将数据从内核拷贝到进程或…...
【Linux】多线程(一万六千字)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 文章目录 前言 线程的概念 线程的理解(Linux系统为例) 在Linux系统里如何保证让正文部分的代码可以并发的去跑呢? 为什么要有多进程呢? 为…...
sh脚本笔记2
test条件测试 语法 条件测试语法说明语法1:test <测试表达式>这是利用test命令进行条件测试表达式的方法。test命令和“<测试表达式>”之间至少有一个空格语法2:[ <测试表达式> ]这是通过[](单中括号)进行条件…...
js替换对象里面的对象名称
data为数组,val为修改前的名称,name为修改后的名称 JSON.parse(JSON.stringify(data).replace(/val/g, name)) ; 1.替换data里面的对象tenantInfoRespVO名称替换成tenantInfoUpdateReqVO 2.替换语句: 代码可复制 let tenantInf…...
鸿蒙开发设备管理:【@ohos.settings (设置数据项名称)】
设置数据项名称 说明: 本模块首批接口从API version 8开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。 本模块提供设置数据项的访问功能相关接口的说明及示例。 导入模块 import settings from ohos.settings;settings.getUri…...
STM32之五:TIM定时器(2-通用定时器)
目录 通用定时器(TIM2~5)框图 1、 输入时钟源选择 2、 时基单元 3 、输入捕获:(IC—Input Capture) 3.1 输入捕获通道框图(TI1为例) 3.1.1 滤波器: 3.1.2 边沿检测器…...
【分布式系统】监控平台Zabbix对接grafana
以前两篇博客为基础 【分布式系统】监控平台Zabbix介绍与部署(命令截图版)-CSDN博客 【分布式系统】监控平台Zabbix自定义模版配置-CSDN博客 一.安装grafana并启动 添加一台服务器192.168.80.104 初始化操作 systemctl disable --now firewalld set…...
操作系统真象还原:编写硬盘驱动程序
第13章-编写硬盘驱动程序 这是一个网站有所有小节的代码实现,同时也包含了Bochs等文件 13.1 硬盘及分区表 13.1.1 创建从盘及获取安装的磁盘数 要实现文件系统,必须先有个磁盘介质,虽然咱们己经有个虚拟磁盘 hd60M.img,但它只…...
firewalld防火墙(二)
一:firewalld高级配置 1:关于iptables的知识 iptables 是Linux系统中传统的命令行防火墙管理工具,它基于内核的netfilter框架工作,用于配置和管理网络规则集,比如过滤(允许/拒绝)进出的数据包…...
Android-悬浮窗口
在Android系统中,如果应用需要弹出一个悬浮窗口,就需要申请一项特殊权限 <uses-permission android:name"android.permission.SYSTEM_ALERT_WINDOW"/>在Android O之前的系统中申请了该权限后,再给对应的window设置 WindowM…...
打破僵局:Foxit Reader无法打开的终极解决方案
打破僵局:Foxit Reader无法打开的终极解决方案 在数字化阅读时代,Foxit Reader作为一款广受欢迎的PDF阅读器,其打不开的问题无疑会给用户带来诸多不便。本文将为您提供全面的解决方案,从基础检查到高级技巧,确保您能够…...
[调试] JTAG下运行正常,从QSPI或者SD卡启动则无响应,如何查找问题
[调试] JTAG下运行正常,从QSPI或者SD卡启动则无响应,如何查找问题 一、问题现象二、用自定义fsbl替代系统默认的fsbl1. 新建fsbl_new2. 如果提示缺少xilffs库3. 使能调试信息输出 三. 启动成功和失败情况下的典型输出1. JTAG启动模式: 正常加载2. QSPI启…...
使用VSCode开发Django指南
使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...
Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...
Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP
编辑-虚拟网络编辑器-更改设置 选择桥接模式,然后找到相应的网卡(可以查看自己本机的网络连接) windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置,选择刚才配置的桥接模式 静态ip设置: 我用的ubuntu24桌…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
