Skip to content

Commit

Permalink
[FLINK-24228][connectors/firehose] Amazon Kinesis Data Firehose sink …
Browse files Browse the repository at this point in the history
…based on the Async Sink Base implemented:

 - Refactored AWSUnifiedSinksUtil into a class that caters for Kinesis and Firehose
 - Extracted commonalities between KDS & KDF sinks into flink-connector-aws-base
 - Implemented integration test based on Localstack container
 - Changing host/container ports to be different, changing HTTP1.1 to being the default, localstack issue fixed
 - Added docs page, changed type in Firehose, turned logging off, removed unused dependencies.
  • Loading branch information
CrynetLogistics authored and dannycranmer committed Feb 1, 2022
1 parent 8a9a08b commit 0e908be
Show file tree
Hide file tree
Showing 36 changed files with 2,011 additions and 265 deletions.
21 changes: 21 additions & 0 deletions flink-connectors/flink-connector-aws-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ under the License.
<artifactId>sts</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${aws.sdk.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>iam</artifactId>
<version>${aws.sdk.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,32 @@
* limitations under the License.
*/

package org.apache.flink.connector.kinesis.util;
package org.apache.flink.connector.aws.util;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.kinesis.config.AWSKinesisDataStreamsConfigConstants;
import org.apache.flink.runtime.util.EnvironmentInformation;

import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
import software.amazon.awssdk.core.SdkClient;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;

import java.net.URI;
import java.util.Optional;
import java.util.Properties;

/** Some utilities specific to Amazon Web Service. */
@Internal
public class AWSKinesisDataStreamsUtil extends AWSGeneralUtil {
public class AWSAsyncSinkUtil extends AWSGeneralUtil {

/** Used for formatting Flink-specific user agent string when creating Kinesis client. */
private static final String USER_AGENT_FORMAT =
AWSKinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT + " V2";
/** V2 suffix to denote the unified sinks. V1 sinks are based on KPL etc. */
static final String V2_USER_AGENT_SUFFIX = " V2";

/**
* Creates a user agent prefix for Flink. This can be used by HTTP Clients.
Expand All @@ -62,51 +58,61 @@ public static String formatFlinkUserAgentPrefix(String userAgentFormat) {

/**
* @param configProps configuration properties
* @param httpClient the underlying HTTP client used to talk to Kinesis
* @return a new Amazon Kinesis Client
* @param httpClient the underlying HTTP client used to talk to AWS
* @return a new AWS Client
*/
public static KinesisAsyncClient createKinesisAsyncClient(
final Properties configProps, final SdkAsyncHttpClient httpClient) {
public static <
S extends SdkClient,
T extends
AwsAsyncClientBuilder<? extends T, S>
& AwsClientBuilder<? extends T, S>>
S createAwsAsyncClient(
final Properties configProps,
final SdkAsyncHttpClient httpClient,
final T clientBuilder,
final String awsUserAgentPrefixFormat,
final String awsClientUserAgentPrefix) {
SdkClientConfiguration clientConfiguration = SdkClientConfiguration.builder().build();
return createKinesisAsyncClient(configProps, clientConfiguration, httpClient);
return createAwsAsyncClient(
configProps,
clientConfiguration,
httpClient,
clientBuilder,
awsUserAgentPrefixFormat,
awsClientUserAgentPrefix);
}

/**
* @param configProps configuration properties
* @param clientConfiguration the AWS SDK v2 config to instantiate the client
* @param httpClient the underlying HTTP client used to talk to Kinesis
* @return a new Amazon Kinesis Client
* @param httpClient the underlying HTTP client used to talk to AWS
* @return a new AWS Client
*/
public static KinesisAsyncClient createKinesisAsyncClient(
final Properties configProps,
final SdkClientConfiguration clientConfiguration,
final SdkAsyncHttpClient httpClient) {
public static <
S extends SdkClient,
T extends
AwsAsyncClientBuilder<? extends T, S>
& AwsClientBuilder<? extends T, S>>
S createAwsAsyncClient(
final Properties configProps,
final SdkClientConfiguration clientConfiguration,
final SdkAsyncHttpClient httpClient,
final T clientBuilder,
final String awsUserAgentPrefixFormat,
final String awsClientUserAgentPrefix) {
String flinkUserAgentPrefix =
Optional.ofNullable(
configProps.getProperty(
AWSKinesisDataStreamsConfigConstants
.KINESIS_CLIENT_USER_AGENT_PREFIX))
.orElse(formatFlinkUserAgentPrefix(USER_AGENT_FORMAT));
Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix))
.orElse(
formatFlinkUserAgentPrefix(
awsUserAgentPrefixFormat + V2_USER_AGENT_SUFFIX));

final ClientOverrideConfiguration overrideConfiguration =
createClientOverrideConfiguration(
clientConfiguration,
ClientOverrideConfiguration.builder(),
flinkUserAgentPrefix);
final KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder();

return createKinesisAsyncClient(
configProps, clientBuilder, httpClient, overrideConfiguration);
}

@VisibleForTesting
static ClientOverrideConfiguration createClientOverrideConfiguration(
final SdkClientConfiguration config,
final ClientOverrideConfiguration.Builder overrideConfigurationBuilder) {
return createClientOverrideConfiguration(
config,
overrideConfigurationBuilder,
formatFlinkUserAgentPrefix(USER_AGENT_FORMAT));
return createAwsAsyncClient(configProps, clientBuilder, httpClient, overrideConfiguration);
}

@VisibleForTesting
Expand All @@ -131,11 +137,16 @@ static ClientOverrideConfiguration createClientOverrideConfiguration(
}

@VisibleForTesting
static KinesisAsyncClient createKinesisAsyncClient(
final Properties configProps,
final KinesisAsyncClientBuilder clientBuilder,
final SdkAsyncHttpClient httpClient,
final ClientOverrideConfiguration overrideConfiguration) {
static <
S extends SdkClient,
T extends
AwsAsyncClientBuilder<? extends T, S>
& AwsClientBuilder<? extends T, S>>
S createAwsAsyncClient(
final Properties configProps,
final T clientBuilder,
final SdkAsyncHttpClient httpClient,
final ClientOverrideConfiguration overrideConfiguration) {

if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
final URI endpointOverride =
Expand All @@ -150,10 +161,4 @@ static KinesisAsyncClient createKinesisAsyncClient(
.region(getRegion(configProps))
.build();
}

public static boolean isRecoverableException(Exception e) {
Throwable cause = e.getCause();
return cause instanceof LimitExceededException
|| cause instanceof ProvisionedThroughputExceededException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.aws.testutils;

import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.iam.IamAsyncClient;
import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
import software.amazon.awssdk.services.iam.model.CreateRoleResponse;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.waiters.S3AsyncWaiter;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;

/**
* A set of static methods that can be used to call common AWS services on the Localstack container.
*/
public class AWSServicesTestUtils {

private static final String ACCESS_KEY_ID = "accessKeyId";
private static final String SECRET_ACCESS_KEY = "secretAccessKey";

public static S3AsyncClient getS3Client(String endpoint) throws URISyntaxException {
return S3AsyncClient.builder()
.httpClient(getHttpClient(endpoint))
.region(Region.AP_SOUTHEAST_1)
.endpointOverride(new URI(endpoint))
.credentialsProvider(getDefaultCredentials())
.build();
}

public static IamAsyncClient getIamClient(String endpoint) throws URISyntaxException {
return IamAsyncClient.builder()
.httpClient(getHttpClient(endpoint))
.region(Region.AWS_GLOBAL)
.endpointOverride(new URI(endpoint))
.credentialsProvider(getDefaultCredentials())
.build();
}

public static AwsCredentialsProvider getDefaultCredentials() {
return StaticCredentialsProvider.create(
AwsBasicCredentials.create(ACCESS_KEY_ID, SECRET_ACCESS_KEY));
}

public static Properties getConfig(String endpoint) {
Properties config = new Properties();
config.setProperty(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
config.setProperty(AWS_ENDPOINT, endpoint);
config.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), ACCESS_KEY_ID);
config.setProperty(
AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), SECRET_ACCESS_KEY);
config.setProperty(TRUST_ALL_CERTIFICATES, "true");
return config;
}

public static SdkAsyncHttpClient getHttpClient(String endpoint) {
return AWSGeneralUtil.createAsyncHttpClient(getConfig(endpoint));
}

public static void createBucket(S3AsyncClient s3Client, String bucketName)
throws ExecutionException, InterruptedException {
CreateBucketRequest bucketRequest =
CreateBucketRequest.builder().bucket(bucketName).build();
s3Client.createBucket(bucketRequest);

HeadBucketRequest bucketRequestWait =
HeadBucketRequest.builder().bucket(bucketName).build();

S3AsyncWaiter s3Waiter = s3Client.waiter();
CompletableFuture<WaiterResponse<HeadBucketResponse>> waiterResponseFuture =
s3Waiter.waitUntilBucketExists(bucketRequestWait);

waiterResponseFuture.get();
}

public static void createIAMRole(IamAsyncClient iam, String roleName)
throws ExecutionException, InterruptedException {
CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build();

CompletableFuture<CreateRoleResponse> responseFuture = iam.createRole(request);
responseFuture.get();
}

public static List<S3Object> listBucketObjects(S3AsyncClient s3, String bucketName)
throws ExecutionException, InterruptedException {
ListObjectsRequest listObjects = ListObjectsRequest.builder().bucket(bucketName).build();
CompletableFuture<ListObjectsResponse> res = s3.listObjects(listObjects);
return res.get().contents();
}
}
Loading

0 comments on commit 0e908be

Please sign in to comment.