brpc之baidu_protocol
简介
是brpc默认使用的协议
初始化
Protocol baidu_protocol = { ParseRpcMessage,SerializeRequestDefault, PackRpcRequest,ProcessRpcRequest, ProcessRpcResponse,VerifyRpcRequest, NULL, NULL,CONNECTION_TYPE_ALL, "baidu_std" };
协议定义
定义在baidu_rpc_meta.proto文件中
message RpcMeta {optional RpcRequestMeta request = 1;optional RpcResponseMeta response = 2;optional int32 compress_type = 3;optional int64 correlation_id = 4;optional int32 attachment_size = 5;optional ChunkInfo chunk_info = 6;optional bytes authentication_data = 7;optional StreamSettings stream_settings = 8;map<string, string> user_fields = 9;
}message RpcRequestMeta {required string service_name = 1;required string method_name = 2;optional int64 log_id = 3;optional int64 trace_id = 4;optional int64 span_id = 5;optional int64 parent_span_id = 6;optional string request_id = 7; // correspond to x-request-id in http headeroptional int32 timeout_ms = 8; // client's timeout setting for current call
}message RpcResponseMeta {optional int32 error_code = 1;optional string error_text = 2;
}
处理
ProcessRpcRequest
是服务端收到一个完整包后的处理,其依赖ServerPrivateAccessor
(获取Server对应的服务名或者方法名)和ControllerPrivateAccessor
void ProcessRpcRequest(InputMessageBase* msg_base) {const int64_t start_parse_us = butil::cpuwide_time_us();DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));SocketUniquePtr socket_guard(msg->ReleaseSocket());Socket* socket = socket_guard.get();const Server* server = static_cast<const Server*>(msg_base->arg());ScopedNonServiceError non_service_error(server);RpcMeta meta;if (!ParsePbFromIOBuf(&meta, msg->meta)) {LOG(WARNING) << "Fail to parse RpcMeta from " << *socket;socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s",socket->description().c_str());return;}const RpcRequestMeta &request_meta = meta.request();SampledRequest* sample = AskToBeSampled();if (sample) {sample->meta.set_service_name(request_meta.service_name());sample->meta.set_method_name(request_meta.method_name());sample->meta.set_compress_type((CompressType)meta.compress_type());sample->meta.set_protocol_type(PROTOCOL_BAIDU_STD);sample->meta.set_attachment_size(meta.attachment_size());sample->meta.set_authentication_data(meta.authentication_data());sample->request = msg->payload;sample->submit(start_parse_us);}std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);if (NULL == cntl.get()) {LOG(WARNING) << "Fail to new Controller";return;}std::unique_ptr<google::protobuf::Message> req;std::unique_ptr<google::protobuf::Message> res;ServerPrivateAccessor server_accessor(server);ControllerPrivateAccessor accessor(cntl.get());const bool security_mode = server->options().security_mode() &&socket->user() == server_accessor.acceptor();if (request_meta.has_log_id()) {cntl->set_log_id(request_meta.log_id());}if (request_meta.has_request_id()) {cntl->set_request_id(request_meta.request_id());}if (request_meta.has_timeout_ms()) {cntl->set_timeout_ms(request_meta.timeout_ms());}cntl->set_request_compress_type((CompressType)meta.compress_type());accessor.set_server(server).set_security_mode(security_mode).set_peer_id(socket->id()).set_remote_side(socket->remote_side()).set_local_side(socket->local_side()).set_auth_context(socket->auth_context()).set_request_protocol(PROTOCOL_BAIDU_STD).set_begin_time_us(msg->received_us()).move_in_server_receiving_sock(socket_guard);if (meta.has_stream_settings()) {accessor.set_remote_stream_settings(meta.release_stream_settings());}if (!meta.user_fields().empty()) {for (const auto& it : meta.user_fields()) {(*cntl->request_user_fields())[it.first] = it.second;}}// Tag the bthread with this server's key for thread_local_data().if (server->thread_local_options().thread_local_data_factory) {bthread_assign_data((void*)&server->thread_local_options());}Span* span = NULL;if (IsTraceable(request_meta.has_trace_id())) {span = Span::CreateServerSpan(request_meta.trace_id(), request_meta.span_id(),request_meta.parent_span_id(), msg->base_real_us());accessor.set_span(span);span->set_log_id(request_meta.log_id());span->set_remote_side(cntl->remote_side());span->set_protocol(PROTOCOL_BAIDU_STD);span->set_received_us(msg->received_us());span->set_start_parse_us(start_parse_us);span->set_request_size(msg->payload.size() + msg->meta.size() + 12);}MethodStatus* method_status = NULL;do {if (!server->IsRunning()) {cntl->SetFailed(ELOGOFF, "Server is stopping");break;}if (socket->is_overcrowded()) {cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",butil::endpoint2str(socket->remote_side()).c_str());break;}if (!server_accessor.AddConcurrency(cntl.get())) {cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",server->options().max_concurrency);break;}if (FLAGS_usercode_in_pthread && TooManyUserCode()) {cntl->SetFailed(ELIMIT, "Too many user code to run when"" -usercode_in_pthread is on");break;}// NOTE(gejun): jprotobuf sends service names without packages. So the// name should be changed to full when it's not.butil::StringPiece svc_name(request_meta.service_name());if (svc_name.find('.') == butil::StringPiece::npos) {const Server::ServiceProperty* sp =server_accessor.FindServicePropertyByName(svc_name);if (NULL == sp) {cntl->SetFailed(ENOSERVICE, "Fail to find service=%s",request_meta.service_name().c_str());break;}svc_name = sp->service->GetDescriptor()->full_name();}const Server::MethodProperty* mp =server_accessor.FindMethodPropertyByFullName(svc_name, request_meta.method_name());if (NULL == mp) {cntl->SetFailed(ENOMETHOD, "Fail to find method=%s/%s",request_meta.service_name().c_str(),request_meta.method_name().c_str());break;} else if (mp->service->GetDescriptor()== BadMethodService::descriptor()) {BadMethodRequest breq;BadMethodResponse bres;breq.set_service_name(request_meta.service_name());mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);break;}// Switch to service-specific error.non_service_error.release();method_status = mp->status;if (method_status) {int rejected_cc = 0;if (!method_status->OnRequested(&rejected_cc, cntl.get())) {cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",mp->method->full_name().c_str(), rejected_cc);break;}}google::protobuf::Service* svc = mp->service;const google::protobuf::MethodDescriptor* method = mp->method;accessor.set_method(method);if (!server->AcceptRequest(cntl.get())) {break;}if (span) {span->ResetServerSpanName(method->full_name());}const int req_size = static_cast<int>(msg->payload.size());butil::IOBuf req_buf;butil::IOBuf* req_buf_ptr = &msg->payload;if (meta.has_attachment_size()) {if (req_size < meta.attachment_size()) {cntl->SetFailed(EREQUEST,"attachment_size=%d is larger than request_size=%d",meta.attachment_size(), req_size);break;}int body_without_attachment_size = req_size - meta.attachment_size();msg->payload.cutn(&req_buf, body_without_attachment_size);req_buf_ptr = &req_buf;cntl->request_attachment().swap(msg->payload);}CompressType req_cmp_type = (CompressType)meta.compress_type();req.reset(svc->GetRequestPrototype(method).New());if (!ParseFromCompressedData(*req_buf_ptr, req.get(), req_cmp_type)) {cntl->SetFailed(EREQUEST, "Fail to parse request message, ""CompressType=%s, request_size=%d", CompressTypeToCStr(req_cmp_type), req_size);break;}res.reset(svc->GetResponsePrototype(method).New());// `socket' will be held until response has been sentgoogle::protobuf::Closure* done = ::brpc::NewCallback<int64_t, Controller*, const google::protobuf::Message*,const google::protobuf::Message*, const Server*,MethodStatus*, int64_t>(&SendRpcResponse, meta.correlation_id(), cntl.get(), req.get(), res.get(), server,method_status, msg->received_us());// optional, just release resource ASAPmsg.reset();req_buf.clear();if (span) {span->set_start_callback_us(butil::cpuwide_time_us());span->AsParent();}if (!FLAGS_usercode_in_pthread) {return svc->CallMethod(method, cntl.release(), req.release(), res.release(), done);}if (BeginRunningUserCode()) {svc->CallMethod(method, cntl.release(), req.release(), res.release(), done);return EndRunningUserCodeInPlace();} else {return EndRunningCallMethodInPool(svc, method, cntl.release(),req.release(), res.release(), done);}} while (false);// `cntl', `req' and `res' will be deleted inside `SendRpcResponse'// `socket' will be held until response has been sentSendRpcResponse(meta.correlation_id(), cntl.release(), req.release(), res.release(), server,method_status, msg->received_us());
}
对于开启了usercode_in_pthread
,如果计数超过限制,会放到UserCodeBackupPool
线程池中来执行
int UserCodeBackupPool::Init() {// Like bthread workers, these threads never quit (to avoid potential hang// during termination of program).for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) {pthread_t th;if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) {LOG(ERROR) << "Fail to create UserCodeRunner";return -1;}}return 0;
}static void* UserCodeRunner(void* args) {butil::PlatformThread::SetName("brpc_user_code_runner");static_cast<UserCodeBackupPool*>(args)->UserCodeRunningLoop();return NULL;
}// Entry of backup thread for running user code.
void UserCodeBackupPool::UserCodeRunningLoop() {bthread::run_worker_startfn();
#ifdef BAIDU_INTERNALlogging::ComlogInitializer comlog_initializer;
#endifint64_t last_time = butil::cpuwide_time_us();while (true) {bool blocked = false;UserCode usercode = { NULL, NULL };{BAIDU_SCOPED_LOCK(s_usercode_mutex);while (queue.empty()) {pthread_cond_wait(&s_usercode_cond, &s_usercode_mutex);blocked = true;}usercode = queue.front();queue.pop_front();if (g_too_many_usercode &&(int)queue.size() <= FLAGS_usercode_backup_threads) {g_too_many_usercode = false;}}const int64_t begin_time = (blocked ? butil::cpuwide_time_us() : last_time);usercode.fn(usercode.arg);const int64_t end_time = butil::cpuwide_time_us();inpool_count << 1;inpool_elapse_us << (end_time - begin_time);last_time = end_time;}
}
放入队列逻辑为
void EndRunningCallMethodInPool(::google::protobuf::Service* service,const ::google::protobuf::MethodDescriptor* method,::google::protobuf::RpcController* controller,const ::google::protobuf::Message* request,::google::protobuf::Message* response,::google::protobuf::Closure* done) {CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs;args->service = service;args->method = method;args->controller = controller;args->request = request;args->response = response;args->done = done;return EndRunningUserCodeInPool(CallMethodInBackupThread, args);
};void EndRunningUserCodeInPool(void (*fn)(void*), void* arg) {InitUserCodeBackupPoolOnceOrDie();g_usercode_inplace.fetch_sub(1, butil::memory_order_relaxed);// Not enough idle workers, run the code in backup threads to prevent// all workers from being blocked and no responses will be processed// anymore (deadlocked).const UserCode usercode = { fn, arg };pthread_mutex_lock(&s_usercode_mutex);s_usercode_pool->queue.push_back(usercode);// If the queue has too many items, we can't drop the user code// directly which often must be run, for example: client-side done.// The solution is that we set a mark which is not cleared before// queue becomes short again. RPC code checks the mark before// submitting tasks that may generate more user code.if ((int)s_usercode_pool->queue.size() >=(FLAGS_usercode_backup_threads *FLAGS_max_pending_in_each_backup_thread)) {g_too_many_usercode = true;}pthread_mutex_unlock(&s_usercode_mutex);pthread_cond_signal(&s_usercode_cond);
}
闭包
在处理协议时,最终提供给Server的闭包为
google::protobuf::Closure* done = ::brpc::NewCallback<int64_t, Controller*, const google::protobuf::Message*,const google::protobuf::Message*, const Server*,MethodStatus*, int64_t>(&SendRpcResponse, meta.correlation_id(), cntl.get(), req.get(), res.get(), server,method_status, msg->received_us());template <typename Arg1, typename Arg2, typename Arg3, typename Arg4, typename Arg5, typename Arg6, typename Arg7>
inline ::google::protobuf::Closure* NewCallback(void (*function)(Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7),Arg1 arg1, Arg2 arg2, Arg3 arg3, Arg4 arg4, Arg5 arg5, Arg6 arg6, Arg7 arg7) {return new internal::FunctionClosure7<Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7>(function, true, arg1, arg2, arg3, arg4, arg5, arg6, arg7);
}
FunctionClosure7
闭包类关系为
function_
:为发送响应函数SendRpcResponse
arg1_
:为meta.correlation_id()
arg2_
:为cntl.get()
,类型为Controller*
arg3_
:为req.get()
,类型为const google::protobuf::Message*
,表示请求消息
arg4_
:为res.get()
,类型为const google::protobuf::Message*
,表示用于加响应的消息
arg5_
:为server
, 类型为const Server*
,表示具体的服务业务
arg6_
:为method_status
,类型为MethodStatus*
,表示方法状态
arg7_
:为msg->received_us()
,类型为int64_t
,表示消息的接收时间
相关文章:
brpc之baidu_protocol
简介 是brpc默认使用的协议 初始化 Protocol baidu_protocol { ParseRpcMessage,SerializeRequestDefault, PackRpcRequest,ProcessRpcRequest, ProcessRpcResponse,VerifyRpcRequest, NULL, NULL,CONNECTION_TYPE_ALL, "baidu_std" };协议定义 定义在baidu_rpc…...
LeetCode:39. 组合总和
跟着carl学算法,本系列博客仅做个人记录,建议大家都去看carl本人的博客,写的真的很好的! 代码随想录 LeetCode:39. 组合总和 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target ,找出 cand…...

SOLID原则学习,开闭原则(Open Closed Principle, OCP)
文章目录 1. 定义2. 开闭原则的详细解释3. 实现开闭原则的方法4. 总结 1. 定义 开闭原则(Open-Closed Principle,OCP)是面向对象设计中的五大原则(SOLID)之一,由Bertrand Meyer提出。开闭原则的核心思想是…...

Unreal Engine 5 C++ Advanced Action RPG 七章笔记
第七章 Ranged Enemy 2-Ranged Enemy Starting Weapon 制作新敌人的流程准备 新敌人的武器起始的状态数据自己的战斗能力投射能力自己的行为树 创建角色,添加武器,添加数据,就是继承之前的基类敌人的 运行结果 3-Glacer Starting Stats 看看就行,就是复制曲线表格更改数…...

自动连接校园网wifi脚本实践(自动网页认证)
目录 起因执行步骤分析校园网登录逻辑如何判断当前是否处于未登录状态? 书写代码打包设置开机自动启动 起因 我们一般通过远程控制的方式访问实验室电脑,但是最近实验室老是断电,但重启后也不会自动连接校园网账户认证,远程工具&…...

HTTP/HTTPS ⑤-CA证书 || 中间人攻击 || SSL/TLS
这里是Themberfue ✨上节课我们聊到了对称加密和非对称加密,实际上,单纯地非对称加密并不能保证数据不被窃取,我们还需要一个更加重要的东西——证书 中间人攻击 通过非对称加密生成私钥priKey和公钥pubKey用来加密对称加密生成的密钥&…...

traceroute原理探究
文章中有截图,看不清的话,可以把浏览器显示比例放大到200%后观看。 linux下traceroute的原理 本文通过抓包观察一下linux下traceroute的原理 环境:一台嵌入式linux设备,内网ip是192.168.186.195,其上有192.168.202.…...
50_Lua垃圾回收
1.Lua垃圾回收机制概述 Lua采用了一种自动内存管理机制,称为垃圾回收(Garbage Collection, GC)。垃圾回收的主要目的是回收程序中不再被使用的内存,从而避免内存泄漏。Lua的垃圾回收器负责回收动态分配的对象,如函数、用户数据、表、字符串、线程、内部结构等。Lua的垃圾…...

Git-2-:Cherry-Pick 的使用场景及使用流程
前面我们说了 Git合并、解决冲突、强行回退等解决方案 >> 点击查看 这里再说一下 Cherry-Pick功能,Cherry-Pick不是merge,只是把部分功能代码Cherry-Pick到远程的目标分支 git cherry-pick功能简介: git cherry-pick 是用来从一个分…...

【C++】21.map和set的使用
文章目录 1. 序列式容器和关联式容器2. set系列的使用2.1 set和multiset参考文档2.2 set类的介绍2.3 set的构造和迭代器构造函数:双向迭代器迭代器: 2.4 set的增删查2.5 insert和迭代器遍历使用样例:2.6 find和erase使用样例:2.7 …...

burpsiute的基础使用(2)
爆破模块(intruder): csrf请求伪造访问(模拟攻击): 方法一: 通过burp将修改,删除等行为的数据包压缩成一个可访问链接,通过本地浏览器访问(该浏览器用户处于登陆状态&a…...
ElasticSearch 同义词匹配
synonym.txt 电脑, 计算机, 主机 复印纸, 打印纸, A4纸, 纸, A3 平板电脑, Pad DELETE /es_sku_index_20_20250109 PUT /es_sku_index_20_20250109 {"settings": {"index": {"number_of_shards": "5","number_of_replicas&quo…...

linux RT-Preempt spin lock实现
一、spin_lock概述 Spinlock是linux内核中常用的一种互斥锁机制,和mutex不同,当无法持锁进入临界区的时候,当前执行线索不会阻塞,而是不断的自旋等待该锁释放。正因为如此,自旋锁也是可以用在中断上下文的。也正是因为…...
PySpark广播表连接解决数据倾斜的完整案例
使用PySpark解决数据倾斜问题的完整案例,通过广播表连接的方式来优化性能。 准备数据 假设我们有两张表,一张大表 big_table 和一张小表 small_table ,小表将作为广播表。 from pyspark.sql import SparkSession# 初始化SparkSession spar…...

Chromium CDP 开发(十二):为自己的Domain建立custom_config.json
引言 本章详细介绍了如何为自定义的 CDP Domain 创建 custom_config.json 文件,并通过修改 BUILD.gn 文件来确保自定义的配置文件参与编译。我们通过 inspector_protocol_generate 配置段自动生成自定义 Domain 的头文件和实现文件,并成功将其集成到构建…...

【Vue】全局/局部组件使用流程(Vue2为例)
全局组件和局部组件区别 如何使用 全局组件:全局注册后,可以在任意页面中直接使用。局部组件:在页面中需要先导入子组件路径,注册组件才能使用。 适用场景 全局组件:适用于高频使用的组件,如导航栏、业…...
Vue.js组件开发详解
在现代前端开发中,Vue.js 凭借其简洁、高效、灵活的特性,成为了众多开发者的首选框架之一,而组件化开发则是 Vue.js 的核心优势。组件可以将复杂的 UI 界面拆分成一个个独立的、可复用的小块,极大地提高了开发效率和代码的可维护性…...

解决:ubuntu22.04中IsaacGymEnv保存视频报错的问题
1. IsaacGymEnvs项目介绍 IsaacGymEnvs:基于NVIDIA Isaac Gym的高效机器人训练环境 IsaacGymEnvs 是一个基于 NVIDIA Isaac Gym 的开源 Python 环境库,专为机器人训练提供高效的仿真环境。Isaac Gym 是由 NVIDIA 开发的一个高性能物理仿真引擎…...
深度学习camp-第J7周:对于ResNeXt-50算法的思考
🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 📌你需要解决的疑问:这个代码是否有错?对错与否都请给出你的思考 📌打卡要求:请查找相关资料、逐步…...
java: 错误: 无效的源发行版:17解决办法
遇到“java: 错误: 无效的源发行版:17”的问题,通常是因为项目设置中指定的Java版本与当前环境不一致导致的。以下是几种可能的解决方案: 检查并升级Java版本:确保你已经安装了支持Java 17的JDK版本。你可以通过命令行输入java -v…...

Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...

测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...

Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...

抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...

【单片机期末】单片机系统设计
主要内容:系统状态机,系统时基,系统需求分析,系统构建,系统状态流图 一、题目要求 二、绘制系统状态流图 题目:根据上述描述绘制系统状态流图,注明状态转移条件及方向。 三、利用定时器产生时…...