brpc之baidu_protocol
简介
是brpc默认使用的协议
初始化
Protocol baidu_protocol = { ParseRpcMessage,SerializeRequestDefault, PackRpcRequest,ProcessRpcRequest, ProcessRpcResponse,VerifyRpcRequest, NULL, NULL,CONNECTION_TYPE_ALL, "baidu_std" };
协议定义
定义在baidu_rpc_meta.proto文件中
message RpcMeta {optional RpcRequestMeta request = 1;optional RpcResponseMeta response = 2;optional int32 compress_type = 3;optional int64 correlation_id = 4;optional int32 attachment_size = 5;optional ChunkInfo chunk_info = 6;optional bytes authentication_data = 7;optional StreamSettings stream_settings = 8;map<string, string> user_fields = 9;
}message RpcRequestMeta {required string service_name = 1;required string method_name = 2;optional int64 log_id = 3;optional int64 trace_id = 4;optional int64 span_id = 5;optional int64 parent_span_id = 6;optional string request_id = 7; // correspond to x-request-id in http headeroptional int32 timeout_ms = 8; // client's timeout setting for current call
}message RpcResponseMeta {optional int32 error_code = 1;optional string error_text = 2;
}
处理
ProcessRpcRequest是服务端收到一个完整包后的处理,其依赖ServerPrivateAccessor(获取Server对应的服务名或者方法名)和ControllerPrivateAccessor
void ProcessRpcRequest(InputMessageBase* msg_base) {const int64_t start_parse_us = butil::cpuwide_time_us();DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));SocketUniquePtr socket_guard(msg->ReleaseSocket());Socket* socket = socket_guard.get();const Server* server = static_cast<const Server*>(msg_base->arg());ScopedNonServiceError non_service_error(server);RpcMeta meta;if (!ParsePbFromIOBuf(&meta, msg->meta)) {LOG(WARNING) << "Fail to parse RpcMeta from " << *socket;socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s",socket->description().c_str());return;}const RpcRequestMeta &request_meta = meta.request();SampledRequest* sample = AskToBeSampled();if (sample) {sample->meta.set_service_name(request_meta.service_name());sample->meta.set_method_name(request_meta.method_name());sample->meta.set_compress_type((CompressType)meta.compress_type());sample->meta.set_protocol_type(PROTOCOL_BAIDU_STD);sample->meta.set_attachment_size(meta.attachment_size());sample->meta.set_authentication_data(meta.authentication_data());sample->request = msg->payload;sample->submit(start_parse_us);}std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);if (NULL == cntl.get()) {LOG(WARNING) << "Fail to new Controller";return;}std::unique_ptr<google::protobuf::Message> req;std::unique_ptr<google::protobuf::Message> res;ServerPrivateAccessor server_accessor(server);ControllerPrivateAccessor accessor(cntl.get());const bool security_mode = server->options().security_mode() &&socket->user() == server_accessor.acceptor();if (request_meta.has_log_id()) {cntl->set_log_id(request_meta.log_id());}if (request_meta.has_request_id()) {cntl->set_request_id(request_meta.request_id());}if (request_meta.has_timeout_ms()) {cntl->set_timeout_ms(request_meta.timeout_ms());}cntl->set_request_compress_type((CompressType)meta.compress_type());accessor.set_server(server).set_security_mode(security_mode).set_peer_id(socket->id()).set_remote_side(socket->remote_side()).set_local_side(socket->local_side()).set_auth_context(socket->auth_context()).set_request_protocol(PROTOCOL_BAIDU_STD).set_begin_time_us(msg->received_us()).move_in_server_receiving_sock(socket_guard);if (meta.has_stream_settings()) {accessor.set_remote_stream_settings(meta.release_stream_settings());}if (!meta.user_fields().empty()) {for (const auto& it : meta.user_fields()) {(*cntl->request_user_fields())[it.first] = it.second;}}// Tag the bthread with this server's key for thread_local_data().if (server->thread_local_options().thread_local_data_factory) {bthread_assign_data((void*)&server->thread_local_options());}Span* span = NULL;if (IsTraceable(request_meta.has_trace_id())) {span = Span::CreateServerSpan(request_meta.trace_id(), request_meta.span_id(),request_meta.parent_span_id(), msg->base_real_us());accessor.set_span(span);span->set_log_id(request_meta.log_id());span->set_remote_side(cntl->remote_side());span->set_protocol(PROTOCOL_BAIDU_STD);span->set_received_us(msg->received_us());span->set_start_parse_us(start_parse_us);span->set_request_size(msg->payload.size() + msg->meta.size() + 12);}MethodStatus* method_status = NULL;do {if (!server->IsRunning()) {cntl->SetFailed(ELOGOFF, "Server is stopping");break;}if (socket->is_overcrowded()) {cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",butil::endpoint2str(socket->remote_side()).c_str());break;}if (!server_accessor.AddConcurrency(cntl.get())) {cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",server->options().max_concurrency);break;}if (FLAGS_usercode_in_pthread && TooManyUserCode()) {cntl->SetFailed(ELIMIT, "Too many user code to run when"" -usercode_in_pthread is on");break;}// NOTE(gejun): jprotobuf sends service names without packages. So the// name should be changed to full when it's not.butil::StringPiece svc_name(request_meta.service_name());if (svc_name.find('.') == butil::StringPiece::npos) {const Server::ServiceProperty* sp =server_accessor.FindServicePropertyByName(svc_name);if (NULL == sp) {cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",request_meta.service_name().c_str());break;}svc_name = sp->service->GetDescriptor()->full_name();}const Server::MethodProperty* mp =server_accessor.FindMethodPropertyByFullName(svc_name, request_meta.method_name());if (NULL == mp) {cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",request_meta.service_name().c_str(),request_meta.method_name().c_str());break;} else if (mp->service->GetDescriptor()== BadMethodService::descriptor()) {BadMethodRequest breq;BadMethodResponse bres;breq.set_service_name(request_meta.service_name());mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);break;}// Switch to service-specific error.non_service_error.release();method_status = mp->status;if (method_status) {int rejected_cc = 0;if (!method_status->OnRequested(&rejected_cc, cntl.get())) {cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",mp->method->full_name().c_str(), rejected_cc);break;}}google::protobuf::Service* svc = mp->service;const google::protobuf::MethodDescriptor* method = mp->method;accessor.set_method(method);if (!server->AcceptRequest(cntl.get())) {break;}if (span) {span->ResetServerSpanName(method->full_name());}const int req_size = static_cast<int>(msg->payload.size());butil::IOBuf req_buf;butil::IOBuf* req_buf_ptr = &msg->payload;if (meta.has_attachment_size()) {if (req_size < meta.attachment_size()) {cntl->SetFailed(EREQUEST,"attachment_size=%d is larger than request_size=%d",meta.attachment_size(), req_size);break;}int body_without_attachment_size = req_size - meta.attachment_size();msg->payload.cutn(&req_buf, body_without_attachment_size);req_buf_ptr = &req_buf;cntl->request_attachment().swap(msg->payload);}CompressType req_cmp_type = (CompressType)meta.compress_type();req.reset(svc->GetRequestPrototype(method).New());if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {cntl->SetFailed(EREQUEST, "Fail to parse request message, ""CompressType=%s, request_size=%d", CompressTypeToCStr(req_cmp_type), req_size);break;}res.reset(svc->GetResponsePrototype(method).New());// `socket' will be held until response has been sentgoogle::protobuf::Closure* done = ::brpc::NewCallback<int64_t, Controller*, const google::protobuf::Message*,const google::protobuf::Message*, const Server*,MethodStatus*, int64_t>(&SendRpcResponse, meta.correlation_id(), cntl.get(), req.get(), res.get(), server,method_status, msg->received_us());// optional, just release resource ASAPmsg.reset();req_buf.clear();if (span) {span->set_start_callback_us(butil::cpuwide_time_us());span->AsParent();}if (!FLAGS_usercode_in_pthread) {return svc->CallMethod(method, cntl.release(), req.release(), res.release(), done);}if (BeginRunningUserCode()) {svc->CallMethod(method, cntl.release(), req.release(), res.release(), done);return EndRunningUserCodeInPlace();} else {return EndRunningCallMethodInPool(svc, method, cntl.release(),req.release(), res.release(), done);}} while (false);// `cntl', `req' and `res' will be deleted inside `SendRpcResponse'// `socket' will be held until response has been sentSendRpcResponse(meta.correlation_id(), cntl.release(), req.release(), res.release(), server,method_status, msg->received_us());
}
对于开启了usercode_in_pthread,如果计数超过限制,会放到UserCodeBackupPool线程池中来执行
int UserCodeBackupPool::Init() {// Like bthread workers, these threads never quit (to avoid potential hang// during termination of program).for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) {pthread_t th;if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) {LOG(ERROR) << "Fail to create UserCodeRunner";return -1;}}return 0;
}static void* UserCodeRunner(void* args) {butil::PlatformThread::SetName("brpc_user_code_runner");static_cast<UserCodeBackupPool*>(args)->UserCodeRunningLoop();return NULL;
}// Entry of backup thread for running user code.
void UserCodeBackupPool::UserCodeRunningLoop() {bthread::run_worker_startfn();
#ifdef BAIDU_INTERNALlogging::ComlogInitializer comlog_initializer;
#endifint64_t last_time = butil::cpuwide_time_us();while (true) {bool blocked = false;UserCode usercode = { NULL, NULL };{BAIDU_SCOPED_LOCK(s_usercode_mutex);while (queue.empty()) {pthread_cond_wait(&s_usercode_cond, &s_usercode_mutex);blocked = true;}usercode = queue.front();queue.pop_front();if (g_too_many_usercode &&(int)queue.size() <= FLAGS_usercode_backup_threads) {g_too_many_usercode = false;}}const int64_t begin_time = (blocked ? butil::cpuwide_time_us() : last_time);usercode.fn(usercode.arg);const int64_t end_time = butil::cpuwide_time_us();inpool_count << 1;inpool_elapse_us << (end_time - begin_time);last_time = end_time;}
}
放入队列逻辑为
void EndRunningCallMethodInPool(::google::protobuf::Service* service,const ::google::protobuf::MethodDescriptor* method,::google::protobuf::RpcController* controller,const ::google::protobuf::Message* request,::google::protobuf::Message* response,::google::protobuf::Closure* done) {CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs;args->service = service;args->method = method;args->controller = controller;args->request = request;args->response = response;args->done = done;return EndRunningUserCodeInPool(CallMethodInBackupThread, args);
};void EndRunningUserCodeInPool(void (*fn)(void*), void* arg) {InitUserCodeBackupPoolOnceOrDie();g_usercode_inplace.fetch_sub(1, butil::memory_order_relaxed);// Not enough idle workers, run the code in backup threads to prevent// all workers from being blocked and no responses will be processed// anymore (deadlocked).const UserCode usercode = { fn, arg };pthread_mutex_lock(&s_usercode_mutex);s_usercode_pool->queue.push_back(usercode);// If the queue has too many items, we can't drop the user code// directly which often must be run, for example: client-side done.// The solution is that we set a mark which is not cleared before// queue becomes short again. RPC code checks the mark before// submitting tasks that may generate more user code.if ((int)s_usercode_pool->queue.size() >=(FLAGS_usercode_backup_threads *FLAGS_max_pending_in_each_backup_thread)) {g_too_many_usercode = true;}pthread_mutex_unlock(&s_usercode_mutex);pthread_cond_signal(&s_usercode_cond);
}
闭包
在处理协议时,最终提供给Server的闭包为
google::protobuf::Closure* done = ::brpc::NewCallback<int64_t, Controller*, const google::protobuf::Message*,const google::protobuf::Message*, const Server*,MethodStatus*, int64_t>(&SendRpcResponse, meta.correlation_id(), cntl.get(), req.get(), res.get(), server,method_status, msg->received_us());template <typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5, typename Arg6, typename Arg7>
inline ::google::protobuf::Closure* NewCallback(void (*function)(Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7),Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5, Arg6 arg6, Arg7 arg7) {return new internal::FunctionClosure7<Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7>(function, true, arg1, arg2, arg3, arg4, arg5, arg6, arg7);
}
FunctionClosure7闭包类关系为
function_:为发送响应函数SendRpcResponse
arg1_:为meta.correlation_id()
arg2_:为cntl.get(),类型为Controller*
arg3_:为req.get(),类型为const google::protobuf::Message*,表示请求消息
arg4_:为res.get(),类型为const google::protobuf::Message*,表示用于加响应的消息
arg5_:为server, 类型为const Server*,表示具体的服务业务
arg6_ :为method_status,类型为MethodStatus*,表示方法状态
arg7_:为msg->received_us(),类型为int64_t,表示消息的接收时间
相关文章:
brpc之baidu_protocol
简介 是brpc默认使用的协议 初始化 Protocol baidu_protocol { ParseRpcMessage,SerializeRequestDefault, PackRpcRequest,ProcessRpcRequest, ProcessRpcResponse,VerifyRpcRequest, NULL, NULL,CONNECTION_TYPE_ALL, "baidu_std" };协议定义 定义在baidu_rpc…...
LeetCode:39. 组合总和
跟着carl学算法,本系列博客仅做个人记录,建议大家都去看carl本人的博客,写的真的很好的! 代码随想录 LeetCode:39. 组合总和 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target ,找出 cand…...
SOLID原则学习,开闭原则(Open Closed Principle, OCP)
文章目录 1. 定义2. 开闭原则的详细解释3. 实现开闭原则的方法4. 总结 1. 定义 开闭原则(Open-Closed Principle,OCP)是面向对象设计中的五大原则(SOLID)之一,由Bertrand Meyer提出。开闭原则的核心思想是…...
Unreal Engine 5 C++ Advanced Action RPG 七章笔记
第七章 Ranged Enemy 2-Ranged Enemy Starting Weapon 制作新敌人的流程准备 新敌人的武器起始的状态数据自己的战斗能力投射能力自己的行为树 创建角色,添加武器,添加数据,就是继承之前的基类敌人的 运行结果 3-Glacer Starting Stats 看看就行,就是复制曲线表格更改数…...
自动连接校园网wifi脚本实践(自动网页认证)
目录 起因执行步骤分析校园网登录逻辑如何判断当前是否处于未登录状态? 书写代码打包设置开机自动启动 起因 我们一般通过远程控制的方式访问实验室电脑,但是最近实验室老是断电,但重启后也不会自动连接校园网账户认证,远程工具&…...
HTTP/HTTPS ⑤-CA证书 || 中间人攻击 || SSL/TLS
这里是Themberfue ✨上节课我们聊到了对称加密和非对称加密,实际上,单纯地非对称加密并不能保证数据不被窃取,我们还需要一个更加重要的东西——证书 中间人攻击 通过非对称加密生成私钥priKey和公钥pubKey用来加密对称加密生成的密钥&…...
traceroute原理探究
文章中有截图,看不清的话,可以把浏览器显示比例放大到200%后观看。 linux下traceroute的原理 本文通过抓包观察一下linux下traceroute的原理 环境:一台嵌入式linux设备,内网ip是192.168.186.195,其上有192.168.202.…...
50_Lua垃圾回收
1.Lua垃圾回收机制概述 Lua采用了一种自动内存管理机制,称为垃圾回收(Garbage Collection, GC)。垃圾回收的主要目的是回收程序中不再被使用的内存,从而避免内存泄漏。Lua的垃圾回收器负责回收动态分配的对象,如函数、用户数据、表、字符串、线程、内部结构等。Lua的垃圾…...
Git-2-:Cherry-Pick 的使用场景及使用流程
前面我们说了 Git合并、解决冲突、强行回退等解决方案 >> 点击查看 这里再说一下 Cherry-Pick功能,Cherry-Pick不是merge,只是把部分功能代码Cherry-Pick到远程的目标分支 git cherry-pick功能简介: git cherry-pick 是用来从一个分…...
【C++】21.map和set的使用
文章目录 1. 序列式容器和关联式容器2. set系列的使用2.1 set和multiset参考文档2.2 set类的介绍2.3 set的构造和迭代器构造函数:双向迭代器迭代器: 2.4 set的增删查2.5 insert和迭代器遍历使用样例:2.6 find和erase使用样例:2.7 …...
burpsiute的基础使用(2)
爆破模块(intruder): csrf请求伪造访问(模拟攻击): 方法一: 通过burp将修改,删除等行为的数据包压缩成一个可访问链接,通过本地浏览器访问(该浏览器用户处于登陆状态&a…...
ElasticSearch 同义词匹配
synonym.txt 电脑, 计算机, 主机 复印纸, 打印纸, A4纸, 纸, A3 平板电脑, Pad DELETE /es_sku_index_20_20250109 PUT /es_sku_index_20_20250109 {"settings": {"index": {"number_of_shards": "5","number_of_replicas&quo…...
linux RT-Preempt spin lock实现
一、spin_lock概述 Spinlock是linux内核中常用的一种互斥锁机制,和mutex不同,当无法持锁进入临界区的时候,当前执行线索不会阻塞,而是不断的自旋等待该锁释放。正因为如此,自旋锁也是可以用在中断上下文的。也正是因为…...
PySpark广播表连接解决数据倾斜的完整案例
使用PySpark解决数据倾斜问题的完整案例,通过广播表连接的方式来优化性能。 准备数据 假设我们有两张表,一张大表 big_table 和一张小表 small_table ,小表将作为广播表。 from pyspark.sql import SparkSession# 初始化SparkSession spar…...
Chromium CDP 开发(十二):为自己的Domain建立custom_config.json
引言 本章详细介绍了如何为自定义的 CDP Domain 创建 custom_config.json 文件,并通过修改 BUILD.gn 文件来确保自定义的配置文件参与编译。我们通过 inspector_protocol_generate 配置段自动生成自定义 Domain 的头文件和实现文件,并成功将其集成到构建…...
【Vue】全局/局部组件使用流程(Vue2为例)
全局组件和局部组件区别 如何使用 全局组件:全局注册后,可以在任意页面中直接使用。局部组件:在页面中需要先导入子组件路径,注册组件才能使用。 适用场景 全局组件:适用于高频使用的组件,如导航栏、业…...
Vue.js组件开发详解
在现代前端开发中,Vue.js 凭借其简洁、高效、灵活的特性,成为了众多开发者的首选框架之一,而组件化开发则是 Vue.js 的核心优势。组件可以将复杂的 UI 界面拆分成一个个独立的、可复用的小块,极大地提高了开发效率和代码的可维护性…...
解决:ubuntu22.04中IsaacGymEnv保存视频报错的问题
1. IsaacGymEnvs项目介绍 IsaacGymEnvs:基于NVIDIA Isaac Gym的高效机器人训练环境 IsaacGymEnvs 是一个基于 NVIDIA Isaac Gym 的开源 Python 环境库,专为机器人训练提供高效的仿真环境。Isaac Gym 是由 NVIDIA 开发的一个高性能物理仿真引擎…...
深度学习camp-第J7周:对于ResNeXt-50算法的思考
🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 📌你需要解决的疑问:这个代码是否有错?对错与否都请给出你的思考 📌打卡要求:请查找相关资料、逐步…...
java: 错误: 无效的源发行版:17解决办法
遇到“java: 错误: 无效的源发行版:17”的问题,通常是因为项目设置中指定的Java版本与当前环境不一致导致的。以下是几种可能的解决方案: 检查并升级Java版本:确保你已经安装了支持Java 17的JDK版本。你可以通过命令行输入java -v…...
业务系统对接大模型的基础方案:架构设计与关键步骤
业务系统对接大模型:架构设计与关键步骤 在当今数字化转型的浪潮中,大语言模型(LLM)已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中,不仅可以优化用户体验,还能为业务决策提供…...
docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
Qt Http Server模块功能及架构
Qt Http Server 是 Qt 6.0 中引入的一个新模块,它提供了一个轻量级的 HTTP 服务器实现,主要用于构建基于 HTTP 的应用程序和服务。 功能介绍: 主要功能 HTTP服务器功能: 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...
鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/
使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题:docker pull 失败 网络不同,需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...
Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...
