Skip to content

Commit

Permalink
SERVER-85907 Make replica set endpoint reject writes on secondary (#1…
Browse files Browse the repository at this point in the history
…8679)

GitOrigin-RevId: 61da798c4915fcfc6e220f7535699b3ee72b050d
  • Loading branch information
cheahuychou authored and MongoDB Bot committed Feb 6, 2024
1 parent cec86a9 commit 5f0f53e
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Tests that the replica set endpoint skips readPreference re-targeting for reads.
* Tests that the replica set endpoint skips readPreference re-targeting.
*
* @tags: [
* requires_fcv_73,
Expand Down Expand Up @@ -208,11 +208,46 @@ assert.commandWorked(secondary1TestDB.setProfilingLevel(2));
}

{
jsTest.log("Testing that the replica set endpoint does re-target writes");
jsTest.log("Testing that the replica set endpoint does not re-target writes");

assert.commandWorked(secondary0TestColl.insert({x: 3}));
rst.awaitReplication();
assert.neq(secondary0TestColl.findOne({x: 3}), null);
assert.commandFailedWithCode(secondary0TestColl.insert({x: 3}), ErrorCodes.NotWritablePrimary);
assert.eq(primaryTestColl.findOne({x: 3}), null);

assert.commandFailedWithCode(secondary1TestColl.update({x: 1}, {$set: {y: 1}}),
ErrorCodes.NotWritablePrimary);
assert.eq(primaryTestColl.findOne({x: 1}).y, null);

assert.commandFailedWithCode(secondary0TestColl.remove({x: 1}), ErrorCodes.NotWritablePrimary);
assert.neq(primaryTestColl.findOne({x: 1}), null);

assert.throwsWithCode(
() => secondary1TestColl.findAndModify({query: {x: 1}, update: {$set: {y: 1}}}),
ErrorCodes.NotWritablePrimary);
assert.eq(primaryTestColl.findOne({x: 1}).y, null);

assert.commandFailedWithCode(secondary0TestColl.createIndex({x: 1}),
ErrorCodes.NotWritablePrimary);
// The collection should only have the _id index.
assert.eq(primaryTestColl.getIndexes().length, 1);

assert.commandWorked(primaryTestColl.createIndex({x: 1}));
assert.eq(primaryTestColl.getIndexes().length, 2);
assert.commandFailedWithCode(secondary1TestColl.dropIndex({x: 1}),
ErrorCodes.NotWritablePrimary);
assert.eq(primaryTestColl.getIndexes().length, 2);

assert.throwsWithCode(() => secondary0TestColl.drop(), ErrorCodes.NotWritablePrimary);
assert.eq(primaryTestDB.getCollectionInfos({name: collName}).length, 1);

const newCollName = collName + "Other";
assert.commandFailedWithCode(secondary1TestDB.createCollection(newCollName),
ErrorCodes.NotWritablePrimary);
assert.eq(primaryTestDB.getCollectionInfos({name: newCollName}).length, 0);

assert.commandFailedWithCode(secondary0TestColl.renameCollection(newCollName),
ErrorCodes.NotWritablePrimary);
assert.eq(primaryTestDB.getCollectionInfos({name: collName}).length, 1);
assert.eq(primaryTestDB.getCollectionInfos({name: newCollName}).length, 0);
}

jsTest.log("Disabling profiler");
Expand Down
1 change: 1 addition & 0 deletions src/mongo/db/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,7 @@ env.CppUnitTest(
'auth/authmocks',
'commands',
'commands_test_example',
'repl/replmocks',
's/sharding_catalog_manager',
'service_context_d_test_fixture',
'shard_role_api',
Expand Down
38 changes: 37 additions & 1 deletion src/mongo/db/commands_test_example.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace commands_test_example {
class ExampleIncrementCommand final : public TypedCommand<ExampleIncrementCommand> {
private:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return Command::AllowedOnSecondary::kNever;
return Command::AllowedOnSecondary::kAlways;
}

std::string help() const override {
Expand Down Expand Up @@ -178,6 +178,42 @@ class ExampleVoidCommand final : public TypedCommand<ExampleVoidCommand> {
mutable std::int32_t iCapture = 0;
};

class ExampleVoidCommandNeverAllowedOnSecondary
: public TypedCommand<ExampleVoidCommandNeverAllowedOnSecondary> {
private:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return Command::AllowedOnSecondary::kNever;
}

public:
using Request = ExampleVoidNeverAllowedOnSecondary;
using Invocation = ExampleVoidCommand::Invocation;
};

class ExampleVoidCommandAlwaysAllowedOnSecondary
: public TypedCommand<ExampleVoidCommandAlwaysAllowedOnSecondary> {
private:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return Command::AllowedOnSecondary::kAlways;
}

public:
using Request = ExampleVoidAlwaysAllowedOnSecondary;
using Invocation = ExampleVoidCommand::Invocation;
};

class ExampleVoidCommandAllowedOnSecondaryIfOptedIn
: public TypedCommand<ExampleVoidCommandAllowedOnSecondaryIfOptedIn> {
private:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return Command::AllowedOnSecondary::kOptIn;
}

public:
using Request = ExampleVoidAllowedOnSecondaryIfOptedIn;
using Invocation = ExampleVoidCommand::Invocation;
};

/**
* ExampleVoid command defined using TypedCommand with Derived.
*/
Expand Down
21 changes: 21 additions & 0 deletions src/mongo/db/commands_test_example.idl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,27 @@ commands:
api_version: ""
fields:
i: int
exampleVoidNeverAllowedOnSecondary:
description: "Similar to exampleVoid, but is not allowed to run on secondaries."
command_name: exampleVoidNeverAllowedOnSecondary
namespace: concatenate_with_db
api_version: ""
fields:
i: int
exampleVoidAlwaysAllowedOnSecondary:
description: "Similar to exampleVoid, but is always allowed to run on secondaries."
command_name: exampleVoidAlwaysAllowedOnSecondary
namespace: concatenate_with_db
api_version: ""
fields:
i: int
exampleVoidAllowedOnSecondaryIfOptedIn:
description: "Similar to exampleVoid, but is allowed secondaries if opted in."
command_name: exampleVoidAllowedOnSecondaryIfOptedIn
namespace: concatenate_with_db
api_version: ""
fields:
i: int
exampleMinimal:
description: "like exampleIncrement, but use MinimalInvocationBase"
command_name: exampleMinimal
Expand Down
41 changes: 38 additions & 3 deletions src/mongo/db/replica_set_endpoint_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "mongo/db/replica_set_endpoint_util.h"

#include "mongo/bson/bsontypes.h"
#include "mongo/db/cluster_role.h"
#include "mongo/db/commands.h"
#include "mongo/db/multitenancy_gen.h"
#include "mongo/db/namespace_string.h"
Expand Down Expand Up @@ -82,13 +83,27 @@ bool isTargetedCommandRequest(OperationContext* opCtx, const OpMsgRequest& opMsg
isCurrentOpAggregateCommandRequest(opMsgReq);
}

/**
* Returns the service for the specified role.
*/
Service* getService(OperationContext* opCtx, ClusterRole role) {
auto service = opCtx->getServiceContext()->getService(role);
invariant(service);
return service;
}

/**
* Returns the router service.
*/
Service* getRouterService(OperationContext* opCtx) {
auto routerService = opCtx->getServiceContext()->getService(ClusterRole::RouterServer);
invariant(routerService);
return routerService;
return getService(opCtx, ClusterRole::RouterServer);
}

/**
* Returns the shard service.
*/
Service* getShardService(OperationContext* opCtx) {
return getService(opCtx, ClusterRole::ShardServer);
}

/**
Expand Down Expand Up @@ -143,6 +158,26 @@ bool shouldRouteRequest(OperationContext* opCtx, const OpMsgRequest& opMsgReq) {
return false;
}


auto shardCommand =
CommandHelpers::findCommand(getShardService(opCtx), opMsgReq.getCommandName());
if (shardCommand &&
shardCommand->secondaryAllowed(opCtx->getServiceContext()) ==
BasicCommand::AllowedOnSecondary::kNever) {
// On the shard service, this is a primary-only command. Make it fail with a
// NotWritablePrimary error if this mongod is not the primary. This check is necessary for
// providing replica set user experience (i.e. writes should fail on secondaries) since by
// going through the router code paths the command would get routed to the primary and
// succeed whether or not this mongod is the primary. This check only needs to be
// best-effort since if this mongod steps down after the check, the write would be routed
// to the new primary. For this reason, just use canAcceptWritesForDatabase_UNSAFE to
// avoid taking the RSTL lock or the ReplicationCoordinator's mutex.
uassert(ErrorCodes::NotWritablePrimary,
"This command is only allowed on a primary",
repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase_UNSAFE(
opCtx, opMsgReq.getDbName()));
}

// There is nothing that will prevent the cluster from becoming multi-shard (i.e. no longer
// supporting as replica set endpoint) after the check here is done. However, the contract is
// that users must have transitioned to the sharded connection string (i.e. connect to mongoses
Expand Down
79 changes: 73 additions & 6 deletions src/mongo/db/replica_set_endpoint_util_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
* it in the license file.
*/

#include <memory>

#include "mongo/bson/json.h"
#include "mongo/db/cluster_role.h"
#include "mongo/db/database_name.h"
#include "mongo/db/replica_set_endpoint_util.h"

#include "mongo/util/database_name_util.h"
#include <memory>

#include "mongo/bson/json.h"
#include "mongo/db/auth/authz_manager_external_state_mock.h"
#include "mongo/db/cluster_role.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands_test_example.h"
#include "mongo/db/commands_test_example_gen.h"
#include "mongo/db/database_name.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/member_state.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/replica_set_endpoint_sharding_state.h"
#include "mongo/db/replica_set_endpoint_test_fixture.h"
#include "mongo/db/service_context_d_test_fixture.h"
Expand All @@ -51,6 +51,7 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/s/grid.h"
#include "mongo/transport/transport_layer_mock.h"
#include "mongo/unittest/assert.h"
#include "mongo/unittest/death_test.h"

#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
Expand All @@ -74,12 +75,20 @@ class ReplicaSetEndpointUtilTest : public ServiceContextMongoDTest, public Repli
ReplicaSetEndpointShardingState::get(getServiceContext())->setIsConfigShard(true);
setHasTwoOrShardsClusterParameter(false);
ASSERT_FALSE(getHasTwoOrShardsClusterParameter());

auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext());
ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_SECONDARY));
repl::ReplicationCoordinator::set(getServiceContext(), std::move(replCoord));
}

transport::TransportLayerMock& getTransportLayer() {
return _transportLayer;
}

repl::ReplicationCoordinator* getReplicationCoordinator() {
return repl::ReplicationCoordinator::get(getServiceContext());
}

const std::string kTestDbName = "testDb";
const std::string kTestCollName = "testColl";
const TenantId kTestTenantId{OID::gen()};
Expand Down Expand Up @@ -143,6 +152,15 @@ TEST_F(ReplicaSetEndpointUtilTest, IsReplicaSetEndpointClient_FeatureFlagDisable
MONGO_REGISTER_COMMAND(commands_test_example::ExampleIncrementCommand).forShard().forRouter();
MONGO_REGISTER_COMMAND(commands_test_example::ExampleMinimalCommand).forShard();
MONGO_REGISTER_COMMAND(commands_test_example::ExampleVoidCommand).forRouter();
MONGO_REGISTER_COMMAND(commands_test_example::ExampleVoidCommandNeverAllowedOnSecondary)
.forRouter()
.forShard();
MONGO_REGISTER_COMMAND(commands_test_example::ExampleVoidCommandAlwaysAllowedOnSecondary)
.forRouter()
.forShard();
MONGO_REGISTER_COMMAND(commands_test_example::ExampleVoidCommandAllowedOnSecondaryIfOptedIn)
.forRouter()
.forShard();

TEST_F(ReplicaSetEndpointUtilTest, ShouldRoute_RouterAndShardCommand) {
std::shared_ptr<transport::Session> session = getTransportLayer().createSession();
Expand Down Expand Up @@ -180,6 +198,55 @@ TEST_F(ReplicaSetEndpointUtilTest, ShouldRoute_RouterOnlyCommand) {
ASSERT(shouldRouteRequest(opCtx.get(), opMsgRequest));
}

TEST_F(ReplicaSetEndpointUtilTest, ShouldRoute_RouterAndShardCommand_NeverAllowedOnSecondary) {
std::shared_ptr<transport::Session> session = getTransportLayer().createSession();
auto client = getServiceContext()->getService()->makeClient(
"RouterAndShardCommand_NeverAllowedOnSecondary", session);
auto opCtx = client->makeOperationContext();

auto ns = NamespaceString::createNamespaceString_forTest(kTestDbName, kTestCollName);
commands_test_example::ExampleVoidNeverAllowedOnSecondary voidCmd(ns, 0);
auto opMsgRequest = mongo::OpMsgRequest::fromDBAndBody(ns.dbName(), voidCmd.toBSON({}));

ASSERT_OK(getReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY));
ASSERT(shouldRouteRequest(opCtx.get(), opMsgRequest));
ASSERT_OK(getReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_SECONDARY));
ASSERT_THROWS_CODE(
shouldRouteRequest(opCtx.get(), opMsgRequest), DBException, ErrorCodes::NotWritablePrimary);
}

TEST_F(ReplicaSetEndpointUtilTest, ShouldRoute_RouterAndShardCommand_AlwaysAllowedOnSecondary) {
std::shared_ptr<transport::Session> session = getTransportLayer().createSession();
auto client = getServiceContext()->getService()->makeClient(
"RouterAndShardCommand_AlwaysAllowedOnSecondary", session);
auto opCtx = client->makeOperationContext();

auto ns = NamespaceString::createNamespaceString_forTest(kTestDbName, kTestCollName);
commands_test_example::ExampleVoidAlwaysAllowedOnSecondary voidCmd(ns, 0);
auto opMsgRequest = mongo::OpMsgRequest::fromDBAndBody(ns.dbName(), voidCmd.toBSON({}));

ASSERT_OK(getReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY));
ASSERT(shouldRouteRequest(opCtx.get(), opMsgRequest));
ASSERT_OK(getReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_SECONDARY));
ASSERT(shouldRouteRequest(opCtx.get(), opMsgRequest));
}

TEST_F(ReplicaSetEndpointUtilTest, ShouldRoute_RouterAndShardCommand_AllowedOnSecondaryIfOptedIn) {
std::shared_ptr<transport::Session> session = getTransportLayer().createSession();
auto client = getServiceContext()->getService()->makeClient(
"RouterAndShardCommand_AllowedOnSecondaryIfOptedIn", session);
auto opCtx = client->makeOperationContext();

auto ns = NamespaceString::createNamespaceString_forTest(kTestDbName, kTestCollName);
commands_test_example::ExampleVoidAllowedOnSecondaryIfOptedIn voidCmd(ns, 0);
auto opMsgRequest = mongo::OpMsgRequest::fromDBAndBody(ns.dbName(), voidCmd.toBSON({}));

ASSERT_OK(getReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY));
ASSERT(shouldRouteRequest(opCtx.get(), opMsgRequest));
ASSERT_OK(getReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_SECONDARY));
ASSERT(shouldRouteRequest(opCtx.get(), opMsgRequest));
}

TEST_F(ReplicaSetEndpointUtilTest, ShouldNotRoute_LocalDatabase) {
std::shared_ptr<transport::Session> session = getTransportLayer().createSession();
auto client = getServiceContext()->getService()->makeClient("LocalDatabase", session);
Expand Down

0 comments on commit 5f0f53e

Please sign in to comment.