当前位置: 首页 > article >正文

从零实现Json-Rpc框架】- 项目实现 - 服务端主题实现及整体封装

📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨

在这里插入图片描述

在这里插入图片描述

文章目录

  • 📢前言
  • 🏳️‍🌈一、服务端主题管理模块​
    • 1.1 核心功能
    • 1.2 核心设计思路
    • 1.3 主题 结构构造
    • 1.4 订阅者 结构构造
    • 1.5 主题 和 订阅者 统筹管理
      • (1) onTopicRequest
      • (2) onShutdown
      • (3) 细节实现方法
    • 1.8 框架代码
    • 1.7 整体代码
  • 🏳️‍🌈二、服务端整体实现
    • 2.1 逻辑框架
    • 2.1.1 注册中心(RegistryServer)​
    • 2.1.2 RPC 服务核心(RpcServer)​
      • 2.1.3 主题服务(TopicServer)​
    • 2.2 逻辑代码
    • 2.3 整体代码
  • 👥总结


📢前言

截至现在,在项目实现上,我们已经封装好了 零碎接口,对 各种消息 及 常用结构体 进行了封装

也实现了 dispatcher 路由转发的功能。

服务端 方面,完成了

  • 业务函数回调总结 - rpc_route.hpp
  • 服务的提供、发现、注册 - rpc_registry.hpp

客户端 方面,完成了

  • 消息请求及其回调 - requestor.hpp
  • 消息请求发送 - rpc_caller.hpp

服务端 未完成部分

  • 服务端主题管理模块 - rpc_topic.hpp
  • 服务端功能整合 - rpc_server.hpp

客户端 未完成部分

  • 客户的提供、发现、注册 - rpc_registry.hpp
  • 客户端主题管理模块 - rpc_topic.hpp
  • 客户端功能整合 - rpc_client.hpp

这一篇文章,笔者就为服务端的 主题实现整体封装 画上句号


🏳️‍🌈一、服务端主题管理模块​

1.1 核心功能

这段代码实现了一个 ​服务端主题管理模块​(TopicManager),支持 ​发布-订阅模式 的核心功能,包括:

  • 主题的创建与删除
  • 客户端的订阅与取消订阅
  • 消息的发布与推送
  • 连接断开时的自动清理

1.2 核心设计思路

​线程安全:通过 std::mutex 保护共享数据(_topics 和 _subscribers)。
​数据映射

  • _topics:维护主题名称到 Topic 对象的映射。
  • _subscribers:维护客户端连接到 Subscriber 对象的映射。

​操作统一入口:通过 onTopicRequest 分发不同类型的主题操作请求。

1.3 主题 结构构造

作用:表示一个主题,管理其所有订阅者。
​成员

  • _subscribers:存储所有订阅者的集合(Subscriber::ptr)。

​方法

  • appendSubscriber / removeSubscriber:增删订阅者。
  • pushMessage:向所有订阅者发送消息。
// 主题名称 和 其订阅者连接 的映射
struct Topic {using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string _topic_name;std::unordered_set<Subscriber::ptr> _subscribers; // 当前主题的订阅者的连接Topic(const std::string& topic_name) : _topic_name(topic_name) {}// 增加订阅者void appendSubscriber(const Subscriber::ptr& subscriber);// 删除订阅者void removeSubscriber(const Subscriber::ptr& subscriber);// 推送消息void pushMessage(const BaseMessage::ptr& msg);
};

1.4 订阅者 结构构造

作用:表示一个订阅者,记录其订阅的主题。
​成员

  • _conn:订阅者的网络连接对象。
  • _topics:订阅者当前订阅的所有主题名称集合。

​方法

  • appendTopic / removeTopic:增删订阅的主题。
// 定义一个订阅者对象,记录其订阅的 所有主题名称
struct Subscriber {using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr _conn;std::unordered_set<std::string> _topics; // 该订阅者所订阅的主题名称Subscriber(const BaseConnection::ptr& conn) : _conn(conn) {};// 增加订阅的主题void appendTopic(const std::string& topic);// 删除订阅的主题void removeTopic(const std::string& topic);
};

1.5 主题 和 订阅者 统筹管理

我们需要建立两个映射关系

  • 主题名 -> 主题结构
  • 订阅者连接 -> 订阅者结构
std::mutex _mutex;
std::unordered_map<std::string, Topic::ptr> _topics;
std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;

核心接口解析

(1) onTopicRequest

​功能:处理客户端发送的主题操作请求(总入口)。
​操作类型

  • TOPIC_CREATE:调用 topicCreate 创建主题。
  • TOPIC_REMOVE:调用 topicRemove 删除主题。
  • TOPIC_SUBSCRIBE:调用 topicSubscribe 订阅主题。
  • TOPIC_CANCEL:调用 topicCancel 取消订阅。
  • TOPIC_PUBLISH:调用 topicPublish 发布消息。
  • 错误处理:若操作失败,返回错误响应(errorResponse)。

(2) onShutdown

​功能:处理客户端连接断开时的清理逻辑。
​步骤

  • _subscribers 中移除订阅者。
  • 遍历订阅者的所有主题,从主题的订阅列表中移除该订阅者。

(3) 细节实现方法

变更类

// 构造一个主题对象,添加映射关系的管理
void topicCreate(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);// 删除一个主题对象,删除映射关系的管理
void topicRemove(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);// 将订阅者订阅到指定主题。
bool topicSubscribe(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);// 取消该主题的订阅者
void topicCancel(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);// 向各个该主题的 订阅者 发布主题消息
bool topicPublish(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

返回类

//  返回一个错误响应
void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode);//  返回一个主题响应
void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

1.8 框架代码

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>namespace rpc
{namespace server{class TopicManager{public:using ptr = std::shared_ptr<TopicManager>;// 统一处理有关主题的请求void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 关闭连接void onShutdown(const BaseConnection::ptr &conn);private://  返回一个错误响应void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode);//  返回一个主题响应void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);private:// 构造一个主题对象,添加映射关系的管理void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 删除一个主题对象,删除映射关系的管理void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 将订阅者订阅到指定主题。bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 取消该主题的订阅者 void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 向各个该主题的 订阅者 发布主题消息bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);private:// 定义一个订阅者对象,记录其订阅的 所有主题名称struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr _conn;std::unordered_set<std::string> _topics; // 该订阅者所订阅的主题名称Subscriber(const BaseConnection::ptr &conn) : _conn(conn) {};// 增加订阅的主题void appendTopic(const std::string &topic);// 删除订阅的主题void removeTopic(const std::string &topic);};// 主题名称 和 其订阅者连接 的映射struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string _topic_name;std::unordered_set<Subscriber::ptr> _subscribers; // 当前主题的订阅者的连接Topic(const std::string &topic_name) : _topic_name(topic_name) {}// 增加订阅者void appendSubscriber(const Subscriber::ptr &subscriber);// 删除订阅者void removeSubscriber(const Subscriber::ptr &subscriber);// 推送消息void pushMessage(const BaseMessage::ptr &msg);};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};}
}

1.7 整体代码

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>namespace rpc
{namespace server{class TopicManager{public:using ptr = std::shared_ptr<TopicManager>;// 统一处理有关主题的请求void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){TopicOptype topic_optype = msg->optype();bool ret = true;switch(topic_optype){case TopicOptype::TOPIC_CREATE: topicCreate(conn, msg); break;              // 主题创建case TopicOptype::TOPIC_REMOVE: topicRemove(conn, msg); break;              // 主题删除case TopicOptype::TOPIC_SUBSCRIBE: ret = topicSubscribe(conn, msg); break;  // 主题订阅case TopicOptype::TOPIC_CANCEL: topicCancel(conn, msg); break;              // 主题取消订阅case TopicOptype::TOPIC_PUBLISH: ret = topicPublish(conn, msg); break;      // 主题发布default: return errorResponse(conn, msg, RCode::RCODE_INVALID_OPTYPE); break;}if(!ret) return errorResponse(conn, msg, RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn, msg);}// 关闭连接void onShutdown(const BaseConnection::ptr &conn){// 消息发布者断开连接,不需要任何操作// 1. 判断断开连接的是否为订阅者,不是的话直接返回// 2. 获取到订阅者退出,受影响的主题对象// 3. 从主题对象中,移除订阅者// 4. 从订阅者映射信息中,删除订阅者std::vector<Topic::ptr> topics;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto sub_it = _subscribers.find(conn);if(sub_it == _subscribers.end()){ELOG("该订阅者连接不存在");return;}// 获取该订阅者锁定月的主题subscriber = sub_it->second;// 2. 获取到订阅者退出,受影响的主题对象for(auto& topic_name : subscriber->_topics){auto topic_it = _topics.find(topic_name);if(topic_it == _topics.end())continue;topics.push_back(topic_it->second);}_subscribers.erase(sub_it);}}private:void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode){auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRcode(rcode);return conn->send(msg_rsp);}void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRcode(RCode::RCODE_OK);return conn->send(msg_rsp);}private:// 构造一个主题对象,添加映射关系的管理void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);// 获取主题名字std::string topic_name = msg->topicKey();// 构造一个主题对象auto topic = std::make_shared<Topic>(topic_name);// 增加订阅者_topics.insert(std::make_pair(topic_name, topic));std::cout << "创建主题" <<  topic_name << std::endl;}// 删除一个主题对象,删除映射关系的管理void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1. 查看当前主题,有哪些订阅者,然后从订阅者中将主题信息删掉// 2. 删除主题的数据 -- 主题名称余出题对象的映射std::string topic_name = msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers; // 记录 当前主题 的 所有订阅者连接{std::unique_lock<std::mutex> lock(_mutex);auto it = _topics.find(msg->topicKey());if (it == _topics.end()){ELOG("没有找到 %s 主题的订阅者", msg->topicKey().c_str());return;}subscribers = it->second->_subscribers;_topics.erase(it); // 删除主题对象}for (auto &subscriber : subscribers){subscriber->removeTopic(topic_name);}}// 将订阅者订阅到指定主题。bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);// 1. 查找或创建订阅者对象(Subscriber)auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end()){return false;}topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}else{subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}}// 2. 在主题对象中,新增一个订阅者对象关联的连接;  在订阅者对象中新增一个订阅的主题topic->appendSubscriber(subscriber);subscriber->appendTopic(msg->topicKey());return true;}// 取消该主题的订阅者 void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1. 先找出主题对象,和订阅者对象Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it != _topics.end()){topic = topic_it->second;}auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}}// 2. 从主对象中删除当前的订阅者连接if(subscriber)subscriber->removeTopic(msg->topicKey());if(topic && subscriber)topic->removeSubscriber(subscriber);}// 向各个该主题的 订阅者 发布主题消息bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end()){ELOG("没有找到 %s 主题的订阅者", msg->topicKey().c_str());return false;}topic = topic_it->second;}topic->pushMessage(msg);return true;}private:// 定义一个订阅者对象,记录其订阅的 所有主题名称struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr _conn;std::unordered_set<std::string> _topics; // 该订阅者所订阅的主题名称Subscriber(const BaseConnection::ptr &conn) : _conn(conn) {};// 增加订阅的主题void appendTopic(const std::string &topic){std::unique_lock<std::mutex> lock(_mutex);_topics.insert(topic);}void removeTopic(const std::string &topic){std::unique_lock<std::mutex> lock(_mutex);_topics.erase(topic);}};// 主题名称 和 其订阅者连接 的映射struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string _topic_name;std::unordered_set<Subscriber::ptr> _subscribers; // 当前主题的订阅者的连接Topic(const std::string &topic_name) : _topic_name(topic_name) {}// 增加订阅者void appendSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);_subscribers.insert(subscriber);}// 删除订阅者void removeSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);_subscribers.erase(subscriber);}// 推送消息void pushMessage(const BaseMessage::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);for (auto &subscriber : _subscribers){subscriber->_conn->send(msg);}}};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};}
}

🏳️‍🌈二、服务端整体实现

2.1 逻辑框架

一个 ​分布式 RPC 服务端系统

包含三个核心模块:​服务注册中心RPC 服务核心 和 ​主题服务

以下是各模块的作用及整体架构:

2.1.1 注册中心(RegistryServer)​

​功能

  • ​服务注册与发现:接收服务提供者(Provider)注册的服务信息,供消费者(Consumer)查询可用服务。
  • 连接管理:在客户端断开时清理相关资源(如服务下线通知)。

​关键成员

  • _pd_manager:管理服务注册与发现的业务逻辑(如维护服务列表)。
  • _dispatcher:分发客户端请求到对应的处理逻辑。
  • _server:底层网络服务器,监听端口并处理连接。

​使用场景

  • 服务提供者启动时向注册中心注册自身服务。
  • 消费者通过注册中心查询可用的服务地址和方法。

2.1.2 RPC 服务核心(RpcServer)​

​功能

  • ​RPC 服务管理:启动 RPC 服务,处理客户端调用请求。
  • 服务注册(可选)​:将服务方法注册到注册中心(若启用)。
  • 请求路由:将 RPC 请求路由到对应的业务处理逻辑。

​关键成员

  • _router:路由请求到具体的服务方法(如根据方法名匹配处理函数)。
  • _req_client:与注册中心通信的客户端(用于服务注册或发现)。
  • _dispatcher:分发网络消息到业务逻辑。

    配置选项
    _enableRegistry:是否启用注册中心(决定是否自动注册服务)。

2.1.3 主题服务(TopicServer)​

​功能

  • ​发布-订阅模式:管理主题的创建、订阅和消息推送。
  • ​消息广播:向订阅特定主题的客户端推送消息。

​关键成员

  • _topic_manager:管理主题和订阅者(如 TopicManager 类)。
  • _dispatcher:处理客户端的订阅/发布请求。

2.2 逻辑代码

#pragma once#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"#include "rpc_route.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"namespace rpc
{namespace server{// 注册中心的服务端实现// 启动服务:监听指定端口,接收客户端(服务提供者/发现者)连接。// ​消息路由:将服务注册/发现请求分发给业务处理器(PDManager)。// ​连接管理:在客户端断开时清理相关资源(如服务下线通知)。class RegistryServer{// 注册中心服务端,只需要针对服务注册与发现请求进行处理即可public:using ptr = std::shared_ptr<RegistryServer>;RegistryServer(int port);void start();private:void onConnShutdown(const BaseConnection::ptr &conn);private:ProviderDiscovererManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};// ​RPC 服务端核心类,负责管理 RPC 服务的生命周期// 启动 RPC 服务:监听指定端口,处理客户端 RPC 请求。// ​服务注册​(可选):将服务方法注册到注册中心,供客户端发现。// ​请求路由:将接收到的 RPC 请求分发给对应的业务处理逻辑。class RpcServer{public:using ptr = std::shared_ptr<RpcServer>;RpcServer(const Address &access_addr, bool enableRegistry = false, const Address &registry_server_addr = Address());// 注册服务到注册中心void registerMethod(const ServiceDescribe::ptr &service);void start();private:bool _enableRegistry;Address _access_addr;rpc::client::RegistryClient::ptr _req_client;RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};class TopicServer{public:using ptr = std::shared_ptr<TopicServer>;TopicServer(int port);void start();private:void onConnShutdown(const BaseConnection::ptr &conn);private:TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}

2.3 整体代码

#pragma once#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp" #include "rpc_route.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"namespace rpc{namespace server{// 注册中心的服务端实现// 启动服务:监听指定端口,接收客户端(服务提供者/发现者)连接。// ​消息路由:将服务注册/发现请求分发给业务处理器(PDManager)。// ​连接管理:在客户端断开时清理相关资源(如服务下线通知)。class RegistryServer{// 注册中心服务端,只需要针对服务注册与发现请求进行处理即可public:using ptr = std::shared_ptr<RegistryServer>;RegistryServer(int port): _pd_manager(std::make_shared<ProviderDiscovererManager>()),_dispatcher(std::make_shared<Dispatcher>()){ // 1. 注册服务请求处理器// 将 PDManager::onServiceRequest 绑定到 MType::REQ_SERVICE 消息类型。当收到服务注册或发现请求时,调用此方法处理auto service_cb = std::bind(&ProviderDiscovererManager::onServiceRequest, _pd_manager.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);// 2. 创建底层服务器并设置回调// 通过 ServerFactory 创建底层服务器,设置消息总入口为 Dispatcher::onMessage_server = rpc::ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);// 3. 设置连接关闭回调// 当客户端断开连接时,调用 onConnShutdown 清理相关资源auto close_cb = std::bind(&RegistryServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();   // 启动服务器,开始监听端口}private:void onConnShutdown(const BaseConnection::ptr& conn){_pd_manager->onConnShutdown(conn);  // 通知 PDManager 处理连接断开}private:ProviderDiscovererManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};// ​RPC 服务端核心类,负责管理 RPC 服务的生命周期// 启动 RPC 服务:监听指定端口,处理客户端 RPC 请求。// ​服务注册​(可选):将服务方法注册到注册中心,供客户端发现。// ​请求路由:将接收到的 RPC 请求分发给对应的业务处理逻辑。class RpcServer{public:using ptr = std::shared_ptr<RpcServer>;RpcServer(const Address& access_addr, bool enableRegistry = false, const Address& registry_server_addr = Address()): _enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<rpc::server::RpcRouter>()),_dispatcher(std::make_shared<Dispatcher>()){// 1. ​创建注册客户端​(若启用注册)if(enableRegistry){_req_client = std::make_shared<client::RegistryClient>(registry_server_addr.first, registry_server_addr.second);}// 2. 注册 RPC 请求处理回调auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, rpc_cb);// 3. 创建底层服务器并设置回调_server = rpc::ServerFactory::create(access_addr.second);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}// 注册服务到注册中心void registerMethod(const ServiceDescribe::ptr& service){if(_enableRegistry)_req_client->registryMethod(service->method(), _access_addr);_router->registerMethod(service);}void start(){_server->start();}private:bool _enableRegistry;Address _access_addr;rpc::client::RegistryClient::ptr _req_client;RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};class TopicServer{public:using ptr = std::shared_ptr<TopicServer>;TopicServer(int port): _topic_manager(std::make_shared<TopicManager>()),_dispatcher(std::make_shared<Dispatcher>()){// 1. 注册主题请求处理器auto topic_cb = std::bind(&TopicManager::onTopicRequest, _topic_manager.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, topic_cb);// 2. 创建底层服务器并设置回调_server = ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);// 3. 设置连接关闭回调auto close_cb = std::bind(&TopicServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}private:void onConnShutdown(const BaseConnection::ptr& conn){_topic_manager->onShutdown(conn);}private:TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}

👥总结

本篇博文对 从零实现Json-Rpc框架】- 项目实现 - 服务端主题实现及整体封装 做了一个较为详细的介绍,不知道对你有没有帮助呢

觉得博主写得还不错的三连支持下吧!会继续努力的~

请添加图片描述

相关文章:

从零实现Json-Rpc框架】- 项目实现 - 服务端主题实现及整体封装

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;博客仓库&#xff1a;https://gitee.com/JohnKingW/linux_test/tree/master/lesson &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &…...

AI助力PPT制作,让演示变得轻松高效

AI助力PPT制作&#xff0c;让演示变得轻松高效&#xff01;随着科技的进步&#xff0c;AI技术早已渗透到各行各业&#xff0c;特别是在办公领域&#xff0c;AI制作PPT已不再是未来的梦想&#xff0c;而是现实的工具。以前你可能需要花费数小时来制作一个完美的PPT&#xff0c;如…...

React-01React创建第一个项目(npm install -g create-react-app)

1. React特点 JSX是javaScript语法的扩展&#xff0c;React开发不一定使用JSX。单向响应的数据流&#xff0c;React实现单向数据流&#xff0c;减少重复代码&#xff0c;比传统数据绑定更简单。等等 JSX是js的语法扩展&#xff0c;允许在js中编写类似HTML的代码 const …...

HTML应用指南:利用POST请求获取三大运营商5G基站位置信息(二)

在当前信息技术迅猛发展的背景下,第五代移动通信(5G)技术作为新一代的无线通信标准,正逐步成为推动社会进步和产业升级的关键驱动力。三大电信运营商(中国移动、中国联通、中国电信)在全国范围内的5G基站部署,不仅极大地提升了网络性能,也为智能城市、物联网、自动驾驶…...

C++学习笔记之内存管理

仅用于记录学习理解 选择题答案及解析 globalVar&#xff1a;C&#xff08;数据段 (静态区)&#xff09; 解析&#xff1a;全局变量存放在数据段&#xff08;静态区&#xff09;&#xff0c;生命周期从程序开始到结束&#xff0c;程序运行期间一直存在。 staticGlobalVar&…...

针对 MySQL 数据库中 主键/唯一约束的更新方法 和 ON DUPLICATE KEY UPDATE 语法的详细说明及示例,并以表格总结

以下是针对 MySQL 数据库中 主键/唯一约束的更新方法 和 ON DUPLICATE KEY UPDATE 语法的详细说明及示例&#xff0c;并以表格总结&#xff1a; 一、主键的更新 1. 更新主键的条件 允许更新&#xff1a;MySQL 允许更新主键列&#xff0c;但需满足以下条件&#xff1a; 唯一性…...

day21 学习笔记

文章目录 前言一、删除数据二、索引操作1.loc方法2.iloc方法 三、添加数据1.loc方法添加数据2.concat方法拼接数据 四、重置索引 前言 通过今天的学习&#xff0c;我掌握了对Pandas对象数据元素进行增删操作以及重置索引的操作 一、删除数据 DataFrame.drop(labelsNone, axis…...

【MyBatis】深入解析 MyBatis XML 开发:增删改查操作和方法命名规范、@Param 重命名参数、XML 返回自增主键方法

增删改查操作 接下来&#xff0c;我们来实现一下用户的增加、删除和修改的操作。 增( Insert ) UserInfoMapper接口&#xff1a; 我们写好UserInfoMapper接口后&#xff0c;自动生成 XML 代码&#xff1b; UserInfoMapper.xml实现&#xff1a; 增删改查方法命名规范 如果我们…...

【Pandas】pandas DataFrame select_dtypes

Pandas2.2 DataFrame Attributes and underlying data 方法描述DataFrame.index用于获取 DataFrame 的行索引DataFrame.columns用于获取 DataFrame 的列标签DataFrame.dtypes用于获取 DataFrame 中每一列的数据类型DataFrame.info([verbose, buf, max_cols, …])用于提供 Dat…...

使用Python构建Kafka示例项目

新建项目 mkdir python-kafka-test cd python-kafka-test 安装依赖 pip install confluent_kafka 创建配置文件 # Kafka配置文件# Kafka服务器配置 KAFKA_CONFIG {bootstrap.servers: localhost:9092,# 生产者特定配置producer: {client.id: python-kafka-producer,acks:…...

本地化部署DeepSeek-R1蒸馏大模型:基于飞桨PaddleNLP 3.0的实战指南

目录 一、飞桨框架3.0&#xff1a;大模型推理新范式的开启1.1 自动并行机制革新&#xff1a;解放多卡推理1.2 推理-训练统一设计&#xff1a;一套代码全流程复用 二、本地部署DeepSeek-R1-Distill-Llama-8B的实战流程2.1 机器环境说明2.2 模型与推理脚本准备2.3 启动 Docker 容…...

VBA 64位API声明语句第008讲

跟我学VBA&#xff0c;我这里专注VBA, 授人以渔。我98年开始&#xff0c;从源码接触VBA已经20余年了&#xff0c;随着年龄的增长&#xff0c;越来越觉得有必要把这项技能传递给需要这项技术的职场人员。希望职场和数据打交道的朋友&#xff0c;都来学习VBA,利用VBA,起码可以提高…...

Linux信号——信号的保存(2)

关于core和term两种终止方式 core是什么&#xff1f; 将进程在内存中的核心数据&#xff08;与调试有关&#xff09;转存到磁盘中形成core,core.pid的文件。 core dump&#xff1a;核心转储。 core与term的区别&#xff1a; term只是普通的终止&#xff0c;而core终止方式还要…...

PyQt6实例_A股日数据维护工具_权息数据增量更新线程

目录 前置&#xff1a; 代码&#xff1a; 1 工作类 2 数据库交互 3 主界面启用子线程 视频&#xff1a; 前置&#xff1a; 1 本系列将以 “PyQt6实例_A股日数据维护工具” 开头放置在“PyQt6实例”专栏 专栏地址 https://blog.csdn.net/m0_37967652/category_12929760.h…...

【蓝桥杯嵌入式——学习笔记一】2016年第七届省赛真题重难点解析记录,闭坑指南(文末附完整代码)

在读题过程中发现本次使用的是串口2&#xff0c;需要配置串口2。 但在查看产品手册时发现PA14同时也是SWCLK。 所以在使用串口2时需要拔下跳线帽去连接CH340。 可能是用到串口2的缘故&#xff0c;在烧录时发现报了一个错误。这时我们要想烧录得按着复位键去点击烧录&#xff0c…...

基础常问 (概念、代码)

读源码 代码题 Void方法 &#xff0c;也可以提前rerun;结束 RandomAccessFile类&#xff08;随机访问文件&#xff09; 在 Java 中&#xff0c;可以使用RandomAccessFile类来实现文件指针操作。RandomAccessFile提供了对文件内容的随机访问功能&#xff0c;它的文件指针可以通…...

大学生机器人比赛实战(一)综述篇

大学生机器人比赛实战 参加机器人比赛是大学生提升工程实践能力的绝佳机会。本指南将全面介绍如何从零开始准备华北五省机器人大赛、ROBOCAN、RoboMaster等主流机器人赛事&#xff0c;涵盖硬件设计、软件开发、算法实现和团队协作等关键知识。 一、比赛选择与准备策略 1.1 主…...

什么是宽带拨号?

宽带拨号&#xff08;PPPoE拨号&#xff09;是一种通过账号密码认证接入互联网的方式&#xff0c;常见于家庭宽带、企业专线等场景。用户需要通过路由器或电脑进行拨号连接&#xff0c;运营商验证身份后分配IP地址&#xff0c;才能正常上网。 1. 宽带拨号的工作原理 PPPoE协议&…...

J1 ResNet-50算法实战与解析

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習紀錄博客&#x1f356; 原作者&#xff1a;K同学啊 | 接輔導、項目定制 一、理论知识储备 1. 残差网络的由来 ResNet主要解决了CNN在深度加深时的退化问题&#xff08;梯度消失与梯度爆炸&#xff09;。 虽然B…...

[MySQL初阶]MySQL(8)索引机制:下

标题&#xff1a;[MySQL初阶]MySQL&#xff08;8&#xff09;索引机制&#xff1a;下 水墨不写bug 文章目录 四、从问题到底层&#xff0c;从现象到本质1.为什么插入的数据默认排好序2.MySQL的Page&#xff08;1&#xff09;为什么选择用Page&#xff1f;&#xff08;2&#x…...

Muduo网络库实现 [九] - EventLoopThread模块

目录 设计思路 类的设计 模块的实现 私有接口 公有接口 设计思路 我们说过一个EventLoop要绑定一个线程&#xff0c;未来该EventLoop所管理的所有的连接的操作都需要在这个EventLoop绑定的线程中进行&#xff0c;所以我们该如何实现将EventLoop和线程绑定呢&#xff1f;…...

Vim操作指令全解析

Vim是我们在Linux日常工作中不可或缺的文本编辑器。它强大的功能和高效的编辑方式可以极大提升工作效率。本文将全面解析Vim的各种操作指令&#xff0c;从基础操作到高级技巧。 一、Vim模式解析 Vim是一个模式化编辑器&#xff0c;理解不同模式是掌握Vim的关键&#xff1a; …...

《K230 从熟悉到...》识别机器码(AprilTag)

《K230 从熟悉到...》识别机器码&#xff08;aprirltag&#xff09; tag id 《庐山派 K230 从熟悉到...》 识别机器码&#xff08;AprilTag&#xff09; AprilTag是一种基于二维码的视觉标记系统&#xff0c;最早是由麻省理工学院&#xff08;MIT&#xff09;在2008年开发的。A…...

VMware ESXi:企业级虚拟化平台详解

VMware ESXi&#xff1a;企业级虚拟化平台详解 目录 什么是VMware ESXi&#xff1f; ESXi的发展历史 ESXi的核心特性 3.1 裸机架构&#xff08;Type-1 Hypervisor&#xff09; 3.2 轻量化与高性能 3.3 集中管理&#xff08;vCenter集成&#xff09; ESXi的架构与工作原理…...

使用 PyTorch 的 `optim.lr_scheduler.CosineAnnealingLR` 学习率调度器

使用 PyTorch 的 optim.lr_scheduler.CosineAnnealingLR 学习率调度器 在深度学习中,学习率(Learning Rate, LR)是影响模型训练效果的一个关键超参数。一个合适的学习率调度策略可以帮助模型更快地收敛,同时避免陷入局部最优或振荡。PyTorch 提供了多种学习率调度器,其中…...

栈和队列的概念

1.栈的概念 只允许在固定的一端进行插入和删除&#xff0c;进行数据的插入和数据的删除操作的一端数栈顶&#xff0c;另一端称为栈底。 栈中数据元素遵循后进先出LIFO (Last In First Out) 压栈&#xff1a;栈的插入。 出栈&#xff1a;栈的删除。出入数据在栈顶。 那么下面…...

常用的元素操作API

click 触发当前元素的点击事件 clear() 清空内容 sendKeys(...) 往文本框一类元素中写入内容 getTagName() 获取元素的的标签名 getAttribute(属性名) 根据属性名获取元素属性值 getText() 获取当前元素的文本值 isDisplayed() 查看元素是否显示 get(String url) 访…...

红日靶场一实操笔记

一&#xff0c;网络拓扑图 二&#xff0c;信息搜集 1.kali机地址&#xff1a;192.168.50.129 2.探测靶机 注&#xff1a;需要win7开启c盘里面的phpstudy的服务。 nmap -sV -Pn 192.168.50.128 或者扫 nmap -PO 192.168.50.0/24 可以看出来win7(ip为192.168.50.128)的靶机开…...

SpringBoot集成Redis 灵活使用 TypedTuple 和 DefaultTypedTuple 实现 Redis ZSet 的复杂操作

以下是 Spring Boot 集成 Redis 中 TypedTuple 和 DefaultTypedTuple 的详细使用说明&#xff0c;包含代码示例和场景说明&#xff1a; 1. 什么是 TypedTuple 和 DefaultTypedTuple&#xff1f; TypedTuple<T> 接口&#xff1a; 定义了 Redis 中有序集合&#xff08;ZSet…...

7-4 BCD解密

BCD数是用一个字节来表达两位十进制的数&#xff0c;每四个比特表示一位。所以如果一个BCD数的十六进制是0x12&#xff0c;它表达的就是十进制的12。但是小明没学过BCD&#xff0c;把所有的BCD数都当作二进制数转换成十进制输出了。于是BCD的0x12被输出成了十进制的18了&#x…...