Skip to content

Commit

Permalink
SERVER-94165 Merge remote command response structs into one (#27601)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: be19189d49679c0aaf38cbcde115c99e776fde5c
  • Loading branch information
josephdprince authored and MongoDB Bot committed Oct 1, 2024
1 parent 6e8528c commit f7197bc
Show file tree
Hide file tree
Showing 49 changed files with 827 additions and 696 deletions.
13 changes: 9 additions & 4 deletions src/mongo/client/async_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName,
auto cmdReply = rpc::makeReply(&response);
_parseHelloResponse(requestObj, cmdReply);
if (hook) {
executor::RemoteCommandResponse cmdResp(*cmdReply, timer.elapsed());
executor::RemoteCommandResponse cmdResp(
_peer, cmdReply->getCommandReply(), timer.elapsed());
uassertStatusOK(hook->validateHost(_peer, requestObj, cmdResp));
}
});
Expand Down Expand Up @@ -401,7 +402,8 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
std::move(fromConnAcquiredTimer),
token)
.then([this, startTimer = std::move(startTimer)](rpc::UniqueReply response) {
return executor::RemoteCommandResponse(*response, startTimer.elapsed());
return executor::RemoteCommandResponse(
_peer, response->getCommandReply(), startTimer.elapsed());
});
}

Expand All @@ -414,8 +416,11 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::_continueReceiveExhaustRe
.then([stopwatch, msgId, baton, this](Message responseMsg) mutable {
bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome);
rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg));
auto rcResponse = executor::RemoteCommandResponse(
*response, duration_cast<Milliseconds>(stopwatch.elapsed()), isMoreToComeSet);
auto rcResponse =
executor::RemoteCommandResponse(_peer,
response->getCommandReply(),
duration_cast<Milliseconds>(stopwatch.elapsed()),
isMoreToComeSet);
return rcResponse;
});
}
Expand Down
7 changes: 4 additions & 3 deletions src/mongo/client/dbclient_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ executor::RemoteCommandResponse initWireVersion(

if (auto status = DBClientSession::appendClientMetadata(applicationName, &bob);
!status.isOK()) {
return status;
return {conn->getServerHostAndPort(), status};
}

conn->getCompressorManager().clientBegin(&bob);
Expand Down Expand Up @@ -229,10 +229,11 @@ executor::RemoteCommandResponse initWireVersion(

conn->getCompressorManager().clientFinish(helloObj);

return executor::RemoteCommandResponse{std::move(helloObj), finish - start};
return executor::RemoteCommandResponse{
conn->getServerHostAndPort(), std::move(helloObj), finish - start};

} catch (...) {
return exceptionToStatus();
return {conn->getServerHostAndPort(), exceptionToStatus()};
}

boost::optional<Milliseconds> clampTimeout(double timeoutInSec) {
Expand Down
20 changes: 13 additions & 7 deletions src/mongo/client/fetcher_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void FetcherTest::processNetworkResponse(const BSONObj& obj,
ReadyQueueState readyQueueStateAfterProcessing,
FetcherState fetcherStateAfterProcessing) {
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
getNet()->scheduleSuccessfulResponse({obj, elapsed});
getNet()->scheduleSuccessfulResponse(ResponseStatus::make_forTest(obj, elapsed));
finishProcessingNetworkResponse(readyQueueStateAfterProcessing, fetcherStateAfterProcessing);
}

Expand Down Expand Up @@ -563,7 +563,8 @@ TEST_F(FetcherTest, ScheduleButShutdown) {
TEST_F(FetcherTest, ScheduleAfterCompletionReturnsShutdownInProgress) {
ASSERT_EQUALS(Fetcher::State::kPreStart, fetcher->getState_forTest());
ASSERT_OK(fetcher->schedule());
auto rs = ResponseStatus(ErrorCodes::OperationFailed, "find command failed", Milliseconds(0));
auto rs = ResponseStatus::make_forTest(
Status(ErrorCodes::OperationFailed, "find command failed"), Milliseconds(0));
processNetworkResponse(rs, ReadyQueueState::kEmpty, FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status.code());

Expand All @@ -573,7 +574,8 @@ TEST_F(FetcherTest, ScheduleAfterCompletionReturnsShutdownInProgress) {

TEST_F(FetcherTest, FindCommandFailed1) {
ASSERT_OK(fetcher->schedule());
auto rs = ResponseStatus(ErrorCodes::BadValue, "bad hint", Milliseconds(0));
auto rs =
ResponseStatus::make_forTest(Status(ErrorCodes::BadValue, "bad hint"), Milliseconds(0));
processNetworkResponse(rs, ReadyQueueState::kEmpty, FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::BadValue, status.code());
ASSERT_EQUALS("bad hint", status.reason());
Expand Down Expand Up @@ -1089,7 +1091,8 @@ TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) {
ASSERT_EQUALS(cursorId, cursors.front().numberLong());

// Failed killCursors command response should be logged.
getNet()->scheduleSuccessfulResponse(noi, {BSON("ok" << false), Milliseconds(0)});
getNet()->scheduleSuccessfulResponse(
noi, ResponseStatus::make_forTest(BSON("ok" << false), Milliseconds(0)));
getNet()->runReadyNetworkOperations();
}

Expand Down Expand Up @@ -1197,9 +1200,11 @@ TEST_F(FetcherTest, FetcherAppliesRetryPolicyToFirstCommandButNotToGetMoreReques

// Retry policy is applied to find command.
const BSONObj doc = BSON("_id" << 1);
auto rs = ResponseStatus(ErrorCodes::HostUnreachable, "first", Milliseconds(0));
auto rs =
ResponseStatus::make_forTest(Status(ErrorCodes::HostUnreachable, "first"), Milliseconds(0));
processNetworkResponse(rs, ReadyQueueState::kHasReadyRequests, FetcherState::kActive);
rs = ResponseStatus(ErrorCodes::SocketException, "second", Milliseconds(0));
rs = ResponseStatus::make_forTest(Status(ErrorCodes::SocketException, "second"),
Milliseconds(0));
processNetworkResponse(rs, ReadyQueueState::kHasReadyRequests, FetcherState::kActive);
processNetworkResponse(BSON("cursor" << BSON("id" << 1LL << "ns"
<< "db.coll"
Expand All @@ -1214,7 +1219,8 @@ TEST_F(FetcherTest, FetcherAppliesRetryPolicyToFirstCommandButNotToGetMoreReques
ASSERT_BSONOBJ_EQ(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);

rs = ResponseStatus(ErrorCodes::OperationFailed, "getMore failed", Milliseconds(0));
rs = ResponseStatus::make_forTest(Status(ErrorCodes::OperationFailed, "getMore failed"),
Milliseconds(0));
// No retry policy for subsequent getMore commands.
processNetworkResponse(rs, ReadyQueueState::kEmpty, FetcherState::kInactive);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
Expand Down
5 changes: 4 additions & 1 deletion src/mongo/client/remote_command_retry_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ void RemoteCommandRetryScheduler::_remoteCommandCallback(
}();

if (!scheduleStatus.isOK()) {
_onComplete({rcba.executor, rcba.myHandle, rcba.request, scheduleStatus});
_onComplete({rcba.executor,
rcba.myHandle,
rcba.request,
executor::RemoteCommandResponse(rcba.request.target, scheduleStatus)});
return;
}
}
Expand Down
65 changes: 42 additions & 23 deletions src/mongo/client/remote_command_retry_scheduler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,9 @@ TEST_F(RemoteCommandRetrySchedulerTest,

runReadyNetworkOperations();
checkCompletionStatus(
&scheduler, callback, {ErrorCodes::CallbackCanceled, "executor shutdown"});
&scheduler,
callback,
ResponseStatus::make_forTest(Status(ErrorCodes::CallbackCanceled, "executor shutdown")));
}

TEST_F(RemoteCommandRetrySchedulerTest,
Expand All @@ -379,7 +381,9 @@ TEST_F(RemoteCommandRetrySchedulerTest,

runReadyNetworkOperations();
checkCompletionStatus(
&scheduler, callback, {ErrorCodes::CallbackCanceled, "scheduler shutdown"});
&scheduler,
callback,
ResponseStatus::make_forTest(Status(ErrorCodes::CallbackCanceled, "scheduler shutdown")));
}

TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnNonRetryableErrorInResponse) {
Expand All @@ -393,7 +397,8 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnNonRetryableEr
start(&scheduler);

// This should match one of the non-retryable error codes in the policy.
ResponseStatus rs(ErrorCodes::OperationFailed, "injected error", Milliseconds(0));
ResponseStatus rs = ResponseStatus::make_forTest(
Status(ErrorCodes::OperationFailed, "injected error"), Milliseconds(0));
processNetworkResponse(rs);
checkCompletionStatus(&scheduler, callback, rs);

Expand All @@ -412,7 +417,8 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerInvokesCallbackOnFirstSuccessfu
start(&scheduler);

// Elapsed time in response is ignored on successful responses.
ResponseStatus response(BSON("ok" << 1 << "x" << 123 << "z" << 456), Milliseconds(100));
ResponseStatus response = ResponseStatus::make_forTest(
BSON("ok" << 1 << "x" << 123 << "z" << 456), Milliseconds(100));

processNetworkResponse(response);
checkCompletionStatus(&scheduler, callback, response);
Expand All @@ -435,10 +441,11 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerIgnoresEmbeddedErrorInSuccessfu
// Scheduler does not parse document in a successful response for embedded errors.
// This is the case with some commands (e.g. find) which do not always return errors using the
// wire protocol.
ResponseStatus response(BSON("ok" << 0 << "code" << int(ErrorCodes::FailedToParse) << "errmsg"
<< "injected error"
<< "z" << 456),
Milliseconds(100));
ResponseStatus response = ResponseStatus::make_forTest(
BSON("ok" << 0 << "code" << int(ErrorCodes::FailedToParse) << "errmsg"
<< "injected error"
<< "z" << 456),
Milliseconds(100));

processNetworkResponse(response);
checkCompletionStatus(&scheduler, callback, response);
Expand All @@ -456,15 +463,19 @@ TEST_F(RemoteCommandRetrySchedulerTest,
badExecutor.get(), request, std::ref(callback), std::move(policy));
start(&scheduler);

processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
processNetworkResponse(
ResponseStatus::make_forTest(Status(ErrorCodes::HostNotFound, "first"), Milliseconds(0)));

// scheduleRemoteCommand() will fail with ErrorCodes::ShutdownInProgress when trying to send
// third remote command request after processing second failed response.
badExecutor->scheduleRemoteCommandFailPoint = true;
processNetworkResponse({ErrorCodes::HostNotFound, "second", Milliseconds(0)});
processNetworkResponse(
ResponseStatus::make_forTest(Status(ErrorCodes::HostNotFound, "second"), Milliseconds(0)));

checkCompletionStatus(
&scheduler, callback, {ErrorCodes::ShutdownInProgress, "", Milliseconds(0)});
&scheduler,
callback,
ResponseStatus::make_forTest(Status(ErrorCodes::ShutdownInProgress, ""), Milliseconds(0)));
}

TEST_F(RemoteCommandRetrySchedulerTest,
Expand All @@ -478,10 +489,13 @@ TEST_F(RemoteCommandRetrySchedulerTest,
&getExecutor(), request, std::ref(callback), std::move(policy));
start(&scheduler);

processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
processNetworkResponse({ErrorCodes::HostUnreachable, "second", Milliseconds(0)});
processNetworkResponse(
ResponseStatus::make_forTest(Status(ErrorCodes::HostNotFound, "first"), Milliseconds(0)));
processNetworkResponse(ResponseStatus::make_forTest(
Status(ErrorCodes::HostUnreachable, "second"), Milliseconds(0)));

ResponseStatus response(ErrorCodes::NetworkTimeout, "last", Milliseconds(0));
ResponseStatus response =
ResponseStatus::make_forTest(Status(ErrorCodes::NetworkTimeout, "last"), Milliseconds(0));
processNetworkResponse(response);
checkCompletionStatus(&scheduler, callback, response);
}
Expand All @@ -496,9 +510,11 @@ TEST_F(RemoteCommandRetrySchedulerTest, SchedulerShouldRetryUntilSuccessfulRespo
&getExecutor(), request, std::ref(callback), std::move(policy));
start(&scheduler);

processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
processNetworkResponse(
ResponseStatus::make_forTest(Status(ErrorCodes::HostNotFound, "first"), Milliseconds(0)));

ResponseStatus response(BSON("ok" << 1 << "x" << 123 << "z" << 456), Milliseconds(100));
ResponseStatus response = ResponseStatus::make_forTest(
BSON("ok" << 1 << "x" << 123 << "z" << 456), Milliseconds(100));
processNetworkResponse(response);
checkCompletionStatus(&scheduler, callback, response);
}
Expand Down Expand Up @@ -546,13 +562,15 @@ TEST_F(RemoteCommandRetrySchedulerTest,
policyPtr->scheduler = &scheduler;
start(&scheduler);

processNetworkResponse({ErrorCodes::HostNotFound, "first", Milliseconds(0)});
processNetworkResponse(
ResponseStatus::make_forTest(Status(ErrorCodes::HostNotFound, "first"), Milliseconds(0)));

checkCompletionStatus(&scheduler,
callback,
{ErrorCodes::CallbackCanceled,
"scheduler was shut down before retrying command",
Milliseconds(0)});
checkCompletionStatus(
&scheduler,
callback,
ResponseStatus::make_forTest(
Status(ErrorCodes::CallbackCanceled, "scheduler was shut down before retrying command"),
Milliseconds(0)));
}

bool sharedCallbackStateDestroyed = false;
Expand Down Expand Up @@ -590,7 +608,8 @@ TEST_F(RemoteCommandRetrySchedulerTest,
sharedCallbackData.reset();
ASSERT_FALSE(sharedCallbackStateDestroyed);

processNetworkResponse({ErrorCodes::OperationFailed, "command failed", Milliseconds(0)});
processNetworkResponse(ResponseStatus::make_forTest(
Status(ErrorCodes::OperationFailed, "command failed"), Milliseconds(0)));

scheduler.join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, result);
Expand Down
3 changes: 2 additions & 1 deletion src/mongo/client/server_discovery_monitor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ class ServerDiscoveryMonitorTestFixture : public unittest::Test {
const auto opmsg = OpMsgRequestBuilder::create(
auth::ValidatedTenancyScope::kNotRequired, request.dbname, request.cmdObj);
const auto reply = node->runCommand(request.id, opmsg)->getCommandReply();
_net->scheduleSuccessfulResponse(noi, RemoteCommandResponse(reply, Milliseconds(0)));
_net->scheduleSuccessfulResponse(
noi, RemoteCommandResponse::make_forTest(reply, Milliseconds(0)));
} else {
_net->scheduleErrorResponse(noi, Status(ErrorCodes::HostUnreachable, ""));
}
Expand Down
3 changes: 2 additions & 1 deletion src/mongo/client/server_ping_monitor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class ServerPingMonitorTestFixture : public unittest::Test {
if (node->isRunning()) {
const auto opmsg = static_cast<OpMsgRequest>(request);
const auto reply = node->runCommand(request.id, opmsg)->getCommandReply();
_net->scheduleSuccessfulResponse(noi, RemoteCommandResponse(reply, Milliseconds(0)));
_net->scheduleSuccessfulResponse(
noi, RemoteCommandResponse::make_forTest(reply, Milliseconds(0)));
} else {
_net->scheduleErrorResponse(noi, Status(ErrorCodes::HostUnreachable, ""));
}
Expand Down
18 changes: 12 additions & 6 deletions src/mongo/db/query/search/mongot_cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,20 @@ long long computeInitialBatchSize(const boost::intrusive_ptr<ExpressionContext>&
}
} // namespace

HostAndPort getMongotAddress() {
auto swHostAndPort = HostAndPort::parse(globalMongotParams.host);
// This host and port string is configured and validated at startup.
invariant(swHostAndPort.getStatus());

return swHostAndPort.getValue();
}

executor::RemoteCommandRequest getRemoteCommandRequest(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& cmdObj) {
doThrowIfNotRunningWithMongotHostConfigured();
auto swHostAndPort = HostAndPort::parse(globalMongotParams.host);
// This host and port string is configured and validated at startup.
invariant(swHostAndPort.getStatus());
executor::RemoteCommandRequest rcr(
executor::RemoteCommandRequest(swHostAndPort.getValue(), nss.dbName(), cmdObj, opCtx));
executor::RemoteCommandRequest(getMongotAddress(), nss.dbName(), cmdObj, opCtx));
rcr.sslMode = transport::ConnectSSLMode::kDisableSSL;
return rcr;
}
Expand Down Expand Up @@ -385,8 +390,9 @@ executor::RemoteCommandResponse runSearchCommandWithRetries(
std::function<bool(Status)> retryPolicy) {
using namespace fmt::literals;
auto taskExecutor = executor::getMongotTaskExecutor(expCtx->opCtx->getServiceContext());
executor::RemoteCommandResponse response =
Status(ErrorCodes::InternalError, "Internal error running search command");
executor::RemoteCommandResponse response = {
getMongotAddress(),
Status(ErrorCodes::InternalError, "Internal error running search command")};
for (;;) {
Status err = Status::OK();
do {
Expand Down
Loading

0 comments on commit f7197bc

Please sign in to comment.