当前位置: 首页 > article >正文

mongodb源码分析session异步接受asyncSourceMessage()客户端流变Message对象

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制,ASIOSession和connection是循环接受客户端命令,状态转变流程是:State::Created 》 State::Source 》State::SourceWait 》 State::Process 》 State::SinkWait  》 State::Source 》State::SourceWait

State::Created,     //session刚刚创建,但是还没有接受任何命令
State::Source,      //去接受客户端新的命令
State::SourceWait,  // 等待客户端新的命令
State::Process,     // 将接受的命令发送给mongodb数据库
State:: SinkWait,    // 等待将命令的执行结果返回给客户端

session异步接受asyncSourceMessage()客户端流变Message对象代码调用链如下

  1. mongo/transport/service_state_machine.cpp的_sourceMessage方法,返回viod
  2. mongo/transport/session_asio.h的asyncSourceMessage方法,返回Future<Message>
  3. mongo/transport/session_asio.h的sourceMessageImpl方法,返回Future<Message>
  4. mongo/transport/session_asio.h的read方法,返回Future<void>
  5. mongo/transport/session_asio.h的opportunisticRead方法,返回Future<void>

mongo/transport/service_state_machine.cpp的方法_sourceMessage主要状态State::Source变State::SourceWait,TransportLayerASIO模式包含两种线程模型:adaptive(动态线程模型)和synchronous(同步线程模型)。adaptive模式线程设计采用动态线程方式,线程数和 mongodb压力直接相关。

同步线程模型调用_session()->sourceMessage()获取消息。

动态线程模型调用_session()->asyncSourceMessage()异步获取消息,后面重点分析动态线程异步获取消息逻辑。

void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {invariant(_inMessage.empty());invariant(_state.load() == State::Source);LOG(1) << "conca _sourceMessage State::Source";_state.store(State::SourceWait);LOG(1) << "conca _sourceMessage store State::SourceWait";guard.release();auto sourceMsgImpl = [&] {if (_transportMode == transport::Mode::kSynchronous) {MONGO_IDLE_THREAD_BLOCK;return Future<Message>::makeReady(_session()->sourceMessage());} else {invariant(_transportMode == transport::Mode::kAsynchronous);return _session()->asyncSourceMessage();}};sourceMsgImpl().getAsync([this](StatusWith<Message> msg) {if (msg.isOK()) {_inMessage = std::move(msg.getValue());invariant(!_inMessage.empty());}_sourceCallback(msg.getStatus());});
}

一般来说,每条消息均包含一个标准消息头,并后跟特定于请求的数据。标准消息头的结构如下

struct MsgHeader {int32   messageLength; // total message size, including thisint32   requestID;     // identifier for this messageint32   responseTo;    // requestID from the original request//   (used in responses from the database)int32   opCode;        // message type
}

messageLength

消息的总大小(以字节为单位)。该总数包括保存消息长度的 4 个字节。

requestID

客户端或数据库生成的标识符,可用于唯一标识此消息。

responseTo

从客户端消息中获取的 requestID

opCode

消息类型。 有关详细信息,请参阅操作码。

Mongodb协议由msg header + msg body组成,一个完整的mongodb报文内容格式如下:

后面重点研究_session()->asyncSourceMessage()代码,_session()获取当前_session,对应的实现代码是mongo/transport/session_asio.h,mongo/transport/session_asio.h的asyncSourceMessage方法如下:

  Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) override {ensureAsync();return sourceMessageImpl(baton);}

mongo/transport/session_asio.h的sourceMessageImpl方法如下:

Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr) {static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value);std::cout << "conca sourceMessageImpl" << std::endl;auto headerBuffer = SharedBuffer::allocate(kHeaderSize);auto ptr = headerBuffer.get();return read(asio::buffer(ptr, kHeaderSize), baton).then([headerBuffer = std::move(headerBuffer), this, baton]() mutable {if (checkForHTTPRequest(asio::buffer(headerBuffer.get(), kHeaderSize))) {return sendHTTPResponse(baton);}const auto msgLen = size_t(MSGHEADER::View(headerBuffer.get()).getMessageLength());if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {StringBuilder sb;sb << "recv(): message msgLen " << msgLen << " is invalid. "<< "Min " << kHeaderSize << " Max: " << MaxMessageSizeBytes;const auto str = sb.str();LOG(0) << str;return Future<Message>::makeReady(Status(ErrorCodes::ProtocolError, str));}LOG(1) << "msgLen:" << msgLen << " kHeaderSize " << kHeaderSize;if (msgLen == kHeaderSize) {// This probably isn't a real case since all (current) messages have bodies.if (_isIngressSession) {networkCounter.hitPhysicalIn(msgLen);}return Future<Message>::makeReady(Message(std::move(headerBuffer)));}auto buffer = SharedBuffer::allocate(msgLen);memcpy(buffer.get(), headerBuffer.get(), kHeaderSize);LOG(1) << " buffer.get() " << buffer.get();MsgData::View msgView(buffer.get());return read(asio::buffer(msgView.data(), msgView.dataLen()), baton).then([this, buffer = std::move(buffer), msgLen]() mutable {if (_isIngressSession) {networkCounter.hitPhysicalIn(msgLen);}return Message(std::move(buffer));});});}

mongo/transport/session_asio.h的sourceMessageImpl代码异步获取消息,先读取kHeaderSize长度数据,再读取Body具体信息。

 read(asio::buffer(ptr, kHeaderSize), baton)读取mongodb头部header数据,解析出header中的messageLength字段。

 if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes)检查messageLength字段是否在指定的合理范围,该字段不能小于Header整个头部大小,也不能超过MaxMessageSizeBytes最大长度。

 if (msgLen == kHeaderSize)如果只有头部信息直接返回

Header len检查通过,说明读取header数据完成,read继续读取body信息。

最后将上面步骤读取的buffer封装成Message对象,返回给上级Message,后面再根据message具体调用MongoDB数据库。

mongo/transport/session_asio.h的read方法如下:

 Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSLif (_sslSocket) {std::cout << "conca read _sslSocket" << std::endl;return opportunisticRead(*_sslSocket, buffers, baton);} else if (!_ranHandshake) {invariant(asio::buffer_size(buffers) >= sizeof(MSGHEADER::Value));std::cout << "conca read !_ranHandshake" << std::endl;return opportunisticRead(_socket, buffers, baton).then([this, buffers]() mutable {_ranHandshake = true;return maybeHandshakeSSLForIngress(buffers);}).then([this, buffers, baton](bool needsRead) mutable {if (needsRead) {return read(buffers, baton);} else {return Future<void>::makeReady();}});}
#endifreturn opportunisticRead(_socket, buffers, baton);}

mongo/transport/session_asio.h的opportunisticRead方法代码,来自 MongoDB 的网络层,是一个使用 Asio 库实现的异步读取函数。它的主要功能是尝试从流中读取数据到缓冲区。

    Future<void> opportunisticRead(Stream& stream,const MutableBufferSequence& buffers,const BatonHandle& baton = nullptr) {std::error_code ec;size_t size;if (MONGO_unlikely(transportLayerASIOshortOpportunisticReadWrite.shouldFail()) &&_blockingMode == Async) {asio::mutable_buffer localBuffer = buffers;std::cout << "conca opportunisticRead asio::read 11" << std::endl;if (buffers.size()) {localBuffer = asio::mutable_buffer(buffers.data(), 1);}size = asio::read(stream, localBuffer, ec);if (!ec && buffers.size() > 1) {ec = asio::error::would_block;}} else {std::cout << "conca opportunisticRead asio::read" << std::endl;size = asio::read(stream, buffers, ec);std::cout << "conca opportunisticRead asio::read size is " << size<< std::endl;}if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) &&(_blockingMode == Async)) {// asio::read is a loop internally, so some of buffers may have been read into already.// So we need to adjust the buffers passed into async_read to be offset by size, if// size is > 0.MutableBufferSequence asyncBuffers(buffers);if (size > 0) {asyncBuffers += size;}std::cout << "conca opportunisticRead asyncBuffers" << std::endl;if (baton && baton->networking()) {return baton->networking()->addSession(*this, NetworkingBaton::Type::In).then([&stream, asyncBuffers, baton, this] {return opportunisticRead(stream, asyncBuffers, baton);});}return asio::async_read(stream, asyncBuffers, UseFuture{}).ignoreValue();} else {return futurize(ec);}}

相关文章:

mongodb源码分析session异步接受asyncSourceMessage()客户端流变Message对象

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程&#xff0c;并且验证connection是否超过限制&#xff0c;ASIOSession和connection是循环接受客户端命令&#xff0c;状态转变流程是&#xff1a;State::Created 》 State::Source 》State::…...

【数据分析】什么是鲁棒性?

引言 —— 为什么我们需要“抗折腾”的系统&#xff1f; 当你乘坐的飞机穿越雷暴区时机体剧烈颠簸&#xff0c;自动驾驶汽车在暴雨中稳稳避开障碍物&#xff0c;或是手机从口袋摔落后依然流畅运行——这些场景背后&#xff0c;都藏着一个工程领域的“隐形守护者”&#xff1a;…...

适老化场景重构:现代家政老年照护虚拟仿真实训室建设方案​

随着老龄化社会的深度发展&#xff0c;老年照护服务的专业化需求对人才培养提出了更高要求。 凯禾瑞华以现代家政管理理念为核心&#xff0c;推出老年照护虚拟仿真实训室建设方案&#xff0c;通过虚拟仿真技术重构适老化生活场景&#xff0c;融合数字课程、产教融合及搭载智能…...

Qt/C++学习系列之QGroupBox控件的简单使用

Qt/C学习系列之QGroupBox控件的简单使用 前言样式使用代码层面初始化控件事件过滤器点击事件处理 总结 前言 最近在练手一个项目&#xff0c;项目中有不同功能的划分&#xff0c;为了功能分区一目了然&#xff0c;我使用到QGroupBox控件&#xff0c;也是在界面排版布局中最常用…...

Ubuntu设置之初始化

安装SSH服务 # 安装 OpenSSH Server sudo apt update sudo apt install -y openssh-server# 检查 SSH 服务状态 sudo systemctl status ssh # Active: active (running) since Sat 2025-05-31 17:13:07 CST; 6s ago# 重启服务 sudo systemctl restart ssh自定义分辨率 新…...

如何轻松地将数据从 iPhone传输到iPhone 16

对升级到 iPhone 16 感到兴奋吗&#xff1f;恭喜&#xff01;然而&#xff0c;除了兴奋之外&#xff0c;学习如何将数据从 iPhone 传输到 iPhone 16 也很重要。毕竟&#xff0c;那些重要的联系人、笔记等都是不可或缺的。为了实现轻松的iPhone 到 iPhone 传输&#xff0c;我们总…...

开源供应链攻击持续发酵,多个软件包仓库惊现恶意组件

近期在npm、Python和Ruby软件包仓库中相继发现多组恶意组件&#xff0c;这些组件能够清空加密货币钱包资金、安装后删除整个代码库并窃取Telegram API令牌&#xff0c;再次印证了开源生态系统中潜伏的多样化供应链威胁。 多平台恶意组件集中曝光 Checkmarx、ReversingLabs、S…...

Docker Compose 备忘

1。docker-compose.yml services:air-web:build: .ports:- "1027:1027"volumes:- .:/codedepends_on:- air-redisair-redis:image: "redis:alpine" 2. DockerfileFROM python:3.12-slim-bookworm #设置工作目录 WORKDIR /code #将当前目录内容拷贝到容器…...

量子计算+AI:特征选择与神经网络优化创新应用

在由玻色量子协办的第二届APMCM“五岳杯”量子计算挑战赛中&#xff0c;来自北京理工大学的Q-Masterminds团队摘取了银奖。该团队由北京理工大学张玉利教授指导&#xff0c;依托玻色量子550计算量子比特的相干光量子计算机&#xff0c;将量子计算技术集成到特征选择和神经网络剪…...

算法分析与设计-动态规划、贪心算法

目录 第三章——动态规划 第四章——贪心算法 第三章——动态规划 /*【问题描述】 使用动态规划算法解矩阵连乘问题&#xff0c;具体来说就是&#xff0c;依据其递归式自底向上的方式进行计算&#xff0c;在计算过程中&#xff0c;保存子问题答案&#xff0c;每个子问题只解…...

光伏功率预测新突破:TCN-ECANet-GRU混合模型详解与复现

研究背景 ​背景与挑战​ 光伏发电受天气非线性影响,传统方法(统计模型、机器学习)难以处理高维时序数据,预测误差大。​创新模型提出​ 融合时序卷积网络(TCN)、高效通道注意力(ECANet)和门控循环单元(GRU)的混合架构。​方法论细节​ TCN:膨胀因果卷积提取长时序特…...

React组件基础

组件是什么&#xff1f; 概念&#xff1a;一个组件就是用户界面的一部分&#xff0c;它可以有自己的逻辑和外观&#xff0c;组件之间可以相互嵌套&#xff0c;也可以多次复用 组件化开发&#xff0c;可以让开发者像搭积木一样构建一个完整庞大的应用 react组件 在react中&a…...

2025年5月24日系统架构设计师考试题目回顾

当前仅仅是个人用于记录&#xff0c;还未做详细分析&#xff0c;待更新… 综合知识 设 x,y 满足约束条件&#xff1a;x-1>0, x-y<0, x-y-x<0, 则 y/x 的最大值是()。 A. 3 B. 2 C. 4 D. 1 申请软件著作权登记时应当向中国版本保护中心提交软件的鉴别材料&#xff…...

ABP 框架集成 EasyAbp.Abp.GraphQL 构建高性能 GraphQL API

&#x1f680; ABP 框架集成 EasyAbp.Abp.GraphQL 构建高性能 GraphQL API &#x1f4da; 目录 &#x1f680; ABP 框架集成 EasyAbp.Abp.GraphQL 构建高性能 GraphQL API&#x1f9ed; 背景与目标&#x1f6e0; 安装与依赖&#x1f4e6; 模块注册与启动MyProjectHttpApiHostMo…...

C# 用户控件(User Control)详解:创建、使用与最佳实践

在C#应用程序开发中&#xff0c;用户控件&#xff08;User Control&#xff09;是一种强大的工具&#xff0c;它允许开发者将多个标准控件组合成一个可复用的自定义组件。无论是Windows Forms还是WPF&#xff0c;用户控件都能显著提高UI开发的效率&#xff0c;减少重复代码&…...

OpenWrt 搭建 samba 服务器的方法并解决 Windows 不允许访问匿名服务器(0x80004005的错误)的方法

文章目录 一、安装所需要的软件二、配置自动挂载三、配置 Samba 服务器四、配置 Samba 访问用户和密码&#xff08;可选&#xff09;新建 Samba 专门的用户添加无密码的 Samba 账户使用root账户 五、解决 Windows 无法匿名访问Samba方案一 配置无密码的Samba账户并启用匿名访问…...

【 Redis | 完结篇 缓存优化 】

前言&#xff1a;本节包含常见redis缓存问题&#xff0c;包含缓存一致性问题&#xff0c;缓存雪崩&#xff0c;缓存穿透&#xff0c;缓存击穿问题及其解决方案 1. 缓存一致性 我们先看下目前企业用的最多的缓存模型。缓存的通用模型有三种&#xff1a; 缓存模型解释Cache Asi…...

AI数据集构建:从爬虫到标注的全流程指南

AI数据集构建&#xff1a;从爬虫到标注的全流程指南 系统化学习人工智能网站&#xff08;收藏&#xff09;&#xff1a;https://www.captainbed.cn/flu 文章目录 AI数据集构建&#xff1a;从爬虫到标注的全流程指南摘要引言流程图&#xff1a;数据集构建全生命周期一、数据采…...

Android 颜色百分比对照

本文就是简单写个demo,打印下颜色百分比的数值.方便以后使用. 1: 获取透明色 具体的代码如下: /*** 获取透明色* param percent* param red* param green* param blue* return*/public static int getTransparentColor(int percent, int red, int green, int blue) {int alp…...

AI破局:饿了么如何搅动即时零售江湖

最近&#xff0c;即时零售赛道打的火热&#xff0c;对我们的生活也产生了不少的影响。 美女同事小张就没少吐槽“他们咋样了我不知道&#xff0c;奶茶那么便宜&#xff0c;胖了五六斤不说&#xff0c;钱包也空了&#xff0c;在淘宝买奶茶的时候&#xff0c;换了个手机还买了不少…...

04 APP 自动化- Appium toast 元素定位列表滑动

文章目录 一、toast 元素的定位二、滑屏操作 一、toast 元素的定位 toast 元素就是简易的消息提示框&#xff0c;toast 显示窗口显示的时间有限&#xff0c;一般3秒左右 # -*- codingutf-8 -*- from time import sleep from appium import webdriver from appium.options.an…...

判断它是否引用了外部库

在一个 C# 项目中&#xff0c;要系统性地判断它是否引用了外部库&#xff08;包括 NuGet 包、引用的 DLL、项目间依赖等&#xff09;&#xff0c;你应从以下几个关键维度入手进行检查和分析&#xff1a; 1. 检查 .csproj 项目文件 C# 项目使用 .csproj 文件&#xff08;MSBuil…...

物流项目第十期(轨迹微服务)

本项目专栏&#xff1a; 物流项目_Auc23的博客-CSDN博客 建议先看这期&#xff1a; MongoDB入门之Java的使用-CSDN博客 物流项目第九期&#xff08;MongoDB的应用之作业范围&#xff09;-CSDN博客 业务需求 快递员取件成功后&#xff0c;需要将订单转成运单&#xff0c;用…...

Python 入门到进阶全指南:从语言特性到实战项目

一、Python 简介 Python 是一种高级、跨平台、解释型编程语言&#xff0c;以简洁语法和高可读性著称&#xff0c;既适合编程初学者快速入门&#xff0c;也能满足资深开发者的复杂需求。其核心特性与应用场景如下&#xff1a; 核心特性解析 解释型语言&#xff1a;无需编译即可…...

【数据库】关系数据理论--规范化

1.问题的提出 关系模式由五部分组成&#xff0c;是一个五元组&#xff1a; R(U, D, DOM, F) &#xff08;1&#xff09;关系名R是符号化的元组语义 &#xff08;2&#xff09;U为一组属性 &#xff08;3&#xff09;D为属性组U中的属性所来自的域 &#xff08;4&#xff09;DOM…...

SQL 中 JOIN 的执行顺序优化指南

SQL 中 JOIN 的执行顺序优化指南 一、JOIN 执行顺序基础原理 在 SQL 查询中,JOIN的执行顺序是查询优化的重要环节。数据库引擎会根据多种因素决定最优的 JOIN 顺序: 逻辑执行顺序:SQL 语句的书写顺序(如 FROM → WHERE → GROUP BY)并不代表实际执行顺序物理执行顺序:由查…...

Oracle双平面适用场景讨论会议

4月28日&#xff0c;我在杭州组织召开了Oracle双平面会议讨论沙龙。在国产化数据库浪潮的今天&#xff0c;Oracle数据库作为国产数据库的应急库&#xff0c;在国产数据库发生故障或者性能下降时&#xff0c;如何更好的使用Oracle。会议主题如下&#xff1a; 1、背景与痛点速览&…...

OD 算法题 B卷【矩阵稀疏扫描】

文章目录 矩阵稀疏扫描 矩阵稀疏扫描 如果矩阵中的很多系数都为零&#xff0c;则为稀疏矩阵&#xff0c;给定一个矩阵&#xff0c;如果某行、列存在0的个数超出&#xff08;包含&#xff09;了行宽、列宽的一半&#xff08;整除&#xff09;&#xff0c;则认为该行、列为稀疏的…...

使用BERT/BiLSTM + CRF 模型进行NER进展记录~

使用代码处理数据集&#xff0c;发现了一些问题&#xff0c;以及解决办法~ 下载了一组数据集&#xff0c;数据存放在CSV中&#xff0c;GBK格式。如下&#xff1a; 首先对每一列直接进行NER抽取&#xff0c;结果非常不好&#xff1a; 几乎是乱抽取的&#xff0c;解决办法是自己创…...

HarmonyOS运动开发:精准估算室内运动的距离、速度与步幅

##鸿蒙核心技术##运动开发##Sensor Service Kit&#xff08;传感器服务&#xff09;# 前言 在室内运动场景中&#xff0c;由于缺乏 GPS 信号&#xff0c;传统的基于卫星定位的运动数据追踪方法无法使用。因此&#xff0c;如何准确估算室内运动的距离、速度和步幅&#xff0c;…...