Skip to content

Commit

Permalink
feat: Implementing Connection#beginTransaction(TransactionDefinition)…
Browse files Browse the repository at this point in the history
… to support @transactional annotation (#738)

Implemented the following classes and methods to conform to R2DBC SPI which enables supporting `@Transactional` annotation in [Spring Data Dialect](https://github.com/GoogleCloudPlatform/spring-cloud-gcp/tree/main/spring-cloud-spanner-spring-data-r2dbc).

- `Connection#beginTransaction(TransactionDefinition)` method so that it can be used by a `TransactionManager` when `@Transactional` annotation is used. ✅
- `Connection#getTransactionIsolationLevel` returns SERIALIZABLE as it is the closest level to EXTERNAL_CONSISTENCY which spanner supports. ✅
- `Connection#setTransactionIsolationLevel(IsolationLevel)` accepts only `IsolationLevel.SERIALIZABLE` and throw `UnsupportedOperationException` otherwise. ✅
- Introduced `SpannerTransactionDefinition`, an implementation of `TransactionDefinition`(R2DBC spi) to configure the transaction attributes which will be used by `Connection#beginTransaction(TransactionDefinition)` method. ✅

Fixes: #238.
  • Loading branch information
jainsahab authored Oct 30, 2023
1 parent 3dc2488 commit bee69c1
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package com.google.cloud.spanner.r2dbc.v2;

import static com.google.cloud.spanner.r2dbc.v2.SpannerConstants.TIMESTAMP_BOUND;
import static com.google.common.base.MoreObjects.firstNonNull;
import static io.r2dbc.spi.IsolationLevel.SERIALIZABLE;
import static io.r2dbc.spi.TransactionDefinition.ISOLATION_LEVEL;
import static io.r2dbc.spi.TransactionDefinition.READ_ONLY;
import static java.lang.Boolean.TRUE;

import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.r2dbc.api.SpannerConnection;
import com.google.cloud.spanner.r2dbc.statement.StatementParser;
Expand Down Expand Up @@ -51,7 +58,18 @@ public Publisher<Void> beginTransaction() {

@Override
public Publisher<Void> beginTransaction(TransactionDefinition definition) {
return Mono.error(new UnsupportedOperationException());
IsolationLevel isolationLevel = firstNonNull(definition.getAttribute(ISOLATION_LEVEL),
SERIALIZABLE);
return validateIsolation(isolationLevel)
.then(Mono.defer(() -> {
boolean isReadOnly = TRUE.equals(definition.getAttribute(READ_ONLY));
if (isReadOnly) {
TimestampBound timestampBound = firstNonNull(definition.getAttribute(TIMESTAMP_BOUND),
TimestampBound.strong());
return this.clientLibraryAdapter.beginReadonlyTransaction(timestampBound);
}
return this.clientLibraryAdapter.beginTransaction();
}));
}

@Override
Expand Down Expand Up @@ -115,7 +133,7 @@ public ConnectionMetadata getMetadata() {

@Override
public IsolationLevel getTransactionIsolationLevel() {
throw new UnsupportedOperationException();
return IsolationLevel.SERIALIZABLE;
}

@Override
Expand All @@ -138,9 +156,10 @@ public Publisher<Void> setAutoCommit(boolean autoCommit) {
return this.clientLibraryAdapter.setAutoCommit(autoCommit);
}


@Override
public Publisher<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
throw new UnsupportedOperationException();
return this.validateIsolation(isolationLevel);
}

@Override
Expand All @@ -160,4 +179,14 @@ public Publisher<Void> close() {
public boolean isInReadonlyTransaction() {
return this.clientLibraryAdapter.isInReadonlyTransaction();
}

private Mono<Void> validateIsolation(IsolationLevel isolationLevel) {
if (isolationLevel == null) {
return Mono.error(new IllegalArgumentException("IsolationLevel can't be null."));
}
return isolationLevel == SERIALIZABLE ? Mono.empty()
: Mono.error(new UnsupportedOperationException(
String.format("Unsupported '%s' isolation level, Only SERIALIZABLE is supported.",
isolationLevel.asSql())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022-2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.r2dbc.v2;

import com.google.cloud.spanner.TimestampBound;
import io.r2dbc.spi.Option;

/**
* Spanner Constants.
*/
public class SpannerConstants {
public static final Option<TimestampBound> TIMESTAMP_BOUND = Option.valueOf("timestampBound");

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2022-2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.r2dbc.v2;

import static com.google.cloud.spanner.r2dbc.v2.SpannerConstants.TIMESTAMP_BOUND;
import static java.lang.Boolean.TRUE;

import io.r2dbc.spi.Option;
import io.r2dbc.spi.TransactionDefinition;
import java.util.HashMap;
import java.util.Map;

/**
* An implementation of {@link TransactionDefinition} for Spanner Database.
*/
public class SpannerTransactionDefinition implements TransactionDefinition {

private final Map<Option<?>, Object> internalMap;

SpannerTransactionDefinition(Map<Option<?>, Object> internalMap) {
validate(internalMap);
this.internalMap = internalMap;
}

private void validate(Map<Option<?>, Object> internalMap) {
boolean isReadOnlyTransaction = TRUE.equals(internalMap.get(READ_ONLY));
if (!isReadOnlyTransaction && internalMap.containsKey(TIMESTAMP_BOUND)) {
throw new IllegalArgumentException("TIMESTAMP_BOUND can only be configured for"
+ " read only transactions.");
}
}

@Override
public <T> T getAttribute(Option<T> option) {
return (T) this.internalMap.get(option);
}


/**
* A builder class for {@link SpannerTransactionDefinition}.
*/
public static class Builder {
private final Map<Option<?>, Object> internalMap;

public Builder() {
this.internalMap = new HashMap<>();
}

public <T> Builder with(Option<T> option, T value) {
this.internalMap.put(option, value);
return this;
}

public SpannerTransactionDefinition build() {
return new SpannerTransactionDefinition(this.internalMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package com.google.cloud.spanner.r2dbc.v2;

import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED;
import static io.r2dbc.spi.IsolationLevel.READ_UNCOMMITTED;
import static io.r2dbc.spi.IsolationLevel.REPEATABLE_READ;
import static io.r2dbc.spi.IsolationLevel.SERIALIZABLE;
import static io.r2dbc.spi.TransactionDefinition.ISOLATION_LEVEL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
Expand All @@ -26,9 +31,10 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.TransactionDefinition;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand All @@ -47,19 +53,131 @@ class SpannerClientLibraryConnectionTest {
public void setUpMocks() {
this.mockAdapter = mock(DatabaseClientReactiveAdapter.class);
this.connection = new SpannerClientLibraryConnection(this.mockAdapter);

when(this.mockAdapter.beginReadonlyTransaction(any())).thenReturn(Mono.empty());
when(this.mockAdapter.beginTransaction()).thenReturn(Mono.empty());
}

@Test
void beginReadonlyTransactionUsesStrongConsistencyByDefault() {
StepVerifier.create(this.connection.beginReadonlyTransaction())
.verifyComplete();

when(this.mockAdapter.beginReadonlyTransaction(any())).thenReturn(Mono.empty());
verify(this.mockAdapter).beginReadonlyTransaction(TimestampBound.strong());
}

StepVerifier.create(this.connection.beginReadonlyTransaction())
@Test
void shouldBeginTransactionInReadOnlyMode() {
SpannerTransactionDefinition readOnlyDefinition = new SpannerTransactionDefinition.Builder()
.with(TransactionDefinition.READ_ONLY, true)
.build();

StepVerifier.create(this.connection.beginTransaction(readOnlyDefinition))
.verifyComplete();

verify(this.mockAdapter).beginReadonlyTransaction(TimestampBound.strong());
}

@Test
void shouldBeginTransactionInReadWriteMode() {
SpannerTransactionDefinition readWriteDefinition = new SpannerTransactionDefinition.Builder()
.with(TransactionDefinition.READ_ONLY, false)
.build();

StepVerifier.create(this.connection.beginTransaction(readWriteDefinition))
.verifyComplete();

verify(this.mockAdapter).beginTransaction();
}

@Test
void shouldBeginTransactionInReadWriteModeByDefault() {
SpannerTransactionDefinition readWriteDefinition = new SpannerTransactionDefinition.Builder()
.build(); // absence of attribute indicates read write transaction

StepVerifier.create(this.connection.beginTransaction(readWriteDefinition))
.verifyComplete();

verify(this.mockAdapter).beginTransaction();
}

@Test
void shouldBeginTransactionWithGivenTimestampBound() {
TimestampBound fiveSecondTimestampBound = TimestampBound.ofExactStaleness(5L, TimeUnit.SECONDS);

SpannerTransactionDefinition staleTransactionDefinition =
new SpannerTransactionDefinition.Builder()
.with(TransactionDefinition.READ_ONLY, true)
.with(SpannerConstants.TIMESTAMP_BOUND, fiveSecondTimestampBound)
.build();

StepVerifier.create(this.connection.beginTransaction(staleTransactionDefinition))
.verifyComplete();

verify(this.mockAdapter).beginReadonlyTransaction(fiveSecondTimestampBound);
}

@Test
void shouldThrowErrorWhenBeginTransactionWithOtherThanDefaultOrSerializable() {
SpannerTransactionDefinition.Builder builder = new SpannerTransactionDefinition.Builder();

// default isolation
SpannerTransactionDefinition defaultIsolation = builder.with(ISOLATION_LEVEL, null)
.build();
StepVerifier.create(this.connection.beginTransaction(defaultIsolation))
.verifyComplete();

// SERIALIZABLE isolation
SpannerTransactionDefinition serializable = builder.with(ISOLATION_LEVEL, SERIALIZABLE)
.build();
StepVerifier.create(this.connection.beginTransaction(serializable))
.verifyComplete();

// READ_COMMITTED isolation
SpannerTransactionDefinition readCommitted = builder.with(ISOLATION_LEVEL, READ_COMMITTED)
.build();
StepVerifier.create(this.connection.beginTransaction(readCommitted))
.expectError(UnsupportedOperationException.class)
.verify();

// READ_UNCOMMITTED isolation
SpannerTransactionDefinition readUncommitted = builder.with(ISOLATION_LEVEL, READ_UNCOMMITTED)
.build();
StepVerifier.create(this.connection.beginTransaction(readUncommitted))
.expectError(UnsupportedOperationException.class)
.verify();

// REPEATABLE_READ isolation
SpannerTransactionDefinition repeatableRead = builder.with(ISOLATION_LEVEL, REPEATABLE_READ)
.build();
StepVerifier.create(this.connection.beginTransaction(repeatableRead))
.expectError(UnsupportedOperationException.class)
.verify();
}

@Test
void shouldGetAndSetSerializationOnlyAsIsolationLevel() {
StepVerifier.create(this.connection.setTransactionIsolationLevel(SERIALIZABLE))
.verifyComplete();
assertThat(this.connection.getTransactionIsolationLevel()).isEqualTo(SERIALIZABLE);

StepVerifier.create(this.connection.setTransactionIsolationLevel(READ_COMMITTED))
.expectError(UnsupportedOperationException.class)
.verify();

StepVerifier.create(this.connection.setTransactionIsolationLevel(READ_UNCOMMITTED))
.expectError(UnsupportedOperationException.class)
.verify();

StepVerifier.create(this.connection.setTransactionIsolationLevel(REPEATABLE_READ))
.expectError(UnsupportedOperationException.class)
.verify();

StepVerifier.create(this.connection.setTransactionIsolationLevel(null))
.expectError(IllegalArgumentException.class)
.verify();
}

@Test
void batchUsesCorrectAdapter() {
Batch batch = this.connection.createBatch();
Expand All @@ -81,13 +199,6 @@ void batchUsesCorrectAdapter() {
assertThat(args.get(0).getSql()).isEqualTo("UPDATE tbl SET col1=val1");
}

@Test
void beginTransactionCustomDefinitionNotSupported() {
StepVerifier.create(
this.connection.beginTransaction(IsolationLevel.SERIALIZABLE)
).verifyError(UnsupportedOperationException.class);
}

@Test
void setLockWaitTimeoutNotSupported() {
StepVerifier.create(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2022-2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.r2dbc.v2;

import static com.google.cloud.spanner.r2dbc.v2.SpannerConstants.TIMESTAMP_BOUND;
import static io.r2dbc.spi.TransactionDefinition.READ_ONLY;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.google.cloud.spanner.TimestampBound;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;



class SpannerTransactionDefinitionTest {

@Test
void shouldThrowExceptionIfTimeStampBoundIsConfiguredWithReadWriteTransaction() {
SpannerTransactionDefinition.Builder builder1 = new SpannerTransactionDefinition.Builder()
.with(TIMESTAMP_BOUND, TimestampBound.ofExactStaleness(5, TimeUnit.SECONDS))
.with(READ_ONLY, false);

// absence of READ_ONLY attribute indicates read write transaction
SpannerTransactionDefinition.Builder builder2 = new SpannerTransactionDefinition.Builder()
.with(TIMESTAMP_BOUND, TimestampBound.ofExactStaleness(5, TimeUnit.SECONDS));

assertThrows(IllegalArgumentException.class, builder1::build);
assertThrows(IllegalArgumentException.class, builder2::build);
}
}

0 comments on commit bee69c1

Please sign in to comment.