Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[adhoc] Prepare aws2 ClientConfiguration for json serialization and cleanup AWS Module #16894

Merged
merged 2 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/java/io/amazon-web-services2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ext.summary = "IO library to read and write Amazon Web Services services from Be

dependencies {
implementation library.java.vendored_guava_26_0_jre
implementation library.java.error_prone_annotations
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.aws_java_sdk2_apache_client
implementation library.java.aws_java_sdk2_netty_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@

import static org.apache.beam.sdk.io.aws2.options.AwsSerializableUtils.deserializeAwsCredentialsProvider;
import static org.apache.beam.sdk.io.aws2.options.AwsSerializableUtils.serializeAwsCredentialsProvider;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import com.google.auto.value.extension.memoized.Memoized;
import java.io.Serializable;
import java.net.URI;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.checkerframework.dataflow.qual.Pure;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;

Expand All @@ -47,18 +49,28 @@
* uses a backoff strategy with equal jitter for computing the delay before the next retry.
*/
@AutoValue
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
@JsonDeserialize(builder = ClientConfiguration.Builder.class)
mosche marked this conversation as resolved.
Show resolved Hide resolved
public abstract class ClientConfiguration implements Serializable {

/**
* Optional {@link AwsCredentialsProvider}. If set, this overwrites the default in {@link
* AwsOptions#getAwsCredentialsProvider()}.
*/
public abstract @Nullable @Pure AwsCredentialsProvider credentialsProvider();
@JsonProperty
@Memoized
mosche marked this conversation as resolved.
Show resolved Hide resolved
public @Nullable @Pure AwsCredentialsProvider credentialsProvider() {
return credentialsProviderAsJson() != null
? deserializeAwsCredentialsProvider(credentialsProviderAsJson())
: null;
}

/**
* Optional {@link Region}. If set, this overwrites the default in {@link
* AwsOptions#getAwsRegion()}.
*/
@JsonProperty
@Memoized
mosche marked this conversation as resolved.
Show resolved Hide resolved
public @Nullable @Pure Region region() {
return regionId() != null ? Region.of(regionId()) : null;
}
Expand All @@ -67,20 +79,24 @@ public abstract class ClientConfiguration implements Serializable {
* Optional service endpoint to use AWS compatible services instead, e.g. for testing. If set,
* this overwrites the default in {@link AwsOptions#getEndpoint()}.
*/
@JsonProperty
public abstract @Nullable @Pure URI endpoint();

/**
* Optional {@link RetryConfiguration} for AWS clients. If unset, retry behavior will be unchanged
* and use SDK defaults.
*/
@JsonProperty
public abstract @Nullable @Pure RetryConfiguration retry();

abstract @Nullable @Pure String regionId();

abstract @Nullable @Pure String credentialsProviderAsJson();

public abstract Builder toBuilder();

public static Builder builder() {
return new AutoValue_ClientConfiguration.Builder();
return Builder.builder();
}

public static ClientConfiguration create(
Expand All @@ -93,12 +109,20 @@ public static ClientConfiguration create(
}

@AutoValue.Builder
@JsonPOJOBuilder(withPrefix = "")
public abstract static class Builder {
@JsonCreator
static Builder builder() {
return new AutoValue_ClientConfiguration.Builder();
}

/**
* Optional {@link AwsCredentialsProvider}. If set, this overwrites the default in {@link
* AwsOptions#getAwsCredentialsProvider()}.
*/
public abstract Builder credentialsProvider(AwsCredentialsProvider credentialsProvider);
public Builder credentialsProvider(AwsCredentialsProvider credentialsProvider) {
return credentialsProviderAsJson(serializeAwsCredentialsProvider(credentialsProvider));
}

/**
* Optional {@link Region}. If set, this overwrites the default in {@link
Expand All @@ -118,6 +142,7 @@ public Builder region(Region region) {
* Optional {@link RetryConfiguration} for AWS clients. If unset, retry behavior will be
* unchanged and use SDK defaults.
*/
@JsonSetter
public abstract Builder retry(RetryConfiguration retry);

/**
Expand All @@ -132,58 +157,8 @@ public Builder retry(Consumer<RetryConfiguration.Builder> retry) {

abstract Builder regionId(String region);

abstract AwsCredentialsProvider credentialsProvider();

abstract ClientConfiguration autoBuild();
abstract Builder credentialsProviderAsJson(String credentialsProvider);

public ClientConfiguration build() {
if (credentialsProvider() != null) {
credentialsProvider(new SerializableAwsCredentialsProvider(credentialsProvider()));
}
return autoBuild();
}
}

/** Internal serializable {@link AwsCredentialsProvider}. */
private static class SerializableAwsCredentialsProvider
echauchot marked this conversation as resolved.
Show resolved Hide resolved
implements AwsCredentialsProvider, Serializable {
private transient AwsCredentialsProvider provider;
private String serializedProvider;

SerializableAwsCredentialsProvider(AwsCredentialsProvider provider) {
this.provider = checkNotNull(provider, "AwsCredentialsProvider cannot be null");
this.serializedProvider = serializeAwsCredentialsProvider(provider);
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.writeUTF(serializedProvider);
}

private void readObject(ObjectInputStream in) throws IOException {
serializedProvider = in.readUTF();
provider = deserializeAwsCredentialsProvider(serializedProvider);
}

@Override
public AwsCredentials resolveCredentials() {
return provider.resolveCredentials();
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SerializableAwsCredentialsProvider that = (SerializableAwsCredentialsProvider) o;
return serializedProvider.equals(that.serializedProvider);
}

@Override
public int hashCode() {
return serializedProvider.hashCode();
}
public abstract ClientConfiguration build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
*/
package org.apache.beam.sdk.io.aws2.common;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nullable;
Expand All @@ -32,11 +37,14 @@
* HTTP Configuration</a>
*/
@AutoValue
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
@JsonDeserialize(builder = HttpClientConfiguration.Builder.class)
public abstract class HttpClientConfiguration implements Serializable {

/**
* Milliseconds to wait when acquiring a connection from the pool before giving up and timing out.
*/
@JsonProperty
public abstract @Nullable @Pure Integer connectionAcquisitionTimeout();

/**
Expand All @@ -45,12 +53,14 @@ public abstract class HttpClientConfiguration implements Serializable {
* <p>This will never close a connection that is currently in use, so long-lived connections may
* remain open longer than this time.
*/
@JsonProperty
public abstract @Nullable @Pure Integer connectionMaxIdleTime();

/**
* Milliseconds to wait when initially establishing a connection before giving up and timing out.
* A duration of 0 means infinity, and is not recommended.
*/
@JsonProperty
public abstract @Nullable @Pure Integer connectionTimeout();

/**
Expand All @@ -60,12 +70,14 @@ public abstract class HttpClientConfiguration implements Serializable {
* <p>This will never close a connection that is currently in use, so long-lived connections may
* remain open longer than this time.
*/
@JsonProperty
public abstract @Nullable @Pure Integer connectionTimeToLive();

/**
* Milliseconds to wait for data to be transferred over an established, open connection before the
* connection is timed out. A duration of 0 means infinity, and is not recommended.
*/
@JsonProperty
public abstract @Nullable @Pure Integer socketTimeout();

/**
Expand All @@ -75,6 +87,7 @@ public abstract class HttpClientConfiguration implements Serializable {
* <p>Note: Read timeout is only supported for async clients and ignored otherwise. Use {@link
* #socketTimeout()} instead.
*/
@JsonProperty
public abstract @Nullable @Pure Integer readTimeout();

/**
Expand All @@ -84,6 +97,7 @@ public abstract class HttpClientConfiguration implements Serializable {
* <p>Note: Write timeout is only supported for async clients and ignored otherwise. Use {@link
* #socketTimeout()} instead.
*/
@JsonProperty
public abstract @Nullable @Pure Integer writeTimeout();

/**
Expand All @@ -94,14 +108,21 @@ public abstract class HttpClientConfiguration implements Serializable {
* concurrent requests. When using HTTP/2 the number of connections that will be used depends on
* the max streams allowed per connection.
*/
@JsonProperty
public abstract @Nullable @Pure Integer maxConnections();

public static Builder builder() {
return new AutoValue_HttpClientConfiguration.Builder();
return Builder.builder();
}

@AutoValue.Builder
@JsonPOJOBuilder(withPrefix = "")
public abstract static class Builder {
@JsonCreator
static Builder builder() {
return new AutoValue_HttpClientConfiguration.Builder();
}

/**
* Milliseconds to wait when acquiring a connection from the pool before giving up and timing
* out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.joda.time.Duration.ZERO;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.util.StdConverter;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nullable;
Expand All @@ -36,31 +43,51 @@
* SdkDefaultRetrySetting} for further details.
*/
@AutoValue
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
@JsonDeserialize(builder = RetryConfiguration.Builder.class)
public abstract class RetryConfiguration implements Serializable {
private static final java.time.Duration BASE_BACKOFF = java.time.Duration.ofMillis(100);
private static final java.time.Duration THROTTLED_BASE_BACKOFF = java.time.Duration.ofSeconds(1);
private static final java.time.Duration MAX_BACKOFF = java.time.Duration.ofSeconds(20);

@JsonProperty
public abstract @Pure int numRetries();

@JsonProperty
@JsonSerialize(converter = DurationToMillis.class)
public abstract @Nullable @Pure Duration baseBackoff();

@JsonProperty
@JsonSerialize(converter = DurationToMillis.class)
public abstract @Nullable @Pure Duration throttledBaseBackoff();

@JsonProperty
@JsonSerialize(converter = DurationToMillis.class)
public abstract @Nullable @Pure Duration maxBackoff();

public abstract RetryConfiguration.Builder toBuilder();

public static Builder builder() {
return new AutoValue_RetryConfiguration.Builder();
return Builder.builder();
}

@AutoValue.Builder
@JsonPOJOBuilder(withPrefix = "")
public abstract static class Builder {
@JsonCreator
static Builder builder() {
return new AutoValue_RetryConfiguration.Builder();
}

public abstract Builder numRetries(int numRetries);

@JsonDeserialize(converter = MillisToDuration.class)
public abstract Builder baseBackoff(Duration baseBackoff);

@JsonDeserialize(converter = MillisToDuration.class)
public abstract Builder throttledBaseBackoff(Duration baseBackoff);

@JsonDeserialize(converter = MillisToDuration.class)
public abstract Builder maxBackoff(Duration maxBackoff);

abstract RetryConfiguration autoBuild();
Expand Down Expand Up @@ -115,4 +142,18 @@ RetryPolicy toClientRetryPolicy() {
private @Nullable static java.time.Duration toJava(@Nullable Duration duration) {
return duration == null ? null : java.time.Duration.ofMillis(duration.getMillis());
}

static class DurationToMillis extends StdConverter<Duration, Long> {
@Override
public Long convert(Duration duration) {
return duration.getMillis();
}
}

static class MillisToDuration extends StdConverter<Long, Duration> {
@Override
public Duration convert(Long millis) {
return Duration.millis(millis);
}
}
}
Loading