Skip to content

Commit

Permalink
chore(spanner): preserving lock order - R/W mux session (#3348)
Browse files Browse the repository at this point in the history
This PR introduces changes to support the lock order preservation protocol for multiplexed sessions in read/write transactions. According to this protocol, when a read/write transaction on a multiplexed session is retried, the transaction ID from the previous abort must be passed when creating a new transaction. This ensures that the retried transaction is recognized as older, rather than being treated as a new one.

The transaction context object is structured as follows,
```
txn {
    transactionId: The transaction ID.
    previousTransactionId: The transaction ID of the most recently failed transaction.
}
```
  • Loading branch information
harshachinta authored Oct 16, 2024
1 parent 0836101 commit f370394
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;

/** Implementation of {@link AsyncTransactionManager}. */
final class AsyncTransactionManagerImpl
Expand Down Expand Up @@ -80,7 +81,19 @@ public TransactionContextFutureImpl beginAsync() {

private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
}

txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (firstAttempt) {
session.setActive(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class DatabaseClientImpl implements DatabaseClient {
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer,
false);
/* useMultiplexedSessionForRW = */ false);
}

DatabaseClientImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ static void throwIfTransactionsPending() {
}
}

static TransactionOptions createReadWriteTransactionOptions(Options options) {
static TransactionOptions createReadWriteTransactionOptions(
Options options, ByteString previousTransactionId) {
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptions.setExcludeTxnFromChangeStreams(true);
Expand All @@ -78,6 +79,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
if (options.withOptimisticLock() == Boolean.TRUE) {
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
}
if (previousTransactionId != null
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
}
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}
Expand Down Expand Up @@ -427,13 +432,17 @@ public void close() {
}

ApiFuture<ByteString> beginTransactionAsync(
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
Options transactionOptions,
boolean routeToLeader,
Map<SpannerRpc.Option, ?> channelHint,
ByteString previousTransactionId) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(getName())
.setOptions(createReadWriteTransactionOptions(transactionOptions))
.setOptions(
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
.build();
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
Expand Down Expand Up @@ -469,11 +478,12 @@ ApiFuture<ByteString> beginTransactionAsync(
return res;
}

TransactionContextImpl newTransaction(Options options) {
TransactionContextImpl newTransaction(Options options, ByteString previousTransactionId) {
return TransactionContextImpl.newBuilder()
.setSession(this)
.setOptions(options)
.setTransactionId(null)
.setPreviousTransactionId(previousTransactionId)
.setOptions(options)
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
.setRpc(spanner.getRpc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;

/** Implementation of {@link TransactionManager}. */
final class TransactionManagerImpl implements TransactionManager, SessionTransaction {
Expand Down Expand Up @@ -53,7 +54,7 @@ public void setSpan(ISpan span) {
public TransactionContext begin() {
Preconditions.checkState(txn == null, "begin can only be called once");
try (IScope s = tracer.withSpan(span)) {
txn = session.newTransaction(options);
txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY);
session.setActive(this);
txnState = TransactionState.STARTED;
return txn;
Expand Down Expand Up @@ -102,7 +103,18 @@ public TransactionContext resetForRetry() {
}
try (IScope s = tracer.withSpan(span)) {
boolean useInlinedBegin = txn.transactionId != null;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (session.getIsMultiplexed()) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
}
txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (!useInlinedBegin) {
txn.ensureTxn();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ static class Builder extends AbstractReadContext.Builder<Builder, TransactionCon

private Clock clock = new Clock();
private ByteString transactionId;
// This field is set only when the transaction is created during a retry and uses a
// multiplexed session.
private ByteString previousTransactionId;
private Options options;
private boolean trackTransactionStarter;

Expand All @@ -118,6 +121,11 @@ Builder setTrackTransactionStarter(boolean trackTransactionStarter) {
return self();
}

Builder setPreviousTransactionId(ByteString previousTransactionId) {
this.previousTransactionId = previousTransactionId;
return self();
}

@Override
TransactionContextImpl build() {
Preconditions.checkState(this.options != null, "Options must be set");
Expand Down Expand Up @@ -201,6 +209,8 @@ public void removeListener(Runnable listener) {

volatile ByteString transactionId;

final ByteString previousTransactionId;

private CommitResponse commitResponse;
private final Clock clock;

Expand All @@ -216,6 +226,7 @@ private TransactionContextImpl(Builder builder) {
this.channelHint =
getChannelHintOptions(
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
this.previousTransactionId = builder.previousTransactionId;
}

@Override
Expand Down Expand Up @@ -246,6 +257,10 @@ private void decreaseAsyncOperations() {
}
}

ByteString getPreviousTransactionId() {
return this.previousTransactionId;
}

@Override
public void close() {
// Only mark the context as closed, but do not end the tracer span, as that is done by the
Expand Down Expand Up @@ -283,7 +298,8 @@ ApiFuture<Void> ensureTxnAsync() {
private void createTxnAsync(final SettableApiFuture<Void> res) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut =
session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint());
session.beginTransactionAsync(
options, isRouteToLeader(), getTransactionChannelHint(), getPreviousTransactionId());
fut.addListener(
() -> {
try {
Expand Down Expand Up @@ -558,7 +574,9 @@ TransactionSelector getTransactionSelector() {
}
if (tx == null) {
return TransactionSelector.newBuilder()
.setBegin(SessionImpl.createReadWriteTransactionOptions(options))
.setBegin(
SessionImpl.createReadWriteTransactionOptions(
options, getPreviousTransactionId()))
.build();
} else {
// Wait for the transaction to come available. The tx.get() call will fail with an
Expand Down Expand Up @@ -1079,7 +1097,7 @@ public TransactionRunner allowNestedTransaction() {
TransactionRunnerImpl(SessionImpl session, TransactionOption... options) {
this.session = session;
this.options = Options.fromTransactionOptions(options);
this.txn = session.newTransaction(this.options);
this.txn = session.newTransaction(this.options, /* previousTransactionId = */ ByteString.EMPTY);
this.tracer = session.getTracer();
}

Expand Down Expand Up @@ -1118,7 +1136,19 @@ private <T> T runInternal(final TransactionCallable<T> txCallable) {
// Do not inline the BeginTransaction during a retry if the initial attempt did not
// actually start a transaction.
useInlinedBegin = txn.transactionId != null;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (session.getIsMultiplexed()) {
// Use the current transactionId if available, otherwise fallback to the previous
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
}

txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
}
checkState(
isValid, "TransactionRunner has been invalidated by a new operation on the session");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@

package com.google.cloud.spanner;

import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.protobuf.ByteString;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import org.junit.Test;
Expand All @@ -42,7 +48,7 @@ public void testCommitReturnsCommitStats() {
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
try (AsyncTransactionManagerImpl manager =
new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats())))
when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
.thenReturn(transaction);
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1);
Expand All @@ -54,4 +60,67 @@ public void testCommitReturnsCommitStats() {
verify(transaction).commitAsync();
}
}

@Test
public void testRetryUsesPreviousTransactionIdOnMultiplexedSession() {
// Set up mock transaction IDs
final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId");
final ByteString mockPreviousTransactionId =
ByteString.copyFromUtf8("mockPreviousTransactionId");

Span oTspan = mock(Span.class);
ISpan span = new OpenTelemetrySpan(oTspan);
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
// Mark the session as multiplexed.
when(session.getIsMultiplexed()).thenReturn(true);

// Initialize a mock transaction with transactionId = null, previousTransactionId = null.
transaction = mock(TransactionRunnerImpl.TransactionContextImpl.class);
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
.thenReturn(transaction);

// Simulate an ABORTED error being thrown when `commitAsync()` is called.
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, ""))
.when(transaction)
.commitAsync();

try (AsyncTransactionManagerImpl manager =
new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
manager.beginAsync();

// Verify that for the first transaction attempt, the `previousTransactionId` is
// ByteString.EMPTY.
// This is because no transaction has been previously aborted at this point.
verify(session)
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), ByteString.EMPTY);
assertThrows(AbortedException.class, manager::commitAsync);
clearInvocations(session);

// Mock the transaction object to contain transactionID=null and
// previousTransactionId=mockPreviousTransactionId
when(transaction.getPreviousTransactionId()).thenReturn(mockPreviousTransactionId);
manager.resetForRetryAsync();
// Verify that in the first retry attempt, the `previousTransactionId`
// (mockPreviousTransactionId) is passed to the new transaction.
// This allows Spanner to retry the transaction using the ID of the aborted transaction.
verify(session)
.newTransaction(
Options.fromTransactionOptions(Options.commitStats()), mockPreviousTransactionId);
assertThrows(AbortedException.class, manager::commitAsync);
clearInvocations(session);

// Mock the transaction object to contain transactionID=mockTransactionId and
// previousTransactionId=mockPreviousTransactionId and transactionID = null
transaction.transactionId = mockTransactionId;
manager.resetForRetryAsync();
// Verify that the latest `transactionId` (mockTransactionId) is used in the retry.
// This ensures the retry logic is working as expected with the latest transaction ID.
verify(session)
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), mockTransactionId);

when(transaction.rollbackAsync()).thenReturn(ApiFutures.immediateFuture(null));
manager.closeAsync();
}
}
}
Loading

0 comments on commit f370394

Please sign in to comment.