Skip to content

Commit

Permalink
chore(spanner): track precommit token for R/W multiplexed session (#3411
Browse files Browse the repository at this point in the history
)

When a read-write transaction is executed on a multiplexed session, the RPC responses of that transaction return a `MultiplexedSessionPrecommitToken`. In client library, the precommit token with the highest sequence number is tracked at the transaction context level. During the commit, this latest precommit token is fetched and set in the CommitRequest. If the precommit token is not set during the commit, the backend will throw an `INVALID_ARGUMENT` error.

Including the latest token in the CommitRequest is essential to prevent latency regression, though it does not impact the correctness of the transaction.

This PR tracks the precommit token from the following RPC responses,
1. ResultSet
2. PartialResultSet
3. ExecuteBatchDmlResponse
  • Loading branch information
harshachinta authored Oct 18, 2024
1 parent 1e8b82c commit aeeea3c
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RequestOptions;
Expand Down Expand Up @@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) {
this.session.onReadDone();
}

/**
* For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
* present in the RPC response. In such cases, this method will be a no-op.
*/
@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}

private ResultSet readInternal(
String table,
@Nullable String index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.protobuf.ListValue;
import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.Value.KindCase;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.Transaction;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)

/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);

/**
* Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token
* will be included if the read-write transaction is executed on a multiplexed session.
*/
void onPrecommitToken(MultiplexedSessionPrecommitToken token);
}

static final class LazyByteArray implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR

GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
this.iterator = new GrpcValueIterator(iterator);
this.iterator = new GrpcValueIterator(iterator, listener);
this.listener = listener;
this.decodeMode = decodeMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.Listener;
import com.google.common.collect.AbstractIterator;
import com.google.protobuf.ListValue;
import com.google.protobuf.Value.KindCase;
Expand All @@ -44,9 +45,11 @@ private enum StreamValue {
private PartialResultSet current;
private int pos;
private ResultSetStats statistics;
private final Listener listener;

GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
this.stream = stream;
this.listener = listener;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
}
}
// collect the precommit token from each PartialResultSet
if (current.hasPrecommitToken()) {
listener.onPrecommitToken(current.getPrecommitToken());
}
if (current.hasStats()) {
statistics = current.getStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.spanner.v1.ExecuteBatchDmlResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetStats;
Expand Down Expand Up @@ -179,6 +180,11 @@ public void removeListener(Runnable listener) {
@GuardedBy("committingLock")
private volatile boolean committing;

private final Object precommitTokenLock = new Object();

@GuardedBy("precommitTokenLock")
private MultiplexedSessionPrecommitToken latestPrecommitToken;

@GuardedBy("lock")
private volatile SettableApiFuture<Void> finishedAsyncOperations = SettableApiFuture.create();

Expand Down Expand Up @@ -439,6 +445,10 @@ public void run() {
}
requestBuilder.setRequestOptions(requestOptionsBuilder.build());
}
if (session.getIsMultiplexed() && getLatestPrecommitToken() != null) {
// Set the precommit token in the CommitRequest for multiplexed sessions.
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
Expand Down Expand Up @@ -643,6 +653,25 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}
}

/**
* In read-write transactions, the precommit token with the highest sequence number from this
* transaction attempt will be tracked and included in the
* [Commit][google.spanner.v1.Spanner.Commit] request for the transaction.
*/
@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {
if (token == null) {
return;
}
synchronized (precommitTokenLock) {
if (this.latestPrecommitToken == null
|| token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) {
this.latestPrecommitToken = token;
txnLogger.log(Level.FINE, "Updating precommit token to " + this.latestPrecommitToken);
}
}
}

@Nullable
String getTransactionTag() {
if (this.options.hasTag()) {
Expand All @@ -651,6 +680,13 @@ String getTransactionTag() {
return null;
}

@Nullable
MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
synchronized (precommitTokenLock) {
return this.latestPrecommitToken;
}
}

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
e = super.onError(e, withBeginTransaction);
Expand Down Expand Up @@ -829,6 +865,9 @@ private ResultSet internalExecuteUpdate(
throw new IllegalArgumentException(
"DML response missing stats possibly due to non-DML statement as input");
}
if (resultSet.hasPrecommitToken()) {
onPrecommitToken(resultSet.getPrecommitToken());
}
return resultSet;
} catch (Throwable t) {
throw onError(
Expand Down Expand Up @@ -903,6 +942,9 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
resultSet.get().getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
if (resultSet.get().hasPrecommitToken()) {
onPrecommitToken(resultSet.get().getPrecommitToken());
}
} catch (Throwable e) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
Expand Down Expand Up @@ -958,6 +1000,10 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
}
}

if (response.hasPrecommitToken()) {
onPrecommitToken(response.getPrecommitToken());
}

// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (response.getStatus().getCode() == Code.ABORTED_VALUE) {
Expand Down Expand Up @@ -1022,6 +1068,9 @@ public ApiFuture<long[]> batchUpdateAsync(
builder.getTransaction().hasBegin());
}
}
if (batchDmlResponse.hasPrecommitToken()) {
onPrecommitToken(batchDmlResponse.getPrecommitToken());
}
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.QueryPlan;
import com.google.spanner.v1.ResultSetMetadata;
Expand Down Expand Up @@ -77,6 +78,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction

@Override
public void onDone(boolean withBeginTransaction) {}

@Override
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
}

@Before
Expand Down
Loading

0 comments on commit aeeea3c

Please sign in to comment.