Skip to content

Commit

Permalink
SERVER-85164 Modifies TransactionRouter to add txn metadata for Start…
Browse files Browse the repository at this point in the history
…OrContinue (#18704)

GitOrigin-RevId: c122b6fbe6ec565222e509f1ae6336528cb45edd
  • Loading branch information
israelhsu authored and MongoDB Bot committed Feb 8, 2024
1 parent 1726a84 commit cbe87b3
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 40 deletions.
14 changes: 9 additions & 5 deletions src/mongo/db/service_entry_point_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,11 +946,15 @@ void CheckoutSessionAndInvokeCommand::_checkOutSession() {
try {
auto opObserver = opCtx->getServiceContext()->getOpObserver();
opObserver->onTransactionStart(opCtx);
auto transactionAction = sessionOptions.getStartTransaction()
? TransactionParticipant::TransactionActions::kStart
: (sessionOptions.getAutocommit()
? TransactionParticipant::TransactionActions::kContinue
: TransactionParticipant::TransactionActions::kNone);
auto transactionAction = TransactionParticipant::TransactionActions::kNone;
if (sessionOptions.getStartTransaction()) {
transactionAction = TransactionParticipant::TransactionActions::kStart;
} else if (sessionOptions.getStartOrContinueTransaction()) {
transactionAction =
TransactionParticipant::TransactionActions::kStartOrContinue;
} else if (sessionOptions.getAutocommit()) {
transactionAction = TransactionParticipant::TransactionActions::kContinue;
}
txnParticipant.beginOrContinue(
opCtx,
{*sessionOptions.getTxnNumber(), sessionOptions.getTxnRetryCounter()},
Expand Down
10 changes: 10 additions & 0 deletions src/mongo/db/session/logical_session_id.idl
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ structs:
transaction."
type: bool
optional: true
startOrContinueTransaction:
description: "Used to indicate that recipient should be added as a new participant if not
already part of this transaction, or continue the transaction otherwise."
type: bool
optional: true

OperationSessionInfoFromClientBase:
description: "Parser for pulling out session transaction metadata from commands, as opposed to
Expand Down Expand Up @@ -192,6 +197,11 @@ structs:
transaction."
type: bool
optional: true
startOrContinueTransaction:
description: "Used to indicate that recipient should be added as a new participant if not
already part of this transaction, or continue the transaction otherwise."
type: bool
optional: true
coordinator:
description: "Indicates that this shard is the coordinator shard for the transaction two-phase commit."
type: bool
Expand Down
2 changes: 0 additions & 2 deletions src/mongo/s/multi_statement_transaction_requests_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ std::vector<AsyncRequestsSender::Request> attachTxnDetails(
for (const auto& request : requests) {
newRequests.emplace_back(
request.shardId,
// TODO SERVER-85164 txnRouter should attach startOrContinue if
// activeTxnParticipantAddParticipants is true
txnRouter.attachTxnFieldsIfNeeded(opCtx, request.shardId, request.cmdObj));
}

Expand Down
27 changes: 22 additions & 5 deletions src/mongo/s/transaction_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(
OperationContext* opCtx,
BSONObj cmd,
bool isFirstStatementInThisParticipant,
bool addingParticipantViaSubRouter,
bool hasTxnCreatedAnyDatabase) const {
bool hasStartTxn = false;
bool hasAutoCommit = false;
Expand Down Expand Up @@ -491,7 +492,8 @@ BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(
auto cmdName = cmd.firstElement().fieldNameStringData();
auto service = opCtx->getService();
bool mustStartTransaction =
isFirstStatementInThisParticipant && !isTransactionCommand(service, cmdName);
(isFirstStatementInThisParticipant || addingParticipantViaSubRouter) &&
!isTransactionCommand(service, cmdName);

// Strip the command of its read concern if it should not have one.
if (!mustStartTransaction) {
Expand All @@ -509,6 +511,7 @@ BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(
sharedOptions.atClusterTimeForSnapshotReadConcern,
sharedOptions.placementConflictTimeForNonSnapshotReadConcern,
!hasStartTxn,
addingParticipantViaSubRouter,
hasTxnCreatedAnyDatabase)
: appendFieldsForContinueTransaction(
std::move(cmd),
Expand Down Expand Up @@ -716,7 +719,11 @@ BSONObj TransactionRouter::Router::attachTxnFieldsIfNeeded(OperationContext* opC
"txnRetryCounter"_attr = o().txnNumberAndRetryCounter.getTxnRetryCounter(),
"shardId"_attr = shardId,
"request"_attr = redact(cmdObj));
return txnPart->attachTxnFieldsIfNeeded(opCtx, cmdObj, false, hasTxnCreatedAnyDatabase);
return txnPart->attachTxnFieldsIfNeeded(opCtx,
cmdObj,
false /* isFirstStatementInThisParticipant */,
false /* addingParticipantViaSubRouter */,
hasTxnCreatedAnyDatabase);
}

auto txnPart = _createParticipant(opCtx, shardId);
Expand All @@ -733,7 +740,11 @@ BSONObj TransactionRouter::Router::attachTxnFieldsIfNeeded(OperationContext* opC
RouterTransactionsMetrics::get(opCtx)->incrementTotalContactedParticipants();
}

return txnPart.attachTxnFieldsIfNeeded(opCtx, cmdObj, true, hasTxnCreatedAnyDatabase);
return txnPart.attachTxnFieldsIfNeeded(opCtx,
cmdObj,
true /* isFirstStatementInThisParticipant */,
o().subRouter,
hasTxnCreatedAnyDatabase);
}

const TransactionRouter::Participant* TransactionRouter::Router::getParticipant(
Expand Down Expand Up @@ -1087,7 +1098,7 @@ void TransactionRouter::Router::_beginTxn(OperationContext* opCtx,
o().txnNumberAndRetryCounter.getTxnNumber());

switch (action) {
case TransactionActions::kStartOrContinue: // fall through to case kStart
case TransactionActions::kStartOrContinue:
case TransactionActions::kStart: {
_resetRouterStateForStartTransaction(opCtx, txnNumberAndRetryCounter);
if (action == TransactionActions::kStartOrContinue) {
Expand Down Expand Up @@ -1155,6 +1166,7 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
uassert(ErrorCodes::InterruptedAtShutdown,
"New transaction cannot be started at shutdown.",
!SessionCatalog::get(opCtx)->getDisallowNewTransactions());

_beginTxn(opCtx, txnNumberAndRetryCounter, action);
}

Expand Down Expand Up @@ -2161,6 +2173,7 @@ BSONObj TransactionRouter::appendFieldsForStartTransaction(
const boost::optional<LogicalTime>& atClusterTimeForSnapshotReadConcern,
const boost::optional<LogicalTime>& placementConflictTimeForNonSnapshotReadConcern,
bool doAppendStartTransaction,
bool doAppendStartOrContinueTransaction,
bool hasTxnCreatedAnyDatabase) {
BSONObjBuilder cmdBob;

Expand Down Expand Up @@ -2195,10 +2208,14 @@ BSONObj TransactionRouter::appendFieldsForStartTransaction(
databaseVersion->serialize(&dbvBuilder);
}

if (doAppendStartTransaction) {
if (doAppendStartOrContinueTransaction) {
cmdBob.append(OperationSessionInfo::kStartOrContinueTransactionFieldName,
doAppendStartOrContinueTransaction);
} else if (doAppendStartTransaction) {
cmdBob.append(OperationSessionInfoFromClient::kStartTransactionFieldName, true);
}


return cmdBob.obj();
}

Expand Down
2 changes: 2 additions & 0 deletions src/mongo/s/transaction_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class TransactionRouter {
BSONObj attachTxnFieldsIfNeeded(OperationContext* opCtx,
BSONObj cmd,
bool isFirstStatementInThisParticipant,
bool addingParticipantViaSubRouter,
bool hasTxnCreatedAnyDatabase) const;

// True if the participant has been chosen as the coordinator for its transaction
Expand Down Expand Up @@ -783,6 +784,7 @@ class TransactionRouter {
const boost::optional<LogicalTime>& atClusterTimeForSnapshotReadConcern,
const boost::optional<LogicalTime>& placementConflictTimeForNonSnapshotReadConcern,
bool doAppendStartTransaction,
bool startOrContinueTransaction,
bool hasTxnCreatedAnyDatabase);

/**
Expand Down
82 changes: 54 additions & 28 deletions src/mongo/s/transaction_router_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,20 +389,30 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartTxnWithStartOrContinue) {
txnRouter.beginOrContinueTxn(
operationContext(), txnNum + 1, TransactionRouter::TransactionActions::kStartOrContinue);

BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern"
<< BSON("level"
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
<< "autocommit" << false << "txnNumber" << txnNum + 1);
const auto expectedShardVersion = exampleShardVersion();
const auto expectedDatabaseVersion = exampleDatabaseVersion();

{
BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern"
<< BSON("level"
<< "snapshot"
<< "atClusterTime"
<< kInMemoryLogicalTime.asTimestamp())
<< "shardVersion" << expectedShardVersion.toBSON()
<< "databaseVersion" << expectedDatabaseVersion.toBSON()
<< "startOrContinueTransaction" << true << "coordinator"
<< true << "autocommit" << false << "txnNumber"
<< txnNum + 1);
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
shard1,
BSON("insert"
<< "test"));
<< "test"
<< "databaseVersion"
<< expectedDatabaseVersion.toBSON()
<< "shardVersion"
<< expectedShardVersion.toBSON()));
ASSERT_BSONOBJ_EQ(expectedNewObj, newCmd);
}
}
Expand All @@ -421,7 +431,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SubRouterCannotCommit) {
<< BSON("level"
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
<< "startOrContinueTransaction" << true << "coordinator" << true
<< "autocommit" << false << "txnNumber" << txnNum + 1);

{
Expand Down Expand Up @@ -453,7 +463,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SubRouterCannotAbort) {
<< BSON("level"
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
<< "startOrContinueTransaction" << true << "coordinator" << true
<< "autocommit" << false << "txnNumber" << txnNum + 1);

{
Expand Down Expand Up @@ -482,7 +492,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SubRouterCannotImplicitlyAbort)
<< BSON("level"
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
<< "startOrContinueTransaction" << true << "coordinator" << true
<< "autocommit" << false << "txnNumber" << txnNum + 1);

{
Expand Down Expand Up @@ -517,21 +527,32 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartOrContinueWithMatchingReadC
txnRouter.beginOrContinueTxn(
operationContext(), txnNum + 1, TransactionRouter::TransactionActions::kStartOrContinue);

BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern"
<< BSON("level"
<< "majority"
<< "afterClusterTime"
<< kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
<< "autocommit" << false << "txnNumber" << txnNum + 1);

const auto shardVersion = exampleShardVersion();
auto expectedShardVersion = shardVersion;
expectedShardVersion.setPlacementConflictTime(kInMemoryLogicalTime);
const auto databaseVersion = exampleDatabaseVersion();
auto expectedDatabaseVersion = databaseVersion;
expectedDatabaseVersion.setPlacementConflictTime(kInMemoryLogicalTime);
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
shard1,
BSON("insert"
<< "test"));
BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern"
<< BSON("level"
<< "majority"
<< "afterClusterTime"
<< kInMemoryLogicalTime.asTimestamp())
<< "shardVersion" << expectedShardVersion.toBSON()
<< "databaseVersion" << expectedDatabaseVersion.toBSON()
<< "startOrContinueTransaction" << true << "coordinator"
<< true << "autocommit" << false << "txnNumber"
<< txnNum + 1);
auto newCmd =
txnRouter.attachTxnFieldsIfNeeded(operationContext(),
shard1,
BSON("insert"
<< "test"
<< "databaseVersion" << databaseVersion.toBSON()
<< "shardVersion" << shardVersion.toBSON()));
ASSERT_BSONOBJ_EQ(expectedNewObj, newCmd);
}

Expand Down Expand Up @@ -580,7 +601,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartOrContinueWithMismatchedRea
<< "majority"
<< "afterClusterTime"
<< kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
<< "startOrContinueTransaction" << true << "coordinator" << true
<< "autocommit" << false << "txnNumber" << txnNum + 1);

{
Expand Down Expand Up @@ -648,7 +669,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartOrContinueWithSnapshotRCSet
<< BSON("level"
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
<< "startOrContinueTransaction" << true << "coordinator" << true
<< "autocommit" << false << "txnNumber" << txnNum + 1);

{
Expand Down Expand Up @@ -1670,6 +1691,7 @@ TEST_F(TransactionRouterTest, AppendFieldsForStartTransactionDefaultRC) {
boost::none,
LogicalTime{Timestamp(10, 1)},
true /* doAppendStartTransaction */,
false /* doAppendStartOrContinueTransaction */,
hasTxnCreatedAnyDatabase);

ASSERT_EQ(result["MyCmd"].numberLong(), 1);
Expand Down Expand Up @@ -1707,6 +1729,7 @@ TEST_F(TransactionRouterTest, AppendFieldsForStartTransactionDefaultRCMajority)
boost::none,
LogicalTime{Timestamp(10, 1)},
true /* doAppendStartTransaction */,
false /* doAppendStartOrContinueTransaction */,
hasTxnCreatedAnyDatabase);

ASSERT_EQ(result["MyCmd"].numberLong(), 1);
Expand Down Expand Up @@ -1748,6 +1771,7 @@ TEST_F(TransactionRouterTest, AppendFieldsForStartTransactionDefaultRCCommandSpe
boost::none,
LogicalTime{Timestamp(10, 1)},
true /* doAppendStartTransaction */,
false /* doAppendStartOrContinueTransaction */,
hasTxnCreatedAnyDatabase);

ASSERT_EQ(result["MyCmd"].numberLong(), 1);
Expand Down Expand Up @@ -1789,6 +1813,7 @@ TEST_F(TransactionRouterTest, AppendFieldsForStartTransactionDefaultRCCommandSpe
LogicalTime(Timestamp(1, 2)),
boost::none,
false /* doAppendStartTransaction */,
false /* doAppendStartOrContinueTransaction */,
hasTxnCreatedAnyDatabase);

ASSERT_EQ(result["MyCmd"].numberLong(), 1);
Expand Down Expand Up @@ -1829,6 +1854,7 @@ TEST_F(TransactionRouterTest,
LogicalTime(Timestamp(1, 2)),
boost::none,
false /* doAppendStartTransaction */,
false /* doAppendStartOrContinueTransaction */,
hasTxnCreatedAnyDatabase);

ASSERT_EQ(result["MyCmd"].numberLong(), 1);
Expand Down

0 comments on commit cbe87b3

Please sign in to comment.