mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 State::Source 》State::SourceWait 》 State::Process 》 State::SinkWait 》 State::Source 》State::SourceWait。
State::Process将命令发送给mongodb数据库,数据库执行命令find等。
mongodb源码分析session执行handleRequest命令find过程代码调用链如下:
- mongo/transport/service_state_machine.cpp中的_processMessage
- mongo/db/service_entry_point_mongod.cpp中的handleRequest
- mongo/db/service_entry_point_common.cpp中的handleRequest
- mongo/db/service_entry_point_common.cpp中的receivedCommands
- mongo/db/ commands.cpp中的findCommand,返回FindCommand对象
- mongo/db/service_entry_point_common.cpp中的execCommandDatabase
- mongo/db/service_entry_point_common.cpp中的runCommandImpl
- mongo/db/service_entry_point_common.cpp中的invocation->run
- mongo/db/commands/find_cmd.cpp中的run
mongo/transport/service_state_machine.cpp的_processMessage代码
void ServiceStateMachine::_processMessage(ThreadGuard guard) {invariant(!_inMessage.empty());LOG(1) << "conca _processMessage";TrafficRecorder::get(_serviceContext).observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), _inMessage);auto& compressorMgr = MessageCompressorManager::forSession(_session());_compressorId = boost::none;if (_inMessage.operation() == dbCompressed) {MessageCompressorId compressorId;auto swm = compressorMgr.decompressMessage(_inMessage, &compressorId);uassertStatusOK(swm.getStatus());_inMessage = swm.getValue();_compressorId = compressorId;}networkCounter.hitLogicalIn(_inMessage.size());// Pass sourced Message to handler to generate response.auto opCtx = Client::getCurrent()->makeOperationContext();// The handleRequest is implemented in a subclass for mongod/mongos and actually all the// database work for this request.DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);// opCtx must be destroyed here so that the operation cannot show// up in currentOp results after the response reaches the clientopCtx.reset();// Format our response, if we have oneMessage& toSink = dbresponse.response;if (!toSink.empty()) {invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome));invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent));// Update the header for the response message.toSink.header().setId(nextMessageId());toSink.header().setResponseToMsgId(_inMessage.header().getId());if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) {
#ifdef MONGO_CONFIG_SSLif (!SSLPeerInfo::forSession(_session()).isTLS) {OpMsg::appendChecksum(&toSink);}
#elseOpMsg::appendChecksum(&toSink);
#endif}// If the incoming message has the exhaust flag set and is a 'getMore' command, then we// bypass the normal RPC behavior. We will sink the response to the network, but we also// synthesize a new 'getMore' request, as if we sourced a new message from the network. This// new request is sent to the database once again to be processed. This cycle repeats as// long as the associated cursor is not exhausted. Once it is exhausted, we will send a// final response, terminating the exhaust stream._inMessage = makeExhaustMessage(_inMessage, &dbresponse);_inExhaust = !_inMessage.empty();networkCounter.hitLogicalOut(toSink.size());if (_compressorId) {auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value());uassertStatusOK(swm.getStatus());toSink = swm.getValue();}TrafficRecorder::get(_serviceContext).observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink);_sinkMessage(std::move(guard), std::move(toSink));LOG(1) << "conca _processMessage _sinkMessage";} else {_state.store(State::Source);_inMessage.reset();LOG(1) << "conca _processMessage store(State::Source)";return _scheduleNextWithGuard(std::move(guard),ServiceExecutor::kDeferredTask,transport::ServiceExecutorTaskName::kSSMSourceMessage);}
}
session接受到客户端发送的命令Message,把接受到的Message信息发送给mongodb数据库执行,DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);核心方法。_sep声明的变量是ServiceEntryPoint* _sep; ServiceEntryPoint实例化对象是mongod或mongos,mongod对应的是ServiceEntryPointMongod。
/mongo/db/service_entry_point_mongod.cpp的handleRequest方法代码:
DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) {return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{});
}
/mongo/db/service_entry_point_common.cpp中的handleRequest方法代码:
DbResponse ServiceEntryPointCommon::handleRequest(OperationContext* opCtx,const Message& m,const Hooks& behaviors) {// before we lock...NetworkOp op = m.operation();bool isCommand = false;DbMessage dbmsg(m);Client& c = *opCtx->getClient();if (c.isInDirectClient()) {if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber()) {invariant(!opCtx->inMultiDocumentTransaction() &&!opCtx->lockState()->inAWriteUnitOfWork());}} else {LastError::get(c).startRequest();AuthorizationSession::get(c)->startRequest(opCtx);// We should not be holding any locks at this pointinvariant(!opCtx->lockState()->isLocked());}const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : nullptr;const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();LOG(1) << "conca " << nsString;if (op == dbQuery) {if (nsString.isCommand()) {isCommand = true;}} else if (op == dbMsg) {isCommand = true;}LOG(1) << "conca isCommand is " << isCommand;CurOp& currentOp = *CurOp::get(opCtx);{stdx::lock_guard<Client> lk(*opCtx->getClient());// Commands handling code will reset this if the operation is a command// which is logically a basic CRUD operation like query, insert, etc.currentOp.setNetworkOp_inlock(op);currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op));}OpDebug& debug = currentOp.debug();boost::optional<long long> slowMsOverride;bool forceLog = false;DbResponse dbresponse;if (op == dbMsg || (op == dbQuery && isCommand)) {dbresponse = receivedCommands(opCtx, m, behaviors);} else if (op == dbQuery) {invariant(!isCommand);opCtx->markKillOnClientDisconnect();dbresponse = receivedQuery(opCtx, nsString, c, m, behaviors);} else if (op == dbGetMore) {dbresponse = receivedGetMore(opCtx, m, currentOp, &forceLog);} else {// The remaining operations do not return any response. They are fire-and-forget.try {if (op == dbKillCursors) {currentOp.ensureStarted();slowMsOverride = 10;receivedKillCursors(opCtx, m);} else if (op != dbInsert && op != dbUpdate && op != dbDelete) {log() << " operation isn't supported: " << static_cast<int>(op);currentOp.done();forceLog = true;} else {if (!opCtx->getClient()->isInDirectClient()) {uassert(18663,str::stream() << "legacy writeOps not longer supported for "<< "versioned connections, ns: " << nsString.ns()<< ", op: " << networkOpToString(op),!ShardedConnectionInfo::get(&c, false));}if (!nsString.isValid()) {uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false);} else if (op == dbInsert) {receivedInsert(opCtx, nsString, m);} else if (op == dbUpdate) {receivedUpdate(opCtx, nsString, m);} else if (op == dbDelete) {receivedDelete(opCtx, nsString, m);} else {MONGO_UNREACHABLE;}}} catch (const AssertionException& ue) {LastError::get(c).setLastError(ue.code(), ue.reason());LOG(3) << " Caught Assertion in " << networkOpToString(op) << ", continuing "<< redact(ue);debug.errInfo = ue.toStatus();}// A NotMaster error can be set either within receivedInsert/receivedUpdate/receivedDelete// or within the AssertionException handler above. Either way, we want to throw an// exception here, which will cause the client to be disconnected.if (LastError::get(opCtx->getClient()).hadNotMasterError()) {notMasterLegacyUnackWrites.increment();uasserted(ErrorCodes::NotMaster,str::stream()<< "Not-master error while processing '" << networkOpToString(op)<< "' operation on '" << nsString << "' namespace via legacy "<< "fire-and-forget command execution.");}}// Mark the op as complete, and log it if appropriate. Returns a boolean indicating whether// this op should be sampled for profiling.const bool shouldSample = currentOp.completeAndLogOperation(opCtx, MONGO_LOG_DEFAULT_COMPONENT, dbresponse.response.size(), slowMsOverride, forceLog);Top::get(opCtx->getServiceContext()).incrementGlobalLatencyStats(opCtx,durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()),currentOp.getReadWriteType());if (currentOp.shouldDBProfile(shouldSample)) {// Performance profiling is onif (opCtx->lockState()->isReadLocked()) {LOG(1) << "note: not profiling because recursive read lock";} else if (c.isInDirectClient()) {LOG(1) << "note: not profiling because we are in DBDirectClient";} else if (behaviors.lockedForWriting()) {// TODO SERVER-26825: Fix race condition where fsyncLock is acquired post// lockedForWriting() call but prior to profile collection lock acquisition.LOG(1) << "note: not profiling because doing fsync+lock";} else if (storageGlobalParams.readOnly) {LOG(1) << "note: not profiling because server is read-only";} else {invariant(!opCtx->lockState()->inAWriteUnitOfWork());profile(opCtx, op);}}recordCurOpMetrics(opCtx);return dbresponse;
}
find、count、update等命令执行方法receivedCommands(opCtx, m, behaviors),如果是游标读取更多数据,调用的是getMore命令,执行方法receivedGetMore(opCtx, m, currentOp, &forceLog)。 对每个命令统计调用的是recordCurOpMetrics(opCtx);
/mongo/db/service_entry_point_common.cpp中receivedCommands方法代码:
DbResponse receivedCommands(OperationContext* opCtx,const Message& message,const ServiceEntryPointCommon::Hooks& behaviors) {auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));OpMsgRequest request;[&] {try { // Parse.request = rpc::opMsgRequestFromAnyProtocol(message);} catch (const DBException& ex) {// If this error needs to fail the connection, propagate it out.if (ErrorCodes::isConnectionFatalMessageParseError(ex.code()))throw;BSONObjBuilder metadataBob;behaviors.appendReplyMetadataOnError(opCtx, &metadataBob);BSONObjBuilder extraFieldsBuilder;appendClusterAndOperationTime(opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized);// Otherwise, reply with the parse error. This is useful for cases where parsing fails// due to user-supplied input, such as the document too deep error. Since we failed// during parsing, we can't log anything about the command.LOG(1) << "assertion while parsing command: " << ex.toString();generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj());return; // From lambda. Don't try executing if parsing failed.}try { // Execute.curOpCommandSetup(opCtx, request);Command* c = nullptr;// In the absence of a Command object, no redaction is possible. Therefore// to avoid displaying potentially sensitive information in the logs,// we restrict the log message to the name of the unrecognized command.// However, the complete command object will still be echoed to the client.if (!(c = CommandHelpers::findCommand(request.getCommandName()))) {globalCommandRegistry()->incrementUnknownCommands();std::string msg = str::stream()<< "no such command: '" << request.getCommandName() << "'";LOG(2) << msg;uasserted(ErrorCodes::CommandNotFound, str::stream() << msg);}LOG(1) << "run command " << request.getDatabase() << ".$cmd" << ' '<< redact(ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body));{// Try to set this as early as possible, as soon as we have figured out the command.stdx::lock_guard<Client> lk(*opCtx->getClient());CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp());}execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors);} catch (const DBException& ex) {BSONObjBuilder metadataBob;behaviors.appendReplyMetadataOnError(opCtx, &metadataBob);BSONObjBuilder extraFieldsBuilder;appendClusterAndOperationTime(opCtx, &extraFieldsBuilder, &metadataBob, LogicalTime::kUninitialized);LOG(1) << "assertion while executing command '" << request.getCommandName() << "' "<< "on database '" << request.getDatabase() << "': " << ex.toString();generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), extraFieldsBuilder.obj());}}();if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) {// Close the connection to get client to go through server selection again.if (LastError::get(opCtx->getClient()).hadNotMasterError()) {notMasterUnackWrites.increment();uasserted(ErrorCodes::NotMaster,str::stream()<< "Not-master error while processing '" << request.getCommandName()<< "' operation on '" << request.getDatabase() << "' database via "<< "fire-and-forget command execution.");}return {}; // Don't reply.}DbResponse dbResponse;if (OpMsg::isFlagSet(message, OpMsg::kExhaustSupported)) {auto responseObj = replyBuilder->getBodyBuilder().asTempObj();auto cursorObj = responseObj.getObjectField("cursor");if (responseObj.getField("ok").trueValue() && !cursorObj.isEmpty()) {dbResponse.exhaustNS = cursorObj.getField("ns").String();dbResponse.exhaustCursorId = cursorObj.getField("id").numberLong();}}dbResponse.response = replyBuilder->done();CurOp::get(opCtx)->debug().responseLength = dbResponse.response.header().dataLen();return dbResponse;
}
mongo/db/commands.cpp中findCommand方法:根据命令名字(find、count、update)找到对应Command对象(FindCommand、CmdFindAndModify、CmdCount、WriteCommand)。MongoDB内核支持的command命令信息保存在一个全局map表_commands中,从命令请求bson中解析出command命令字符串后,就是从该全局map表查找,如果找到该命令则说明MongoDB支持该命令,找不到则说明不支持
Command* CommandHelpers::findCommand(StringData name) {return globalCommandRegistry()->findCommand(name);
}
CommandRegistry* globalCommandRegistry() {static auto reg = new CommandRegistry();return reg;
}CommandMap _commands;Command* CommandRegistry::findCommand(StringData name) const {auto it = _commands.find(name);if (it == _commands.end())return nullptr;return it->second;
}
execCommandDatabase函数的主要作用是在 MongoDB 中执行数据库命令。它会对命令进行解析、授权检查、权限验证、处理各种参数和错误情况,最后执行命令并处理可能出现的异常。auto invocation = command->parse(opCtx, request)解析命令请求,得到命令调用对象。调用 runCommandImpl
函数执行命令。
/mongo/db/service_entry_point_common.cpp中execCommandDatabase代码如下:
void execCommandDatabase(OperationContext* opCtx,Command* command,const OpMsgRequest& request,rpc::ReplyBuilderInterface* replyBuilder,const ServiceEntryPointCommon::Hooks& behaviors) {CommandHelpers::uassertShouldAttemptParse(opCtx, command, request);BSONObjBuilder extraFieldsBuilder;auto startOperationTime = getClientOperationTime(opCtx);auto invocation = command->parse(opCtx, request);OperationSessionInfoFromClient sessionOptions;try {{stdx::lock_guard<Client> lk(*opCtx->getClient());CurOp::get(opCtx)->setCommand_inlock(command);}sleepMillisAfterCommandExecutionBegins.execute([&](const BSONObj& data) {auto numMillis = data["millis"].numberInt();auto commands = data["commands"].Obj().getFieldNames<std::set<std::string>>();// Only sleep for one of the specified commands.if (commands.find(command->getName()) != commands.end()) {mongo::sleepmillis(numMillis);}});// TODO: move this back to runCommands when mongos supports OperationContext// see SERVER-18515 for details.rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName());auto const replCoord = repl::ReplicationCoordinator::get(opCtx);sessionOptions = initializeOperationSessionInfo(opCtx,request.body,command->requiresAuth(),command->attachLogicalSessionsToOpCtx(),replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet,opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking());CommandHelpers::evaluateFailCommandFailPoint(opCtx, command->getName(), invocation->ns());const auto dbname = request.getDatabase().toString();uassert(ErrorCodes::InvalidNamespace,str::stream() << "Invalid database name: '" << dbname << "'",NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));const auto allowTransactionsOnConfigDatabase =(serverGlobalParams.clusterRole == ClusterRole::ConfigServer ||serverGlobalParams.clusterRole == ClusterRole::ShardServer);validateSessionOptions(sessionOptions,command->getName(),invocation->ns(),allowTransactionsOnConfigDatabase);std::unique_ptr<MaintenanceModeSetter> mmSetter;BSONElement cmdOptionMaxTimeMSField;BSONElement allowImplicitCollectionCreationField;BSONElement helpField;StringMap<int> topLevelFields;for (auto&& element : request.body) {StringData fieldName = element.fieldNameStringData();if (fieldName == QueryRequest::cmdOptionMaxTimeMS) {cmdOptionMaxTimeMSField = element;} else if (fieldName == "allowImplicitCollectionCreation") {allowImplicitCollectionCreationField = element;} else if (fieldName == CommandHelpers::kHelpFieldName) {helpField = element;} else if (fieldName == "comment") {opCtx->setComment(element.wrap());} else if (fieldName == QueryRequest::queryOptionMaxTimeMS) {uasserted(ErrorCodes::InvalidOptions,"no such command option $maxTimeMs; use maxTimeMS instead");}uassert(ErrorCodes::FailedToParse,str::stream() << "Parsed command object contains duplicate top level key: "<< fieldName,topLevelFields[fieldName]++ == 0);}if (CommandHelpers::isHelpRequest(helpField)) {CurOp::get(opCtx)->ensureStarted();// We disable last-error for help requests due to SERVER-11492, because config servers// use help requests to determine which commands are database writes, and so must be// forwarded to all config servers.LastError::get(opCtx->getClient()).disable();Command::generateHelpResponse(opCtx, replyBuilder, *command);return;}ImpersonationSessionGuard guard(opCtx);invocation->checkAuthorization(opCtx, request);const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname);if (!opCtx->getClient()->isInDirectClient() &&!MONGO_unlikely(skipCheckingForNotMasterInCommandDispatch.shouldFail())) {const bool inMultiDocumentTransaction = (sessionOptions.getAutocommit() == false);auto allowed = command->secondaryAllowed(opCtx->getServiceContext());bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways;bool couldHaveOptedIn =allowed == Command::AllowedOnSecondary::kOptIn && !inMultiDocumentTransaction;bool optedIn =couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary();bool canRunHere = commandCanRunHere(opCtx, dbname, command, inMultiDocumentTransaction);if (!canRunHere && couldHaveOptedIn) {uasserted(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false");}if (MONGO_unlikely(respondWithNotPrimaryInCommandDispatch.shouldFail())) {uassert(ErrorCodes::NotMaster, "not primary", canRunHere);} else {uassert(ErrorCodes::NotMaster, "not master", canRunHere);}if (!command->maintenanceOk() &&replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&!replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) &&!replCoord->getMemberState().secondary()) {uassert(ErrorCodes::NotMasterOrSecondary,"node is recovering",!replCoord->getMemberState().recovering());uassert(ErrorCodes::NotMasterOrSecondary,"node is not in primary or recovering state",replCoord->getMemberState().primary());// Check ticket SERVER-21432, slaveOk commands are allowed in drain modeuassert(ErrorCodes::NotMasterOrSecondary,"node is in drain mode",optedIn || alwaysAllowed);}}if (command->adminOnly()) {LOG(2) << "command: " << request.getCommandName();}if (command->maintenanceMode()) {mmSetter.reset(new MaintenanceModeSetter(opCtx));}if (command->shouldAffectCommandCounter()) {OpCounters* opCounters = &globalOpCounters;opCounters->gotCommand();}// Parse the 'maxTimeMS' command option, and use it to set a deadline for the operation on// the OperationContext. The 'maxTimeMS' option unfortunately has a different meaning for a// getMore command, where it is used to communicate the maximum time to wait for new inserts// on tailable cursors, not as a deadline for the operation.// TODO SERVER-34277 Remove the special handling for maxTimeMS for getMores. This will// require introducing a new 'max await time' parameter for getMore, and eventually banning// maxTimeMS altogether on a getMore command.const int maxTimeMS =uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField));if (maxTimeMS > 0 && command->getLogicalOp() != LogicalOp::opGetMore) {uassert(40119,"Illegal attempt to set operation deadline within DBDirectClient",!opCtx->getClient()->isInDirectClient());opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired);}auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);// If the parent operation runs in a transaction, we don't override the read concern.auto skipReadConcern =opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction();bool startTransaction = static_cast<bool>(sessionOptions.getStartTransaction());if (!skipReadConcern) {auto newReadConcernArgs = uassertStatusOK(_extractReadConcern(opCtx, invocation.get(), request.body, startTransaction));{// We must obtain the client lock to set the ReadConcernArgs on the operation// context as it may be concurrently read by CurrentOp.stdx::lock_guard<Client> lk(*opCtx->getClient());readConcernArgs = std::move(newReadConcernArgs);}}uassert(ErrorCodes::InvalidOptions,"read concern level snapshot is only valid in a transaction",opCtx->inMultiDocumentTransaction() ||readConcernArgs.getLevel() != repl::ReadConcernLevel::kSnapshotReadConcern);if (startTransaction) {opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true);opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);}auto& oss = OperationShardingState::get(opCtx);if (!opCtx->getClient()->isInDirectClient() &&readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern &&(iAmPrimary ||(readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) {oss.initializeClientRoutingVersions(invocation->ns(), request.body);auto const shardingState = ShardingState::get(opCtx);if (oss.hasShardVersion() || oss.hasDbVersion()) {uassertStatusOK(shardingState->canAcceptShardedCommands());}behaviors.advanceConfigOpTimeFromRequestMetadata(opCtx);}oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField);auto scoped = behaviors.scopedOperationCompletionShardingActions(opCtx);// This may trigger the maxTimeAlwaysTimeOut failpoint.auto status = opCtx->checkForInterruptNoAssert();// We still proceed if the primary stepped down, but accept other kinds of interruptions.// We defer to individual commands to allow themselves to be interruptible by stepdowns,// since commands like 'voteRequest' should conversely continue executing.if (status != ErrorCodes::PrimarySteppedDown &&status != ErrorCodes::InterruptedDueToReplStateChange) {uassertStatusOK(status);}CurOp::get(opCtx)->ensureStarted();command->incrementCommandsExecuted();if (logger::globalLogDomain()->shouldLog(logger::LogComponent::kTracking,logger::LogSeverity::Debug(1)) &&rpc::TrackingMetadata::get(opCtx).getParentOperId()) {MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking)<< rpc::TrackingMetadata::get(opCtx).toString();rpc::TrackingMetadata::get(opCtx).setIsLogged(true);}behaviors.waitForReadConcern(opCtx, invocation.get(), request);behaviors.setPrepareConflictBehaviorForReadConcern(opCtx, invocation.get());try {if (!runCommandImpl(opCtx,invocation.get(),request,replyBuilder,startOperationTime,behaviors,&extraFieldsBuilder,sessionOptions)) {command->incrementCommandsFailed();}} catch (const DBException& e) {command->incrementCommandsFailed();if (e.code() == ErrorCodes::Unauthorized) {CommandHelpers::auditLogAuthEvent(opCtx, invocation.get(), request, e.code());}throw;}} catch (const DBException& e) {...}
}
runCommandImpl函数在 MongoDB 中承担着执行命令的核心任务,其作用是解析命令请求、处理写关注、会话管理、错误处理等,最终执行命令并构建响应。
/mongo/db/service_entry_point_common.cpp中runCommandImpl代码:invocation->run 调用具体Command函数,find命令就是FindCmd,最终执行的FindCmd的run方法。
bool runCommandImpl(OperationContext* opCtx,CommandInvocation* invocation,const OpMsgRequest& request,rpc::ReplyBuilderInterface* replyBuilder,LogicalTime startOperationTime,const ServiceEntryPointCommon::Hooks& behaviors,BSONObjBuilder* extraFieldsBuilder,const OperationSessionInfoFromClient& sessionOptions) {const Command* command = invocation->definition();auto bytesToReserve = command->reserveBytesForReply();
// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the
// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency
// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds.
#ifdef _WIN32if (kDebugBuild)bytesToReserve = 0;
#endifreplyBuilder->reserveBytes(bytesToReserve);const bool shouldCheckOutSession =sessionOptions.getTxnNumber() && !shouldCommandSkipSessionCheckout(command->getName());// getMore operations inherit a WriteConcern from their originating cursor. For example, if the// originating command was an aggregate with a $out and batchSize: 0. Note that if the command// only performed reads then we will not need to wait at all.const bool shouldWaitForWriteConcern =invocation->supportsWriteConcern() || command->getLogicalOp() == LogicalOp::opGetMore;if (shouldWaitForWriteConcern) {auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();// Change the write concern while running the command.const auto oldWC = opCtx->getWriteConcern();ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });boost::optional<WriteConcernOptions> extractedWriteConcern;if (command->getLogicalOp() == LogicalOp::opGetMore) {// WriteConcern will be set up during command processing, it must not be specified on// the command body.behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);} else {// WriteConcern should always be explicitly specified by operations received on shard// and config servers, even if it is empty (ie. writeConcern: {}). In this context// (shard/config servers) an empty WC indicates the operation should use the implicit// server defaults. So, warn if the operation has not specified writeConcern and is on// a shard/config server.if (!opCtx->getClient()->isInDirectClient() &&(serverGlobalParams.clusterRole == ClusterRole::ShardServer ||serverGlobalParams.clusterRole == ClusterRole::ConfigServer) &&!request.body.hasField(WriteConcernOptions::kWriteConcernField)) {// TODO: Disabled until after SERVER-43712, to avoid log spam.// log() << "Missing writeConcern on " << command->getName();}extractedWriteConcern.emplace(uassertStatusOK(extractWriteConcern(opCtx, request.body)));if (sessionOptions.getAutocommit()) {validateWriteConcernForTransaction(*extractedWriteConcern,invocation->definition()->getName());}opCtx->setWriteConcern(*extractedWriteConcern);}auto waitForWriteConcern = [&](auto&& bb) {bool reallyWait = true;failCommand.executeIf([&](const BSONObj& data) {bb.append(data["writeConcernError"]);reallyWait = false;if (data.hasField(kErrorLabelsFieldName) &&data[kErrorLabelsFieldName].type() == Array) {// Propagate error labels specified in the failCommand failpoint to the// OperationContext decoration to override getErrorLabels() behaviors.invariant(!errorLabelsOverride(opCtx));errorLabelsOverride(opCtx).emplace(data.getObjectField(kErrorLabelsFieldName).getOwned());}},[&](const BSONObj& data) {return CommandHelpers::shouldActivateFailCommandFailPoint(data,request.getCommandName(),opCtx->getClient(),invocation->ns()) &&data.hasField("writeConcernError");});if (reallyWait) {behaviors.waitForWriteConcern(opCtx, invocation, lastOpBeforeRun, bb);}};try {if (shouldCheckOutSession) {invokeWithSessionCheckedOut(opCtx, invocation, sessionOptions, replyBuilder);} else {LOG(1) << "conca invocation->run" ;invocation->run(opCtx, replyBuilder);}} catch (const DBException& ex) {// Do no-op write before returning NoSuchTransaction if command has writeConcern.if (ex.toStatus().code() == ErrorCodes::NoSuchTransaction &&!opCtx->getWriteConcern().usedDefault) {TransactionParticipant::performNoopWrite(opCtx, "NoSuchTransaction error");}waitForWriteConcern(*extraFieldsBuilder);throw;}waitForWriteConcern(replyBuilder->getBodyBuilder());// With the exception of getMores inheriting the WriteConcern from the originating command,// nothing in run() should change the writeConcern.dassert(command->getLogicalOp() == LogicalOp::opGetMore? !extractedWriteConcern: (extractedWriteConcern &&SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == extractedWriteConcern->toBSON())));} else {behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);if (shouldCheckOutSession) {invokeWithSessionCheckedOut(opCtx, invocation, sessionOptions, replyBuilder);} else {invocation->run(opCtx, replyBuilder);}}// This fail point blocks all commands which are running on the specified namespace, or which// are present in the given list of commands.If no namespace or command list are provided,then// the failpoint will block all commands.waitAfterCommandFinishesExecution.execute([&](const BSONObj& data) {auto ns = data["ns"].valueStringDataSafe();auto commands =data.hasField("commands") ? data["commands"].Array() : std::vector<BSONElement>();// If 'ns' or 'commands' is not set, block for all the namespaces or commands respectively.if ((ns.empty() || invocation->ns().ns() == ns) &&(commands.empty() ||std::any_of(commands.begin(), commands.end(), [&request](auto& element) {return element.valueStringDataSafe() == request.getCommandName();}))) {CurOpFailpointHelpers::waitWhileFailPointEnabled(&waitAfterCommandFinishesExecution, opCtx, "waitAfterCommandFinishesExecution");}});behaviors.waitForLinearizableReadConcern(opCtx);// Wait for data to satisfy the read concern level, if necessary.behaviors.waitForSpeculativeMajorityReadConcern(opCtx);const bool ok = [&] {auto body = replyBuilder->getBodyBuilder();return CommandHelpers::extractOrAppendOk(body);}();behaviors.attachCurOpErrInfo(opCtx, replyBuilder->getBodyBuilder().asTempObj());{boost::optional<ErrorCodes::Error> wcCode;boost::optional<ErrorCodes::Error> code;auto response = replyBuilder->getBodyBuilder().asTempObj();auto codeField = response["code"];if (!ok && codeField.isNumber()) {code = ErrorCodes::Error(codeField.numberInt());}if (response.hasField("writeConcernError")) {wcCode = ErrorCodes::Error(response["writeConcernError"]["code"].numberInt());}auto isInternalClient = opCtx->getClient()->session() &&(opCtx->getClient()->session()->getTags() & transport::Session::kInternalClient);auto errorLabels = getErrorLabels(opCtx, sessionOptions, command->getName(), code, wcCode, isInternalClient);replyBuilder->getBodyBuilder().appendElements(errorLabels);}auto commandBodyBob = replyBuilder->getBodyBuilder();behaviors.appendReplyMetadata(opCtx, request, &commandBodyBob);appendClusterAndOperationTime(opCtx, &commandBodyBob, &commandBodyBob, startOperationTime);return ok;
}
mongo/db/commands/find_cmd.cpp中FindCmd的run方法执行数据库逻辑。
class FindCmd final : public Command {
public:FindCmd() : Command("find") {}std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,const OpMsgRequest& opMsgRequest) override {// TODO: Parse into a QueryRequest here.return std::make_unique<Invocation>(this, opMsgRequest, opMsgRequest.getDatabase());}AllowedOnSecondary secondaryAllowed(ServiceContext* context) const override {return AllowedOnSecondary::kOptIn;}class Invocation final : public CommandInvocation {public:Invocation(const FindCmd* definition, const OpMsgRequest& request, StringData dbName): CommandInvocation(definition), _request(request), _dbName(dbName) {}private:bool supportsWriteConcern() const override {return false;}ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level) const final {return {ReadConcernSupportResult::ReadConcern::kSupported,ReadConcernSupportResult::DefaultReadConcern::kPermitted};}void explain(OperationContext* opCtx,ExplainOptions::Verbosity verbosity,rpc::ReplyBuilderInterface* result) override {...
}void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) {
...
}}
相关文章:
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...

如何在看板中体现优先级变化
在看板中有效体现优先级变化的关键措施包括:采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中,设置任务排序规则尤其重要,因为它让看板视觉上直观地体…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
java 实现excel文件转pdf | 无水印 | 无限制
文章目录 目录 文章目录 前言 1.项目远程仓库配置 2.pom文件引入相关依赖 3.代码破解 二、Excel转PDF 1.代码实现 2.Aspose.License.xml 授权文件 总结 前言 java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也…...

centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...

Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
【位运算】消失的两个数字(hard)
消失的两个数字(hard) 题⽬描述:解法(位运算):Java 算法代码:更简便代码 题⽬链接:⾯试题 17.19. 消失的两个数字 题⽬描述: 给定⼀个数组,包含从 1 到 N 所有…...

无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...

vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet,点击确认后如下提示 最终上报fail 解决方法 内核升级导致,需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...