当前位置: 首页 > article >正文

C++实现分布式网络通信框架RPC(2)——rpc发布端

有了上篇文章的项目的基本知识的了解,现在我们就开始构建项目。

目录

一、构建工程目录

二、本地服务发布成RPC服务

2.1理解RPC发布

 2.2实现

三、Mprpc框架的基础类设计

3.1框架的初始化类 MprpcApplication

代码实现

3.2读取配置文件类 MprpcConfig

代码实现 

3.3Mprpc网络服务类  RpcProvider

代码实现

1、 NotifyService

2、Run()

 3、OnConnection

4、OnMessage

5、 SendRpcResponse

四、总结


一、构建工程目录

为了更规范化地做完一个工程项目,首先我们要标准化创建一系列目录如下

  • bin:放的是我们项目的可执行文件
  • build:存放的是构建该项目的Cmake编译出来的中间文件,由于中间文件比较杂乱,所以我们统一放在该目录下进行管理
  • example:存放的是我们我们对该项目框架的使用例子
  • lib:存放的是该项目框架最后编译成的静态库
  • src:存放的是该项目框架的所有相关代码
  • test:存放的是我们在构建项目时中间所做的一些测试代码
  • autobuild.sh:一件编译脚本
  • CMakeLists.txt:Cmake顶级目录中存放Cmake编译要寻找的文件
  • README.md:关于该项目的自我描述

二、本地服务发布成RPC服务

2.1理解RPC发布

对于服务端的某一个本地方法来说,客户端如何才能调用到这个方法呢?

显而易见,首先要做是服务端将自己的本地方法都发布成rpc远程调用方法,这样客户端才能通过rpc调用它。在客户端调用某一个服务的某一个方法的时候,总得有要传输服务对象的名字、方法的名字和参数,这中间涉及到的消息的序列化和反序列化我们都是使用protobuf来实现的,但是要注意的是 protobuf 并不支持什么rpc功能,他只是对rpc方法的一个描述,通过这个描述它就可以去做这个rpc请求所携带的参数的序列化和反序列化。

所以整个调用过程中的请求消息发送和响应消息返回都是通过rpc这个框架去一步步调用中间的各种技术去完成的。

假设客户端(Caller)调用了 Login(Loginrequest),即上图左边红色部分,那么这些服务和方法名字及参数都被进行序列化(上图左边黄色部分,然后经过框架调用muduo库发送(上图左边绿色部分)到了服务端(Callee)这一端(上图右边绿色部分)。

接着框架就根据从客户端发来的消息,得到了其想调用Login方法,它就将带有参数的这个Login(Loginrequest)交到protobuf自动生成的UserServiceRpc服务类中的我们重写之后的这个Login方法上了,如下

 在服务端,是由框架给业务上报了请求参数,也就是虚函数Login方法里的request参数,即框架已经将服务端发来的消息都放在了request参数中,且已经被反序列化好了,所以我们在服务端只需要直接使用就好了。

 2.2实现

所以我们想要发布一个RPC方法,我们就必须先定义一个 .proto 文件,相当于一种协议,也让rpc的调用方知道这个rpc方法是怎么发布的,方法名字叫什么,参数类型是什么,返回值类型是什么,这样两方都是遵照这个约束来进行rpc方法的发布和rpc方法的调用。

先定义 user.proto如下,接着对它进行protoc编译,生成user.pb.h和user.pb.cc文件

syntax ="proto3";package fixbug;
option cc_generic_services = true;message ResultCode{int32 errcode=1;bytes errmsg=2;}message LoginRequest{bytes name=1;bytes pwd=2;}message LoginResponse{ResultCode result=1;bool success=2;}message RegisterRequset{uint32 id=1;bytes name=2;bytes pwd=3;}message RegisterResponse{ResultCode result=1;bool success=2;//登陆是否成功}service UserServiceRpc{rpc Login(LoginRequest) returns(LoginResponse);rpc Register(RegisterRequset)returns(RegisterResponse);}

接着自己封装一个类:UserService继承自UserServiceRpc,且对其中的Login方法进行重写如下:

//userservice.cc
#include <iostream>
#include <string>
#include "user.pb.h"
//Userservice原来是一个本地服务,提供了进程内的本地方法,Login
class UserService : public fixbug::UserServiceRpc // 使用在rpc服务的发布端即rpc服务的提供者
{
public:bool Login(std::string name, std::string pwd)//本地方法{std::cout << "doing local service: Login" << std::endl;std::cout << "name:" << name << "  pwd:" << pwd << std::endl;return true;}void Login(::google::protobuf::RpcController *controller,const ::fixbug::LoginRequest *request,::fixbug::LoginResponse *response,::google::protobuf::Closure *done){std::string name=request->name();std::string pwd=request->pwd();//做本地业务bool login_result=Login(name,pwd);//函数重载,参数和返回值不同//把响应写入 包括错误码 错误消息 返回值fixbug::ResultCode *code=response->mutable_result();code->set_errcode(0);//没有错误时,错误码置为0code->set_errmsg("");//没有错误时,错误码为空response->set_success(login_result);//执行回调操作 执行响应对象数据的序列化和网络发送(都是由框架来完成的)done->Run(); }
};

代码解析:

  • Login方法的第一个参数controller,我们暂时不做解释,到后面再细说
  • 第二个参数就是框架已经给我们将客户端的请求消息(Loginrequest)都拿到了request里面,且已经反序列好了,我们直接使用protobuf自动生成的LoginRequest类中的方法拿取就可以了。
  • 将取到的参数在本地进行业务处理。
  • 处理完业务之后,再利用protobuf自动生成的LoginResponse类中的方法将处理结果填回去。但是此时在业务代码中,我们并不清楚怎么返回,这些响应消息的序列化和返回需要交给框架来做,所以框架提供了 LoginResponse *response这个参数,这里业务段只需要填好內容就行了,至于内容的序列化和发送就不需要操心了,全由框架完成。
  • 最后就是执行回调函数 done()了,它的作用就是执行响应对象数据的序列化和网络发送(这属于框架代码 ),其实Closure也是一个抽象类,也需要我们定义一个类来继承它然后进行重写。

我们上面的所实现的都是业务代码!

三、Mprpc框架的基础类设计

接下来我们就需要考虑框架的使用了,在我们服务端使用框架的时候都需要那些操作?

  1. 框架的初始化操作,因为Mprpc也是一个服务器,所以也要有ip地址和端口号,我们不能写死,这些东西都可以从配置文件中读取
  2. 初始化完框架之后,还应该定义一个类(可以理解为网络服务对象)专门来提供在Mprpc节点上发布服务的功能。且他还应该有网络功能,在发布完服务之后启动rpc服务发布节点,启动以后,进程进入阻塞状态,等待远程的rpc调用请求。
//userservice.cc
int main(int agrc,char** argv)
{//使用框架前,一般都需要先调用框架的初始化操作(做一些配置、日志的初始化)//以provider -i config.conf形式输入,从config.conf中读取网络服务器及配置中心的地址或者端口号MprpcApplication::Init(agrc,argv);//将来需要一些ip地址和端口号,这些不能写死,需要在环境变量中获取RpcProvider provider;//专门在框架上发布服务的,provider是一个rpc网络服务对象,将UserService对象发布到rpc节点上provider.NotifyService(new UserService());//启动一个rpc服务发布节点,Run以后,进程进入阻塞状态,等待远程的rpc调用请求provider.Run();return 0;
}

3.1框架的初始化类 MprpcApplication

根据上面的使用框架过程中需要的操作,我们先来实现框架的初始化类

  • Mprpc框架的初始化类,这里使用单例模式来设计,因为它包含了一些整个框架锁共享的一些信息,如配置信息、日志信息等,希望在框架其他地方使用的时候能以最简便的形式获取到框架的基础类共享的信息。
  • 由于框架的初始化只需要在使用框架时调用一次,所以设计成静态成员方法。
  • 由于是单例模式,所以我们要私有化它的构造函数删除拷贝构造和移动构造函数,并给他一个静态的创建实例的方法
  • 由于是框架的初始操作,所以我们还需要一个读取配置文件中信息的一个功能,这个功能涉及到判断配置文件中内容是否合法和读取配置文件內容的操作,所以我们也将读取配置文件封装成一个类

头文件如下

//mprpcapplication.h
class MprpcApplication
{public:static void Init(int argc,char** argv);//用于解析配置文件或启动参数。static MprpcApplication& GetInstance();//获取MprpcApplication实例static MprpcConfig& GetConfig();//获取配置文件內容private:static MprpcConfig m_config;MprpcApplication(){}MprpcApplication(const MprpcApplication&)=delete;//删除拷贝构造函数MprpcApplication(MprpcApplication&&)=delete;//删除移动构造函数
};

代码实现

//mprpcapplication.cc
#include "mprpcapplication.h"
#include<iostream>
#include<unistd.h>
#include<string>MprpcConfig MprpcApplication::m_config;//静态成员在类外初始化void ShowArgsHelp()
{ std::cout<<"format: command -i <configfile>"<<std::endl;
}
//我们将来希望以 ./provider -i test.conf 的命令形式去执行程序所以传入了可变参数
void MprpcApplication::Init(int argc, char **argv)
{if(argc < 2){ShowArgsHelp();exit(EXIT_FAILURE);}int c =0;std::string config_file;while((c = getopt(argc,argv,"i:"))!=-1){switch (c){case 'i':config_file = optarg;//optarg 是 getopt 提供的一个全局变量,指向当前选项对应的参数值;break;case '?'://用户输入了未定义的选项(比如 -x)ShowArgsHelp();exit(EXIT_FAILURE);case ':'://当某个需要参数的选项没有提供参数时(比如 -i 后面没跟文件名)ShowArgsHelp();exit(EXIT_FAILURE);default:break;}}m_config.LoadConfigFile(config_file.c_str());//从命令行上得到了配置文件,接下来就是读取}
MprpcApplication& MprpcApplication::GetInstance()
{static MprpcApplication app;return app;
}
MprpcConfig& MprpcApplication::GetConfig()
{return m_config;
}
#include <unistd.h>
int getopt(int argc, char * const argv[], const char *optstring);

该函数是来解析命令行参数的写法。

  • argcargv[]:主函数传入的命令行参数。
  • optstring:指定合法的选项字符串,例如 "i:" 表示:
    • 'i' 是一个有效选项;
    • 冒号 : 表示这个选项需要一个参数(argument);
  • getopt 会依次从 argv[] 中取出命令行中传入的选项(option);
  • 每次返回一个选项字符(如 'i'),当没有更多选项时返回 -1

3.2读取配置文件类 MprpcConfig

我们的配置文件內容如下

#test.conf
#rpc节点的ip地址
rpcserverip=127.0.0.1
#rpc节点的port端口号
rpcserverport=8081
#zk的ip地址
zookeeperip=127.0.0.1
#zk的port端口号
zookeeperport=2181

 可以看到配置文件中的内容就类似 key-value 的键值对,所以我们考虑在 读取配置文件类 中实现两个方法

  1. 第一个方法是将配置文件中的内容读出来以键值对的形式放入 unordered_map表中。
  2. 第二个方法是给定一个 key,查找 unordered_map表中有没有对应的value并返回。
//mprpcConfig.h
//框架读取配置文件类
//rpcserver_ip,rpcserver_port,zookeeper_ip,zookeeper_port
class MprpcConfig
{public://负责解析加载配置文件void LoadConfigFile(const char*config_file);//查询配置项信息,传入一个key,返回该key所对应的字符串std::string Load(const std::string &key);private:std::unordered_map<std::string,std::string> m_configMap;//去掉字符串前后的空格void Trim(std::string &src_buf);
};

代码实现 

//mprpcconfig.cc
//负责解析加载配置文件void MprpcConfig::LoadConfigFile(const char*config_file){FILE *pf = fopen(config_file,"r");if(nullptr == pf){std::cout<<config_file<<"is not exits"<<std::endl;exit(EXIT_FAILURE);}//三种情况 1、注释 2、正确的配置项 3、去掉多余的空格while(!feof(pf))//测试给定的文件流是否已经达到了文件结束符{char buf[512] = {0};fgets(buf,512,pf);//去掉字符串前边的空格std::string read_buf(buf);Trim(read_buf);//判断#的注释if(read_buf[0]=='#'||read_buf.empty()){continue;}//解析配置项std::string key;std::string value;int idx=read_buf.find('=');if(idx==-1){//配置不合法continue;}key=read_buf.substr(0,idx);Trim(key);//对于value来说还需要找到回车换行在那个地方int endidx=read_buf.find('\n',idx);value=read_buf.substr(idx+1,endidx-idx-1);Trim(value);m_configMap.insert({key,value});}}//查询配置项信息,传入一个key,返回该key所对应的字符串std::string MprpcConfig::Load(const std::string &key){//这里不能直接return m_configMap[key];因为如果key不存在的话,它会给map表里面增加东西的auto it=m_configMap.find(key);if(it == m_configMap.end()){return " ";}return it->second; }//去掉字符串前后的空格void MprpcConfig::Trim(std::string &src_buf){int idx=src_buf.find_first_not_of(' ');if(idx!=-1){//说明字符串前面有空格,即存在有效內容src_buf = src_buf.substr(idx,src_buf.size()-idx);}//去掉字符串后面多余的空格idx=src_buf.find_last_not_of(' ');if(idx!=-1){src_buf=src_buf.substr(0,idx+1);}}

代码解析:这里的功能就相当于是在配置文件中剔除掉那些注释、空格还有根据=找到对应的配置项。 

3.3Mprpc网络服务类  RpcProvider

这个类是框架提供的专门服务发布rpc服务的网络对象类,用户使用的我们的框架的时候是定义了一个Rpcprovide对象,把它当作rpc的一个节点,然后向它上面发布服务,作为网络服务类,所以肯定会存在很多人都请求这个服务的情况,所以它是必须做到高并发的,所以rpc的发布我们是要使用C++的Muduo库来实现的。该类中应该包含如下方法:

  1. 需要一个框架提供给外部使用的,可以发布rpc方法的函数接口 NotifyService 但是作为框架它的参数不能传具体的服务类,不然就将代码写死了,但是我们知道所有的服务都是继承自google::protobuf::Service 抽象类,所以我们可以用基类的指针来接受子类的传参。
  2. 还需要有一个启动rpc服务发布节点,开始提供rpc远程网络调用服务的方法 Run(),调用Run()方法进程进入阻塞状态等待用户的调用请求。
  3. 还要组合Event_Loop,需要一个  muduo::net::EventLoop m_eventLoop;成员,因为这个会需要在多个成员方法中调用,所以不能写成函数局部变量,得定义成成员变量。
  4. 在使用Muduo库时,我们还要关注muduo库的有没有新连接的连接回调方法(OnConnection)和已连接用户的读写事件的回调方法(OnMessage)。
  5. 为了在将来调用的时候好知道调用端调用的是什么服务的什么方法,我们还需要维护一张服务和方法对应表(服务信息表 ServiceInfo)
//rpcprovider.h
// 框架提供的专门服务发布rpc服务的网络对象类
class RpcProvider
{
public://这里是框架提供给外部使用的,可以发布rpc方法的函数接口,不能接收具体业务,而这些业务都继承自google::protobuf::Servicevoid NotifyService(google::protobuf::Service *service);//启动rpc服务节点,开始提供rpc远程网络服务调用void Run();private://关于rpc网络这一块,将tcpserver定义成一个指针,直接使用智能指针//组合了EventLoopmuduo::net::EventLoop m_eventLoop;//service服务类型信息,例如Userservice这个服务或者UserFriendList这个服务struct ServiceInfo{google::protobuf::Service *m_service;//保存服务对象,因为服务方法需要使用服务对象来调用std::unordered_map<std::string,const google::protobuf::MethodDescriptor*> m_methodMap;//保存服务方法<方法名字,方法描述>};//存储注册成功的服务对象和其服务方法的所有信息<服务名字,服务信息>std::unordered_map<std::string,ServiceInfo> m_serviceMap;//新的socket连接回调void OnConnection(const muduo::net::TcpConnectionPtr&);//已建立连接的用户的读写事件的回调void OnMessage(const muduo::net::TcpConnectionPtr&, muduo::net::Buffer* ,muduo::Timestamp);//Closure的回调操作,用于序列化rpc的响应和网络发送,所以需要Connection参数(有网络)和Message参数(发送的消息)void SendRpcResponse(const muduo::net::TcpConnectionPtr& conn,google::protobuf::Message*);
}; 

代码实现

1、 NotifyService

        到现在我们要清楚的是对于rpc发布端来说,用户发起调用的时候,我们的框架会根据调用的信息(服务的名字方法的名字参数等)来定位到用户调用的是本地的服务和方法,然后去调用它,比如用户需要调用UserService的Login方法,但是我们上面只是重写了这个Login方法,并没有调用它,这个调用操作应该是由框架来完成的,但是哦框架怎么知道要调用谁呢?所以在该类中我们需要一个表来记录服务对象和其发布的所有方法。当用户的请求过来的时候,框架就可以在表中查询对应的服务和方法,接着去调用我们重写好的方法.

在预备知识这一节中,我们在剖析 UserServiceRpcd的时候,提到了 ServiceDescriptor* GetDescriptor()方法和MethodDescriptor* descriptor()方法,它们分别是对服务对象的描述和服务对象方法的描述,所以通过这两个方法,我们就可以拿到服务端想要发布的rpc服务对象和其方法的信息。

因为NotifyService接口是一个可以发布rpc方法的接口,是框架提供给外部使用的,就像是一个站点,将想要发布的rpc服务和方法先放在这个站点上。

且我们已经定义了一个类成员用来表示服务的信息,它是一个结构体,其中保存了服务对象,和一个保存了服务对象名字和对服务对象方法描述的类型。还定义了一个存放注册成功的服务名字和服务信息的键值对容器。

所以在本接口中我们要做的事情就是拿出来想要发布的rpc服务的信息及其方法信息然后注册(填入)到我们定义的类型中。

// 这里是框架提供给外部使用的,可以发布rpc方法的函数接口,不能接收具体业务,而这些业务都继承自google::protobuf::Service
void RpcProvider::NotifyService(google::protobuf::Service *service) // 这里虽然接受的是UserService(),但是我们这是框架所以不能接收具体业务,使用UserService()的基类接收
{ServiceInfo service_info; // 实例化一个服务对象对象类型对象// 获取了服务对象的描述信息const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor(); // 现在描述的就是我们注册的UserService对象的信息,包括它的方法啊啥的// 获取服务的名字std::string service_name = pserviceDesc->name();// 获取服务对象方法的数量int methodCnt = pserviceDesc->method_count();// 测试// std::cout << "service_name: " << service_name << std::endl;LOG_INFO("service_name:%s", service_name.c_str());for (int i = 0; i < methodCnt; ++i){// 获取了服务对象指定下标的服务方法的描述,这是抽象描述,因为这是作为框架使用的,不可能直接把service变成Userservice// 把MethodDescriptor变成Login (在protobuf上就是用service和MethodDescriptor分别表示服务对象和其方法)const google::protobuf::MethodDescriptor *pmethodDesc = pserviceDesc->method(i);std::string method_name = pmethodDesc->name();// 接下来就可以给服务对象类型对象中添加东西了service_info.m_methodMap.insert({method_name, pmethodDesc});// 测试// std::cout << "method_name: " << method_name << std::endl;LOG_INFO("method_name:%s", method_name.c_str());}service_info.m_service = service;m_serviceMap.insert({service_name, service_info});
}
2、Run()

在这个方法中主要是启动rpc服务节点,利用muduo库提供rpc的远程网络服务调用,还要实现两个回调方法

void RpcProvider::Run()
{std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());muduo::net::InetAddress address(ip, port); // 将参数传入// 接下来创建tcpserver对象muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");// 绑定连接回调和消息读写回调方法,muduo库的好处就是让网络模块和业务模块分离开来,// 我们只需要关注业务(有没有新用户连接和已连接用户的读写事件),网络的部分直接交给muduo库就可以server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1)); // 这里就可以调用OnConnection了,但是它是需要绑定当前的RpcProvider对象才能调用,就是需要实例化出一个RpcProvider对象才能调,所以// 这里我们使用绑定器,即将RpcProvider::OnConnection跟当前对象(this)绑定一下// OnConnection还有一个参数,所以我们需要预留一个参数的位置server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3));// 设置muduo库的线程数量server.setThreadNum(4); // 一个线程是I/O线程,另外三个是工作线程,epoll+多线程(基于reactor模型的一个服务器)// std::cout << "RpcProvider start service at ip:" << ip << " port:" << port << std::endl;LOG_INFO("RpcProvider start service at ip:%s,port:%d", ip.c_str(), port);// 启动网络服务server.start();m_eventLoop.loop();
}
 3、OnConnection

该接口表示的是新的socket到达的回调接口,因为rpc请求也是一个和http请求一样的短链接请求,请求完,服务端响应了之后就可以主动关闭连接了。

void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn)
{if (!conn->connected()){// 表示和rpc client的连接断开了conn->shutdown();//关闭socket文件描述符}
}
4、OnMessage

接下来实现的是当已连接用户的读写事件发生时侯的回调方法,当远程有一个rpc服务的调用请求,那么我们的 OnMessage 方法就会响应。也就是rpc工作原理图的右边的黄色部分就是当前接口需要做的。

包含请求的反序列化和响应的序列化。 

  • 我们要在这里进行数据的序列化和反序列化,对于请求的数据来说,我们要有service_name、method_name和参数args,所以我们还需要定义 proto的message类型进行数据的序列化和反序列化,相当于在rpc框架内部,RpcProvider和RpcConsumer需要协商好之间通信用的protobuf数据类型,且为了防止粘包问题,我们还需要带上数据头,表示名字长度和参数的大小长度,所以由调用方发过来的数据应该是 header_size +header_str +args_str,其中header_str包含了service_name、method_name、args_size
  • 所以该方法要做的事情就是,先将网络上接收的远程rpc调用的请求的字符流 即请求的服务和方法及其参数拿到,对他们进行解析,然后反序列化,再到框架之前定义的注册rpc服务表中查询有没有对应的方法,如果有则将该service对象传给基类,然后生成rpc方法调用的请求request和response参数,后面就该调用方法了,最终也就调到了我们在 userservice服务中重写的Login方法上了
  • 最后在调用我们重写后的Login方法后,该方法还有最后一个参数 ::google::protobuf::Closure *done ,当时我们说它是回调操作,,也就是rpc提供端响应完之后,调用它来执行响应对象数据的序列化和网络发送,这些都是由框架来完成的,所以我们就在RpcProvider这个类中实现这个回调方法,只需要在 OnMessage方法中绑定到protobuf中提供的newcallback中就可以了,最后也可以一起传给我们重写的Login方法。
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,muduo::net::Buffer *buffer,muduo::Timestamp)
{// 网络上接收的远程rpc调用的请求的字符流 即请求的方法和其参数Login和argsstd::string recv_buf = buffer->retrieveAllAsString(); //***其内容就是网络接收到的原始字节流(二进制数据)本身,然后将二进制数据本身以字符串的形式返回,而不是对这些字节进行任何字符编码后形成的“有意义的字符串”。// 我们不能直接将所读到的內容转成字符串的格式来读取前四个字节来确定数据头的大小,所以我们需要string中的insert和copy这个接口来实现uint32_t header_size = 0;recv_buf.copy((char *)&header_size, 4, 0); // 从recv_buf的第0个字符开始,复制4字节的数据到header_size中// 根据header_size读取数据头的原始字符流,接着反序列化数据,得到rpc请求的详细信息std::string rpc_header_str = recv_buf.substr(4, header_size); // 这段就包含了 service_name method_name args_sizemprpc::RpcHeader rpcHeader;std::string service_name;std::string method_name;uint32_t args_size;if (rpcHeader.ParseFromString(rpc_header_str)){// 数据反序列化成功service_name = rpcHeader.service_name();method_name = rpcHeader.method_name();args_size = rpcHeader.args_size();}else{// 数据反序列化失败std::cout << "rpc_header_str" << rpc_header_str << "parse error!" << std::endl;return;}// 获取rpc方法参数的字符流数据std::string args_str = recv_buf.substr(4 + header_size, args_size);// 打印调试信息std::cout << "==================================================" << std::endl;std::cout << "recv_buf::" << recv_buf << std::endl;std::cout << "header_size::" << header_size << std::endl;std::cout << "rpc_header_str::" << rpc_header_str << std::endl;std::cout << "service_name::" << service_name << std::endl;std::cout << "method_name::" << method_name << std::endl;std::cout << "args_size::" << args_size << std::endl;std::cout << "args_str::" << args_str << std::endl;for (unsigned char c : args_str){printf("%02X ", c);}printf("\n");std::cout << "==================================================" << std::endl;// 接下来获取service对象和method对象// ServiceInfo sinfo=m_serviceMap,这里不能使用中括号不然找不到的话,对于容器来说会有副作用(插入新的)auto it = m_serviceMap.find(service_name);if (it == m_serviceMap.end()){// 判断请求的对象不在我本地std::cout << service_name << "is not exist" << std::endl;return;}// 判断请求的方法在不在我本地auto mit = it->second.m_methodMap.find(method_name);if (mit == it->second.m_methodMap.end()){// 表示方法不在std::cout << service_name << ":" << method_name << "is not exist" << std::endl;return;}// 此时就知道service了,然后将要调用的这个service对象传给基类指针准备调用(例如要调用userservice,后面就要调用我们重写的Login了)google::protobuf::Service *service = it->second.m_service;      // 对应的就是new出来的Userservice对象,整个模块中不止这一个对象还有比如获取列表好友的GetFriendList对象const google::protobuf::MethodDescriptor *method = mit->second; // 对应Login方法// 生成rpc方法调用的请求request和response参数// 相当于从抽象层面上获取了我们想请求对象的请求方法的一个请求类型和响应类型google::protobuf::Message *request = service->GetRequestPrototype(method).New();if (!request->ParseFromString(args_str))//拿到远端传过来的参数并且反序列化{// std::cout << "request parse error, content:" << args_str << std::endl;LOG_ERR("request parse error, content:%s =>%s:%s:%d", args_str.c_str(), __FILE__, __FUNCTION__, __LINE__);return;}google::protobuf::Message *response = service->GetResponsePrototype(method).New();// 给下面的method方法的调用,绑定一个Closure的回调函数,protobuf中提供了我们需要的绑定函数newcallback(生成一个新的回调)// 即产生了一个对象,它是通过newback回调的google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,const muduo::net::TcpConnectionPtr &,google::protobuf::Message *>(this, &RpcProvider::SendRpcResponse, conn, response);// 在框架上根据远端的rpc请求,调用当前rpc节点上发布的方法// 下面的就相当于是new UserService().Login(controller,request,response,done),但是这是框架调的不能用具体的对象//也就是最后框架调用的Login方法了service->CallMethod(method, nullptr, request, response, done);//根据参数就是调什么方法,nullptr所在的参数暂时不用关心,传入request、response,done是一个自己定义的回调函数//我们所希望的回调函数就是最后将响应发送回去
}
5、 SendRpcResponse

Closure的回调操作,用于序列化rpc的响应和网络发送,所以需要Connection参数(有网络)和Message参数(发送的消息)

void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr &conn, google::protobuf::Message *response)
{// 这里先对回应的消息进行序列化,再通过网络进行发送给rpc的调用方std::string response_str;if (response->SerializeToString(&response_str)){conn->send(response_str);}else{std::cout << "serialize response_str error!" << std::endl; }conn->shutdown(); // 模拟http的短链接服务,由rpcprovider主动断开连接
}

四、总结

上面的全部就是我们mprpc服务端的全部代码了,走到这里,我们应该对mprpc在服务端的这部分有了很深的了解了吧,最后我再做一个总结,从大局上说一下当用户发起rpc调用的时候,在服务端整个过程是什么样子的。

  1. 首先,对于服务端来说,用户想要请求本端的某种服务下的某种方法的时候,这个服务及方法应该先由服务端发布成rpc方法。服务端先定义 .proto文件
    1. 服务端先定义 user.proto文件,在 .proto文件中定义方法的请求与响应的message类型,最后使用service关键字定义想要发布成rpc服务的 服务信息(包括服务名字和服务方法还有方法的请求与响应)。
    2. 接着假设我们要发布 userservice 服务,我们重写 userservice.cc 业务代码,即我们实现了继承自UserServiceRpc类(该类是由 user.proto文件自动生成的且继承自Service的类)UserService类,并重写了该类下的Login方法,在这个方法中我们可以通过request参数直接拿到由远端传过来的参数,并执行本地业务,执行完后我们将响应填进相应的response参数中,最后执行参数中的回调函数将响应结果发送回去。(在这部分中我们写的是业务代码,即拿到参数、做本地业务、填入回应、然后将结果返回,至于谁调用我们重写的Login方法,结果是如何通过网络发送回去的这都是我们的框架需要做的事)
  2. 然后我们初始化框架,这一步就是根据我们启动时的命令参数解析我们的配置文件中的内容(因为涉及到网络服务,配置文件中就是一些ip和port)
  3. 然后provider类的NotifyService()方法将想要发布成rpc的服务及其方法注册到我们创建的Map表中。
  4. 接着执行provider类的Run()方法,这一步就是启动了rpc的服务节点,也就是启动了网络服务,从第2步中读到的IP和Port启动Muduo库。当发生新的Socket连接时,回调OnConnection()方法,当发生消息读写时,就回调OnMessage()方法。
  5. 所以当第4步的rpc节点启动后就可以接收远端的rpc调用了,在调用前我们还需要调用方和服务方共同协定消息的格式,要考虑到粘包问题,这里还需定义一个消息的 rpcheader.proto文件。
  6. 当远端的调用请求到达时,会回调OnMessage()方法,该方法里先根据第5步共同协定的格式,从发送过来的数据中解析出调用方想要调用的服务及方法的名字还有参数,并进行反序列化,然后在我们第3步所构建的服务方法注册表中查找是否存在。
  7. 接着生成rpc方法调用的请求request和response参数,相当于从抽象层面上获取了我们想请求对象的请求方法的一个请求类型和响应类型,大白话就是获取了一个我们想要调用的服务对象的一个request请求类型和response响应类型,接着将发送过来的已经反序列化好的参数填进这个request中。
  8. 下来就是回调方法了,这一步就是前边第一步的第二小步里面回调我们重写的Login方法(在重写的这个方法中拿参数到执行本地业务再到返回一气呵成)。这就是框架要干的工作。
  9. 最后我们重写的Login方法中还有最后一个参数表示填完response回应消息之后将回应消息通过网络发送回去,这也是框架的工作,所以也在这部分完成,在SendRpcResponse()方法中实现。将这个方法作为哦第8步的参数一块返回给我们重写的Login()方法,这样就可以在Login()方法的最后一步调用该方法将response消息通过网络发送回去。


上面就是mprpc框架的发布方的大致内容了。感谢阅读!

相关文章:

C++实现分布式网络通信框架RPC(2)——rpc发布端

有了上篇文章的项目的基本知识的了解&#xff0c;现在我们就开始构建项目。 目录 一、构建工程目录 二、本地服务发布成RPC服务 2.1理解RPC发布 2.2实现 三、Mprpc框架的基础类设计 3.1框架的初始化类 MprpcApplication 代码实现 3.2读取配置文件类 MprpcConfig 代码实现…...

基于鸿蒙(HarmonyOS5)的打车小程序

1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...

消防一体化安全管控平台:构建消防“一张图”和APP统一管理

在城市的某个角落&#xff0c;一场突如其来的火灾打破了平静。熊熊烈火迅速蔓延&#xff0c;滚滚浓烟弥漫开来&#xff0c;周围群众的生命财产安全受到严重威胁。就在这千钧一发之际&#xff0c;消防救援队伍迅速行动&#xff0c;而豪越科技消防一体化安全管控平台构建的消防“…...

VisualXML全新升级 | 新增数据库编辑功能

VisualXML是一个功能强大的网络总线设计工具&#xff0c;专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑&#xff08;如DBC、LDF、ARXML、HEX等&#xff09;&#xff0c;并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...

Sklearn 机器学习 缺失值处理 获取填充失值的统计值

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...

FFmpeg avformat_open_input函数分析

函数内部的总体流程如下&#xff1a; avformat_open_input 精简后的代码如下&#xff1a; int avformat_open_input(AVFormatContext **ps, const char *filename,ff_const59 AVInputFormat *fmt, AVDictionary **options) {AVFormatContext *s *ps;int i, ret 0;AVDictio…...

如何配置一个sql server使得其它用户可以通过excel odbc获取数据

要让其他用户通过 Excel 使用 ODBC 连接到 SQL Server 获取数据&#xff0c;你需要完成以下配置步骤&#xff1a; ✅ 一、在 SQL Server 端配置&#xff08;服务器设置&#xff09; 1. 启用 TCP/IP 协议 打开 “SQL Server 配置管理器”。导航到&#xff1a;SQL Server 网络配…...

【无标题】湖北理元理律师事务所:债务优化中的生活保障与法律平衡之道

文/法律实务观察组 在债务重组领域&#xff0c;专业机构的核心价值不仅在于减轻债务数字&#xff0c;更在于帮助债务人在履行义务的同时维持基本生活尊严。湖北理元理律师事务所的服务实践表明&#xff0c;合法债务优化需同步实现三重平衡&#xff1a; 法律刚性&#xff08;债…...

嵌入式学习之系统编程(九)OSI模型、TCP/IP模型、UDP协议网络相关编程(6.3)

目录 一、网络编程--OSI模型 二、网络编程--TCP/IP模型 三、网络接口 四、UDP网络相关编程及主要函数 ​编辑​编辑 UDP的特征 socke函数 bind函数 recvfrom函数&#xff08;接收函数&#xff09; sendto函数&#xff08;发送函数&#xff09; 五、网络编程之 UDP 用…...

从物理机到云原生:全面解析计算虚拟化技术的演进与应用

前言&#xff1a;我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM&#xff08;Java Virtual Machine&#xff09;让"一次编写&#xff0c;到处运行"成为可能。这个软件层面的虚拟化让我着迷&#xff0c;但直到后来接触VMware和Doc…...

五子棋测试用例

一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏&#xff0c;有着深厚的文化底蕴。通过将五子棋制作成网页游戏&#xff0c;可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家&#xff0c;都可以通过网页五子棋感受到东方棋类…...

GraphQL 实战篇:Apollo Client 配置与缓存

GraphQL 实战篇&#xff1a;Apollo Client 配置与缓存 上一篇&#xff1a;GraphQL 入门篇&#xff1a;基础查询语法 依旧和上一篇的笔记一样&#xff0c;主实操&#xff0c;没啥过多的细节讲解&#xff0c;代码具体在&#xff1a; https://github.com/GoldenaArcher/graphql…...

微服务通信安全:深入解析mTLS的原理与实践

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、引言&#xff1a;微服务时代的通信安全挑战 随着云原生和微服务架构的普及&#xff0c;服务间的通信安全成为系统设计的核心议题。传统的单体架构中&…...

【Post-process】【VBA】ETABS VBA FrameObj.GetNameList and write to EXCEL

ETABS API实战:导出框架元素数据到Excel 在结构工程师的日常工作中,经常需要从ETABS模型中提取框架元素信息进行后续分析。手动复制粘贴不仅耗时,还容易出错。今天我们来用简单的VBA代码实现自动化导出。 🎯 我们要实现什么? 一键点击,就能将ETABS中所有框架元素的基…...

Python实现简单音频数据压缩与解压算法

Python实现简单音频数据压缩与解压算法 引言 在音频数据处理中&#xff0c;压缩算法是降低存储成本和传输效率的关键技术。Python作为一门灵活且功能强大的编程语言&#xff0c;提供了丰富的库和工具来实现音频数据的压缩与解压。本文将通过一个简单的音频数据压缩与解压算法…...

Axure 下拉框联动

实现选省、选完省之后选对应省份下的市区...

【UE5 C++】通过文件对话框获取选择文件的路径

目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 &#xff0c;这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器&#xff0c;右键点击 .uproject 文件&#xff0c;选择 "Generate Visual Studio project files"&#xff0c;重…...

全面解析数据库:从基础概念到前沿应用​

在数字化时代&#xff0c;数据已成为企业和社会发展的核心资产&#xff0c;而数据库作为存储、管理和处理数据的关键工具&#xff0c;在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理&#xff0c;到社交网络的用户数据存储&#xff0c;再到金融行业的交易记录处理&a…...

热烈祝贺埃文科技正式加入可信数据空间发展联盟

2025年4月29日&#xff0c;在福州举办的第八届数字中国建设峰会“可信数据空间分论坛”上&#xff0c;可信数据空间发展联盟正式宣告成立。国家数据局党组书记、局长刘烈宏出席并致辞&#xff0c;强调该联盟是推进全国一体化数据市场建设的关键抓手。 郑州埃文科技有限公司&am…...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 &#xff0c;不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源&#xff08;最常用&#xff09; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...

ZYNQ学习记录FPGA(一)ZYNQ简介

一、知识准备 1.一些术语,缩写和概念&#xff1a; 1&#xff09;ZYNQ全称&#xff1a;ZYNQ7000 All Pgrammable SoC 2&#xff09;SoC:system on chips(片上系统)&#xff0c;对比集成电路的SoB&#xff08;system on board&#xff09; 3&#xff09;ARM&#xff1a;处理器…...

ubuntu22.04 安装docker 和docker-compose

首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...

DiscuzX3.5发帖json api

参考文章&#xff1a;PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下&#xff0c;适配我自己的需求 有一个站点存在多个采集站&#xff0c;我想通过主站拿标题&#xff0c;采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...

第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10+pip3.10)

第一篇&#xff1a;Liunx环境下搭建PaddlePaddle 3.0基础环境&#xff08;Liunx Centos8.5安装Python3.10pip3.10&#xff09; 一&#xff1a;前言二&#xff1a;安装编译依赖二&#xff1a;安装Python3.10三&#xff1a;安装PIP3.10四&#xff1a;安装Paddlepaddle基础框架4.1…...

自然语言处理——文本分类

文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益&#xff08;IG&#xff09; 分类器设计贝叶斯理论&#xff1a;线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别&#xff0c; 有单标签多类别文本分类和多…...

高考志愿填报管理系统---开发介绍

高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发&#xff0c;采用现代化的Web技术&#xff0c;为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## &#x1f4cb; 系统概述 ### &#x1f3af; 系统定…...

使用SSE解决获取状态不一致问题

使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件&#xff0c;这个上传文件是整体功能的一部分&#xff0c;文件在上传的过程中…...

【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权

摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题&#xff1a;安全。文章将详细阐述认证&#xff08;Authentication) 与授权&#xff08;Authorization的核心概念&#xff0c;对比传统 Session-Cookie 与现代 JWT&#xff08;JS…...

软件工程 期末复习

瀑布模型&#xff1a;计划 螺旋模型&#xff1a;风险低 原型模型: 用户反馈 喷泉模型:代码复用 高内聚 低耦合&#xff1a;模块内部功能紧密 模块之间依赖程度小 高内聚&#xff1a;指的是一个模块内部的功能应该紧密相关。换句话说&#xff0c;一个模块应当只实现单一的功能…...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...