Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implementing Connection#beginTransaction(TransactionDefinition) to support @Transactional annotation #738

Merged
merged 13 commits into from
Oct 30, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package com.google.cloud.spanner.r2dbc.v2;

import static com.google.cloud.spanner.r2dbc.v2.SpannerConstants.TIMESTAMP_BOUND;
import static com.google.common.base.MoreObjects.firstNonNull;
import static io.r2dbc.spi.IsolationLevel.SERIALIZABLE;
import static io.r2dbc.spi.TransactionDefinition.ISOLATION_LEVEL;
import static io.r2dbc.spi.TransactionDefinition.READ_ONLY;
import static java.lang.Boolean.TRUE;

import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.r2dbc.api.SpannerConnection;
import com.google.cloud.spanner.r2dbc.statement.StatementParser;
Expand Down Expand Up @@ -51,7 +58,16 @@ public Publisher<Void> beginTransaction() {

@Override
public Publisher<Void> beginTransaction(TransactionDefinition definition) {
return Mono.error(new UnsupportedOperationException());
return validateIsolation(definition.getAttribute(ISOLATION_LEVEL))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this resolving #238?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially yes,

#238 talks about read-only, read-write, stale-read and partitioned DML. Out of these first three are supported but not partitioned DML but that is not supported by the driver itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a separate issue for DML support, and close #238 as part of this PR?

Copy link
Contributor Author

@jainsahab jainsahab Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, I've created #747.

and whenever you close this issue merge this PR, it'll will close #238

.then(Mono.defer(() -> {
boolean isReadOnly = TRUE.equals(definition.getAttribute(READ_ONLY));
if (isReadOnly) {
TimestampBound timestampBound = firstNonNull(definition.getAttribute(TIMESTAMP_BOUND),
TimestampBound.strong());
return this.clientLibraryAdapter.beginReadonlyTransaction(timestampBound);
}
return this.clientLibraryAdapter.beginTransaction();
olavloite marked this conversation as resolved.
Show resolved Hide resolved
}));
}

@Override
Expand Down Expand Up @@ -115,7 +131,7 @@ public ConnectionMetadata getMetadata() {

@Override
public IsolationLevel getTransactionIsolationLevel() {
throw new UnsupportedOperationException();
return IsolationLevel.SERIALIZABLE;
}

@Override
Expand All @@ -140,7 +156,11 @@ public Publisher<Void> setAutoCommit(boolean autoCommit) {

@Override
public Publisher<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
throw new UnsupportedOperationException();
if (isolationLevel == null) {
return Mono.error(new IllegalArgumentException("IsolationLevel can't be null."));
}
jainsahab marked this conversation as resolved.
Show resolved Hide resolved

return this.validateIsolation(isolationLevel);
meltsufin marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -160,4 +180,10 @@ public Publisher<Void> close() {
public boolean isInReadonlyTransaction() {
return this.clientLibraryAdapter.isInReadonlyTransaction();
}

private Mono<Void> validateIsolation(IsolationLevel isolationLevel) {
boolean valid = isolationLevel == null || isolationLevel == SERIALIZABLE;
jainsahab marked this conversation as resolved.
Show resolved Hide resolved
return valid ? Mono.empty() : Mono.error(new UnsupportedOperationException(
String.format("'%s' isolation level not supported", isolationLevel.asSql())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022-2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.r2dbc.v2;

import com.google.cloud.spanner.TimestampBound;
import io.r2dbc.spi.Option;

/**
* Spanner Constants.
*/
public class SpannerConstants {
public static final Option<TimestampBound> TIMESTAMP_BOUND = Option.valueOf("timestampBound");

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2022-2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.r2dbc.v2;

import io.r2dbc.spi.Option;
import io.r2dbc.spi.TransactionDefinition;
import java.util.HashMap;
import java.util.Map;

/**
* An implementation of {@link TransactionDefinition} for Spanner Database.
*/
public class SpannerTransactionDefinition implements TransactionDefinition {

private final Map<Option<?>, Object> internalMap;

SpannerTransactionDefinition(Map<Option<?>, Object> internalMap) {
this.internalMap = internalMap;
}

@Override
public <T> T getAttribute(Option<T> option) {
return (T) this.internalMap.get(option);
}


/**
* A builder class for {@link SpannerTransactionDefinition}.
*/
public static class Builder {
private final Map<Option<?>, Object> internalMap;

public Builder() {
this.internalMap = new HashMap<>();
}

public <T> Builder with(Option<T> option, T value) {
this.internalMap.put(option, value);
return this;
}

public SpannerTransactionDefinition build() {
return new SpannerTransactionDefinition(this.internalMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package com.google.cloud.spanner.r2dbc.v2;

import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED;
import static io.r2dbc.spi.IsolationLevel.READ_UNCOMMITTED;
import static io.r2dbc.spi.IsolationLevel.REPEATABLE_READ;
import static io.r2dbc.spi.IsolationLevel.SERIALIZABLE;
import static io.r2dbc.spi.TransactionDefinition.ISOLATION_LEVEL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
Expand All @@ -25,10 +30,12 @@

import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.r2dbc.v2.SpannerTransactionDefinition;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.TransactionDefinition;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand All @@ -47,19 +54,130 @@ class SpannerClientLibraryConnectionTest {
public void setUpMocks() {
this.mockAdapter = mock(DatabaseClientReactiveAdapter.class);
this.connection = new SpannerClientLibraryConnection(this.mockAdapter);

when(this.mockAdapter.beginReadonlyTransaction(any())).thenReturn(Mono.empty());
when(this.mockAdapter.beginTransaction()).thenReturn(Mono.empty());
}

@Test
void beginReadonlyTransactionUsesStrongConsistencyByDefault() {
StepVerifier.create(this.connection.beginReadonlyTransaction())
.verifyComplete();

when(this.mockAdapter.beginReadonlyTransaction(any())).thenReturn(Mono.empty());
verify(this.mockAdapter).beginReadonlyTransaction(TimestampBound.strong());
}

StepVerifier.create(this.connection.beginReadonlyTransaction())
@Test
void shouldBeginTransactionInReadOnlyMode() {
SpannerTransactionDefinition readOnlyDefinition = new SpannerTransactionDefinition.Builder()
.with(TransactionDefinition.READ_ONLY, true)
.build();

StepVerifier.create(this.connection.beginTransaction(readOnlyDefinition))
.verifyComplete();

verify(this.mockAdapter).beginReadonlyTransaction(TimestampBound.strong());
}

@Test
void shouldBeginTransactionInReadWriteMode() {
SpannerTransactionDefinition readWriteDefinition = new SpannerTransactionDefinition.Builder()
.with(TransactionDefinition.READ_ONLY, false)
.build();

StepVerifier.create(this.connection.beginTransaction(readWriteDefinition))
.verifyComplete();

verify(this.mockAdapter).beginTransaction();
}

@Test
void shouldBeginTransactionInReadWriteModeByDefault() {
SpannerTransactionDefinition readWriteDefinition = new SpannerTransactionDefinition.Builder()
.build(); // absence of attribute indicates read write transaction

StepVerifier.create(this.connection.beginTransaction(readWriteDefinition))
.verifyComplete();

verify(this.mockAdapter).beginTransaction();
}

@Test
void shouldBeginTransactionWithGivenTimestampBound() {
TimestampBound fiveSecondTimestampBound = TimestampBound.ofExactStaleness(5L, TimeUnit.SECONDS);

SpannerTransactionDefinition readWriteDefinition = new SpannerTransactionDefinition.Builder()
olavloite marked this conversation as resolved.
Show resolved Hide resolved
.with(TransactionDefinition.READ_ONLY, true)
.with(SpannerConstants.TIMESTAMP_BOUND, fiveSecondTimestampBound)
.build();

StepVerifier.create(this.connection.beginTransaction(readWriteDefinition))
.verifyComplete();

verify(this.mockAdapter).beginReadonlyTransaction(fiveSecondTimestampBound);
}

@Test
void shouldThrowErrorWhenBeginTransactionWithOtherThanDefaultOrSerializable() {
SpannerTransactionDefinition.Builder builder = new SpannerTransactionDefinition.Builder();

// default isolation
SpannerTransactionDefinition defaultIsolation = builder.with(ISOLATION_LEVEL, null)
.build();
StepVerifier.create(this.connection.beginTransaction(defaultIsolation))
.verifyComplete();

// SERIALIZABLE isolation
SpannerTransactionDefinition serializable = builder.with(ISOLATION_LEVEL, SERIALIZABLE)
.build();
StepVerifier.create(this.connection.beginTransaction(serializable))
.verifyComplete();

// READ_COMMITTED isolation
SpannerTransactionDefinition readCommitted = builder.with(ISOLATION_LEVEL, READ_COMMITTED)
.build();
StepVerifier.create(this.connection.beginTransaction(readCommitted))
.expectError(UnsupportedOperationException.class)
.verify();

// READ_UNCOMMITTED isolation
SpannerTransactionDefinition readUncommitted = builder.with(ISOLATION_LEVEL, READ_UNCOMMITTED)
.build();
StepVerifier.create(this.connection.beginTransaction(readUncommitted))
.expectError(UnsupportedOperationException.class)
.verify();

// REPEATABLE_READ isolation
SpannerTransactionDefinition repeatableRead = builder.with(ISOLATION_LEVEL, REPEATABLE_READ)
.build();
StepVerifier.create(this.connection.beginTransaction(repeatableRead))
.expectError(UnsupportedOperationException.class)
.verify();
}

@Test
void shouldGetAndSetSerializationOnlyAsIsolationLevel() {
StepVerifier.create(this.connection.setTransactionIsolationLevel(SERIALIZABLE))
.verifyComplete();
assertThat(this.connection.getTransactionIsolationLevel()).isEqualTo(SERIALIZABLE);

StepVerifier.create(this.connection.setTransactionIsolationLevel(READ_COMMITTED))
.expectError(UnsupportedOperationException.class)
.verify();

StepVerifier.create(this.connection.setTransactionIsolationLevel(READ_UNCOMMITTED))
.expectError(UnsupportedOperationException.class)
.verify();

StepVerifier.create(this.connection.setTransactionIsolationLevel(REPEATABLE_READ))
.expectError(UnsupportedOperationException.class)
.verify();

StepVerifier.create(this.connection.setTransactionIsolationLevel(null))
.expectError(IllegalArgumentException.class)
.verify();
}

@Test
void batchUsesCorrectAdapter() {
Batch batch = this.connection.createBatch();
Expand All @@ -81,13 +199,6 @@ void batchUsesCorrectAdapter() {
assertThat(args.get(0).getSql()).isEqualTo("UPDATE tbl SET col1=val1");
}

@Test
void beginTransactionCustomDefinitionNotSupported() {
StepVerifier.create(
this.connection.beginTransaction(IsolationLevel.SERIALIZABLE)
).verifyError(UnsupportedOperationException.class);
}

@Test
void setLockWaitTimeoutNotSupported() {
StepVerifier.create(
Expand Down
Loading