diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/pom.xml new file mode 100644 index 000000000..ba34d2004 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/pom.xml @@ -0,0 +1,81 @@ + + + + + + io.streamthoughts + kafka-connect-filepulse-filesystems + 2.13.0-SNAPSHOT + + 4.0.0 + + Kafka Connect Source File Pulse Aliyun OSS FS + kafka-connect-filepulse-aliyunoss-fs + Kafka Connect FilePulse - FileSystem - Support for Aliyun OSS + + + + ${project.parent.basedir}/.. + ${project.parent.basedir}/../license-header + UTF-8 + 3.16.3 + + + + + + io.streamthoughts + kafka-connect-filepulse-commons-fs + ${project.version} + + + + com.aliyun.oss + aliyun-sdk-oss + ${aliyun.oss.version} + + + + org.apache.kafka + kafka-clients + + + + org.apache.kafka + connect-api + + + + org.apache.avro + avro + + + + com.madgag.spongycastle + core + 1.54.0.0 + test + + + + + \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSClientConfig.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSClientConfig.java new file mode 100644 index 000000000..4c6611b2d --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSClientConfig.java @@ -0,0 +1,200 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; + +import java.util.Map; +import java.util.Objects; + +/** + * The aliyun OSS client's configuration. + */ +public class AliyunOSSClientConfig extends AbstractConfig { + + /** + * OSS access key id + */ + public static final String OSS_ACCESS_KEY_ID_CONFIG = "oss.access.key.id"; + /** + * OSS secret key + */ + public static final String OSS_SECRET_KEY_CONFIG = "oss.secret.key"; + /** + * OSS access endpoint + */ + public static final String OSS_ENDPOINT_CONFIG = "oss.endpoint"; + /** + * OSS bucket name + */ + public final static String OSS_BUCKET_NAME_CONFIG = "oss.bucket.name"; + /** + * oss bucket prefix + */ + public final static String OSS_BUCKET_PREFIX_CONFIG = "oss.bucket.prefix"; + /** + * oss max connections + */ + public static final String OSS_MAX_CONNECTIONS_CONFIG = "oss.max.connections"; + public static final int OSS_MAX_CONNECTIONS_DEFAULT = 1024; + public static final String OSS_MAX_CONNECTIONS_DOC = "OSS max connections."; + /** + * oss socket timeout + */ + public static final String OSS_SOCKET_TIMEOUT_CONFIG = "oss.socket.timeout"; + public static final int OSS_SOCKET_TIMEOUT_DEFAULT = 10 * 1000; + public static final String OSS_SOCKET_TIMEOUT_DOC = "OSS connection timeout."; + /** + * oss connection timeout + */ + public static final int OSS_CONNECTION_TIMEOUT = 50 * 1000; + public static final String OSS_CONNECTION_TIMEOUT_CONFIG = "oss.connection.timeout"; + public static final String OSS_CONNECTION_TIMEOUT_DOC = "OSS connection timeout."; + /** + * oss max error retry + */ + public static final String OSS_MAX_ERROR_RETRIES_CONFIG = "oss.max.error.retries"; + public static final int OSS_MAX_ERROR_RETRIES_DEFAULT = 5; + public static final String OSS_MAX_ERROR_RETRIES_DOC = + "The maximum number of retry attempts for failed retryable requests."; + /** + * OSS object storage class config + */ + public static final String OSS_OBJECT_STORAGE_CLASS_CONFIG = "oss.default.object.storage.class"; + public static final String OSS_OBJECT_STORAGE_CLASS_DEFAULT = "Standard"; + public static final String OSS_OBJECT_STORAGE_CLASS_DOC = + "The OSS storage class to associate with an OSS object when it is copied by the connector (e.g., during a move operation)."; + private static final String OSS_ACCESS_KEY_ID_DOC = "OSS Access Key ID"; + private static final String OSS_SECRET_ACCESS_KEY_DOC = "OSS Secret Access Key"; + private static final String OSS_ENDPOINT_DOC = "OSS access endpoint."; + private final static String OSS_BUCKET_NAME_DOC = "The name of the Aliyun OSS bucket."; + private final static String OSS_BUCKET_PREFIX_DOC = + "The prefix to be used for restricting the listing of the objects in the bucket"; + private static final String OSS_GROUP_CONFIG = "OSS"; + + /** + * Creates a new {@link AliyunOSSClientConfig} instance. + * + * @param originals the original configuration map. + */ + public AliyunOSSClientConfig(final Map originals) { + super(getConf(), originals, false); + } + + /** + * @return the {@link ConfigDef}. + */ + static ConfigDef getConf() { + int ossGroupCounter = 0; + + return new ConfigDef().define(OSS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, OSS_BUCKET_NAME_DOC, OSS_GROUP_CONFIG, ossGroupCounter++, + ConfigDef.Width.NONE, OSS_BUCKET_NAME_CONFIG) + + .define(OSS_BUCKET_PREFIX_CONFIG, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), + ConfigDef.Importance.MEDIUM, OSS_BUCKET_PREFIX_DOC, OSS_GROUP_CONFIG, ossGroupCounter++, + ConfigDef.Width.NONE, OSS_BUCKET_PREFIX_CONFIG) + + .define(OSS_ENDPOINT_CONFIG, ConfigDef.Type.STRING, + ConfigDef.Importance.MEDIUM, OSS_ENDPOINT_DOC, OSS_GROUP_CONFIG, ossGroupCounter++, + ConfigDef.Width.NONE, OSS_ENDPOINT_CONFIG) + .define(OSS_ACCESS_KEY_ID_CONFIG, ConfigDef.Type.PASSWORD, + ConfigDef.Importance.HIGH, OSS_ACCESS_KEY_ID_DOC, OSS_GROUP_CONFIG, ossGroupCounter++, + ConfigDef.Width.NONE, OSS_ACCESS_KEY_ID_CONFIG) + + .define(OSS_SECRET_KEY_CONFIG, ConfigDef.Type.PASSWORD, + ConfigDef.Importance.HIGH, OSS_SECRET_ACCESS_KEY_DOC, OSS_GROUP_CONFIG, ossGroupCounter++, + ConfigDef.Width.NONE, OSS_SECRET_KEY_CONFIG) + + .define(OSS_MAX_ERROR_RETRIES_CONFIG, ConfigDef.Type.INT, OSS_MAX_ERROR_RETRIES_DEFAULT, + ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, OSS_MAX_ERROR_RETRIES_DOC, + OSS_GROUP_CONFIG, ossGroupCounter++, ConfigDef.Width.NONE, OSS_MAX_ERROR_RETRIES_CONFIG) + .define(OSS_CONNECTION_TIMEOUT_CONFIG, ConfigDef.Type.INT, OSS_CONNECTION_TIMEOUT, + ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, OSS_CONNECTION_TIMEOUT_DOC, + OSS_GROUP_CONFIG, ossGroupCounter++, ConfigDef.Width.NONE, OSS_CONNECTION_TIMEOUT_CONFIG) + + .define(OSS_MAX_CONNECTIONS_CONFIG, ConfigDef.Type.INT, OSS_MAX_CONNECTIONS_DEFAULT, + ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, OSS_MAX_CONNECTIONS_DOC, + OSS_GROUP_CONFIG, ossGroupCounter++, ConfigDef.Width.NONE, OSS_MAX_CONNECTIONS_CONFIG) + .define(OSS_SOCKET_TIMEOUT_CONFIG, ConfigDef.Type.INT, OSS_SOCKET_TIMEOUT_DEFAULT, + ConfigDef.Range.atLeast(1), ConfigDef.Importance.MEDIUM, OSS_SOCKET_TIMEOUT_DOC, + OSS_GROUP_CONFIG, ossGroupCounter++, ConfigDef.Width.NONE, OSS_SOCKET_TIMEOUT_CONFIG) + .define(OSS_OBJECT_STORAGE_CLASS_CONFIG, ConfigDef.Type.STRING, OSS_OBJECT_STORAGE_CLASS_DEFAULT, + ConfigDef.Importance.LOW, OSS_OBJECT_STORAGE_CLASS_DOC, OSS_GROUP_CONFIG, ossGroupCounter++, + ConfigDef.Width.NONE, OSS_OBJECT_STORAGE_CLASS_CONFIG); + } + + public Password getOSSAccessKeyId() { + return getPassword(OSS_ACCESS_KEY_ID_CONFIG); + } + + public Password getOSSAccessKey() { + return getPassword(OSS_SECRET_KEY_CONFIG); + } + + public String getOSSBucketName() { + return getString(OSS_BUCKET_NAME_CONFIG); + } + + public String getOSSEndpoint() { + return getString(OSS_ENDPOINT_CONFIG); + } + + public String getOSSBucketPrefix() { + return getString(OSS_BUCKET_PREFIX_CONFIG); + } + + public int getOSSMaxConnections() { + return getInt(OSS_MAX_CONNECTIONS_CONFIG); + } + + public int getOSSSocketTimeout() { + return getInt(OSS_SOCKET_TIMEOUT_CONFIG); + } + + public int getOSSConnectionTimeout() { + return getInt(OSS_CONNECTION_TIMEOUT_CONFIG); + } + + public int getOssMaxErrorRetries() { + return getInt(OSS_MAX_ERROR_RETRIES_CONFIG); + } + + public String getOSSDefaultStorageClass() { + return getString(OSS_OBJECT_STORAGE_CLASS_CONFIG); + } + + public static class NonEmptyPassword implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + if (Objects.isNull(value) || ((Password) value).value() == null) { + return; + } + final Password pwd = (Password) value; + if (StringUtils.isBlank(pwd.value())) { + throw new ConfigException(name, pwd, "Password must be non-empty"); + } + } + } + +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSFileSystemListing.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSFileSystemListing.java new file mode 100644 index 000000000..2d7657452 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSFileSystemListing.java @@ -0,0 +1,117 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import static io.streamthoughts.kafka.connect.filepulse.internal.StringUtils.isNotBlank; + +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.model.ListObjectsRequest; +import com.aliyun.oss.model.ObjectListing; +import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting; +import io.streamthoughts.kafka.connect.filepulse.fs.utils.AliyunOSSClientUtils; +import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code AliyunOSSFileSystemListing} that can be used for + * listing objects that exist in a specific Aliyun OSS bucket. + */ +public class AliyunOSSFileSystemListing implements FileSystemListing { + + private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSFileSystemListing.class); + + private FileListFilter filter; + private AliyunOSSClientConfig config; + private OSSClient client; + private AliyunOSSStorage ossStorage; + + + @Override + public void configure(final Map configs) { + configure(new AliyunOSSClientConfig(configs)); + } + + @VisibleForTesting + void configure(final AliyunOSSClientConfig config) { + this.config = config; + this.client = AliyunOSSClientUtils.createOSSClient(config); + this.ossStorage = new AliyunOSSStorage(client); + this.ossStorage.setDefaultStorageClass(config.getOSSDefaultStorageClass()); + if (!ossStorage.doesOSSBucketExist(config.getOSSBucketName())) { + throw new ConfigException("Invalid OSS bucket name. " + + "Bucket does not exist, or an error happens while connecting to Amazon service"); + } + } + + /** + * list bucket objects + * + * @return + */ + @Override + public Collection listObjects() { + final ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(config.getOSSBucketName()); + if (isNotBlank(config.getOSSBucketPrefix())) { + request.setPrefix(config.getOSSBucketPrefix()); + } + final List objectMetaList = new LinkedList<>(); + ObjectListing objectListing; + try { + do { + LOG.info("Sending new request for listing objects: bucketName={}, prefix={}", request.getBucketName(), + request.getPrefix()); + objectListing = client.listObjects(request); + objectMetaList.addAll(objectListing.getObjectSummaries().stream() + .filter(ossObjectSummary -> !ossObjectSummary.getKey().startsWith("oss://")) + .map(ossObjectSummary -> new OSSBucketKey(ossObjectSummary.getBucketName(), + ossObjectSummary.getKey())).map(ossStorage::getObjectMetadata).filter(Objects::nonNull) + .collect(Collectors.toList())); + String marker = objectListing.getNextMarker(); + if (marker != null) { + LOG.debug("Object listing is truncated, next marker is {}", marker); + request.setMarker(marker); + } + } while (objectListing.isTruncated()); + } catch (Exception e) { + LOG.error("Failed to list objects from the Aliyun OSS bucket '{}'. " + + "Error occurred while processing the request: {}", config.getOSSBucketName(), e); + } + return filter == null ? objectMetaList : filter.filterFiles(objectMetaList); + } + + + @Override + public void setFilter(final FileListFilter filter) { + this.filter = filter; + } + + @Override + public AliyunOSSStorage storage() { + return ossStorage; + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSStorage.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSStorage.java new file mode 100644 index 000000000..08596ca4e --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSStorage.java @@ -0,0 +1,236 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.model.CopyObjectRequest; +import com.aliyun.oss.model.CopyObjectResult; +import com.aliyun.oss.model.GenericRequest; +import com.aliyun.oss.model.GetObjectRequest; +import com.aliyun.oss.model.ObjectMetadata; +import io.streamthoughts.kafka.connect.filepulse.fs.utils.AliyunOSSURI; +import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils; +import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta; +import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta; +import java.io.InputStream; +import java.net.URI; +import java.util.HashMap; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code AliyunOSSStorage} can be used to interact with Aliyun OSS. + */ +public class AliyunOSSStorage implements Storage { + + private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSStorage.class); + private final OSSClient ossClient; + private String defaultStorageClass; + + /** + * Creates a new {@link AliyunOSSStorage} instance. + * + * @param ossClient the aliyun oss client. + */ + public AliyunOSSStorage(final OSSClient ossClient) { + this.ossClient = Objects.requireNonNull(ossClient, "ossClient should not be null"); + } + + private static FileObjectMeta createFileObjectMeta(final OSSBucketKey ossObject, + final ObjectMetadata objectMetadata) { + + final HashMap userDefinedMetadata = new HashMap<>(); + objectMetadata.getUserMetadata().forEach((k, v) -> userDefinedMetadata.put("oss.object.user.metadata." + k, v)); + userDefinedMetadata.put("oss.object.summary.bucketName", ossObject.bucketName()); + userDefinedMetadata.put("oss.object.summary.key", ossObject.key()); + userDefinedMetadata.put("oss.object.summary.etag", objectMetadata.getETag()); + userDefinedMetadata.put("oss.object.summary.storageClass", objectMetadata.getObjectStorageClass()); + final String contentMD5 = objectMetadata.getContentMD5(); + FileObjectMeta.ContentDigest digest = null; + if (contentMD5 != null) { + digest = new FileObjectMeta.ContentDigest(contentMD5, "MD5"); + } + return new GenericFileObjectMeta.Builder() + .withUri(ossObject.toURI()) + .withName(ossObject.key()) + .withContentLength(objectMetadata.getContentLength()) + .withLastModified(objectMetadata.getLastModified()) + .withContentDigest(digest) + .withUserDefinedMetadata(userDefinedMetadata) + .build(); + } + + public void setDefaultStorageClass(final String defaultStorageClass) { + this.defaultStorageClass = defaultStorageClass; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean exists(final URI objectURI) { + return doesOSSObjectExist(OSSBucketKey.fromURI(objectURI)); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean delete(final URI objectURI) { + final OSSBucketKey ossBucketKey = OSSBucketKey.fromURI(objectURI); + try { + this.ossClient.deleteObject(ossBucketKey.bucketName(), ossBucketKey.key()); + return true; + } catch (Exception e) { + LOG.error( + "Failed to remove object from Aliyun OSS. " + + "Error occurred while processing the request for {}: {}", + ossBucketKey.toURI(), + e + ); + } + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean move(final URI source, final URI dest) { + final OSSBucketKey ossSourceObject = OSSBucketKey.fromURI(source); + final OSSBucketKey ossDestinationObject = OSSBucketKey.fromURI(dest); + try { + // OSS does not support built-in move operation. + // Move should be implemented as copy+delete + final CopyObjectRequest copyObjectRequest = new CopyObjectRequest( + ossSourceObject.bucketName(), + ossSourceObject.key(), + ossDestinationObject.bucketName(), + ossDestinationObject.key() + ); + if (!StringUtils.isBlank(defaultStorageClass)) { + copyObjectRequest.addHeader("x-oss-storage-class", defaultStorageClass); + } + // Copy to target using object metadata from source object + LOG.debug( + "Copying OSS object from {} to {}", + ossSourceObject.toURI(), + ossDestinationObject.toURI() + ); + // + CopyObjectResult objectResult = this.ossClient.copyObject(copyObjectRequest); + if (objectResult.getETag() != null) { + LOG.debug("Deleting OSS object: {}", ossSourceObject.toURI()); + return delete(ossSourceObject.toURI()); + } + return false; + } catch (Exception e) { + LOG.error( + "Failed to move object from Aliyun OSS to {}. " + + "Error occurred while processing the request for {}: {}", + ossDestinationObject.toURI(), + ossSourceObject.toURI(), + e + ); + } + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public InputStream getInputStream(final URI objectURI) { + final OSSBucketKey ossObject = OSSBucketKey.fromURI(objectURI); + final GetObjectRequest request = new GetObjectRequest(ossObject.bucketName(), ossObject.key()); + return ossClient.getObject(request).getObjectContent(); + } + + /** + * {@inheritDoc} + */ + @Override + public FileObjectMeta getObjectMetadata(final URI objectURI) { + final AliyunOSSURI aliyunOSSURI = new AliyunOSSURI(objectURI); + return getObjectMetadata(aliyunOSSURI.getBucket(), aliyunOSSURI.getKey()); + + } + + /** + * @param bucketName + * @param key + * @return + */ + public FileObjectMeta getObjectMetadata(final String bucketName, final String key) { + return getObjectMetadata(new OSSBucketKey(bucketName, key)); + } + + public FileObjectMeta getObjectMetadata(final OSSBucketKey ossObject) { + ObjectMetadata objectMetadata = loadObjectMetadata(ossObject); + return createFileObjectMeta( + ossObject, + objectMetadata + ); + } + + private ObjectMetadata loadObjectMetadata(final OSSBucketKey ossObject) { + GenericRequest request = new GenericRequest(ossObject.bucketName(), ossObject.key()); + try { + return ossClient.getObjectMetadata(request); + } catch (Exception e) { + LOG.error( + "Failed to get object metadata from Aliyun OSS. " + + "Error occurred while processing the request for {}: {}", + ossObject.toURI(), + e + ); + throw e; + } + } + + public boolean doesOSSBucketExist(final String bucketName) { + try { + return ossClient.doesBucketExist(bucketName); + } catch (Exception e) { + LOG.error( + "Failed to check if Aliyun OSS bucket '{}' exist. " + + "Error occurred while processing the request: {}", + bucketName, + e + ); + } + return false; + } + + public boolean doesOSSObjectExist(final OSSBucketKey ossObject) { + try { + return ossClient.doesObjectExist(ossObject.bucketName(), ossObject.key()); + } catch (Exception e) { + LOG.error( + "Failed to check if object with key '{}' exist on Aliyun OSS bucket '{}'. " + + "Error occurred while processing the request: {}", + ossObject.key(), + ossObject.bucketName(), + e + ); + } + return false; + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/OSSBucketKey.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/OSSBucketKey.java new file mode 100644 index 000000000..108864bbd --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/OSSBucketKey.java @@ -0,0 +1,105 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import static io.streamthoughts.kafka.connect.filepulse.internal.StringUtils.substringAfterLast; + +import io.streamthoughts.kafka.connect.filepulse.fs.utils.AliyunOSSURI; +import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils; +import java.net.URI; +import java.util.Objects; + + +public class OSSBucketKey { + + public static final String OSS_FOLDER_SEPARATOR = "/"; + private final String bucketName; + private final String key; + + public OSSBucketKey(final String bucketName, final String keyPrefix, final String keyName) { + this(bucketName, StringUtils.removeEnd(keyPrefix, OSS_FOLDER_SEPARATOR) + OSS_FOLDER_SEPARATOR + keyName); + } + + /** + * Creates a new {@link OSSBucketKey} instance. + * + * @param bucketName The Aliyun OSS bucket name. + * @param key The OSS object key. + */ + public OSSBucketKey(final String bucketName, final String key) { + this.bucketName = Objects.requireNonNull(bucketName, "bucketName should not be null"); + this.key = Objects.requireNonNull(key, "key should not be null"); + } + + /** + * An helper method to create a new {@link OSSBucketKey} from a given {@link URI}. + * + * @param uri the uri. + * @return a new {@link OSSBucketKey}. + */ + public static OSSBucketKey fromURI(final URI uri) { + final AliyunOSSURI aliyunOSSURI = new AliyunOSSURI(uri); + return new OSSBucketKey(aliyunOSSURI.getBucket(), aliyunOSSURI.getKey()); + } + + /** + * @return the Aliyun OSS bucket name. + */ + public String bucketName() { + return bucketName; + } + + /** + * @return the Aliyun OSS object key. + */ + public String key() { + return key; + } + + public String objectName() { + return substringAfterLast(key, OSS_FOLDER_SEPARATOR.charAt(0)); + } + + /** + * @return the {@link URI} for this Aliyun OSS object. + */ + public URI toURI() { + return URI.create("oss://" + bucketName + OSS_FOLDER_SEPARATOR + key); + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof OSSBucketKey)) return false; + OSSBucketKey s3Object = (OSSBucketKey) o; + return Objects.equals(bucketName, s3Object.bucketName) && Objects.equals(key, s3Object.key); + } + + + @Override + public int hashCode() { + return Objects.hash(bucketName, key); + } + + @Override + public String toString() { + return "S3Object{" + "bucketName='" + bucketName + '\'' + ", key='" + key + '\'' + '}'; + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AliyunOSSMoveCleanupPolicy.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AliyunOSSMoveCleanupPolicy.java new file mode 100644 index 000000000..ed564343b --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AliyunOSSMoveCleanupPolicy.java @@ -0,0 +1,148 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs.clean; + +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AliyunOSSMoveCleanupPolicy.Config.FAILURES_OSS_BUCKET_NAME_CONFIG; +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AliyunOSSMoveCleanupPolicy.Config.FAILURES_OSS_PREFIX_PATH_CONFIG; +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AliyunOSSMoveCleanupPolicy.Config.SUCCESS_OSS_BUCKET_NAME_CONFIG; +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AliyunOSSMoveCleanupPolicy.Config.SUCCESS_OSS_PREFIX_PATH_CONFIG; + +import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy; +import io.streamthoughts.kafka.connect.filepulse.fs.AliyunOSSStorage; +import io.streamthoughts.kafka.connect.filepulse.fs.OSSBucketKey; +import io.streamthoughts.kafka.connect.filepulse.fs.Storage; +import io.streamthoughts.kafka.connect.filepulse.source.FileObject; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AliyunOSSMoveCleanupPolicy implements FileCleanupPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSMoveCleanupPolicy.class); + private AliyunOSSStorage storage; + private Config config; + + /** + * {@inheritDoc} + */ + @Override + public void configure(final Map configs) { + this.config = new Config(configs); + } + + @Override + public boolean onSuccess(final FileObject source) { + return move(source, SUCCESS_OSS_BUCKET_NAME_CONFIG, SUCCESS_OSS_PREFIX_PATH_CONFIG); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean onFailure(final FileObject source) { + return move(source, FAILURES_OSS_BUCKET_NAME_CONFIG, FAILURES_OSS_PREFIX_PATH_CONFIG); + } + + private boolean move(final FileObject source, final String destinationOSSBucketConfig, + final String destinationOSSPrefixConfig) { + checkState(); + URI sourceURI = source.metadata().uri(); + if (!storage.exists(sourceURI)) { + LOG.warn("Cannot move object-file '{}' to failure OSS bucket due to file does not exist.", sourceURI); + return true; + } + OSSBucketKey sourceBucketKey = OSSBucketKey.fromURI(sourceURI); + + String destOSSBucketName = + Optional.ofNullable(config.getString(destinationOSSBucketConfig)).orElse(sourceBucketKey.bucketName()); + + OSSBucketKey destBucketKey = new OSSBucketKey(destOSSBucketName, config.getString(destinationOSSPrefixConfig), + sourceBucketKey.objectName()); + return storage.move(sourceURI, destBucketKey.toURI()); + } + + @Override + public void setStorage(final Storage storage) { + this.storage = (AliyunOSSStorage) storage; + } + + private void checkState() { + if (storage == null) { + throw new IllegalStateException("no 'storage' initialized."); + } + } + + public static class Config extends AbstractConfig { + private static final String CONFIG_GROUP = "AliyunOSSMoveCleanupPolicy"; + + private static final String CONFIG_PREFIX = "fs.cleanup.policy.move."; + + /** + * success oss bucket name config + */ + public static final String SUCCESS_OSS_BUCKET_NAME_CONFIG = CONFIG_PREFIX + "success.oss.bucket.name"; + /** + * success oss prefix path config + */ + public static final String SUCCESS_OSS_PREFIX_PATH_CONFIG = CONFIG_PREFIX + "success.oss.prefix.path"; + /** + * failure oss bucket name config + */ + public static final String FAILURES_OSS_BUCKET_NAME_CONFIG = CONFIG_PREFIX + "failure.oss.bucket.name"; + /** + * failure oss prefix path config + */ + public static final String FAILURES_OSS_PREFIX_PATH_CONFIG = CONFIG_PREFIX + "failure.oss.prefix.path"; + private static final String SUCCESS_OSS_BUCKET_NAME_DOC = + "The name of the destination OSS bucket for success objects."; + private static final String SUCCESS_OSS_PREFIX_PATH_DOC = + "The prefix to be used for defining the key of an OSS object to move into the destination bucket."; + private static final String FAILURES_OSS_BUCKET_NAME_DOC = + "The name of the destination OSS bucket for failure objects."; + private static final String FAILURES_OSS_PREFIX_PATH_DOC = + "The prefix to be used for defining the key of OSS object to move into the destination bucket."; + + /** + * Creates a new {@link Config} instance. + */ + public Config(final Map originals) { + super(configDef(), originals, true); + } + + static ConfigDef configDef() { + int groupCounter = 0; + return new ConfigDef().define(SUCCESS_OSS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, null, + ConfigDef.Importance.HIGH, SUCCESS_OSS_BUCKET_NAME_DOC, CONFIG_GROUP, groupCounter++, + ConfigDef.Width.NONE, SUCCESS_OSS_BUCKET_NAME_CONFIG) + .define(SUCCESS_OSS_PREFIX_PATH_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, + ConfigDef.Importance.HIGH, SUCCESS_OSS_PREFIX_PATH_DOC, CONFIG_GROUP, groupCounter++, + ConfigDef.Width.NONE, SUCCESS_OSS_PREFIX_PATH_CONFIG) + .define(FAILURES_OSS_PREFIX_PATH_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, + ConfigDef.Importance.HIGH, FAILURES_OSS_PREFIX_PATH_DOC, CONFIG_GROUP, groupCounter++, + ConfigDef.Width.NONE, FAILURES_OSS_PREFIX_PATH_CONFIG) + .define(FAILURES_OSS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, + FAILURES_OSS_BUCKET_NAME_DOC, CONFIG_GROUP, groupCounter++, ConfigDef.Width.NONE, + FAILURES_OSS_BUCKET_NAME_CONFIG); + } + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSAvroFileInputReader.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSAvroFileInputReader.java new file mode 100644 index 000000000..b74d986e7 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSAvroFileInputReader.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.reader.avro.AvroDataStreamIterator; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException; +import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import java.net.URI; + +/** + * The {@code AliyunOSSAvroFileInputReader} can be used to created records from an AVRO file loaded from Aliyun OSS. + */ +public class AliyunOSSAvroFileInputReader extends BaseAliyunOSSInputReader { + + @Override + protected FileInputIterator> newIterator(final URI objectURI, + final IteratorManager iteratorManager) { + + try { + final FileObjectMeta metadata = storage().getObjectMetadata(objectURI); + return new AvroDataStreamIterator(metadata, iteratorManager, storage().getInputStream(objectURI)); + + } catch (Exception e) { + throw new ReaderException("Failed to create AvroDataStreamIterator for: " + objectURI, e); + } + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSBytesArrayInputReader.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSBytesArrayInputReader.java new file mode 100644 index 000000000..cace0a0b0 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSBytesArrayInputReader.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.BytesArrayInputIteratorFactory; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import java.net.URI; +import java.util.Map; + +/** + * The {@code AliyunOSSBytesArrayInputReader} can be used to created records + * from an BytesArray file loaded from Aliyun OSS. + */ +public class AliyunOSSBytesArrayInputReader extends BaseAliyunOSSInputReader { + + private BytesArrayInputIteratorFactory factory; + + @Override + public void configure(final Map configs) { + super.configure(configs); + this.factory = new BytesArrayInputIteratorFactory(storage(), iteratorManager()); + } + + @Override + protected FileInputIterator> newIterator(final URI objectURI, + final IteratorManager iteratorManager) { + return factory.newIterator(objectURI); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSMetadataFileInputReader.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSMetadataFileInputReader.java new file mode 100644 index 000000000..fcb144464 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSMetadataFileInputReader.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import java.net.URI; +import java.util.Map; + +/** + * Send a single record containing file metadata. + */ +public class AliyunOSSMetadataFileInputReader extends BaseAliyunOSSInputReader { + + private FileInputMetadataIteratorFactory factory; + + @Override + public void configure(final Map configs) { + super.configure(configs); + this.factory = new FileInputMetadataIteratorFactory(storage()); + } + + @Override + protected FileInputIterator> newIterator(final URI context, + final IteratorManager iteratorManager) { + return factory.newIterator(context); + } + +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSRowFileInputReader.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSRowFileInputReader.java new file mode 100644 index 000000000..e42ec38c9 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSRowFileInputReader.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileInputIteratorConfig; +import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileInputIteratorFactory; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import java.net.URI; +import java.util.Map; + +public class AliyunOSSRowFileInputReader extends BaseAliyunOSSInputReader { + + private RowFileInputIteratorFactory factory; + + @Override + public void configure(final Map configs) { + super.configure(configs); + this.factory = + new RowFileInputIteratorFactory(new RowFileInputIteratorConfig(configs), storage(), iteratorManager()); + } + + /** + * {@inheritDoc} + */ + @Override + public FileInputIterator> newIterator(final URI objectURI, + final IteratorManager iteratorManager) { + return factory.newIterator(objectURI); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSXMLFileInputReader.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSXMLFileInputReader.java new file mode 100644 index 000000000..d3dc017ba --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSXMLFileInputReader.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputIteratorFactory; +import io.streamthoughts.kafka.connect.filepulse.fs.reader.xml.XMLFileInputReaderConfig; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import java.net.URI; +import java.util.Map; + +/** + * The {@code AliyunOSSXMLFileInputReader} can be used to created records from an XML file loaded from Aliyun OSS. + */ +public class AliyunOSSXMLFileInputReader extends BaseAliyunOSSInputReader { + + private XMLFileInputIteratorFactory factory; + + @Override + public void configure(final Map configs) { + super.configure(configs); + factory = new XMLFileInputIteratorFactory(new XMLFileInputReaderConfig(configs), storage(), iteratorManager()); + } + + @Override + protected FileInputIterator> newIterator(final URI objectURI, + final IteratorManager iteratorManager) { + return factory.newIterator(objectURI); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/BaseAliyunOSSInputReader.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/BaseAliyunOSSInputReader.java new file mode 100644 index 000000000..d91a855d4 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/BaseAliyunOSSInputReader.java @@ -0,0 +1,70 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import com.aliyun.oss.OSSClient; +import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting; +import io.streamthoughts.kafka.connect.filepulse.fs.AliyunOSSClientConfig; +import io.streamthoughts.kafka.connect.filepulse.fs.AliyunOSSStorage; +import io.streamthoughts.kafka.connect.filepulse.fs.utils.AliyunOSSClientUtils; +import io.streamthoughts.kafka.connect.filepulse.reader.StorageAwareFileInputReader; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code BaseAliyunOSSInputReader} provides the {@link AliyunOSSStorage}. + */ +public abstract class BaseAliyunOSSInputReader extends AbstractFileInputReader + implements StorageAwareFileInputReader { + + private static final Logger LOG = LoggerFactory.getLogger(BaseAliyunOSSInputReader.class); + protected AliyunOSSClientConfig clientConfig; + private OSSClient ossClient; + private AliyunOSSStorage storage; + + @Override + public void configure(final Map configs) { + super.configure(configs); + if (storage == null) { + LOG.info("Create new Aliyun OSS client from the properties passed through the connector's configuration "); + this.clientConfig = new AliyunOSSClientConfig(configs); + ossClient = AliyunOSSClientUtils.createOSSClient(clientConfig); + storage = new AliyunOSSStorage(ossClient); + storage.setDefaultStorageClass(clientConfig.getOSSDefaultStorageClass()); + } + } + + @VisibleForTesting + void setStorage(final AliyunOSSStorage storage) { + this.storage = storage; + } + + @Override + public AliyunOSSStorage storage() { + return storage; + } + + @Override + public void close() { + if (ossClient != null) { + ossClient.shutdown(); + } + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/utils/AliyunOSSClientUtils.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/utils/AliyunOSSClientUtils.java new file mode 100644 index 000000000..be78b09bd --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/utils/AliyunOSSClientUtils.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs.utils; + +import com.aliyun.oss.ClientBuilderConfiguration; +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSClientBuilder; +import io.streamthoughts.kafka.connect.filepulse.fs.AliyunOSSClientConfig; + +/** + * Utility class for creating new oss client. + */ +public class AliyunOSSClientUtils { + /** + * Helper method to creates a new {@link OSSClient} client. + * + * @param config + * @return + */ + public static OSSClient createOSSClient(AliyunOSSClientConfig config) { + ClientBuilderConfiguration clientConfiguration = new ClientBuilderConfiguration(); + clientConfiguration.setMaxErrorRetry(config.getOssMaxErrorRetries()); + clientConfiguration.setConnectionTimeout(config.getOSSConnectionTimeout()); + clientConfiguration.setMaxConnections(config.getOSSMaxConnections()); + clientConfiguration.setSocketTimeout(config.getOSSSocketTimeout()); + + /** + * Create oss client + */ + return (OSSClient) new OSSClientBuilder().build(config.getOSSEndpoint(), config.getOSSAccessKeyId().value(), + config.getOSSAccessKey().value(), clientConfiguration); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/utils/AliyunOSSURI.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/utils/AliyunOSSURI.java new file mode 100644 index 000000000..9f0beaeae --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/utils/AliyunOSSURI.java @@ -0,0 +1,252 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.fs.utils; + +import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Aliyun oss uri util + */ +public class AliyunOSSURI { + private static final Pattern ENDPOINT_PATTERN = Pattern.compile("^(.+\\.)?s3[.-]([a-z0-9-]+)\\."); + private URI uri; + private boolean isPathStyle; + private String bucket; + private String key; + + + public AliyunOSSURI(String str) { + this(str, true); + } + + public AliyunOSSURI(String str, boolean urlEncode) { + this(URI.create(preprocessUrlStr(str, urlEncode)), urlEncode); + } + + public AliyunOSSURI(URI uri) { + this(uri, false); + } + + private AliyunOSSURI(URI uri, boolean urlEncode) { + if (uri == null) { + throw new IllegalArgumentException("uri cannot be null"); + } + initialize(uri, urlEncode); + } + + private static String preprocessUrlStr(String str, boolean encode) { + if (encode) { + return URLEncoder.encode(str, StandardCharsets.UTF_8).replace("%3A", ":").replace("%2F", "/") + .replace("+", "%20"); + } else { + return str; + } + } + + private static String decode(String str) { + if (str == null) { + return null; + } else { + for (int i = 0; i < str.length(); ++i) { + if (str.charAt(i) == '%') { + return decode(str, i); + } + } + + return str; + } + } + + private static String decode(String str, int firstPercent) { + StringBuilder builder = new StringBuilder(); + builder.append(str, 0, firstPercent); + appendDecoded(builder, str, firstPercent); + for (int i = firstPercent + 3; i < str.length(); ++i) { + if (str.charAt(i) == '%') { + appendDecoded(builder, str, i); + i += 2; + } else { + builder.append(str.charAt(i)); + } + } + + return builder.toString(); + } + + private static void appendDecoded(StringBuilder builder, String str, int index) { + if (index > str.length() - 3) { + throw new IllegalStateException("Invalid percent-encoded string:\"" + str + "\"."); + } else { + char first = str.charAt(index + 1); + char second = str.charAt(index + 2); + char decoded = (char) (fromHex(first) << 4 | fromHex(second)); + builder.append(decoded); + } + } + + private static int fromHex(char c) { + if (c < '0') { + throw new IllegalStateException( + "Invalid percent-encoded string: bad character '" + c + "' in escape sequence."); + } else if (c <= '9') { + return c - 48; + } else if (c < 'A') { + throw new IllegalStateException( + "Invalid percent-encoded string: bad character '" + c + "' in escape sequence."); + } else if (c <= 'F') { + return c - 65 + 10; + } else if (c < 'a') { + throw new IllegalStateException( + "Invalid percent-encoded string: bad character '" + c + "' in escape sequence."); + } else if (c <= 'f') { + return c - 97 + 10; + } else { + throw new IllegalStateException( + "Invalid percent-encoded string: bad character '" + c + "' in escape sequence."); + } + } + + private void initialize(URI uri, boolean urlEncode) { + this.uri = uri; + if ("oss".equalsIgnoreCase(uri.getScheme())) { + this.isPathStyle = false; + this.bucket = uri.getAuthority(); + if (this.bucket == null) { + throw new IllegalArgumentException("Invalid OSS URI: no bucket: " + uri); + } + String path = uri.getPath(); + this.key = (path.length() <= 1) ? null : uri.getPath().substring(1); + return; + } + parseHost(urlEncode); + } + + private void parseHost(boolean urlEncode) { + String path = uri.getHost(); + if (path == null) { + throw new IllegalArgumentException("Invalid OSS URI: no hostname: " + uri); + } + Matcher matcher = ENDPOINT_PATTERN.matcher(path); + if (!matcher.find()) { + throw new IllegalArgumentException( + "Invalid OSS URI: hostname does not appear to be a valid OSS endpoint: " + uri); + } + String prefix = matcher.group(1); + if (prefix != null && !prefix.isEmpty()) { + this.isPathStyle = false; + this.bucket = prefix.substring(0, prefix.length() - 1); + path = uri.getPath(); + if (path != null && !path.isEmpty() && !"/".equals(uri.getPath())) { + this.key = uri.getPath().substring(1); + } + return; + } + this.isPathStyle = true; + decodeHost(urlEncode); + } + + private void decodeHost(boolean urlEncode) { + String path = urlEncode ? uri.getPath() : uri.getRawPath(); + if (!"".equals(path) && !"/".equals(path)) { + int index = path.indexOf(47, 1); + if (index == -1) { + this.bucket = decode(path.substring(1)); + this.key = null; + } else if (index == path.length() - 1) { + this.bucket = decode(path.substring(1, index)); + this.key = null; + } else { + this.bucket = decode(path.substring(1, index)); + this.key = decode(path.substring(index + 1)); + } + } + } + + public URI getURI() { + return this.uri; + } + + public boolean isPathStyle() { + return this.isPathStyle; + } + + public String getBucket() { + return this.bucket; + } + + public String getKey() { + return this.key; + } + + @Override + public String toString() { + return this.uri.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o != null && this.getClass() == o.getClass()) { + AliyunOSSURI that = (AliyunOSSURI) o; + if (this.isPathStyle != that.isPathStyle) { + return false; + } else if (!this.uri.equals(that.uri)) { + return false; + } else { + label58: + { + if (this.bucket != null) { + if (this.bucket.equals(that.bucket)) { + break label58; + } + } else if (that.bucket == null) { + break label58; + } + + return false; + } + + if (this.key != null) { + if (!this.key.equals(that.key)) { + return false; + } + } else if (that.key != null) { + return false; + } + return false; + } + } else { + return false; + } + } + + @Override + public int hashCode() { + int result = this.uri.hashCode(); + result = 31 * result + (this.isPathStyle ? 1 : 0); + result = 31 * result + (this.bucket != null ? this.bucket.hashCode() : 0); + result = 31 * result + (this.key != null ? this.key.hashCode() : 0); + return result; + } +} \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSClientConfigTest.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSClientConfigTest.java new file mode 100644 index 000000000..18a34d369 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/AliyunOSSClientConfigTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.streamthoughts.kafka.connect.filepulse.fs; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class AliyunOSSClientConfigTest { + + @Test + public void should_get_config() { + final AliyunOSSClientConfig config = new AliyunOSSClientConfig( + Map.of(AliyunOSSClientConfig.OSS_ACCESS_KEY_ID_CONFIG, "access-key", + AliyunOSSClientConfig.OSS_SECRET_KEY_CONFIG, "secret-key", + AliyunOSSClientConfig.OSS_ENDPOINT_CONFIG, "oss-endpoint", + AliyunOSSClientConfig.OSS_BUCKET_NAME_CONFIG, "test-bucket", + AliyunOSSClientConfig.OSS_BUCKET_PREFIX_CONFIG, "prefix")); + Assert.assertEquals("access-key", config.getOSSAccessKeyId().value()); + Assert.assertEquals("secret-key", config.getOSSAccessKey().value()); + Assert.assertEquals("oss-endpoint", config.getOSSEndpoint()); + Assert.assertEquals("test-bucket", config.getOSSBucketName()); + Assert.assertEquals("prefix", config.getOSSBucketPrefix()); + } + + + @Test(expected = ConfigException.class) + public void should_throw_exception_given_mutually_exclusive_options() { + AliyunOSSClientConfig config = new AliyunOSSClientConfig( + Map.of(AliyunOSSClientConfig.OSS_BUCKET_NAME_CONFIG, "test-bucket", + AliyunOSSClientConfig.OSS_BUCKET_PREFIX_CONFIG, "prefix")); + config.getOSSAccessKey(); + } +} \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/OSSBucketKeyTest.java b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/OSSBucketKeyTest.java new file mode 100644 index 000000000..98d7b1cff --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/OSSBucketKeyTest.java @@ -0,0 +1,43 @@ +/* + * Copyright 2023 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.streamthoughts.kafka.connect.filepulse.fs; + +import org.junit.Assert; +import org.junit.Test; + +import java.net.URI; + +public class OSSBucketKeyTest { + + @Test + public void should_return_object_name_given_key_with_prefix() { + OSSBucketKey key = new OSSBucketKey("bucket", "/path/to/object/text.csv"); + Assert.assertEquals("text.csv", key.objectName()); + } + + + @Test + public void should_return_object_name_given_key_with_url() { + OSSBucketKey source = new OSSBucketKey("bucket", "/path/to/object/text.csv"); + URI uri = source.toURI(); + OSSBucketKey dest = OSSBucketKey.fromURI(uri); + Assert.assertEquals("text.csv", dest.objectName()); + } +} diff --git a/connect-file-pulse-filesystems/pom.xml b/connect-file-pulse-filesystems/pom.xml index 5d46fb045..389ad8b98 100644 --- a/connect-file-pulse-filesystems/pom.xml +++ b/connect-file-pulse-filesystems/pom.xml @@ -17,8 +17,8 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - kafka-connect-filepulse-reactor @@ -39,6 +39,7 @@ filepulse-azure-storage-fs filepulse-google-cloud-storage-fs filepulse-sftp-fs + filepulse-aliyunoss-fs