Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(spanner): preserving lock order - R/W mux session #3348

Merged
merged 18 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;

/** Implementation of {@link AsyncTransactionManager}. */
final class AsyncTransactionManagerImpl
Expand Down Expand Up @@ -80,7 +81,19 @@ public TransactionContextFutureImpl beginAsync() {

private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.previousTransactionId;
}

txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (firstAttempt) {
session.setActive(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class DatabaseClientImpl implements DatabaseClient {
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer,
false);
/* useMultiplexedSessionForRW = */ false);
}

DatabaseClientImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ static void throwIfTransactionsPending() {
}
}

static TransactionOptions createReadWriteTransactionOptions(Options options) {
static TransactionOptions createReadWriteTransactionOptions(
Options options, ByteString previousTransactionId) {
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptions.setExcludeTxnFromChangeStreams(true);
Expand All @@ -78,6 +79,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
if (options.withOptimisticLock() == Boolean.TRUE) {
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
}
if (previousTransactionId != null
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
}
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}
Expand Down Expand Up @@ -427,13 +432,17 @@ public void close() {
}

ApiFuture<ByteString> beginTransactionAsync(
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
Options transactionOptions,
boolean routeToLeader,
Map<SpannerRpc.Option, ?> channelHint,
ByteString previousTransactionId) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(getName())
.setOptions(createReadWriteTransactionOptions(transactionOptions))
.setOptions(
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
.build();
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
Expand Down Expand Up @@ -469,11 +478,12 @@ ApiFuture<ByteString> beginTransactionAsync(
return res;
}

TransactionContextImpl newTransaction(Options options) {
TransactionContextImpl newTransaction(Options options, ByteString previousTransactionId) {
return TransactionContextImpl.newBuilder()
.setSession(this)
.setOptions(options)
.setTransactionId(null)
.setPreviousTransactionId(previousTransactionId)
.setOptions(options)
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
.setRpc(spanner.getRpc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;

/** Implementation of {@link TransactionManager}. */
final class TransactionManagerImpl implements TransactionManager, SessionTransaction {
Expand Down Expand Up @@ -53,7 +54,7 @@ public void setSpan(ISpan span) {
public TransactionContext begin() {
Preconditions.checkState(txn == null, "begin can only be called once");
try (IScope s = tracer.withSpan(span)) {
txn = session.newTransaction(options);
txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY);
session.setActive(this);
txnState = TransactionState.STARTED;
return txn;
Expand Down Expand Up @@ -102,7 +103,18 @@ public TransactionContext resetForRetry() {
}
try (IScope s = tracer.withSpan(span)) {
boolean useInlinedBegin = txn.transactionId != null;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (session.getIsMultiplexed()) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.previousTransactionId;
}
txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
if (!useInlinedBegin) {
txn.ensureTxn();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ static class Builder extends AbstractReadContext.Builder<Builder, TransactionCon

private Clock clock = new Clock();
private ByteString transactionId;
// This field is set only when the transaction is created during a retry and uses a
// multiplexed session.
private ByteString previousTransactionId;
private Options options;
private boolean trackTransactionStarter;

Expand All @@ -118,6 +121,11 @@ Builder setTrackTransactionStarter(boolean trackTransactionStarter) {
return self();
}

Builder setPreviousTransactionId(ByteString previousTransactionId) {
this.previousTransactionId = previousTransactionId;
return self();
}

@Override
TransactionContextImpl build() {
Preconditions.checkState(this.options != null, "Options must be set");
Expand Down Expand Up @@ -201,6 +209,8 @@ public void removeListener(Runnable listener) {

volatile ByteString transactionId;

ByteString previousTransactionId;
harshachinta marked this conversation as resolved.
Show resolved Hide resolved

private CommitResponse commitResponse;
private final Clock clock;

Expand All @@ -216,6 +226,7 @@ private TransactionContextImpl(Builder builder) {
this.channelHint =
getChannelHintOptions(
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
this.previousTransactionId = builder.previousTransactionId;
}

@Override
Expand Down Expand Up @@ -283,7 +294,8 @@ ApiFuture<Void> ensureTxnAsync() {
private void createTxnAsync(final SettableApiFuture<Void> res) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut =
session.beginTransactionAsync(options, isRouteToLeader(), getTransactionChannelHint());
session.beginTransactionAsync(
options, isRouteToLeader(), getTransactionChannelHint(), previousTransactionId);
fut.addListener(
() -> {
try {
Expand Down Expand Up @@ -558,7 +570,8 @@ TransactionSelector getTransactionSelector() {
}
if (tx == null) {
return TransactionSelector.newBuilder()
.setBegin(SessionImpl.createReadWriteTransactionOptions(options))
.setBegin(
SessionImpl.createReadWriteTransactionOptions(options, previousTransactionId))
.build();
} else {
// Wait for the transaction to come available. The tx.get() call will fail with an
Expand Down Expand Up @@ -1079,7 +1092,7 @@ public TransactionRunner allowNestedTransaction() {
TransactionRunnerImpl(SessionImpl session, TransactionOption... options) {
this.session = session;
this.options = Options.fromTransactionOptions(options);
this.txn = session.newTransaction(this.options);
this.txn = session.newTransaction(this.options, /* previousTransactionId = */ ByteString.EMPTY);
this.tracer = session.getTracer();
}

Expand Down Expand Up @@ -1118,7 +1131,19 @@ private <T> T runInternal(final TransactionCallable<T> txCallable) {
// Do not inline the BeginTransaction during a retry if the initial attempt did not
// actually start a transaction.
useInlinedBegin = txn.transactionId != null;
txn = session.newTransaction(options);

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (session.getIsMultiplexed()) {
// Use the current transactionId if available, otherwise fallback to the previous
// transactionId.
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
multiplexedSessionPreviousTransactionId =
txn.transactionId != null ? txn.transactionId : txn.previousTransactionId;
}

txn =
session.newTransaction(
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
}
checkState(
isValid, "TransactionRunner has been invalidated by a new operation on the session");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@

package com.google.cloud.spanner;

import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.protobuf.ByteString;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import org.junit.Test;
Expand All @@ -42,7 +48,7 @@ public void testCommitReturnsCommitStats() {
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
try (AsyncTransactionManagerImpl manager =
new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats())))
when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
.thenReturn(transaction);
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1);
Expand All @@ -54,4 +60,67 @@ public void testCommitReturnsCommitStats() {
verify(transaction).commitAsync();
}
}

@Test
public void testRetryUsesPreviousTransactionIdOnMultiplexedSession() {
// Set up mock transaction IDs
final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId");
final ByteString mockPreviousTransactionId =
ByteString.copyFromUtf8("mockPreviousTransactionId");

Span oTspan = mock(Span.class);
ISpan span = new OpenTelemetrySpan(oTspan);
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
// Mark the session as multiplexed.
when(session.getIsMultiplexed()).thenReturn(true);

// Initialize a mock transaction with transactionId = null, previousTransactionId = null.
transaction = mock(TransactionRunnerImpl.TransactionContextImpl.class);
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
.thenReturn(transaction);

// Simulate an ABORTED error being thrown when `commitAsync()` is called.
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, ""))
.when(transaction)
.commitAsync();

try (AsyncTransactionManagerImpl manager =
new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
manager.beginAsync();

// Verify that for the first transaction attempt, the `previousTransactionId` is
// ByteString.EMPTY.
// This is because no transaction has been previously aborted at this point.
verify(session)
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), ByteString.EMPTY);
assertThrows(AbortedException.class, manager::commitAsync);
clearInvocations(session);

// Mock the transaction object to contain transactionID=null and
// previousTransactionId=mockPreviousTransactionId
transaction.previousTransactionId = mockPreviousTransactionId;
manager.resetForRetryAsync();
// Verify that in the first retry attempt, the `previousTransactionId`
// (mockPreviousTransactionId) is passed to the new transaction.
// This allows Spanner to retry the transaction using the ID of the aborted transaction.
verify(session)
.newTransaction(
Options.fromTransactionOptions(Options.commitStats()), mockPreviousTransactionId);
assertThrows(AbortedException.class, manager::commitAsync);
clearInvocations(session);

// Mock the transaction object to contain transactionID=mockTransactionId and
// previousTransactionId=mockPreviousTransactionId and transactionID = null
transaction.transactionId = mockTransactionId;
manager.resetForRetryAsync();
// Verify that the latest `transactionId` (mockTransactionId) is used in the retry.
// This ensures the retry logic is working as expected with the latest transaction ID.
verify(session)
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), mockTransactionId);

when(transaction.rollbackAsync()).thenReturn(ApiFutures.immediateFuture(null));
manager.closeAsync();
}
}
}
Loading
Loading