C++ Json-Rpc框架-3项目实现(2)
一.消息分发Dispatcher实现

Dispatcher 就是“消息分发中枢”:根据消息类型 MType,把消息派发给对应的处理函数(Handler)执行。
初版:
#pragma once
#include "net.hpp"
#include "message.hpp"namespace wws
{class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;void registerHandler(MType mtype,const MessageCallback &handler){std::unique_lock<std::mutex> lock(_mutex);_handlers.insert(std::make_pair(mtype,handler));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息类型对应的业务处理函数,进行调用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second(conn,msg);return;}//没有找到指定类型的处理回调 但我们客户端和服务端都是我们自己设计的 因此不可能出现这种情况ELOG("收到未知类型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,MessageCallback> _handlers;};
}
使用方法 :服务端为例
#include "message.hpp"
#include "net.hpp"
#include "dispatcher.hpp"void onRpcRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Rpc请求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::RpcResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);rpc_rsp->setResult(33);conn->send(rpc_rsp);
}
void onTopicRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Topic请求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::TopicResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);conn->send(rpc_rsp);
}int main()
{//server建立收到rpc topic请求时对应的调用函数auto dispatcher=std::make_shared<wws::Dispatcher>();dispatcher->registerHandler(wws::MType::REQ_RPC,onRpcRequest);//注册映射关系dispatcher->registerHandler(wws::MType::REQ_TOPIC,onTopicRequest);//注册映射关系auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);server->setMessageCallback(message_cb);server->start();return 0;
}
回调函数调用过程:
1.服务端定义了两个函数onRpcRequest收到rpc请求的回调函数,onTopicRequest收到topic请求的回调函数(两个函数的参数部分都是一样的)。
2.创建Dispatcher类对象,调用registerHandler函数把请求类型(mtype)和对应的回调函数建立映射(因为上面回调函数的类型都是一样的,所以可以用map进行同一管理)。
3.把Dispatcher::onMessage函数设置成消息回调函数。在Dispatcher::onMessage函数内部会根据不同的消息类型,找到对应的回调函数并进行调用。
但是设置的两个回调函数中onRpcRequest,onTopicRequest。它们第二个参数类型都是基类BaseMessage,虽然传入的对象是子类对象RpcResponse TopicResponse,但仍无法访问它们对应子类的成员函数。
如果把第二个参数基类Base换成它们对应的子类,能访问了,但这就会导致函数的类型不一样了,就不能用map进行统一管理。
有没有什么办法即能用map进行统一管理,还能在回调函数中调用到子类的函数。
方法一:直接在回调函数中通过
dynamic_cast/ std::dynamic_pointer_cast将父类->子类可以通过
dynamic_cast将基类指针(或引用)转换为子类类型,以便访问子类特有的成员函数。前提是你使用的是多态(polymorphic)类,也就是基类至少要有一个虚函数//Base基类 Derived子类 void test(Base* basePtr) {// 裸指针写法Derived* derivedPtr = dynamic_cast<Derived*>(basePtr);if (derivedPtr) {derivedPtr->specialFunction(); // 成功转换,调用子类函数} else {cout << "dynamic_cast failed!" << endl;} } //智能指针写法 std::shared_ptr<Base> basePtr = std::make_shared<Derived>(); // 用智能指针创建对象 std::shared_ptr<Derived> derivedPtr = std::dynamic_pointer_cast<Derived>(basePtr); // 类型安全转换
项目 裸指针写法 智能指针写法 创建方式 new Derived()std::make_shared<Derived>()类型转换 dynamic_cast<Derived*>(Base*)std::dynamic_pointer_cast<Derived>(shared_ptr<Base>)返回值类型 Derived*std::shared_ptr<Derived>生命周期管理 手动 delete 自动释放,无内存泄漏风险 安全检查 ✅ 运行时类型检查,失败返回 nullptr✅ 同样运行时类型检查,失败返回空智能指针 是否影响引用计数 ❌ 不涉及引用计数 ✅ 新建了一个共享控制块引用
缺点 说明 ❌ 调用方需要知道消息类型 调用者必须手动 dynamic_cast到对应子类,否则不能访问子类内容❌ 有一定运行时开销 dynamic_cast需要在运行时检查类型,会略有性能损失(但一般能接受)❌ 容易出错 如果类型错了,就返回 nullptr,还要额外判断、处理错误❌ 易破坏封装 上层代码要知道并显式转换为子类,增加了耦合度和类型暴露
方法二:模板 + 继承 + 多态
先看代码实现:
#pragma once #include "net.hpp" #include "message.hpp"namespace wws {//让统一的父类指针指向不同的子类对象//通过调用父类虚函数,调用不同子类onMessage类型转换(dynamic<>)完成后的函数调用class Callback{public:using ptr=std::shared_ptr<Callback>;virtual void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)=0;};template<typename T>class CallbackT:public Callback{public:using ptr=std::shared_ptr<CallbackT<T>>;//根据消息类型重新定义出函数类型using MessageCallback =std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>;CallbackT(const MessageCallback &handler):_handler(handler){}void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)override{auto type_msg=std::dynamic_pointer_cast<T>(msg);_handler(conn,type_msg);}private:MessageCallback _handler;};class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;template<typename T> //加typename表示这是一个类型void registerHandler(MType mtype,const typename CallbackT<T>::MessageCallback &handler)//传的是类对象 T是消息类型(BaseMessage子类){std::unique_lock<std::mutex> lock(_mutex);auto cb = std::make_shared<CallbackT<T>>(handler);_handlers.insert(std::make_pair(mtype,cb));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息类型对应的业务处理函数,进行调用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second->onMessage(conn,msg);//调用类中的回调函数return;}//没有找到指定类型的处理回调 但我们客户端和服务端都是我们自己设计的 因此不可能出现这种情况ELOG("收到未知类型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,Callback::ptr> _handlers;//second是一个基类指针}; }使用方法:
不用传BaseMessage基类,直接传子类。但在设置时指明Typed消息类型。
因为回调函数hadler类型不同,我们就用模板类根据T(消息类型)生成不同的类,里面是保存的是不同类型的回调函数。但根据模板类生成的类,类型不一样还是不能统一放入map中。
所以再定义一个父类 里面的回调函数设置为虚函数,这些不同的模板类作为子类继承父类并实现各自的回调函数。map中的handler存的不再是回调函数,而是一个父类指针,通过父类指针调用子类的回调函数。
具体过程:
使用模板类
CallbackT<T>根据消息类型T生成不同的子类,它们内部包装了各自类型的回调函数。(因为子类传入的是一个函数类型,保存的一个函数类型,而T是一个消息类型。还需要MessageCallback将T消息类型和回调函数类型绑定在一起)这些模板类继承自统一的基类
Callback,并重写了其onMessage()虚函数。当我们在调用registerHandler注册回调函数时,会创建一个
CallbackT<T>的对象并获取其指针,最后设置到map中。在
Dispatcher中,map<MType, Callback::ptr>存储的是基类指针,但实际指向的是不同子类CallbackT<T>的对象。当调用
onMessage()时,基类指针会通过虚函数机制调用对应子类的实现,再通过dynamic_pointer_cast将消息转换为正确的类型,最终调用具体的业务回调函数。
方法一和方法二对比:
相比直接在 handler 里 dynamic_cast,现在的设计通过模板和多态封装了类型转换逻辑,使回调函数 更简洁、更安全、更可维护,Dispatcher 也更具通用性和扩展性。
对比点 ✅ 当前封装方式<br>(模板 + 多态) ❌ 直接在每个 handler 里手动 dynamic_cast 💡 代码复用性 回调逻辑封装在模板中,避免重复写类型转换代码 每个回调都要重复写一次 dynamic_pointer_cast ✅ 类型安全 编译器自动根据模板类型 T限定传入的函数签名由开发者自己保证类型正确,容易写错 🎯 接口统一 Dispatcher 接收统一的 BaseMessage::ptr,自动调用类型对应的 handler接口统一,但每次回调前都得“手动猜”消息类型 🧼 代码整洁性 Handler 业务函数专注于处理业务,不掺杂类型转换代码 handler 代码里混入了类型转换、错误判断等杂项 🔄 可扩展性 新增消息类型只需 registerHandler<T>一行,无需修改 dispatcher 逻辑每新增一种类型,都要写新回调 + 自己处理类型转换 🔒 类型封装 类型转换封装在 CallbackT<T>::onMessage内,调用者无感知显式暴露类型细节,破坏封装性 🧠 可维护性 Dispatcher 管理逻辑集中、结构清晰 回调函数多时,容易混乱、出错
二.服务端-RpcRouter实现

组织和处理客户端发来的 RPC 请求,并调用对应的业务逻辑进行响应。
这个模块主要由 4 个类构成:
类名 作用 VType参数类型的枚举,例如整数、字符串、对象等 ServiceDescribe描述一个服务方法的参数、返回值、回调函数等 ServiceManager管理多个服务(增删查) RpcRouter处理客户端发来的 RPC 请求,协调调用服务
#include "../common/net.hpp"
#include "../common/message.hpp"namespace wws
{
namespace server
{//枚举类 VType 定义参数与返回值的类型enum class VType{BOOL = 0,INTEGRAL,NUMERIC,STRING,ARRAY,OBJECT,};// 服务描述类class ServiceDescribe{public:using ptr=std::shared_ptr<ServiceDescribe>;using ServiceCallback=std::function<void(const Json::Value&,Json::Value&)>;using ParamDescribe=std::pair<std::string,VType>;//参数名称 类型ServiceDescribe(std::string &&method_name,ServiceCallback &&callback,std::vector<ParamDescribe>&& params_desc,VType return_type):_method_name(std::move(method_name)),_callback(std::move(callback)),_params_desc(std::move(params_desc)),_return_type(return_type){}//返回名称const std::string & method(){return _method_name;}//校验传入参数是否符合要求(1.字段完整 + 2.类型匹配)bool paramCheck(const Json::Value¶ms)//{"nums1",11}{for(auto&desc:_params_desc){//1.判断是否有该字段if(params.isMember(desc.first)==false){ELOG("没有 %s 参数字段",desc.first.c_str());return false;}//2.判断该字段类型是否正确if(check(desc.second,params[desc.first])==false){ELOG("%s参数字段类型错误",desc.first.c_str());return false;}}return true;}bool call(const Json::Value& params,Json::Value&result){_callback(params,result);if(rtypeCheck(result)==false){ELOG("回调处理函数中响应信息类型错误");return false;}return true;}private:// 判断return类型是否正确bool rtypeCheck(const Json::Value &val){return check(_return_type, val);}//判断val对象的类型是否和vtype一致 Json::Value兼容任何JSON类型(int、string、array、object 等)bool check(VType vtype,const Json::Value &val){switch(vtype){case VType::BOOL :return val.isBool();case VType::INTEGRAL : return val.isIntegral();case VType::NUMERIC : return val.isNumeric();case VType::STRING : return val.isString();case VType::ARRAY : return val.isArray();case VType::OBJECT : return val.isObject();}return false;}private:std::string _method_name;//方法名称ServiceCallback _callback;//实际的业务回调函数std::vector<ParamDescribe> _params_desc;//参数字段格式描述vector<参数名称,对应的类型>VType _return_type;//结果类型描述 };//对比 直接在ServiceDescribe中set各参数 的优点:构造完后ServiceDescribe 的成员就不再修改,仅读取 天然线程安全。//若多个线程同时调用 setXxx() 方法 会出现线程安全的问题 需要在每个set函数中加锁class SDescribeFactory{public:void setMethodName(const std::string&name){_method_name=name;}void setReturnType(VType vtype){_return_type=vtype;}void setParamDesc(const std::string &pname,VType vtype){_params_desc.push_back(ServiceDescribe::ParamDescribe(pname,vtype));}void setCallback(const ServiceDescribe::ServiceCallback&cb){_callback=cb;}ServiceDescribe::ptr build(){return std::make_shared<ServiceDescribe>(_method_name,_callback,_params_desc,_return_type);}private:std::string _method_name;//方法名称ServiceDescribe::ServiceCallback _callback; // 实际的业务回调函数std::vector<ServiceDescribe::ParamDescribe> _params_desc; // 参数字段格式描述vector<参数名称,对应的类型>VType _return_type; // 结果类型描述};//服务管理类 增删查class ServiceManager{public:using ptr=std::shared_ptr<ServiceManager>;void insert(const ServiceDescribe::ptr&desc)//增{std::unique_lock<std::mutex> lock(_mutex);_service.insert(std::make_pair(desc->method(),desc));}ServiceDescribe::ptr select(const std::string &method_name)//查{std::unique_lock<std::mutex> lock(_mutex);auto it=_service.find(method_name);if(it==_service.end()){return ServiceDescribe::ptr();}return it->second;}void remove(const std::string &method_name)//删{_service.erase(method_name);}private:std::mutex _mutex;std::unordered_map<std::string,ServiceDescribe::ptr> _service;//函数名称 对应服务};class RpcRouter{public:using ptr=std::shared_ptr<ServiceDescribe>;//对注册到Dispatcher模块针对rpc请求进行回调处理的业务函数void onRpcRequest(const BaseConnection::ptr&conn,RpcRequest::ptr &request){//1.根据用户请求的方法描述 判断当前服务端能否提供对应的服务auto service=_service_manager->select(request->method());if(service.get()==nullptr){ELOG("未找到%s服务",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_NOT_FOUND_SERVICE);}//2.进行参数校验 确定能否提供服务if(service->paramCheck(request->params())==false){ELOG("%s服务参数校验失败",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INVALID_PARAMS);}//3.调用业务回调接口进行处理Json::Value result;bool ret=service->call(request->params(),result);if(ret==false){ELOG("%s服务参调用出错",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INTERNAL_ERROR);}//4.向客户端发送结果return response(conn,request,result,RCode::RCODE_OK);}//提供服务注册接口void registerMethod(const ServiceDescribe::ptr &service ){_service_manager->insert(service);}private://响应对象void response(const BaseConnection::ptr&conn,RpcRequest::ptr&req,const Json::Value&res,RCode rcode){auto msg=MessageFactory::create<RpcResponse>();msg->setId(req->rid());msg->setMType(wws::MType::RSP_RPC);msg->setRcode(rcode);msg->setResult(res);conn->send(msg);}private:ServiceManager::ptr _service_manager;};
}
}
ServiceDescribe:服务描述类每一个服务方法(如
Add、Translate)都有一个ServiceDescribe对象,它记录:
方法名
_method_name参数信息
_params_desc(字段名 + 类型)返回值类型
_return_type实际处理逻辑
_callback功能:
paramCheck():检查客户端传来的参数是否完整且类型匹配。
call():调用业务函数,处理请求,并校验响应类型,输出型参数Json::Value&result获取结果。
RpcRouter:RPC 请求核心调度器处理流程(onRpcRequest):
1. 获取客户端请求的方法名:request->method() 2. 查找是否有对应的服务:_service_manager->select() 3. 参数检查:service->paramCheck() 4. 调用回调函数处理:service->call() 5. 构建响应消息:RpcResponse 6. 通过 conn->send(msg) 返回结果注册流程(registerMethod()):
RpcRouter router; router.registerMethod(service); // 将服务注册到服务管理器
为什么要用 SDescribeFactory 工厂模式而不是在 ServiceDescribe 中直接使用 setXxx() 方法进行设置ServiceDescribe的各个参数?
通过DescribeFactory 工厂模式,造完后ServiceDescribe 的成员就不再修改,仅读取 天然线程安全。
如果在ServiceDescribe 设置set(),若多个线程同时调用 setXxx() 方法 会出现线程安全的问题 需要在每个set函数中加锁。
客户端发送 RpcRequest↓RpcRouter::onRpcRequest()//进行处理↓_service_manager->select(method)//查找方法↓ServiceDescribe::paramCheck(params)//校验参数↓ServiceDescribe::call() → 执行业务逻辑//执行回调↓RpcRouter::response() → 发送响应
三.客户端-RpcRouter实现
🔹 1.
describe(请求描述体)
封装一个请求的基本信息:
request: 包含 RID(请求 ID)、MType(消息类型)、Body(请求体)
std::promise<response>:用于 future 异步
callback:用于 callback 异步
RType: 标识异步类型(ASYNC / CALLBACK)🔹 2.
Requestor::send(...)
send(connection, request, callback)→ 异步 callback 模式
send(connection, request, std::future<response>)→ future 模式将
describe存入map<rid, describe>中,等待响应回调🔹 3.
onResponse(connection, response)
收到响应后,根据 RID 从
map<rid, describe>查找对应的describe按照
RType分发:
如果是
CALLBACK,调用 callback如果是
ASYNC,通过promise.set_value(...)实现 future 结果🔹 4.
Dispatcher<mt, handler>
对
MType进行派发处理(主要针对订阅/通知类型的消息,不含 RID)

#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>namespace wws
{
namespace client
{class Requestor{public:using ptr=std::shared_ptr<Requestor>;using RequestCallback=std::function<void(const BaseMessage::ptr&)>;using AsyncResponse=std::future<BaseMessage::ptr>;//请求描述的结构体struct RequestDescribe{using ptr=std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;//请求消息指针RType rtype; //请求类型 异步/回调std::promise<BaseMessage::ptr> response;//用于 async 模式,设置结果set_value 给 future 返回值RequestCallback callback;//用于 callback 模式,响应到来时触发用户逻辑};//收到应答 根据rid找到对应的请求设置结果 或者 调用回调 void onReponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg){std::string rid=msg->rid();RequestDescribe::ptr rdp=getDescribe(rid);//根据id进行查找if(rdp.get()==nullptr){ELOG("收到响应%s,但是未找到对应的请求描述!",rid.c_str());return;}//异步请求把应答msg作为结果设置到promise中,让future就绪if(rdp->rtype==RType::REQ_ASYNC){rdp->response.set_value(msg);//promise.set_value(value); 手动设置值 让std::future<BaseMessage::ptr>变为就绪。}//回调请求 有回调函数就进行调用else if(rdp->rtype==RType::REQ_CALLBACK){if(rdp->callback) rdp->callback(msg);}elseELOG("请求类型未知");//收到应答 删除rid对应的请求描述delDescribe(rid);}//1.异步请求发送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,AsyncResponse&async_rsp){//创建请求描述对象(newDescribe内部完成插入map)RequestDescribe::ptr rdp=newDescribe(req,RType::REQ_ASYNC);if(rdp.get()==nullptr){ELOG("构建请描述对象失败");return false;}//get_future()关联std::future<>async_rsp 和 std::promise<>response//promise.set_value(value) 被调用,就能async_rsp.get()获取值async_rsp=rdp->response.get_future();return true;}//2.同步请求发送(发送完请求后,立刻调用get()阻塞等待set_value()设置后获取结果)//可以在上层进行get()阻塞等待,也是同样效果bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,BaseMessage::ptr&rsp){AsyncResponse rsp_future;bool ret=send(conn,req,rsp_future);if(ret==false) return false;rsp=rsp_future.get();//阻塞等待值就绪return true;}//3.回调请求发送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&rep,RequestCallback&cb){//创建请求描述对象(newDescribe内部完成插入map)RequestDescribe::ptr rdp=newDescribe(rep,RType::REQ_CALLBACK,cb);if(rdp.get()==nullptr){ELOG("构建请描述对象失败");return false;}conn->send(rep);return true;}private://1.新增RequestDescribe::ptr newDescribe(const BaseMessage::ptr&req,RType rtype,const RequestCallback&cb=RequestCallback()){std::unique_lock<std::mutex> lock(_mutex);//构建请求描述对象 并插入到mapRequestDescribe::ptr rd=std::make_shared<RequestDescribe>();rd->request=req;rd->rtype=rtype;if(rtype==RType::REQ_CALLBACK&&cb)rd->callback=cb;_request_desc.insert(std::make_pair(req->rid(),rd));//插入到mapreturn rd;}//2.查找RequestDescribe::ptr getDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);auto it=_request_desc.find(rid);if(it==_request_desc.end()){return RequestDescribe::ptr();}return it->second;}//3.删除void delDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string,RequestDescribe::ptr> _request_desc;//id->请求消息};
}
}
用户发起请求│▼
Requestor::send(...)│▼
创建 RequestDescribe (含回调 or promise)│▼
加入 map<rid, describe>│▼
发送消息给服务器⬇(服务器响应)Requestor::onResponse(...)│▼找回 describe -> 判断 rtype├── CALLBACK → 执行 callback└── ASYNC → set promise▼清除 map<rid, describe>
四.客户端RpcCaller实现

1. 构造函数
RpcCaller(const Requestor::ptr&)
初始化传入一个
Requestor实例;
Requestor负责发送消息、注册回调、接收服务端响应等;
RpcCaller是调用层,Requestor是通信层。2. 同步调用接口
bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,Json::Value& result)1.先构建RpcRequest请求
2.调用requestor->同步send(),同步阻塞发送请求,并拿到rsp_msg。
3.将rsp_msg的Basemessage类型转换为 RpcResponse,取出正文结果 result();
3. 异步 Future 调用接口
bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,std::future<Json::Value>& result)
创建
promise对象,用于异步回填响应;通过
shared_ptr管理promise生命周期,绑定到回调函数中;通过
_requestor->send(...)注册异步回调;回调触发后由
Callback()设置promise.set_value();调用方使用返回的
future进行.get()即可拿到结果。4. 异步回调接口
bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,JsonResponseCallback& cb)
直接注册用户定义的回调
cb到请求;内部通过
Callback1()做包装处理:
类型转换为
RpcResponse错误处理
最后调用用户传入的
cb(result)传回结果。
#include "requestor.hpp"namespace wws
{
namespace client
{class RpcCaller{public:using ptr=std::shared_ptr<RpcCaller>;using JsonAsyncResponse=std::future<Json::Value>;using JsonResponseCallback=std::function<void(const Json::Value&)>;//requestor中的处理是针对BaseMessage进行处理的(因为要对所有请求进行处理,不单单对Rpc请求处理)//用于在rpc caller中针对结果的处理是对RpcResponse里边的result进行的RpcCaller(const Requestor::ptr &requestor):_requestor(requestor){}//1.同步调用 1.连接conn 2.方法名method 3.方法参数params 4.结果resultbool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value¶ms,Json::Value&result){//1.组织请求auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);BaseMessage::ptr rsp_msg;//2.发送请求 因为send()是重载函数 参数的类型必须保持一致(req_msg子类RpcRequest->基类BaseMessage)bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),rsp_msg);if(ret==false){ELOG("同步Rpc请求失败");return false;}//3.等待响应 响应信息存放在rsp_msg此时是Base基类,需要转成RpcResponse应答auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if(!rpc_rsp_msg){ELOG("Rpc响应 向下类型转换失败");return false;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc同步请求出错:%s",errReason(rpc_rsp_msg->rcode()));return false;}result=rpc_rsp_msg->result();//返回响应消息里面的正文return true;}//2.异步 Future 调用 向服务器发送异步回调请求 设置回调函数 //回调函数中传入一个promise对象 在回调函数中对promise设置数据//异步请求返回的是BaseMessage对象,用户想要的是message里面正文的结果Valuebool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value¶ms,std::future<Json::Value>&result){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);// //这个json_promise对象是一个局部变量,等出了call作用域就会消失// //与它关联的future result在外部get()获取结果时,会异常// std::promise<Json::Value> json_promise;// result=json_promise.get_future();//future和promise建立关联 以后future.get()获取结果auto json_promise=std::make_shared<std::promise<Json::Value>>();result=json_promise->get_future();//std::bind() 对json_promise传值传参,shared_ptr引用计数 +1 此时引用计数==2//退出call作用域 引用计数-- 再等callback被触发完毕并释放后引用计数--,才会析构//shared_ptr引用计数是否加1,只和bind对json_promise指针的捕获方式有关,与函数的参数声明是否引用json_promise指针无关Requestor::RequestCallback cb=std::bind(&RpcCaller::Callback,this,json_promise,std::placeholders::_1);bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),cb);if(ret==false){ELOG("异步Rpc请求失败");return false;}return true;}//3.异步回调bool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value¶ms,JsonResponseCallback&cb){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1,this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false){ELOG("异步Rpc请求失败");return false;}return true;}private:void Callback1(const JsonResponseCallback&cb,const BaseMessage::ptr&msg){//先判断结果对不对auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc响应 向下类型转换失败");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc回调请求出错:%s",errReason(rpc_rsp_msg->rcode()));return ;}cb(rpc_rsp_msg->result());}//const BaseMessage::ptr&msg Request参数是拿到响应后传递的void Callback(std::shared_ptr<std::promise<Json::Value>>result,const BaseMessage::ptr&msg){auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc响应 向下类型转换失败");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc异步请求出错:%s",errReason(rpc_rsp_msg->rcode()));return ;}//promise.set_value()设置正文结果 result->set_value(rpc_rsp_msg->result());}private:Requestor::ptr _requestor;};
}
}
为什么不让 Requestor::send() 直接处理 RpcRequest,而是让它只处理 BaseMessage,由 RpcCaller 来封装具体业务(如 RpcRequest/RpcResponse)?
1.因为要对所有请求进行处理,不单单对Rpc请求处理。还可以处理主题 服务等消息类型
2.解耦业务协议与通信通道,只做消息传递与回调处理。
模块 职责 Requestor发送消息、注册和触发回调,处理响应派发 RpcCaller构造 RPC 请求、解析 RPC 响应,组织业务逻辑
为什么 bind 捕获 shared_ptr 能延长 promise 的生命周期?
shared_ptr+bind(值捕获)+外部拷贝保存
本质就是:
只要某个 shared_ptr 指向的对象,引用计数 不为 0,那它就不会被销毁;
所以我们通过创建另一个生命周期更长的 shared_ptr(比如通过 bind() 值捕获并在外部保存),来延长这段资源的生命周期。
(不建议std::ref()引用捕获 如果外部shared_ptr_obj的被销毁,cb 里的引用就变成悬垂指针)
auto cb = std::bind(&func, std::ref(shared_ptr_obj));
如果std::promise<Json::Value> json_promise;直接创建一个promise对象,这是一个局部对象等出了call函数(离开作用域),就会析构。
这就会导致与它关联的future result在外部get()获取结果时,会异常。
auto json_promise=std::make_shared<std::promise<Json::Value>>();创建promise对象并用shared指针来管理它,虽然出了call函数 引用计数-- 为0还是会析构。
但我们在bind绑定的时候,对json_promise进行了传值,拷贝了一份
shared_ptr对象 进入bind内部的闭包,引用计数+1.auto cb = std::bind(&RpcCaller::Callback,this,json_promise, // 👈 这里复制了一份 shared_ptr,引用计数 +1std::placeholders::_1);写成void Callback(const std::shared_ptr<std::promise<Json::Value>>&result,const BaseMessage::ptr&msg) 接收引用(不增加引用计数)。对bind传值引用计数+1无影响
如果是下面这种拷贝引用计数会再+1,但仅限Callback运行时,调用结束后立刻-1,只是暂时的。
所以说现在,在call函数中有被shared指针指向的promise对象引用计数为2,一个make_shared返回的,一个bind闭包值捕获的。
bind闭包对象cb(auto cb=std::bind(...)),它本质不也是一个局部对象吗?call函数结束不还是和另一个局部变量一样会销毁,promise的生命周期怎么会延长呢?
call函数中的cb是否销毁已经无关,因为在函数的外部已经拷贝保存了一份了。
它通常会被传出
call()函数,交给Requestor::send()并被保存在异步响应回调表里!send()会调用newDescibe,把闭包对象存入rd请求描述对象中,再插入到map进行统一管理。
(注意newDescribe虽然是const&获取的cb 不增加引用计数,但这根本无关紧要,因为rd->callback=cb;会把它拷贝到了 RequestDescribe::callback 中,这是才生命周期延长的关键)
等到onResponse收到应答并处理完,就delDescribe()删除对应请求描述信息,保存的回调函数cb也析构,里面保存的json_promise也会析构,所以说ptr+bind值捕获+外部保存
保证了 json_promise 会存活到 callback 被调用。
总结:
1.Callback(param) 中的 shared_ptr 是按值传递,因此会导致引用计数 +1,
但这个 +1 的生命周期仅限于函数执行期间,Callback() 结束后 param 被销毁,引用计数立即 -1。
2.而 bind(...) 捕获 shared_ptr 时(通过值捕获),会将其拷贝一份保存在闭包对象中(如 auto cb),
这会导致引用计数 +1,无论是否调用 Callback,引用计数都存在,直到闭包对象销毁为止。
那闭包对象什么时候销毁呢?delDescribe()
3.需要注意的是,cb 本身是 call() 函数内的局部变量,但它并不会随着 call() 结束而失效。
虽然 call() 返回后 cb 变量本身会被销毁,但在此之前 cb 已经被作为参数传入 _requestor->send(),
并被内部保存到了 _request_desc 表中,作为 RequestDescribe 的一部分长期持有。
4.也就是说:call() 中定义的 cb 虽然是局部变量,但它在作用域结束前被拷贝并传出,生命周期已经被延长。
所以 cb 中捕获的 json_promise(shared_ptr)依然活着,call 函数中的 cb 就算析构,也不影响闭包内部捕获的 promise 的生命周期。
5.只有当服务端响应到达,回调被触发后,执行 cb(msg),再通过 delDescribe(rid) 删除请求描述对象,rd保存的回调函数cb闭包进行析构,值捕获的json_promise被释放,引用计数==0 promise 被析构.
6.相比之下,如果只是创建了一个局部的 shared_ptr(比如 json_promise),没有通过 bind、lambda、线程等传出去,
它就会在函数结束时被立即析构,future 将失效,调用 .get() 会抛出异常(broken promise)。
✅ 因此,虽然 cb 是局部变量,但它在 call() 结束前被拷贝传出,生命周期被框架托管,保证了回调所需的 promise 安全存活。
这是异步通信框架中延长资源生命周期的关键机制,确保异步流程完整闭环。
局部变量当作参数传参,并不会延长它的生命周期; 它的生命周期仍然只属于它原来的作用域; 如果你想延长生命周期,必须使用 shared_ptr,并在外部再持有一份拷贝!
举个例子:
std::shared_ptr<int> create() {auto p = std::make_shared<int>(100); // p 是局部变量return p; // ✅ 返回值是“拷贝一份”,原来的 p 会被销毁,但返回值还持有控制块 }auto x = create(); // 返回值拷贝给 x,资源不会被销毁
情况 局部变量会不会被销毁 解释 普通局部变量(无指针托管) ✅ 会 作用域结束立即销毁 传值作为函数参数 ✅ 会 参数是拷贝,原变量不延长 返回 shared_ptr ✅ 原变量销毁,但返回值持有副本,资源不析构 shared_ptr 被 bind 捕获 ❌ 不会(+1) 延长生命周期直到闭包结束
服务端测试代码
相较于之前实现的客户端,我们不再是直接给dispatcher传业务层函数,而是传RpcRouter的处理对应请求的回调函数,由该函数再从注册到RpcRouter中的具体实现函数中找用户需要的函数,并进行回调返回响应结果。
#include "../common/message.hpp"
#include "../common/net.hpp"#include "../common/dispatcher.hpp"
#include "../server/rpc_router.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{auto router=std::make_shared<wws::server::RpcRouter>();std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//1.注册Add函数到RpcRouterrouter->registerMethod(desc_factory->build());//bind绑定RpcRouter收到消息的回调函数->cbauto cb=std::bind(&wws::server::RpcRouter::onRpcRequest,router.get(),std::placeholders::_1,std::placeholders::_2);auto dispatcher=std::make_shared<wws::Dispatcher>();//2.把RpcRouter回调函数onRpcRequest设置到dispatcher中dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC,cb);auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);//3.把Dispatcher的回调函数onMessage设置到server中server->setMessageCallback(message_cb);server->start();return 0;
}

从图示上可以看到,整个链路依次是:
server 收到消息
调用 messageCallback
触发 onMessage 进入 dispatcher
Dispatcher 发现消息类型是
REQ_RPC,调用 RpcRouter::onRpcRequestRouter 找到 Add 方法并调用其回调函数
Add 函数执行计算并返回结果
server->setMessageCallback(message_cb);
这里的
message_cb是通过std::bind(&wws::Dispatcher::onMessage, dispatcher.get(), ...)绑定的。当服务器收到任何消息时,就会调用
dispatcher->onMessage(...)。
dispatcher->onMessage(...)
Dispatcher根据消息的类型(这里为REQ_RPC)找到先前注册的回调处理函数。这一步对应代码
dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, cb);。所以当消息类型匹配到
REQ_RPC时,就执行cb。
cb->std::bind(&wws::server::RpcRouter::onRpcRequest, router.get(), ...)
这个
cb本质上就是对RpcRouter::onRpcRequest的一次包装。当调用
cb时,实际上就是执行router->onRpcRequest(...)。
RpcRouter::onRpcRequest(...)
在路由器里,根据请求中的“方法名称”(比如
"Add")找到对应的回调函数。此处就是在之前
router->registerMethod(...)时注册的Add方法回调。调用
Add(const Json::Value& req, Json::Value& rsp)
最终执行我们自定义的逻辑(如取
nums1、nums2,相加后存入rsp)。
客户端测试代码
#include "../common/dispatcher.hpp"
#include "../client/requestor.hpp"
#include "../client/rpc_caller.hpp"
#include <thread>
#include <chrono>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{auto requestor=std::make_shared<wws::client::Requestor>();auto caller=std::make_shared<wws::client::RpcCaller>(requestor);auto dispatcher=std::make_shared<wws::Dispatcher>();auto rsp_cb=std::bind(&wws::client::Requestor::onResponse,requestor.get(),std::placeholders::_1,std::placeholders::_2);//wws::RpcResponse->wws::BaseMessage //rsp_cb绑定的函数参数为Requestor::onResponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg)//而registerHandler注册需要的函数类型 std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>; 第二个参数必须也为BaseMessage::ptr,所以T传BaseMessagedispatcher->registerHandler<wws::BaseMessage>(wws::MType::RSP_RPC,rsp_cb);auto client=wws::ClientFactory::create("127.0.0.1",9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);client->setMessageCallback(message_cb);client->connect();auto conn=client->connection();//1.同步调用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=caller->call(conn,"Add",params,result);if(ret!=false){DLOG("result: %d", result.asInt());}//2.异步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=caller->call(conn,"Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回调params["nums1"]=55;params["nums2"]=66;ret = caller->call(conn,"Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));client->shutdown();return 0;
}
RpcCaller 调用
call("Add", params, rsp)
用户在业务代码里直接写
caller->call(...),想要调用服务端的 “Add” 方法并等待结果。RpcCaller 内部执行 “AddDesc(...)”
call(...)方法内部会先构造一个带唯一请求ID的RpcRequest,然后调用 Requestor 的相关方法(示意中称作AddDesc)来“登记”该请求:
存储该请求ID
记录调用类型(同步/异步/回调)
如果是回调式,还会记录用户传入的回调函数
发送请求到服务端
Requestor 记完请求描述后,就会通过网络连接将
RpcRequest发送给远程服务端。服务端处理并返回
RpcResponse
当服务端收到 “Add” 请求后,进行实际的加法运算或其他业务逻辑,然后打包
RpcResponse返回给客户端。客户端接收响应 -> 分发到 Requestor::onResponse()
客户端网络层读到响应后,先通过 Dispatcher 分发,根据消息类型(如
RSP_RPC)找到之前绑定的回调,即Requestor::onResponse(...)。
Requestor::onResponse()根据响应里的 “请求ID=111” 查到对应的“请求描述 (desc)”,确定是哪个请求、用什么方式处理(同步阻塞唤醒或执行用户回调等),并把结果交给调用方。RpcCaller 最终返回结果给用户
对于异步调用
call(...),当响应到来时,onResponse() 会调用 rdp->response.set_value(msg),把响应 msg 设置到 promise 中。用户future.get()获取结果。若是回调式调用,则
Requestor::onResponse()会直接执行用户的回调函数,把结果带给用户。
Requestor 和 RpcCaller 的关系:
Requestor 负责管理请求–响应映射。它用一个请求描述(包含唯一 ID、请求类型和处理机制)来记录每次发送的请求。当响应到来时,根据请求 ID 查找描述,
如果是异步请求(REQ_ASYNC),则调用 promise.set_value(msg) 使关联 future 就绪;
如果是回调请求(REQ_CALLBACK),则直接调用用户注册的回调函数。
RpcCaller 则对外提供 RPC 调用接口。它负责构造 RPC 请求(设置方法名、参数、生成请求 ID),并调用 Requestor 的 send() 方法来登记请求并发送消息。用户可以选择同步(阻塞等待 future.get())、异步(返回 future)或回调式调用。
二者协同工作:RpcCaller 构造并发送请求,而 Requestor 负责匹配响应并将结果传递给上层。
RpcCaller用户调用的接口,用户传入要调用的对象 参数 方式,根据方式(同步 异步 回调)的不同选择不同的call进行调用,1.先根据参数构建请求消息2.调用Requester里面对应的send()。
Requester中send()会先构建请求描述对象(call传入的请求消息 请求类型...) 并建立请求-id间的映射(等收到应答时根据id找到对应的请求描述对象),再完成发送。
等服务端返回应答,Dispatcher根据消息的类型,找到client的Requestor中onResponse处理应答的回调函数,它根据id找到对应的请求描述,再根据请求描述中的类型,进行set_value设置结果或者callback()调用回调。最后删除该请求描述
set_value后,get()获取到结果,阻塞结束返回上层
再返回到call进行检查并返回结果
RpcCaller::call() 的三种调用方式流程
同步调用 (
call(conn, method, params, result))客户端调用call,把连接 函数名 参数 用于获取结果的Value对象
进入同步调用call. 先根据传入的参数组织请求(里面设置了请求类型REQ_RPC)
调用Requester中send()发送请求
using AsyncResponse=std::future<BaseMessage::ptr>;先创建一个future对象用于后面get()阻塞获取结果。再调用异步send(),因为异步和同步的区别只是在于用户什么时候get()阻塞获取结果,异步+立刻get()==同步。
再看看异步send()
先调用newDescribe,里面会创建请求描述对象(传入回调函数会设置回调函数)和UUID建立映射关系用map管理起来。
conn->send()发送请求。
之后服务端进行处理,返回应答,server::messageCallback->Disoatcher::onMessage->根据请求类型找到对应的回调函数,RSP_RPC对应的就是requestor::onResponse()处理应答的回调函数。
进入onResponse,先根据UUID找到对应的请求描述,根据请求描述的类型,看是异步(同步里面调用的异步),还是回调,进行相对的处理。
是同步,就set_value设置结果。
设置完结果,futrue就绪get()获取结果
上层的result就获取到了结果,进行输出。
RpcCaller::call(conn, method, params, result) // 同步调用└──> Requestor::send(conn, req, rsp) // 调用同步版本 send()└──> send(conn, req, rsp_future) // 调用异步 Future 版本 send()└──> 创建 RequestDescribe 并存储└──> 阻塞等待 future.get()└──> Dispatcher::onMessage(conn, msg)└──> Requestor::onResponse(conn, msg)└──> rdp->response.set_value(msg) // future.set_value() 解除阻塞└──> 解析 RpcResponse 并返回 result // get() 获取结果并返回经过的关键函数:
RpcCaller::call(conn, method, params, result)
Requestor::send(conn, req, rsp)
Requestor::send(conn, req, rsp_future)(异步版本)
Dispatcher::onMessage(conn, msg)
Requestor::onResponse(conn, msg)
rdp->response.set_value(msg)
future.get()解除阻塞
异步 Future 调用 (
call(conn, method, params, future))不传Json::Value result直接获取结果,std::future<Json::Value> res_futre,让用户自己get()获取结果。
result=json_promise->get_future();管理promise,set_value后用户就可以get()获取结果。
还绑定了Callback,传入回调函数cb,创建请求描述对象时设置回调函数cb
此时设置的类型为REQ_CALLBACK,收到应答,找到对应请求描述,会调用设置的回调函数cb
cb bind绑定的是Callbcak函数 它会set_value设置结果,让用户的future就绪,get()获取结果
RpcCaller::call(conn, method, params, future) // 异步 Future 调用└──> 创建 RpcRequest 并设置参数└──> 创建 std::promise<Json::Value> 和 future 关联└──> 绑定 Callback (关联 promise 和 result)└──> Requestor::send(conn, req, cb) // 传入回调函数 cb,onResponse() 解析后触发└──> 创建 RequestDescribe 并存储└──> 发送请求,不阻塞└──> Dispatcher::onMessage(conn, msg) // 收到服务端响应└──> Requestor::onResponse(conn, msg)└──> 通过 rid 查找 RequestDescribe└──> Callback 触发 set_value(msg) // 解析结果并设置 promise└──> future.get() 解除阻塞,获取结果 // 用户上层调用 future.get() 阻塞获取结果经过的关键函数:
RpcCaller::call(conn, method, params, res_future)
Requestor::send(conn, req, cb)
Requestor::onResponse(conn, msg)rdp->callback(msg)
Callback解析msg并set_value()
res_future.get()解除阻塞
异步回调调用 (
call(conn, method, params, cb))回调,不直接设置结果,而是调用用户传来的函数(callback)。
call异步回调,和异步futrue一样设置回调函数Callback1.(但回调函数不同)
也是调用的这个send()
接下来也是 调用请求描述对象中设置的回调函数即Callback1
而Callback1不和Callback一样set_value设置结果,而是调用上层传来的函数,它返回结果给用户。
RpcCaller::call(conn, method, params, cb) // 异步回调调用└──> 创建 RpcRequest 并设置参数└──> 绑定 Callback1└──> Requestor::send(conn, req, cb) // 调用异步回调版本 send()└──> 创建 RequestDescribe 并存储└──> 发送请求,不阻塞└──> Dispatcher::onMessage(conn, msg) // 收到服务端返回消息└──> Requestor::onResponse(conn, msg)└──> 通过 rid 查找 RequestDescribe└──> Callback1 触发用户自定义的 cb(msg)└──> 用户自定义的回调函数解析结果
RpcCaller::call(conn, method, params, cb)
Requestor::send(conn, req, cb)
Requestor::onResponse(conn, msg)
rdp->callback(msg)触发Callback1用户自定义的
callback(msg)解析结果
五.注册中心---服务端rpc_registry
服务端如何实现服务信息的管理:
服务端需要1.提供注册服务2.发现的请求业务处理
1.需要将 哪个服务 能够由 哪个主机提供 管理起来 hash<method,vector<provide>>
进行服务发现时,能返回谁能提供指定服务
2.需要将 哪个主机 发现过 哪个服务 管理起来
当进行服务通知的时候,能通知给对应发现者 <method,vector<discoverer>>
3.需要 哪个连接 对应 哪个服务提供者 管理起来 hash<conn,provider>
当一个连接断开时 能知道哪个主机的哪些服务下线了,然后才能给发现者通知xxx的xxx服务下线了。
4.需要 哪个连接 对应 哪个服务发现者 管理起来 hash<conn.discoverer>
当一个连接断开时 如果有服务上下线 就不需要给它进行通知了
1️⃣
ProviderManager(服务提供者管理)维护服务提供者信息,进行服务注册、删除和查询。
提供
addProvider()、delProvider()、getProvider()和methodHosts()等方法。
2️⃣
DiscovererManager(服务发现者管理)维护服务发现者信息,进行服务发现、删除和通知。
提供
addDisecoverer()、delDisoverer()、onlineNotify()和offlineNotify()等方法。
3️⃣
PDManager(核心管理器)处理服务请求、注册、发现、上线/下线通知以及连接断开后的清理逻辑。
提供
onServiceRequest()、onConnShutdown()等核心逻辑。处理服务的响应,包括:
registryResponse():服务注册应答
discoverResponse():服务发现应答
errorResponse():错误处理
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <set>
namespace wws
{
namespace server
{//服务提供者class ProviderManager{public:using ptr=std::shared_ptr<ProviderManager>;struct Provider{using ptr=std::shared_ptr<Provider>;std::mutex _mutex; BaseConnection::ptr conn;Address host; //主机信息ip+portstd::vector<std::string> methods; //所有提供的方法Provider(const BaseConnection::ptr&c,const Address&h):conn(c),host(h){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};//新的服务提供者进行服务注册void addProvider(const BaseConnection::ptr&c,const Address &h,const std::string&method){Provider::ptr provider;{std::unique_lock<std::mutex> lock(_mutex);auto it=_conns.find(c);//找连接对应的提供者if(it!=_conns.end()){provider=it->second;}//找不到就创建 并新增连接->提供者else{provider=std::make_shared<Provider>(c,h);_conns.insert(std::make_pair(c,provider));}//method方法被哪些提供者提供 增加提供者auto &providers=_providers[method];providers.insert(provider);}//提供者内更新记录 能提供的方法provider->appendMethod(method);}//服务提供者断开连接时 获取它的信息 用于服务下线通知Provider::ptr getProvider(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return Provider::ptr();return it->second;}//服务提供者断开连接时 删除它的关联信息void delProvider(const BaseConnection::ptr&c)//连接->提供者->所有提供方法{std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;for(auto&method:it->second->methods)//找到该服务提供者的所有方法{auto&providers=_providers[method];//根据方法 从提供该方法的提供者中再进行删除//1.providers容器是vectro版本// //手动查找并按迭代器删除 providers.erase(it->second)// //但是 erase(it->second) 并不会起到按值删除的作用,因为 erase() 按值删除时,只有 std::vector 在 C++20 之后才引入 erase 和 erase_if// auto provider_it = std::find(providers.begin(), providers.end(), it->second);// if (provider_it != providers.end())// {// providers.erase(provider_it);// }//set 直接支持 erase(it->second)providers.erase(it->second);}//删除连接与服务提供者的关系_conns.erase(it);}//返回 method对应的提供者std::vector<Address> methodHosts(const std::string &method){std::unique_lock<std::mutex> lock(_mutex);auto it = _providers.find(method);if (it == _providers.end())return std::vector<Address>();std::vector<Address> result(it->second.begin(), it->second.end());return result;}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Provider::ptr>> _providers;//方法->提供者主机std::unordered_map<BaseConnection::ptr,Provider::ptr> _conns;//连接->提供者}; //服务发现者class DiscovererManager{public:using ptr=std::shared_ptr<DiscovererManager>;struct Discoverer{using ptr=std::shared_ptr<Discoverer>;std::mutex _mutex;BaseConnection::ptr conn;//发现者关联的客户端std::vector<std::string> methods;//发现过的服务Discoverer(const BaseConnection::ptr&c):conn(c){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.push_back(method);}};//当客户端进行服务发现的时候新增发现者 新增服务名称?Discoverer::ptr addDisecoverer(const BaseConnection::ptr&c,const std::string &method){Discoverer::ptr discoverer;{std::unique_lock<std::mutex> lock(_mutex);//找连接对应的服务发现者auto it=_conns.find(c);if(it!=_conns.end()){discoverer=it->second;}else{discoverer=std::make_shared<Discoverer>(c);_conns.insert(std::make_pair(c,discoverer));}//method方法被哪些发现者发现了 增加发现者auto &discoverers=_discoverers[method];discoverers.insert(discoverer);}//在发现者中 增加已经发现的方法discoverer->appendMethod(method);return discoverer;}//发现者不需要被get() 发现者下线不需要通知 所以不需要进行get()获取对象后进行下线通知//发现者客户端断开连接时 找到发现者信息 删除关联信息void delDisoverer(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;//找到发现过的方法for(auto&method:it->second->methods){//从发现过method的所有发现者 中找要进行删除的发现者auto&discoverers=_discoverers[method];discoverers.erase(it->second);}//删除conn->discoverer_conns.erase(it);}//新的服务提供者上线 上线通知void onlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_ONLINE);}//服务提供者断开连接 下线通知void offlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_OFFLINE);}private:void notify(const std::string &method,const Address&host,wws::ServiceOptype optype){std::unique_lock<std::mutex> _mutex;//先判断该方法有没有被人发现过 auto it=_discoverers.find(method);//没有就不用进行任何处理if(it==_discoverers.end())return;//对发现过该方法的发现者一个个进行通知auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服务请求(注册、发现、上线、下线)msg_req->setMethod(method);msg_req->setOptype(optype);//服务操作类型 for(auto&discoverers:it->second){discoverers->conn->send(msg_req);}}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Discoverer::ptr>> _discoverers;//该方法被哪些发现者发现了std::unordered_map<BaseConnection::ptr,Discoverer::ptr> _conns;//连接->发现者(连接断开->对应发现者->删除vector中的发现者)};class PDManager{public:using ptr=std::shared_ptr<PDManager>;//处理服务请求 并返回应答void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr &msg) {//先判断服务操作请求:服务注册/服务发现auto optype=msg->optype();if(optype==ServiceOptype::SERVICE_REGISTRY)//服务注册{//1.新增服务提供者 _providers->addProvider(conn,msg->host(),msg->method());//2.对该方法的发现者进行服务上线通知_discoverers->onlineNotify(msg->method(),msg->host());//3.返回应答return registryResponse(conn,msg);}else if(optype==ServiceOptype::SERVICE_DISCOVERY)//服务发现{//新增服务发现者 _discoverers->addDisecoverer(conn,msg->method());return discoverResponse(conn,msg);}else{ELOG("收到服务操作请求,但操作类型错误"); return errorResponse(conn,msg);}}//连接断开void onConnShutdown(const BaseConnection::ptr&conn)//{//这个要断开的连接1.提供者下线 2.发现者下线//1.获取提供者信息 为空说明不是提供者auto provider=_providers->getProvider(conn);if(provider.get()!=nullptr){//提供者下线//1.提供者的每个方法都要下线 通知对应发现者for(auto&method:provider->methods){_discoverers->offlineNotify(method,provider->host);}//2.删除对该提供者的管理_providers->delProvider(conn);}//2.到这 可能是发现者 就算不是会直接返回空//直接删除对该发现者的管理_discoverers->delDisoverer(conn);}private://错误响应void errorResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服务请求(注册、发现、上线、下线)msg_rsp->setRcode(RCode::RCODE_INVALID_OPTYPE); //无效 msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);//服务操作类型 未知 conn->send(msg_rsp);}//注册应答void registryResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服务请求(注册、发现、上线、下线)msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);//服务操作类型 注册conn->send(msg_rsp);}//发现应答 method方法有哪些主机可以提供 void discoverResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid()); msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);//服务操作类型 发现msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服务请求(注册、发现、上线、下线)std::vector<Address> hosts=_providers->methodHosts(msg->method());if(hosts.empty()){msg_rsp->setRcode(RCode::RCODE_NOT_FOUND_SERVICE);return conn->send(msg_rsp);}msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setMethod(msg->method());msg_rsp->setHost(hosts);return conn->send(msg_rsp);}private:ProviderManager::ptr _providers;DiscovererManager::ptr _discoverers;};
}
}
📡 服务注册流程
1️⃣ 客户端(Provider)通过 `registryMethod()` 发送 `SERVICE_REGISTRY` 请求 2️⃣ `PDManager::onServiceRequest()` 处理注册请求 3️⃣ `ProviderManager::addProvider()` 注册服务 4️⃣ `DiscovererManager::onlineNotify()` 通知发现者服务上线 5️⃣ `PDManager::registryResponse()` 发送注册结果🔍 服务发现流程
1️⃣ 客户端(Discoverer)通过 `addDisecoverer()` 发送 `SERVICE_DISCOVERY` 请求 2️⃣ `PDManager::onServiceRequest()` 处理发现请求 3️⃣ `DiscovererManager::addDisecoverer()` 记录发现者 4️⃣ `ProviderManager::methodHosts()` 获取 `method` 对应的主机 5️⃣ `PDManager::discoverResponse()` 发送发现结果🔥 服务下线流程
1️⃣ 连接断开时触发 `PDManager::onConnShutdown()` 2️⃣ 通过 `ProviderManager::getProvider()` 获取 `Provider` 3️⃣ `DiscovererManager::offlineNotify()` 通知发现者服务下线 4️⃣ `ProviderManager::delProvider()` 删除 `Provider`🕵️♂️ 发现者下线流程
1️⃣ 连接断开时触发 `PDManager::onConnShutdown()` 2️⃣ 通过 `DiscovererManager::delDisoverer()` 删除 `Discoverer` 3️⃣ 清理 `_conns` 和 `_discoverers` 的映射
六.注册中心---客户端rpc_registry
客户端的功能比较分离,注册端和发现端根本就不在同一个主机上。
因此客户端的注册和发现功能是完全分开的。
1.作为服务提供者 需要一个能进行服务注册的接口
连接注册中心 进行服务注册
2.作为服务发现者 需要一个能进行服务发现的接口,需要将获取到的提供对应服务的主机信息管理起来 hash<method,vector<host>> 一次发现,多次使用,没有的话再次进行发现。
需要进行服务上线/下线通知请求的处理(需要向dispatcher提供一个请求处理的回调函数)
#pragma
#include"requestor.hpp"namespace wws
{
namespace client
{// 服务提供者类:负责将服务注册到服务注册中心class Provider{public:using ptr=std::shared_ptr<Provider>;Provider(const Requestor::ptr&requestor):_requestor(requestor){}//进行服务注册的接口bool registryMethod(const BaseConnection::ptr&conn,const std::string &method,const Address&host){// 1. 创建 ServiceRequest 请求消息auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服务请求(注册、发现、上线、下线)msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);//服务操作类型 注册// 2. 发送请求并同步等待响应BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);//同步请求if(ret==false){ELOG("%s服务注册失败",method.c_str());return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(service_rsp.get()==nullptr){ELOG("响应类型向下转换失败");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服务注册失败 原因%s",errReason(service_rsp->rcode()));return false;}return true;}private:Requestor::ptr _requestor;//负责发送请求、接收响应};class MethodHost{public:using ptr=std::shared_ptr<MethodHost>;MethodHost(const std::vector<Address>&hosts):_hosts(hosts.begin(),hosts.end()),_idx(0){}void appendHost(const Address&host){//新的服务上线后进行调用std::unique_lock<std::mutex> lock(_mutex);_hosts.push_back(host);}void removeHost(const Address&host){//服务下线进行调用std::unique_lock<std::mutex> lock(_mutex);//vector删除效率O(n)效率低,但更多的操作还是 随机访问[] 进行RR轮转,所以vector是最合适的for(auto it=_hosts.begin();it!=_hosts.end();it++){if(*it!=host){_hosts.erase(it);break;}}}Address chooseHost(){std::unique_lock<std::mutex> lock(_mutex);size_t pos=_idx++ %_hosts.size();//1.pos=_idx%size 2._idx+=1return _hosts[pos];}bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _hosts.empty();}private:std::mutex _mutex;size_t _idx;//当前 RR 轮转索引//vector 提供了 O(1) 的索引访问,可以快速实现RR轮转机制。std::vector<Address> _hosts;};class Discoverer{public:Discoverer(const Requestor::ptr &requestor){}//服务发现的接口bool serviceDiscovery(const BaseConnection::ptr&conn,const std::string&method,Address&host){//当前有method方法对应的提供服务者 直接返回host地址{std::unique_lock<std::mutex> lock(_mutex);auto it=_method_hosts.find(method);if(it!=_method_hosts.end()){if (it->second->empty() == false){host = it->second->chooseHost();return true;}}}//当前没有对应的服务者//1.构建服务发现请求auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);//消息类型msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("服务发现失败!");return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(!service_rsp.get()){ELOG("服务发现失败! 响应类型转换失败");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服务发现失败! 错误原因%s",errReason(service_rsp->rcode()).c_str());return false;}//2.看服务发现完后有没有新提供者增加std::unique_lock<std::mutex> _mutex;auto method_host=std::make_shared<MethodHost>(service_rsp->hosts());if(method_host->empty()){ELOG("%s服务发现失败!没有能提供服务的主机",method.c_str());return false;}host=method_host->chooseHost();_method_hosts[method]=method_host;//更新method方法->提供者address(可能method方法已经在map中所以=赋值)return true;}//给Dispathcer模块进行服务上线下线请求处理的回调函数void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){//1.先判断是上线/下线 都不是就不处理auto optype=msg->optype();std::string method=msg->method();std::unique_lock<std::mutex> lock(_mutex);//2.上线通知if(optype==ServiceOptype::SERVICE_ONLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//该method方法不存在 创建MethodHost初始化并添加入map中auto method_host=std::make_shared<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method]=method_host;}else{//存在直接加it->second->appendHost(msg->host());}}//3.下线通知else if(optype==ServiceOptype::SERVICE_OFFLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//该method方法不存在 直接return//不需要把method方法从map中移除 前面已经判断过map中method方法为空的情况return;}else{//存在直接删除it->second->removeHost(msg->host());}}}private:std::mutex _mutex;std::unordered_map<std::string,MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;};
}
}
1.客户端中的provider服务提供者,有服务注册接口registryMethod,构建服务注册请求借助Requestor send()到服务端的的注册接口。收到应答后判断应用是否正确。
2.客户端的discoverer服务发现者,有服务发现接口serviceDiscovery,先开当前有没有method方法对应的提供者,有就返回一个提供者。没有就构建服务发现请求,并检查是否有误,无错误后 再判断是否有新增的提供者,有就返回提供者并更新method->提供者的映射,没有就直接返回.
3.MethodHost类 管理一个method方法对应的所以提供者的主机信息,并提供RR轮转功能,按顺序返回method方法对应的提供者。
1️⃣
Provider类:服务提供者
作用:
负责将服务注册到服务注册中心。
主要功能:
registryMethod():
发送
ServiceRequest注册请求到服务中心。检查
msg_rsp响应是否成功注册。内部成员:
_requestor:
Requestor::ptr类型,负责请求发送和接收响应。
2️⃣
MethodHost类:服务主机管理
作用:
维护多个
Address主机地址,并提供负载均衡(RR 轮转)功能。主要功能:
appendHost():新增一个服务主机地址。
removeHost():移除一个服务主机地址。
chooseHost():
通过 RR 轮转选择一个主机地址。
empty():判断是否为空。内部成员:
_mutex:互斥锁保护_hosts访问安全。
_idx:当前 RR 轮转索引。
_hosts:std::vector<Address>存储主机地址列表。
3️⃣
Discoverer类:服务发现者
作用:
发现服务并维护
MethodHost对象,进行服务上线/下线通知的管理。主要功能:
serviceDiscovery():
发现服务并更新
_method_hosts缓存。选择主机地址
host并返回。
onServiceRequest():
处理服务上线/下线的请求。
动态添加/删除
MethodHost及其地址信息。内部成员:
_mutex:互斥锁保护_method_hosts。
_method_hosts:
std::unordered_map<std::string, MethodHost::ptr>,映射method -> host。
_requestor:
Requestor::ptr发送发现请求。
4️⃣
Requestor类:请求器
作用:
负责向服务中心发送请求并获取响应。
核心功能:
send():
向服务中心同步发送
msg_req请求。接收
msg_rsp响应并返回bool标志位。
🔥 类之间的关系
Provider通过_requestor发送ServiceRequest进行服务注册。
Discoverer通过_requestor发送ServiceRequest进行服务发现。
MethodHost由Discoverer维护,并提供 RR 轮转机制选择主机。
Discoverer监听onServiceRequest()来处理服务上线/下线。
RR轮询(Round-Robin)
一个method方法可以对应多个提供者,用户请求method方法,我们应该返回哪一个提供者,才能实现最大资源利用呢?
我们可以通过RR轮询按照固定顺序轮流将请求分配到不同的主机。
1.维护一个递增索引 _idx。
2.每次请求时,选择 _hosts[_idx % _hosts.size()] 作为当前主机。
3._idx 自增,当 _idx >= _hosts.size() 时自动重置。
RR轮询需要随机访问[ ],所以管理提供者的容器最好选择vector< >。
RR 轮询机制的价值
✅ 1. 负载均衡: 均匀分配请求,防止主机过载,提高系统吞吐量。
✅ 2. 实现简单: 只需维护一个_idx递增索引,选择主机只需O(1)时间复杂度。
✅ 3. 故障规避: 结合removeHost()机制,可以自动剔除故障主机,提升可靠性。
✅ 4. 提升系统吞吐量: 通过多个主机并行处理请求,提升系统的整体性能。
✅ 5. 维护成本低: 逻辑简单,维护成本极低,不需要监控主机状态。
为什么选择
vector<Address>作为主机管理容器
✅ 1. 访问速度快
vector提供 O(1) 的随机访问能力,可以通过pos索引直接获取provider。轮询核心逻辑依赖于
vector[pos]进行主机选择,比map的 O(log n) 查找速度更快,适合高频访问场景。
✅ 2. 内存布局紧凑
vector采用 连续内存存储,有利于 CPU 缓存命中,提升访问效率。在轮询过程中,只需访问固定内存位置,避免了内存跳转带来的性能损耗。
✅ 3. 删除主机速度适中
删除主机时虽然
removeHost()的复杂度是 O(n),但主机上下线事件发生频率远低于请求频率。即使主机上下线处理稍慢,但
chooseHost()仍然保持 O(1) 的快速访问。
⚠️ 注意:
vector删除主机时会触发内存移动,导致性能下降,因此不适合频繁上下线的场景。但在主机变更较少的场景下,
vector的整体性能优于list或map。
七.对服务发现与注册的封装
一.客户端 rpc_client
封装客户端:三大功能模块
一、业务功能:
基础 RPC 功能
服务注册功能
服务发现功能
二、基础底层模块:
网络通信客户端模块(由
BaseClient封装)
🧱 类结构封装解析
1.
RegistryClient:服务注册客户端
构造时连接注册中心地址
提供
registryMethod()方法:业务提供者向注册中心注册服务成员模块包含:
_provider:服务提供者
_requestor:发送请求组件
_dispatcher:调度器
_client:基础通信客户端2.
DiscoveryClient:服务发现客户端
构造时连接注册中心地址
提供
registryDiscovery()方法:业务调用方向注册中心发现服务成员模块包含:
_discoverer:服务发现器
_requestor、_dispatcher、_client:同上3.
RpcClient:RPC 核心客户端
构造参数
enableDiscovery决定是否开启服务发现模式:
若为
true:连接的是注册中心若为
false:连接的是具体的服务提供者提供多种调用方式:
同步调用(返回
result)异步 future 调用(返回
std::future)异步 callback 调用(传入回调函数)
内部组合:
_discovery_client:可选服务发现客户端
_caller:RPC 调用管理器
_requestor、_dispatcher、_client:同样是基础通信组件
在构建rpc客户端时,我们用长连接还是短连接?
1.当客户端调用call()请求对应方法时,RpcClient 内部调用 DiscoveryClient 进行服务发现 向 注册中心 Registry Server 发送服务发现请求
2.注册中心再返回对应方法的提供者主机地址。
3.RpcClient 用RR轮询从中选出来一个地址创建rpc client客户端并连接对应方法的提供者主机
4.服务提供者 Provider 接收到请求处理完返回结果给客户端
5.客户端回调触发,返回响应给用户。
短链接:创建一个rpc client客户端对象,连接服务提供者,进行rpc调用,调用结束后就销毁关闭客户端。
✅ 优点:
实现简单,按需连接、按需释放;
没有资源长期占用问题。
❌ 缺点:
性能差:每次调用都要建立和销毁 TCP 连接,连接成本高;
不利于高频 RPC 场景;
异步处理时管理麻烦:可能连接刚断,回调结果还没处理。
长连接:调用完后并不会销毁关闭客户端,而是将客户端放入连接池。后续还需要访问该主机的该方法,就会从连接池中找到原本的客户端对象,进行rpc调用。若该主机的该服务下线,需要从池中删除对应客户端连接。
✅ 优点:
高性能:避免频繁连接/断开,尤其是重复调用同一服务时;
适合高并发、低延迟系统。
❌ 缺点:
管理复杂,需要处理:
服务下线、连接失效的自动剔除;
异步/并发安全;
池容量、连接空闲策略等。
在我们这个项目中我们选择长连接。
主要还是短连接异步处理时管理麻烦。
短链接,客户端进行完rpc调用就会关闭,后面服务提供者返回结果给客户端,客户端没了收到收到应答的回调函数onResponse 也就不能把结果设置道promise中,上层futrue就不能就绪。
回调不触发,业务逻辑“卡住”
尤其是你用std::future或promise等异步等待对象,结果永远收不到。触发回调时访问已被释放的连接对象,导致崩溃
比如回调中引用了RpcClient或connection,但连接已经析构。异步响应结果丢失,日志无记录,bug 难排查
你会觉得“调用失败了但程序没报错”,其实是 TCP 在你没注意时被关掉了。异步+短连接的问题在于连接的生命周期和异步结果不一致,导致回调无法安全执行。
解决方法:在rpc调用结束后不关闭客户端,而是设置一个回调函数,确保收到收到响应处理完再关闭客户端。
#include "../common/dispatcher.hpp"
#include "requestor.hpp"
#include "rpc_caller.hpp"
#include "rpc_registry.hpp"namespace wws
{
namespace client
{//服务注册客户端class RegistryClient{public:using ptr=std::shared_ptr<RegistryClient>;//构造函数传入注册中心的地址信息 用于连接注册中心RegistryClient(const std::string&ip,int port):_requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()){//注册中心返回响应消息时触发的回调函数auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务注册接口bool registryMethod(const std::string&method, Address&host){return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//服务发现客户端class DiscoveryClient{public:using ptr=std::shared_ptr<DiscoveryClient>;//构造函数传入注册中心的地址信息 用于连接注册中心DiscoveryClient(const std::string&ip,int port,const Discoverer::OfflineCallback &cb):_requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor,cb)),_dispatcher(std::make_shared<Dispatcher>()){//注册中心返回响应消息时触发的回调函数auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//当注册中心向客户端进行上线/下线通知时触发的回调函数auto req_cb=std::bind(&client::Discoverer::onServiceRequest,_discoverer.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE,req_cb);//消息回调函数 所有收到的消息,统一交由 Dispatcher::onMessage 分发给对应 handlerauto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服务发现接口bool registryDiscovery(const std::string&method, Address&host){return _discoverer->serviceDiscovery(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//rpc客户端class RpcClient{public:using ptr=std::shared_ptr<RpcClient>;// enableDiscovery--是否启用服务发现功能 也决定了传入地址信息是注册中心地址 还是提供者的地址RpcClient(bool enableDiscovery, const std::string &ip, int port):_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<wws::client::RpcCaller>(_requestor)){//注册中心返回响应消息时触发的回调函数auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//1.如果启用的服务发现 地址信息是注册中心的地址 是服务发现客户端需要连接的地址 则通过地址信息实例化discover_client//需要经过服务发现获取提供者address再获取对应的clientif(_enableDiscovery){//设置服务下线回调auto offline_cb=std::bind(&RpcClient::delClient,this,std::placeholders::_1);_discovery_client=std::make_shared<DiscoveryClient>(ip,port,offline_cb);}//2.如果没有启用服务发现 则地址信息是服务提供者的地址 则直接创建客户端实例化好rpc_client//直接根据提供的ip+port创建对应的clientelse{auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_rpc_client=ClientFactory::create(ip,port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}//1.同步bool call( const std::string &method,const Json::Value ¶ms, Json::Value &result){//获取clientBaseClient::ptr client=getClient(method);if(client.get()==nullptr)return false;//通过客户端连接 发送rpc请求return _caller->call(client->connection(),method,params,result);}//2.异步futurebool call( const std::string &method,const Json::Value ¶ms, std::future<Json::Value> &result){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, result); }//3.异步回调bool call(const std::string &method,const Json::Value ¶ms, const RpcCaller::JsonResponseCallback &cb){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, cb); }private://创建clientBaseClient::ptr newClient(const Address &host){auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();//添加到连接池putClient(host,client);return client;}//根据address从连接池查找client 没有返回空BaseClient::ptr getClient(const Address&host){std::unique_lock<std::mutex> lock(_mutex);auto it=_rpc_clients.find(host);if(it==_rpc_clients.end()){return BaseClient::ptr();}return it->second;}//根据method获取client://1.method->服务发现->获取目标Address->从连接池获取/没有直接创建//2.用户传入的ip+port->直接获取已经创建的clientBaseClient::ptr getClient(const std::string method){//1.服务发现获取的ip+port BaseClient::ptr client;if(_enableDiscovery){//1.通过服务发现 获取服务提供者地址信息Address host;bool ret=_discovery_client->registryDiscovery(method,host);if(ret==false){ELOG("当前%s服务 没找到服务提供者",method.c_str());return BaseClient::ptr();}//2.查看连接池中是否有对应的客户端 有就直接用 没有就创建client=getClient(host);if(client.get()==nullptr){client=newClient(host);}}//2.用户提供的ip+port创建的clientelse{client=_rpc_client;}return client;}void putClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host,client));}void delClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash{size_t operator()(const Address&host){std::string addr=host.first+std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery;DiscoveryClient::ptr _discovery_client;//网络服务发现 用户传方法名method client用方法名找提供者地址进行连接RpcCaller::ptr _caller;Requestor::ptr _requestor;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//未启用服务发现// RpcClient rpc(false, "127.0.0.1", 8080);// rpc.call("Add", ...); // 直接用 _rpc_client 调用std::mutex _mutex;//<"127.0.0.1",client1>std::unordered_map<Address,BaseClient::ptr,AddressHash>_rpc_clients;//服务发现的客户端连接池// RpcClient rpc(true, registry_ip, port);// rpc.call("Add", ...); // 自动发现、自动连接、自动发请求};
}
}
unordered_map<>自定义类型作key
我们用哈希表来管理客户端连接池时:
我们知道在哈希表中是通过key值找到对应的val值的,但并不是直接用我们传过去的数当key,需要进行哈希值计算。(1.int size_t bool直接强转 2.char* string
h = h * 131 + static_cast<unsigned char>(c); // 类似 BKDR hash)
而库中实现了string、int、float 等基本类型的哈希值计算,但这个Address pair<string,int>是个自定义类型,需要我们自己重载哈希值计算。其实就算把string+int融合成string,再套用库中对string类型计算的哈希函数。
STL中 pair 的哈希组合方法
template <typename T1, typename T2> struct hash<std::pair<T1, T2>> {size_t operator()(const std::pair<T1, T2>& p) const {size_t h1 = std::hash<T1>()(p.first);size_t h2 = std::hash<T2>()(p.second);return h1 ^ (h2 << 1); } };
操作 目的 std::hash<X>()获取单个字段的 hash 值 << 1左移扰动哈希位,使两个字段 hash 分布更开 ^异或混合两个 hash,避免简单叠加导致冲突 效果 更均匀、稳定、不易冲突的哈希组合值
既然选择长连接+连接池的做法,那就要处理当服务下线时 在连接池中删除对应的client
怎么做?设置回调函数,在服务下线时进行调用。
1.RpcClinet初始化DiscoveryClient时传入回调函数
2.DiscoveryClient 传给-> Discoverer
3.Discoverer再把回调函数cb设置到成员变量中
4.Client::onServiceRequest处理服务下线时,除了删除该方法中下线的主机地址Address,还要删除连接池中连接它的client
二.服务端rpc_server (包含注册中心服务端 rpc服务端)
🌐 服务端实现业务功能:
提供 RPC 服务
服务注册与发现机制中提供者的管理(服务注册)和消费者的管理(服务发现)
📦 封装的三类服务端组件:
RPC 服务端
负责接收并处理 RPC 请求。
注册中心服务端
负责管理服务提供者与消费者的注册信息。
发布订阅服务端(后续实现)
用于实现基于事件的通信机制(如 pub-sub 模式),暂未实现。
🛠 实现细节说明:
1. 注册中心服务端
是一个纯粹的服务端,用于管理提供者和消费者信息。
核心功能是处理服务注册与发现的请求。
2. RPC 服务端
实际由两部分组成:
RPC 服务端:用于接收和响应 RPC 请求。
服务注册客户端:启动后自动连接注册中心,并将自己能提供的服务注册上去。
#pragma once
#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
#include "rpc_router.hpp"
#include "rpc_registry.hpp"#include <set>
namespace wws
{
namespace server
{//服务注册服务端class RegistryServer{public:using ptr=std::shared_ptr<RegistryServer>;RegistryServer(int port):_pd_manager(std::make_shared<PDManager>()),_dispatcher(std::make_shared<Dispatcher>()){auto service_cb = std::bind(&PDManager::onServiceRequest, _pd_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);_server = wws::ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb=std::bind(&RegistryServer::onConnShutdown,this,std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}private:void onConnShutdown(const BaseConnection::ptr&conn){_pd_manager->onConnShutdown(conn);}private:PDManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};//rpc服务端class RpcServer{public:using ptr=std::shared_ptr<RpcServer>;RpcServer(const Address&access_addr,bool enableRegistry=false,const Address®istry_server_addr=Address()):_enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<wws::server::RpcRouter>()),_dispatcher(std::make_shared<wws::Dispatcher>()){//启用服务注册if(_enableRegistry){_reg_client=std::make_shared<client::RegistryClient>(registry_server_addr.first,registry_server_addr.second);}//成员server是一个rpcserver 用于提供rpc服务auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, rpc_cb);//创建一个监听指定端口(如 8080)的 RPC 服务器对象_server = wws::ServerFactory::create(access_addr.second);//默认监听 0.0.0.0:port 我监听所有可用 IPauto message_cb = std::bind(&wws::Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}void registerMethod(const ServiceDescribe::ptr &service)//服务描述类{if(_enableRegistry){_reg_client->registryMethod(service->method(),_access_addr);//把 method->本主机地址 发给注册中心 表示自己可以提供method方法}_router->registerMethod(service);//本地注册 把服务描述service注册到RpcRouter中的服务管理类 onRpcRequest接收到客户端请求时进行路由分发调用}void start(){_server->start();}private:bool _enableRegistry;Address _access_addr;//自己的对外服务地址(客户端要连接我就来这)client::RegistryClient::ptr _reg_client;//注册客户端,用于连接注册中心并注册本地服务RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}
注册中心 服务端 客户端代码测试
1.先启动注册中心服务端 并设置port. 处理服务注册请求 服务发现请求 以及服务上下线通知
2.再启动rpc服务端 ,rpc服务端可以通过_reg_client注册中心客户端进行服务注册,但需要启动服务注册_enableRegistry=ture,提供注册中心客户端的Address。
还可以注册本地服务方法,内部注册到
RpcRouter路由器中,用于请求到来时快速查找并调用对应的回调函数。3.启动rpc客户端,rpc客户端有两种连接方式 启动服务发现 根据方法名向注册中心查询服务地址,然后连接对应的服务端。不启动 直接连用户传入的服务提供者的地址
用户再调用call 通过客户端连接 发送rpc请求
rpc_server.cpp
#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{//build生产一个 服务描述类 函数名 + 参数类型+结果类型 + 函数地址(进行回调)std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//wws::Address("127.0.0.1",9090) 监听9090 在9090端口提供服务 true 表示启动服务注册 "127.0.0.1",8080 注册中心的地址信息,创建client用于连接注册中心wws::server::RpcServer server(wws::Address("127.0.0.1",9090),true,wws::Address("127.0.0.1",8080));//server.registerMethod(desc_factory->build());server.start();return 0;
}
registry_sever.cpp
#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"int main()
{//实例化服务端wws::server::RegistryServer reg_server(8080);reg_server.start();return 0;
}
rpc_client.cpp
#include "../../client/rpc_client.hpp"
#include "../../common/detail.hpp"
#include <thread>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{wws::client::RpcClient client(true,"127.0.0.1",8080);//1.同步调用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=client.call("Add",params,result);//client内找对应Add方法对应提供者的连接if(ret!=false){DLOG("result: %d", result.asInt());}//2.异步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=client.call("Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回调params["nums1"]=55;params["nums2"]=66;ret = client.call("Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}
八.发布订阅服务端实现rpc_topic

1. Dispatcher 模块(右上角)
作用:
JSON-RPC 框架中的请求分发核心
判断 RPC 消息类型 → 调用对应业务模块(如 PubSubManager)
Dispatcher::registerMethod("topic", &PubSubManager::onTopicRequest)2. onTopicRequest 回调函数
位置:
PubSubManager中的统一入口函数
作用:
接收来自 Dispatcher 的请求
根据操作类型调用对应处理函数:
类型 功能函数调用 创建主题 topicCreate()删除主题 topicRemove()订阅主题 topicSubscribe()取消订阅 topicCancel()发布消息 topicPublish()3. PubSubManager 核心模块(图中心)
职责:
管理两个核心 map
操作对应的
Topic和Subscriber数据结构std::unordered_map<std::string, Topic::ptr> _topics; std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;4.Subscriber 结构(图左下)
struct Subscriber {BaseConnection::ptr conn;std::unordered_set<std::string> topics; };图示结构:
每个订阅者关联一个连接
还记录了自己订阅的所有主题名
5. topic 结构(图左上)
struct Topic {std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers; };图示结构:
每个主题有一个名称
内部维护一组订阅它的订阅者指针
主要用途:
当消息发布到该主题时: → 遍历
set<subscriber>并调用conn->send(msg)6. 两张 map 映射关系(图中左)
map<topic_name, topic> map<Connection, Subscriber>构成了典型的双向映射系统:
topic_name -> topic -> subscribers
conn -> subscriber -> topics
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>
namespace wws
{
namespace server
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;TopicManager();// 给Dispathcer模块进行服务上线下线请求处理的回调函数void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {TopicOptype topic_optype=msg->optype();bool ret=true;switch(topic_optype){//主题创建case TopicOptype::TOPIC_CREATE: topicCreate(conn,msg);break;//主题删除case TopicOptype::TOPIC_REMOVE: topicRemove(conn,msg);break;//主题订阅case TopicOptype::TOPIC_SUBSCRIBE: topicSubscribe(conn,msg);break;//取消主题订阅case TopicOptype::TOPIC_CANCEL: topicCancel(conn,msg);break;//主题消息发布case TopicOptype::TOPIC_PUBLISH: topicPublish(conn,msg);break;//返回应答 无效操作类型default: return errorResponse(conn,msg,RCode::RCODE_INVALID_OPTYPE);}if(!ret) return errorResponse(conn,msg,RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn,msg);}//一个订阅者在连接断开时的处理 删除其关联的数据void onShutdown(const BaseConnection::ptr&conn){//消息发布者断开连接 不处理; 消息订阅者断开连接需要删除管理数据//1.判断断开连接的是否是订阅者 不是直接返回Subscriber::ptr subscriber;//断开连接的订阅者对象std::vector<Topic::ptr> topics;//受影响的主题对象{auto it = _subscribers.find(conn);if (it == _subscribers.end())return;// 2.获取受影响的主题对象subscriber=it->second;for(auto&topic_name:subscriber->topics){auto topic_it=_topics.find(topic_name);if(topic_it==_topics.end())continue;topics.push_back(topic_it->second);}//3.从订阅者映射map中删除 订阅者_subscribers.erase(it);}//4.从对应主题对象中删除订阅者for(auto&topic:topics){topic->removeSubscriber(subscriber);}}private://错误响应void errorResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg,RCode rcode){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);//主题响应 msg_rsp->setRcode(rcode);conn->send(msg_rsp);}//注册应答void topicResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);msg_rsp->setRcode(RCode::RCODE_OK);conn->send(msg_rsp);}//创建主题void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {std::unique_lock<std::mutex> lock(_mutex);//构建一个主题对象 添加映射关系的管理std::string topic_name=msg->topicKey();//主题名称auto topic=std::make_shared<Topic>(topic_name);_topics.insert(std::make_pair(topic_name,topic));}//删除主题void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {//1.查看当前主题 有哪些订阅者 再从订阅者中删除主题信息//_topics[topic_name]->subscribers subscribers->topics订阅的所有主题名称//2.删除主题数据 _topics[topic_name]->Subscriber 主题名称和主题对象的映射关系std::string topic_name=msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers;{std::unique_lock<std::mutex> lock(_mutex);//删除主题前 先找出订阅该主题的订阅者auto it=_topics.find(topic_name);if(it==_topics.end())return;subscribers=it->second->subscribers;_topics.erase(it);//删除主题名称->topic} for(auto&subscriber:subscribers)subscriber->removeSTopic(topic_name);}// 主题订阅bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1.先找出主题对象topic 订阅者对象subscriber// 没有主题对象就报错 没有订阅者对象就构建Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}else{subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}// 2.在主题对象中 新增一个订阅者对象管理的连接; 在订阅者对象中新增一个订阅的主题topic->appendSubscriber(subscriber);subscriber->appendTopic(msg->topicKey());return true;}}//取消主题订阅void topicCancel(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){// 1.先找出主题对象topic 订阅者对象subscriberTopic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it != _topics.end())topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end())subscriber = sub_it->second;// 2.在主题对象中 删除当前订阅者对象管理的连接; 在订阅者对象中删除对应订阅的主题if(subscriber) subscriber->removeSTopic(msg->topicKey());if(subscriber && topic) topic->removeSubscriber(subscriber);}}// 主题发布bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic=topic_it->second;}topic->pushMessage(msg);return true;}private:// 每个客户端连接会对应一个订阅者对象,记录它当前订阅了哪些主题struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr conn;std::unordered_set<std::string> topics; // 订阅者订阅的主题名称Subscriber(const BaseConnection::ptr &c): conn(c){}// 增加主题void appendTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.insert(topic_name);}// 删除主题void removeSTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.erase(topic_name);}};// 每个主题包含一个主题名 + 当前所有的订阅者struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers; // 当前主题订阅者Topic(const std::string &name): topic_name(name){}// 增加订阅者void appendSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.insert(subscriber);}// 删除订阅者void removeSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.erase(subscriber);}// 给该主题的所有订阅者发消息void pushMessage(const BaseMessage::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);for (auto &subscriber : subscribers){subscriber->conn->send(msg);}}};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};
}
}
九.发布订阅客户端实现rpc_topic
一、发布订阅客户端的角色划分
消息发布客户端
创建主题
删除主题
发布消息(向某个主题发布)
消息订阅客户端
创建主题
删除主题
订阅某主题的消息
取消订阅某主题
二、整体模块设计思路
对外的五个操作接口
针对“主题”的操作,包括:
创建
删除
订阅
取消订阅
发布
对外的一个消息处理接口
提供给 dispatcher 模块,进行消息分发处理
相当于 dispatcher 收到消息发布请求后,查找有哪些订阅者,并调用对应的回调函数将消息推送过去
内部的数据管理
管理“主题名称”与“消息处理回调函数”的映射关系
#pragma once
#include"requestor.hpp"namespace wws
{
namespace client
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;using SubCallback=std::function<void(const std::string &key,const std::string&msg)>;//主题创建bool create(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_CREATE);}//删除bool remove(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_REMOVE);}//订阅主题 SubCallback收到主题新消息的进行的回调bool subscribe(const BaseConnection::ptr&conn,const std::string &key,const SubCallback&cb){//当我们订阅了主题 可能发布者会马上发布该主题的内容 //这时候如果cb还没有设置到map中就无法执行回调函数 所以先设置回调函数到map中addSubscribe(key,cb);bool ret=commonRequest(conn,key,TopicOptype::TOPIC_SUBSCRIBE);if(ret==false){//请求发送失败 删除map中对应的cbdelSubscribe(key);return false;}return true;}//取消订阅bool cancel(const BaseConnection::ptr&conn,const std::string &key){delSubscribe(key);return commonRequest(conn,key,TopicOptype::TOPIC_CANCEL);}//发布消息(向某个主题发布)bool publish(const BaseConnection::ptr&conn,const std::string &key,const std::string &msg){return commonRequest(conn,key,TopicOptype::TOPIC_PUBLISH,msg);}// 当收到服务端推送的消息时调用,触发对应订阅者的回调处理逻辑 (设置给dispatcher收到对应主题消息 进行回调处理)void onPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){//1.先判断该消息的操作类型是否为 发布消息auto type=msg->optype();if(type!=TopicOptype::TOPIC_PUBLISH){ELOG("收到了错误类型的主题操作");return;}//2.取出主题名称 以及消息内容 后面调用cbstd::string topic_key=msg->topicKey();std::string topic_msg=msg->topicMsg();//3.调用cbauto callback=getSubscribe(topic_key);if(!callback){ELOG("收到了%s主题信息 但该主题无对应回调",topic_key.c_str());return;}callback(topic_key,topic_msg);}private:void addSubscribe(const std::string &key,const SubCallback&cb){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key,cb));}void delSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback& getSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);auto it=_topic_callbacks.find(key);if(it==_topic_callbacks.end())return SubCallback();return it->second;}bool commonRequest(const BaseConnection::ptr&conn,const std::string &key,TopicOptype type,const std::string &msg=""){//1.构造请求对象 并填充数据auto msg_req=MessageFactory::create<TopicRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_TOPIC);msg_req->setTopicKey(key);msg_req->setOptype(type);if(type==TopicOptype::TOPIC_PUBLISH)msg_req->setTopicMsg(msg);//2.向服务端发送请求 等待响应BaseMessage::ptr msg_rsp;//发请求 + 等响应 + 反序列化响应 + 返回响应对象msg_rspbool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("主题创建请求失败");return false;}//3.判断请求处理是否成功auto topic_rsp_msg=std::dynamic_pointer_cast<TopicResponse>(msg_rsp);if(!topic_rsp_msg){ELOG("主题响应 向下类型转换失败");return false;}if(topic_rsp_msg->rcode()!=RCode::RCODE_OK);{ELOG("主题创建请求出错:%s",errReason(topic_rsp_msg->rcode()).c_str());return false;}return true;}private:std::mutex _mutex;//根据主题查找对应的回调函数执行std::unordered_map<std::string,SubCallback> _topic_callbacks;Requestor::ptr _requestor;};
}
}
十.topicServer topicClient封装
相关文章:
C++ Json-Rpc框架-3项目实现(2)
一.消息分发Dispatcher实现 Dispatcher 就是“消息分发中枢”:根据消息类型 MType,把消息派发给对应的处理函数(Handler)执行。 初版: #pragma once #include "net.hpp" #include "message.hpp"n…...
youtube视频和telegram视频加载原理差异分析
1. 客户侧缓存与流式播放机制 流式视频应用(如 Netflix、YouTube)通过边下载边播放实现流畅体验,其核心依赖以下技术: 缓存预加载:客户端在后台持续下载视频片段(如 DASH/HLS 协议的…...
LLM小白自学笔记:1.两种指令微调
一、LoRA 简单来说,LoRA不直接调整个大模型的全部参数(那样太费资源),而是在模型的某些层(通常是注意力层)加个“旁路”——两个小的矩阵(低秩矩阵)。训练时只更新这俩小矩阵&#x…...
【NLP】 19. Tokenlisation 分词 BPE, WordPiece, Unigram/SentencePiece
1. 翻译系统性能评价方法 在机器翻译系统性能评估中,通常既有人工评价也有自动评价方法: 1.1 人工评价 人工评价主要关注以下几点: 流利度(Fluency): 判断翻译结果是否符合目标语言的语法和习惯。充分性…...
OpenAI发布GPT-4.1系列模型——开发者可免费使用
OpenAI刚刚推出GPT-4.1模型家族,包含GPT-4.1、GPT-4.1 Mini和GPT-4.1 Nano三款模型。重点是——现在全部免费开放! 虽然技术升级值得关注,但真正具有变革意义的是开发者能通过Cursor、Windsurf和GitHub Copilot等平台立即免费调用这些模型。…...
各地物价和生活成本 东欧篇
东欧地区的物价差异相对较大,一些国家的物价较高,而另一些国家则相对便宜。这些差异主要受当地经济发展水平、工资水平、旅游业发展以及国际关系等因素影响。以下是一些典型的东欧国家,按物价高低进行分类: 🌍 物价较高…...
Vue —— 实用的工具函数
目录 响应式数据管理1. toRef 和 torefs2. shallowRef 和 shallowReactive3. markRaw 依赖追踪与副作用1. computed2. watch 和 watchEffect 类型判断与优化1. unref2. isRef 、isReactive 和 isProxy 组件通信与生命周期1. provide 和 inject2. nextTick 高级工具1. useAttrs …...
flex布局(笔记)
弹性布局(Flex布局)是一种现代的CSS布局方式,通过使用display: flex属性来创建一个弹性容器,并在其中使用灵活的盒子模型来进行元素的排列和定位。 主轴与交叉轴:弹性容器具有主轴(main axis)和…...
第二阶段:数据结构与函数
模块4:常用数据结构 (Organizing Lots of Data) 在前面的模块中,我们学习了如何使用变量来存储单个数据,比如一个数字、一个名字或一个布尔值。但很多时候,我们需要处理一组相关的数据,比如班级里所有学生的名字、一本…...
云函数采集架构:Serverless模式下的动态IP与冷启动优化
在 Serverless 架构中使用云函数进行网页数据采集,不仅能大幅降低运维成本,还能根据任务负载动态扩展。然而,由于云函数的无状态特性及冷启动问题,加上目标网站对采集行为的反制措施(如 IP 限制、Cookie 校验等&#x…...
Linux笔记---动静态库(原理篇)
1. ELF文件格式 动静态库文件的构成是什么样的呢?或者说二者的内容是什么? 实际上,可执行文件,目标文件,静态库文件,动态库文件都是使用ELF文件格式进行组织的。 ELF(Executable and Linkable…...
string的模拟实现 (6)
目录 1.string.h 2.string.cpp 3.test.cpp 4.一些注意点 本篇博客就学习下如何模拟实现简易版的string类,学好string类后面学习其他容器也会更轻松些。 代码实现如下: 1.string.h #define _CRT_SECURE_NO_WARNINGS 1 #pragma once #include <…...
【野火模型】利用深度神经网络替代 ELMv1 野火参数化:机制、实现与性能评估
目录 一、ELMv1 野火过程表示法(BASE-Fire)关键机制野火模拟的核心过程 二、采用神经网络模拟野火过程三、总结参考 一、ELMv1 野火过程表示法(BASE-Fire) ELMv1 中的野火模型(称为 BASE-Fire)源自 Commun…...
红宝书第四十七讲:Node.js服务器框架解析:Express vs Koa 完全指南
红宝书第四十七讲:Node.js服务器框架解析:Express vs Koa 完全指南 资料取自《JavaScript高级程序设计(第5版)》。 查看总目录:红宝书学习大纲 一、框架定位:HTTP服务器的工具箱 共同功能: 快…...
嵌入式Linux设备使用Go语言快速构建Web服务,实现设备参数配置管理方案探究
本文探讨,利用Go语言及gin框架在嵌入式Linux设备上高效搭建Web服务器,以实现设备参数的网页配置。通过gin框架,我们可以在几分钟内创建一个功能完善的管理界面,方便对诸如集中器,集线器等没有界面的嵌入式设备的管理。…...
【NLP 59、大模型应用 —— 字节对编码 bpe 算法】
目录 一、词表的构造问题 二、bpe(byte pair encoding) 压缩算法 算法步骤 示例: 步骤 1:初始化符号表和频率统计 步骤 2:统计相邻符号对的频率 步骤 3:合并最高频的符号对 步骤 4:重复合并直至终止条件 三、bpe在NLP中…...
Python对ppt进行文本替换、插入图片、生成表格
目录 1. 安装pptx2. 文本替换和插入图片3. 生成表格 1. 安装pptx pip install python-pptx2. 文本替换和插入图片 文本通过占位符例如{{$xxx}}进行标记,然后进行替换;图片通过ppt中的图形和图片中的占位符进行标记ppt如下 具体实现 from pptx import …...
AI(学习笔记第一课) 在vscode中配置continue
文章目录 AI(学习笔记第一课) 在vscode中配置continue学习内容:1. 使用背景2. 在vscode中配置continue2.1 vscode版本2.2 在vscode中下载continue插件2.2.1 直接进行安装2.2.2 在左下角就会有continue的按钮2.2.3 可以移动到右上角2.2.3 使用的时候需要login 2.3 配…...
C++ (初始面向对象之继承,实现继承,组合,修饰权限)
初始面向对象之继承 根据面向对象的编程思路,我们可以把共性抽象出来封装成类,然后让不同的角色去继承这些类,从而避免大量重复代码的编写 实现继承 继承机制是面向对象程序设计中使代码可以复用的最重要的手段,它允许程序员在保…...
vmcore分析锁问题实例(x86-64)
问题描述:系统出现panic,dmesg有如下打印: [122061.197311] task:irq/181-ice-enp state:D stack:0 pid:3134 ppid:2 flags:0x00004000 [122061.197315] Call Trace: [122061.197317] <TASK> [122061.197318] __schedule0…...
21、c#中“?”的用途
在C#中,? 是一个多用途的符号,具有多种不同的用途,具体取决于上下文。以下是一些常见的用法: 1、可空类型(Nullable Types) ? 可以用于将值类型(如 int、bool 等)变为可空类型。…...
每日搜索--12月
12.1 1. urlencode是一种编码方式,用于将字符串以URL编码的形式进行转换。 urlencode也称为百分号编码(Percent-encoding),是特定上下文的统一资源定位符(URL)的编码机制。它适用于统一资源标识符(URI)的编码,也用于为application/x-www-form-urlencoded MIME准备数…...
一天一个java知识点----Tomcat与Servlet
认识BS架构 静态资源:服务器上存储的不会改变的数据,通常不会根据用户的请求而变化。比如:HTML、CSS、JS、图片、视频等(负责页面展示) 动态资源:服务器端根据用户请求和其他数据动态生成的,内容可能会在每次请求时都…...
游戏报错?MFC140.dll怎么安装才能解决问题?提供多种MFC140.dll丢失修复方案
MFC140.dll 是 Microsoft Visual C 2015 运行库的重要组成部分,许多软件和游戏依赖它才能正常运行。如果你的电脑提示 "MFC140.dll 丢失" 或 "MFC140.dll 未找到",说明系统缺少该文件,导致程序无法启动。本文将详细介绍 …...
TDengine 3.3.6.3 虚拟表简单验证
涛思新出的版本提供虚拟表功能,完美解决了多值窄表查询时需要写程序把窄表变成宽表的处理过程,更加优雅。 超级表定义如下: CREATE STABLE st01 (ts TIMESTAMP,v0 INT,v1 BIGINT,v2 FLOAT,v3 BOOL) TAGS (device VARCHAR(32),vtype VARCHAR(…...
小白如何从0学习php
学习 PHP 可以从零开始逐步深入,以下是针对小白的系统学习路径和建议: 1. 了解 PHP 是什么 定义:PHP 是一种开源的服务器端脚本语言,主要用于 Web 开发(如动态网页、API、后台系统)。 用途:构建…...
常见的 14 个 HTTP 状态码详解
文章目录 一、2xx 成功1、200 OK2、204 No Content3、206 Partial Content 二、3xx 重定向1、301 Moved Permanently2、302 Found3、303 See Other注意4、Not Modified5、307 Temporary Redirect 三、4xx 客户端错误1、400 Bad Request2、401 Unauthorized3、403 Forbidden4、4…...
【Java学习笔记】DOS基本指令
DOS 基本指令 基本原理 接受指令 解析指令 执行指令 常用命令 查看当前目录有什么:dir 使用绝对路径查看特定目录下文件:dir 绝对路径 切换到其他盘:直接输入C: 或 D:直接切换到根目录 返回上一级目录:cd.. 切换到根目录…...
Linux Kernel 8
可编程中断控制器(Programmable Interrupt Controller,PIC) 支持中断(interrupt)的设备通常会有一个专门用于发出中断请求Interrupt ReQuest,IRQ的输出引脚(IRQ pin)。这些IRQ引脚连…...
原子操作CAS(Compare-And-Swap)和锁
目录 原子操作 优缺点 锁 互斥锁(Mutex) 自旋锁(Spin Lock) 原子性 单核单CPU 多核多CPU 存储体系结构 缓存一致性 写传播(Write Propagation) 事务串行化(Transaction Serialization&#…...




















RpcCaller 调用 



再返回到call进行检查并返回结果







上层的result就获取到了结果,进行输出。






也是调用的这个send()
接下来也是 调用请求描述对象中设置的回调函数即Callback1
而Callback1不和Callback一样set_value设置结果,而是调用上层传来的函数,它返回结果给用户。











