diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-sftp-fs/pom.xml
new file mode 100644
index 000000000..f99c2995e
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/pom.xml
@@ -0,0 +1,87 @@
+
+
+
+
+ io.streamthoughts
+ kafka-connect-filepulse-filesystems
+ 2.13.0-SNAPSHOT
+
+ 4.0.0
+
+ Kafka Connect Source File Pulse SFTP FS
+ kafka-connect-filepulse-sftp-fs
+
+
+ ${project.parent.basedir}/..
+ ${project.parent.basedir}/../license-header
+ 0.1.55
+ 2.28.2
+ 3.11.1
+ 1.18.12
+
+
+
+
+ com.jcraft
+ jsch
+ ${jsch.version}
+
+
+ io.streamthoughts
+ kafka-connect-filepulse-commons-fs
+ ${project.version}
+
+
+ org.apache.commons
+ commons-compress
+
+
+ org.apache.avro
+ avro
+
+
+
+ org.mockito
+ mockito-junit-jupiter
+ ${mockito-junit-jupiter.version}
+ test
+
+
+ org.assertj
+ assertj-core
+ ${assertj-core.version}
+ test
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ test
+
+
+ io.streamthoughts
+ kafka-connect-filepulse-plugin
+ ${project.version}
+ test
+
+
+
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFileStorage.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFileStorage.java
new file mode 100644
index 000000000..c902cce82
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFileStorage.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import com.jcraft.jsch.SftpATTRS;
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import java.io.InputStream;
+import java.net.URI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpFileStorage implements Storage {
+ private static final Logger log = LoggerFactory.getLogger(SftpFileStorage.class);
+ private static final String CANNOT_STAT_FILE_ERROR_MSG_TEMPLATE = "Cannot stat file with uri: %s";
+
+ private final SftpClient sftpClient;
+
+ public SftpFileStorage(SftpFilesystemListingConfig config) {
+ this.sftpClient = new SftpClient(config);
+ }
+
+ SftpFileStorage(SftpClient sftpClient) {
+ this.sftpClient = sftpClient;
+ }
+
+ @Override
+ public FileObjectMeta getObjectMetadata(URI uri) {
+ log.debug("Getting object metadata for '{}'", uri);
+ return sftpClient.getObjectMetadata(uri)
+ .findFirst().orElseThrow(() -> new ConnectFilePulseException(buildCannotStatFileErrorMsg(uri)));
+ }
+
+ @Override
+ public boolean exists(URI uri) {
+ log.info("Checking if '{}' exists", uri);
+ SftpATTRS attrs = sftpClient.statFile(uri.toString());
+
+ return attrs.isReg();
+ }
+
+ private String buildCannotStatFileErrorMsg(URI uri) {
+ return String.format(CANNOT_STAT_FILE_ERROR_MSG_TEMPLATE, uri);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean delete(URI uri) {
+ return sftpClient.delete(uri);
+ }
+
+ @Override
+ public boolean move(URI source, URI dest) {
+ return sftpClient.move(source, dest);
+ }
+
+ @Override
+ public InputStream getInputStream(URI uri) {
+ return sftpClient.sftpFileInputStream(uri);
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListing.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListing.java
new file mode 100644
index 000000000..4c2964154
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListing.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpFilesystemListing implements FileSystemListing {
+
+ private static final Logger log = LoggerFactory.getLogger(SftpFilesystemListing.class);
+ private FileListFilter filter;
+
+ private SftpFilesystemListingConfig config;
+
+ private SftpClient sftpClient;
+
+ public SftpFilesystemListing(final List filters) {
+ Objects.requireNonNull(filters, "filters can't be null");
+ this.filter = new CompositeFileListFilter(filters);
+ }
+
+ @SuppressWarnings("unused")
+ public SftpFilesystemListing() {
+ this(Collections.emptyList());
+ }
+
+ @Override
+ public void configure(final Map configs) {
+ log.debug("Configuring SftpFilesystemListing");
+ config = new SftpFilesystemListingConfig(configs);
+ sftpClient = new SftpClient(config);
+ }
+
+ @Override
+ public Collection listObjects() {
+ String listingDirectoryPath = getConfig().getSftpListingDirectoryPath();
+
+ List filesMetadata = getSftpClient().listFiles(listingDirectoryPath)
+ .collect(Collectors.toList());
+
+ return filter.filterFiles(filesMetadata);
+ }
+
+ @Override
+ public void setFilter(FileListFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public SftpFileStorage storage() {
+ return new SftpFileStorage(config);
+ }
+
+ SftpClient getSftpClient() {
+ return sftpClient;
+ }
+
+ SftpFilesystemListingConfig getConfig() {
+ return config;
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListingConfig.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListingConfig.java
new file mode 100644
index 000000000..154f84d92
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListingConfig.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpFilesystemListingConfig extends AbstractConfig {
+ private static final Logger log = LoggerFactory.getLogger(SftpFilesystemListingConfig.class);
+
+ public static final String CONNECTOR_NAME_PROPERTY = "name";
+
+ public static final String SFTP_LISTING_HOST = "sftp.listing.host";
+ public static final String SFTP_LISTING_HOST_DOC = "Hostname to connect to";
+
+ public static final String SFTP_LISTING_PORT = "sftp.listing.port";
+ public static final String SFTP_LISTING_PORT_DOC = "Port to connect to";
+ public static final Integer SFTP_LISTING_PORT_DEFAULT = 22;
+
+ public static final String SFTP_LISTING_USER = "sftp.listing.user";
+ public static final String SFTP_LISTING_USER_DOC = "SFTP username";
+
+ public static final String SFTP_LISTING_PASSWORD = "sftp.listing.password";
+ public static final String SFTP_LISTING_PASSWORD_DOC = "SFTP user credentials";
+
+ public static final String SFTP_LISTING_DIRECTORY_PATH = "sftp.listing.directory.path";
+ public static final String SFTP_LISTING_DIRECTORY_DOC = "The input directory to scan";
+
+ public static final String SFTP_LISTING_STRICT_HOST_KEY_CHECK = "sftp.listing.strict.host.key.check";
+ public static final String SFTP_LISTING_STRICT_HOST_KEY_CHECK_DOC = "String host key checking";
+ public static final String SFTP_LISTING_STRICT_HOST_KEY_CHECK_DEFAULT = "no";
+
+ public static final String SFTP_CONNECTION_TIMEOUT_MS = "sftp.connection.timeout";
+ public static final String SFTP_CONNECTION_TIMEOUT_MS_DOC = "SFTP connection timeout in millis";
+ public static final Integer SFTP_CONNECTION_TIMEOUT_MS_DEFAULT = 10000;
+
+ public static final String SFTP_CONNECTION_RETRIES = "sftp.connection.retries";
+ public static final String SFTP_CONNECTION_RETRIES_DOC = "SFTP connection retries";
+ public static final Integer SFTP_CONNECTION_RETRIES_DEFAULT = 5;
+
+ public static final String SFTP_CONNECTION_DELAY_MS = "sftp.connection.retries.delay";
+ public static final String SFTP_CONNECTION_DELAY_DOC = "SFTP connection delay between retries in millis";
+ public static final Integer SFTP_CONNECTION_DELAY_DEFAULT = 5000;
+
+ public static ConfigDef getConf() {
+ return new ConfigDef()
+ .define(
+ SFTP_LISTING_DIRECTORY_PATH,
+ ConfigDef.Type.STRING,
+ ConfigDef.Importance.HIGH,
+ SFTP_LISTING_DIRECTORY_DOC
+ )
+ .define(
+ SFTP_LISTING_HOST,
+ ConfigDef.Type.STRING,
+ ConfigDef.Importance.HIGH,
+ SFTP_LISTING_HOST_DOC
+ )
+ .define(
+ SFTP_LISTING_USER,
+ ConfigDef.Type.STRING,
+ ConfigDef.Importance.HIGH,
+ SFTP_LISTING_USER_DOC
+ )
+ .define(
+ SFTP_LISTING_PASSWORD,
+ ConfigDef.Type.STRING,
+ ConfigDef.Importance.HIGH,
+ SFTP_LISTING_PASSWORD_DOC
+ )
+ .define(
+ SFTP_LISTING_PORT,
+ ConfigDef.Type.INT,
+ SFTP_LISTING_PORT_DEFAULT,
+ ConfigDef.Importance.HIGH,
+ SFTP_LISTING_PORT_DOC
+ )
+ .define(
+ SFTP_LISTING_STRICT_HOST_KEY_CHECK,
+ ConfigDef.Type.STRING,
+ SFTP_LISTING_STRICT_HOST_KEY_CHECK_DEFAULT,
+ ConfigDef.Importance.MEDIUM,
+ SFTP_LISTING_STRICT_HOST_KEY_CHECK_DOC
+ )
+ .define(
+ SFTP_CONNECTION_TIMEOUT_MS,
+ ConfigDef.Type.INT,
+ SFTP_CONNECTION_TIMEOUT_MS_DEFAULT,
+ ConfigDef.Importance.LOW,
+ SFTP_CONNECTION_TIMEOUT_MS_DOC
+ )
+ .define(
+ SFTP_CONNECTION_RETRIES,
+ ConfigDef.Type.INT,
+ SFTP_CONNECTION_RETRIES_DEFAULT,
+ ConfigDef.Importance.LOW,
+ SFTP_CONNECTION_RETRIES_DOC
+ )
+ .define(
+ SFTP_CONNECTION_DELAY_MS,
+ ConfigDef.Type.INT,
+ SFTP_CONNECTION_DELAY_DEFAULT,
+ ConfigDef.Importance.LOW,
+ SFTP_CONNECTION_DELAY_DOC
+ );
+ }
+
+ public SftpFilesystemListingConfig(Map, ?> originals) {
+ super(getConf(), originals, true);
+ }
+
+ public String getSftpListingHost() {
+ return getString(SFTP_LISTING_HOST);
+ }
+
+ public Integer getSftpListingPort() {
+ return getInt(SFTP_LISTING_PORT);
+ }
+
+ public String getSftpListingDirectoryPath() {
+ String path = getString(SFTP_LISTING_DIRECTORY_PATH);
+ return stripSlashEndingIfPresent(path);
+ }
+
+ private String stripSlashEndingIfPresent(String path) {
+ return path.endsWith("/") ? path.substring(0, path.length() - 1) : path;
+ }
+
+ public String getSftpListingUser() {
+ return getString(SFTP_LISTING_USER);
+ }
+
+ public String getSftpListingPassword() {
+ return getString(SFTP_LISTING_PASSWORD);
+ }
+
+ public String getSftpListingStrictHostKeyCheck() {
+ return getString(SFTP_LISTING_STRICT_HOST_KEY_CHECK);
+ }
+
+ public Integer getSftpConnectionTimeoutMs() {
+ return getInt(SFTP_CONNECTION_TIMEOUT_MS);
+ }
+
+ public String getConnectorName() {
+ String connectorName = Optional.ofNullable(originalsStrings().get(CONNECTOR_NAME_PROPERTY))
+ .orElseThrow(() -> new ConnectFilePulseException("Cannot find property 'name' in config"));
+ log.debug("Connector name: {}", connectorName);
+
+ return connectorName;
+ }
+
+ public Integer getSftpRetries() {
+ return getInt(SFTP_CONNECTION_RETRIES);
+ }
+
+ public Integer getSftpDelayBetweenRetriesMs() {
+ return getInt(SFTP_CONNECTION_DELAY_MS);
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpClient.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpClient.java
new file mode 100644
index 000000000..f9f058187
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpClient.java
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.client;
+
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_BAD_MESSAGE;
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_CONNECTION_LOST;
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_NO_CONNECTION;
+import static org.apache.commons.io.FilenameUtils.getName;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.ChannelSftp.LsEntry;
+import com.jcraft.jsch.SftpATTRS;
+import com.jcraft.jsch.SftpException;
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig;
+import io.streamthoughts.kafka.connect.filepulse.fs.stream.ConnectionAwareInputStream;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpClient {
+
+ private static final Logger log = LoggerFactory.getLogger(SftpClient.class);
+ static final String IS_REGULAR_FILE = "is.regular.file";
+ public static final String CHANNEL_TYPE = "sftp";
+ static final List TRANSIENT_SFTP_ERROR_CODES = List.of(
+ SSH_FX_NO_CONNECTION, SSH_FX_CONNECTION_LOST, SSH_FX_BAD_MESSAGE);
+
+ private final SftpFilesystemListingConfig config;
+
+ /**
+ * Initializes the underlying SFTP channel.
+ *
+ * @param config the connector configuration object.
+ */
+ public SftpClient(SftpFilesystemListingConfig config) {
+ this.config = config;
+ }
+
+ public Stream listFiles(String path) {
+ log.info("Listing files in path '{}'", path);
+ return listAll(path)
+ .filter(this::isRegularFile)
+ .map(this::buildFileMetadata)
+ .peek(meta -> log.debug("Found object '{}'", meta));
+ }
+
+ public Stream listAll(String path) {
+ return retryAction(() -> doInCloseableConnection(c -> listAllCore(path, c)));
+ }
+
+ @SuppressWarnings("unchecked")
+ Stream listAllCore(String path, SftpConnection connection) {
+ try {
+ ChannelSftp channel = connection.getChannel();
+ return channel.ls(path).stream();
+ } catch (SftpException e) {
+ throw new ConnectFilePulseException("Cannot list files", e);
+ }
+ }
+
+ public FileObjectMeta buildFileMetadata(LsEntry entry) {
+ return buildFileMetadata(entry.getFilename(), entry.getAttrs());
+ }
+
+ FileObjectMeta buildFileMetadata(String filename, SftpATTRS attrs) {
+ return new GenericFileObjectMeta.Builder()
+ .withName(filename)
+ .withUri(buildUri(filename))
+ .withLastModified(Instant.ofEpochSecond(attrs.getMTime()))
+ .withContentLength(attrs.getSize())
+ .withUserDefinedMetadata(Map.of(IS_REGULAR_FILE, attrs.isReg()))
+ .build();
+ }
+
+ private boolean isRegularFile(LsEntry e) {
+ return e.getAttrs().isReg();
+ }
+
+ public SftpATTRS statFile(String absolutePath) {
+ return retryAction(() -> doInCloseableConnection(c -> statFileCore(absolutePath, c)));
+ }
+
+ SftpATTRS statFileCore(String absolutePath, SftpConnection connection) {
+ log.info("Getting attributes for file '{}'", absolutePath);
+ try {
+ ChannelSftp channel = connection.getChannel();
+ return channel.stat(absolutePath);
+ } catch (SftpException e) {
+ throw new ConnectFilePulseException("Cannot stat file: " + absolutePath, e);
+ }
+ }
+
+ public Stream getObjectMetadata(URI uri) {
+ String absolutePath = uri.toString();
+ final String filename = getName(absolutePath);
+ return Stream.of((SftpATTRS) retryAction(() -> doInCloseableConnection(c -> statFileCore(absolutePath, c))))
+ .map(attrs -> buildFileMetadata(filename, attrs));
+ }
+
+ /**
+ * Builds a file URI as the absolute path from the sftp root.
+ *
+ * @param name the filename.
+ * @return the absolute path of the file
+ */
+ public URI buildUri(String name) {
+ return URI.create(String.format("%s/%s",
+ config.getSftpListingDirectoryPath(),
+ name));
+ }
+
+ /**
+ * Instantiates an input stream on the remote file on the sftp.
+ *
+ * @param uri the URI representing a file on the SFTP
+ * @return the inputstream.
+ */
+ public ConnectionAwareInputStream sftpFileInputStream(URI uri) {
+ log.info("Getting InputStream for '{}'", uri);
+ String absolutePath = uri.toString();
+
+ return retryAction(() ->
+ doInConnection(connection -> {
+ try {
+ ChannelSftp channel = connection.getChannel();
+ InputStream inputStream = channel.get(absolutePath);
+ return new ConnectionAwareInputStream(connection, absolutePath, inputStream);
+ } catch (SftpException e) {
+ log.error("Cannot open sftp InputStream for " + absolutePath, e);
+ throw new ConnectFilePulseException(e);
+ }
+ }));
+ }
+
+ /**
+ * Deletes the file from this storage.
+ *
+ * @param uri the file {@link URI}.
+ * @return {@code true} if the file has been removed successfully, otherwise {@code false}.
+ */
+ public boolean delete(URI uri) {
+ log.info("Deleting file on '{}'", uri);
+ String absolutePath = uri.toString();
+
+ return retryAction(() ->
+ doInCloseableConnection(connection -> {
+ try {
+ ChannelSftp channel = connection.getChannel();
+ channel.rm(absolutePath);
+ return true;
+ } catch (SftpException e) {
+ log.error("Failed to remove file from " + uri, e);
+ if (isRetryableException(e)) {
+ throw new ConnectFilePulseException(e);
+ }
+ }
+ return false;
+ })
+ );
+ }
+
+ public boolean move(URI source, URI destination) {
+ log.info("Moving file from '{}' to {} ", source, destination);
+ return retryAction(() ->
+ doInCloseableConnection(connection -> {
+ try {
+ ChannelSftp channel = connection.getChannel();
+ channel.rename(source.toString(), destination.toString());
+ return true;
+ } catch (SftpException e) {
+ log.error("Failed to move file from {} to {} ", source, destination, e);
+ if (isRetryableException(e)) {
+ throw new ConnectFilePulseException(e);
+ }
+ }
+ return false;
+ })
+ );
+ }
+
+ R doInConnection(Function action) {
+ SftpConnection sftpConnection = createSftpConnection();
+ return action.apply(sftpConnection);
+ }
+
+ R doInCloseableConnection(Function action) {
+ try (SftpConnection sftpConnection = createSftpConnection()) {
+ return action.apply(sftpConnection);
+ }
+ }
+
+ SftpConnection createSftpConnection() {
+ return new SftpConnection(config);
+ }
+
+ T retryAction(Supplier action) {
+ try {
+ return action.get();
+ } catch (ConnectFilePulseException e) {
+ return retryActionCore(config.getSftpRetries(), action, e);
+ }
+ }
+
+ T retryActionCore(Integer remainingRetries, Supplier action, ConnectFilePulseException prevException) {
+ if (remainingRetries == 0) {
+ throw prevException;
+ }
+
+ log.debug("Retrying action, attempt {}", config.getSftpRetries() - remainingRetries + 1);
+
+ try {
+ return action.get();
+ } catch (ConnectFilePulseException e) {
+ wait(config);
+ return retryActionCore(remainingRetries - 1, action, e);
+ }
+ }
+
+ private void wait(SftpFilesystemListingConfig config) {
+ try {
+ Thread.sleep(config.getSftpDelayBetweenRetriesMs());
+ } catch (InterruptedException e) {
+ throw new ConnectFilePulseException(buildConnectErrorMsg(), e);
+ }
+ }
+
+ private String buildConnectErrorMsg() {
+ return String.format("Cannot connect as user %s to %s:%d",
+ config.getSftpListingUser(),
+ config.getSftpListingHost(),
+ config.getSftpListingPort());
+ }
+
+ private static boolean isRetryableException(SftpException e) {
+ return TRANSIENT_SFTP_ERROR_CODES.stream().anyMatch(i -> e.id == i);
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpConnection.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpConnection.java
new file mode 100644
index 000000000..a513d22df
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpConnection.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.client;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig;
+import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpConnection implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(SftpConnection.class);
+ private static final String STRICT_HOST_KEY_CHECKING = "StrictHostKeyChecking";
+ public static final String CHANNEL_TYPE = "sftp";
+
+ private final SftpFilesystemListingConfig config;
+ private final JSch jsch;
+ private final Session session;
+ private final ChannelSftp channel;
+
+ public SftpConnection(SftpFilesystemListingConfig config) {
+ this.config = config;
+ this.jsch = initJsch();
+ this.session = initSession();
+ this.channel = initChannel();
+ }
+
+ JSch initJsch() {
+ return new JSch();
+ }
+
+ Session initSession() {
+ try {
+ SftpFilesystemListingConfig conf = getConfig();
+ Session jschSession = getJSch().getSession(conf.getSftpListingUser(), conf.getSftpListingHost(),
+ conf.getSftpListingPort());
+ jschSession.setPassword(conf.getSftpListingPassword());
+ jschSession.setConfig(STRICT_HOST_KEY_CHECKING, conf.getSftpListingStrictHostKeyCheck());
+ jschSession.connect(conf.getSftpConnectionTimeoutMs());
+ return jschSession;
+ } catch (JSchException e) {
+ throw new ConnectFilePulseException(buildConnectErrorMsg(), e);
+ }
+ }
+
+ ChannelSftp initChannel() {
+ try {
+ SftpFilesystemListingConfig conf = getConfig();
+ ChannelSftp channel = (ChannelSftp) getSession().openChannel(CHANNEL_TYPE);
+ channel.connect(conf.getSftpConnectionTimeoutMs());
+ return channel;
+ } catch (JSchException e) {
+ throw new ConnectFilePulseException(buildConnectErrorMsg(), e);
+ }
+ }
+
+ public ChannelSftp getChannel() {
+ return channel;
+ }
+
+ public JSch getJSch() {
+ return jsch;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ public SftpFilesystemListingConfig getConfig() {
+ return config;
+ }
+
+ private String buildConnectErrorMsg() {
+ SftpFilesystemListingConfig conf = getConfig();
+ return String.format("Cannot connect as user %s to %s:%d",
+ conf.getSftpListingUser(),
+ conf.getSftpListingHost(),
+ conf.getSftpListingPort());
+ }
+
+ @Override
+ public void close() {
+ Optional.ofNullable(session).filter(Session::isConnected).ifPresent(Session::disconnect);
+
+ log.debug("Connection to {}:{} successfully closed.",
+ config.getSftpListingHost(), config.getSftpListingPort());
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileInputIteratorBuilder.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileInputIteratorBuilder.java
new file mode 100644
index 000000000..ecc00836c
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileInputIteratorBuilder.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.iterator;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.IteratorManager;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.NonBlockingBufferReader;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileWithFooterInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
+import java.nio.charset.Charset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpRowFileInputIteratorBuilder {
+
+ private static final Logger log = LoggerFactory.getLogger(SftpRowFileInputIteratorBuilder.class);
+ private Charset charset = UTF_8;
+ private int minNumReadRecords = 1;
+ private FileObjectMeta metadata;
+ private long waitMaxMs = 0;
+ private int skipHeaders = 0;
+ private int skipFooters = 0;
+ private IteratorManager iteratorManager;
+ private NonBlockingBufferReader reader;
+
+ public SftpRowFileInputIteratorBuilder withReader(final NonBlockingBufferReader reader) {
+ this.reader = reader;
+ return this;
+ }
+
+ public SftpRowFileInputIteratorBuilder withMetadata(final FileObjectMeta metadata) {
+ this.metadata = metadata;
+ return this;
+ }
+
+ public SftpRowFileInputIteratorBuilder withSkipHeaders(final int skipHeaders) {
+ this.skipHeaders = skipHeaders;
+ return this;
+ }
+
+ public SftpRowFileInputIteratorBuilder withSkipFooters(final int skipFooters) {
+ this.skipFooters = skipFooters;
+ return this;
+ }
+
+ public SftpRowFileInputIteratorBuilder withMinNumReadRecords(final int minNumReadRecords) {
+ this.minNumReadRecords = minNumReadRecords;
+ return this;
+ }
+
+ public SftpRowFileInputIteratorBuilder withMaxWaitMs(final long maxWaitMs) {
+ this.waitMaxMs = maxWaitMs;
+ return this;
+ }
+
+ public SftpRowFileInputIteratorBuilder withCharset(final Charset charset) {
+ this.charset = charset;
+ return this;
+ }
+
+ public SftpRowFileInputIteratorBuilder withIteratorManager(final IteratorManager iteratorManager) {
+ this.iteratorManager = iteratorManager;
+ return this;
+ }
+
+ public FileInputIterator> build() {
+ FileInputIterator> iterator;
+
+ log.info("Building iterator");
+
+
+ iterator = initRowFileInputIterator();
+
+ iterator = decorateIteratorToHandleFooter(iterator);
+ iterator = decorateIteratorToHandleHeader(iterator);
+
+ return iterator;
+ }
+
+ RowFileInputIterator initRowFileInputIterator() {
+ return new RowFileInputIterator(metadata, iteratorManager, reader)
+ .setMinNumReadRecords(minNumReadRecords)
+ .setMaxWaitMs(waitMaxMs);
+ }
+
+ private FileInputIterator> decorateIteratorToHandleFooter(
+ FileInputIterator> iterator) {
+ if (skipFooters > 0) {
+ iterator = initRowFileWithFooterInputIterator(iterator);
+ }
+ return iterator;
+ }
+
+ RowFileWithFooterInputIterator initRowFileWithFooterInputIterator(
+ FileInputIterator> iterator) {
+ return new RowFileWithFooterInputIterator(
+ skipFooters,
+ metadata.uri(),
+ charset,
+ iterator
+ );
+ }
+
+ private FileInputIterator> decorateIteratorToHandleHeader(
+ FileInputIterator> iterator) {
+ if (skipHeaders > 0) {
+ iterator = initSftpRowFileWithHeadersInputIterator(iterator);
+ }
+ return iterator;
+ }
+
+ SftpRowFileWithHeadersInputIterator initSftpRowFileWithHeadersInputIterator(
+ FileInputIterator> iterator) {
+ return new SftpRowFileWithHeadersInputIterator(
+ skipHeaders,
+ reader,
+ iterator
+ );
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileInputIteratorFactory.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileInputIteratorFactory.java
new file mode 100644
index 000000000..1cbd0f49e
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileInputIteratorFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.iterator;
+
+import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
+import io.streamthoughts.kafka.connect.filepulse.fs.Storage;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.IteratorManager;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.NonBlockingBufferReader;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileInputIteratorConfig;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIteratorFactory;
+import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
+import java.net.URI;
+
+public class SftpRowFileInputIteratorFactory implements FileInputIteratorFactory {
+
+ private final RowFileInputIteratorConfig configs;
+
+ private final Storage storage;
+
+ private final IteratorManager iteratorManager;
+
+ public SftpRowFileInputIteratorFactory(final RowFileInputIteratorConfig config,
+ final Storage storage,
+ final IteratorManager iteratorManager) {
+ this.configs = config;
+ this.storage = storage;
+ this.iteratorManager = iteratorManager;
+ }
+
+ @Override
+ public FileInputIterator> newIterator(URI objectURI) {
+ final FileObjectMeta objectMetadata = storage.getObjectMetadata(objectURI);
+
+ return new SftpRowFileInputIteratorBuilder()
+ .withMetadata(objectMetadata)
+ .withCharset(configs.charset())
+ .withMinNumReadRecords(configs.minReadRecords())
+ .withSkipHeaders(configs.skipHeaders())
+ .withSkipFooters(configs.skipFooters())
+ .withMaxWaitMs(configs.maxWaitMs())
+ .withIteratorManager(iteratorManager)
+ .withReader(tryConfigureReader(objectURI))
+ .build();
+ }
+
+ private NonBlockingBufferReader tryConfigureReader(URI objectURI) {
+ try {
+ return configureReader(objectURI);
+ } catch (Exception e) {
+ throw new ReaderException("Cannot get InputStream", e);
+ }
+ }
+
+ private NonBlockingBufferReader configureReader(URI objectURI) throws Exception {
+ NonBlockingBufferReader br = new NonBlockingBufferReader(
+ storage.getInputStream(objectURI),
+ configs.bufferInitialBytesSize(),
+ configs.charset()
+ );
+
+ br.disableAutoFlush();
+ return br;
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileWithHeadersInputIterator.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileWithHeadersInputIterator.java
new file mode 100644
index 000000000..c5239e6b7
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileWithHeadersInputIterator.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.iterator;
+
+import static java.util.stream.Collectors.toList;
+
+import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.NonBlockingBufferReader;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileInputIteratorDecorator;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileRecordOffset;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileWithHeadersInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.internal.TextBlock;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
+import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
+import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Predicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpRowFileWithHeadersInputIterator extends RowFileInputIteratorDecorator {
+
+ private static final Logger log = LoggerFactory.getLogger(SftpRowFileWithHeadersInputIterator.class);
+ private static final String HEADERS_RECORD_FIELD = "headers";
+
+ /**
+ * The number of rows to be skipped at the beginning of file.
+ */
+ private final int skipHeaders;
+
+ private final NonBlockingBufferReader sequentialReader;
+
+ private List headers;
+
+ private List headerNames;
+
+ public SftpRowFileWithHeadersInputIterator(final int skipHeaders,
+ final NonBlockingBufferReader sequentialReader,
+ final FileInputIterator> iterator) {
+ super(iterator);
+ this.skipHeaders = skipHeaders;
+ this.sequentialReader = sequentialReader;
+ }
+
+ /** We had to reimplement this class since
+ * @see RowFileWithHeadersInputIterator#next() closes the stream after reading the header in the try-with-resources.
+ * When reading a file from the remote sftp server we want to handle one single flow of reads.
+ */
+ @Override
+ public RecordsIterable> next() {
+ if (headers == null) {
+ initHeaders();
+ }
+
+ final RecordsIterable> records = iterator.next();
+
+ return new RecordsIterable<>(records.stream()
+ .filter(isNotHeaderLine())
+ .peek(record -> record.value().put(HEADERS_RECORD_FIELD, headerNames))
+ .collect(toList()));
+ }
+
+ void initHeaders() {
+ log.info("Starting to read header lines ({}) from file {}", skipHeaders, context().metadata().uri());
+
+ try {
+ headers = sequentialReader.readLines(skipHeaders, true);
+ headerNames = headers
+ .stream()
+ .map(TextBlock::data)
+ .collect(toList());
+
+ log.info("Retrieved headers '{}'", headerNames);
+ } catch (IOException e) {
+ throw new ReaderException(String.format("Cannot read lines from file %s",
+ context().metadata().uri()), e);
+ }
+ if (headers.size() < skipHeaders) {
+ throw new ReaderException(
+ String.format(
+ "Not enough data for reading headers from file %s (available=%d, expecting=%d)",
+ context().metadata().uri(),
+ headers.size(),
+ skipHeaders)
+ );
+ }
+ }
+
+ private Predicate> isNotHeaderLine() {
+ return record -> {
+ final RowFileRecordOffset offset = (RowFileRecordOffset) record.offset();
+ return offset.startPosition() > headers.get(skipHeaders - 1).startOffset();
+ };
+ }
+
+ List getHeaderNames() {
+ return headerNames;
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SftpRowFileInputReader.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SftpRowFileInputReader.java
new file mode 100644
index 000000000..af8b28fa6
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SftpRowFileInputReader.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.reader;
+
+import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
+import io.streamthoughts.kafka.connect.filepulse.fs.SftpFileStorage;
+import io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig;
+import io.streamthoughts.kafka.connect.filepulse.fs.iterator.SftpRowFileInputIteratorFactory;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileInputIteratorConfig;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.reader.StorageAwareFileInputReader;
+import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
+import java.net.URI;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SftpRowFileInputReader extends AbstractFileInputReader
+ implements StorageAwareFileInputReader {
+ private static final Logger log = LoggerFactory.getLogger(SftpRowFileInputReader.class);
+ private SftpFileStorage storage;
+
+ private SftpRowFileInputIteratorFactory factory;
+
+ @Override
+ public void configure(Map configs) {
+ super.configure(configs);
+
+ log.debug("Configuring SftpRowFileInputReader");
+
+ if (storage == null) {
+ storage = initStorage(configs);
+ log.debug("Storage instantiated successfully");
+ }
+
+ this.factory = initIteratorFactory(configs);
+ }
+
+ SftpRowFileInputIteratorFactory initIteratorFactory(Map configs) {
+ return new SftpRowFileInputIteratorFactory(
+ new RowFileInputIteratorConfig(configs),
+ storage,
+ iteratorManager()
+ );
+ }
+
+ SftpFileStorage initStorage(Map configs) {
+ final SftpFilesystemListingConfig config = new SftpFilesystemListingConfig(configs);
+ return new SftpFileStorage(config);
+ }
+
+ @Override
+ public SftpFileStorage storage() {
+ return storage;
+ }
+
+ @Override
+ protected FileInputIterator> newIterator(URI objectURI, IteratorManager iteratorManager) {
+ log.info("Getting new iterator for object '{}'", objectURI);
+ return factory.newIterator(objectURI);
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/stream/ConnectionAwareInputStream.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/stream/ConnectionAwareInputStream.java
new file mode 100644
index 000000000..b7e567d6e
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/stream/ConnectionAwareInputStream.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2019-2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.stream;
+
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpConnection;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionAwareInputStream extends InputStream {
+
+ private static final Logger log = LoggerFactory.getLogger(ConnectionAwareInputStream.class);
+ private static final String ZIP_EXTENSION = ".zip";
+ final SftpConnection connection;
+ final String absolutePath;
+ final InputStream delegate;
+
+ public ConnectionAwareInputStream(SftpConnection connection, String absolutePath, InputStream delegate) {
+ this.connection = connection;
+ this.absolutePath = absolutePath;
+ this.delegate = isContentZipped() ? wrapAsCompressedStream(delegate) : delegate;
+ }
+
+ private InputStream wrapAsCompressedStream(InputStream raw) {
+ try {
+ return wrapAsCompressedStreamCore(raw);
+ } catch (IOException ioe) {
+ log.error("Cannot wrap InputStream into a compressed stream for " + absolutePath, ioe);
+ throw new ConnectFilePulseException(ioe);
+ }
+ }
+
+ /**
+ * Wraps the raw stream into the appropriate specific compressed stream.
+ *
+ * @param raw the underlying input stream.
+ * @return the wrapped input stream if appropriate, the raw stream otherwise.
+ * @throws IOException if the file is compressed but for some reason the compressed stream cannot be created.
+ */
+ private InputStream wrapAsCompressedStreamCore(InputStream raw) throws IOException {
+ log.debug("Input file is a ZIP, embedding InputStream into a ZipInputStream");
+ ZipInputStream zipInputStream = new ZipInputStream(raw, StandardCharsets.UTF_8);
+ ZipEntry zipEntry = zipInputStream.getNextEntry();
+
+ return Optional.ofNullable(zipEntry)
+ .map(__ -> zipInputStream)
+ .orElseThrow(() -> new ConnectFilePulseException(
+ String.format("Zip file '%s' has no content", absolutePath)
+ ));
+ }
+
+ public boolean isContentZipped() {
+ return absolutePath.toLowerCase().endsWith(ZIP_EXTENSION);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (delegate != null) {
+ delegate.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ return delegate.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return delegate.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return delegate.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return delegate.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return delegate.available();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ delegate.mark(readlimit);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ delegate.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return delegate.markSupported();
+ }
+
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFileStorageTest.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFileStorageTest.java
new file mode 100644
index 000000000..47797040d
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFileStorageTest.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import static java.time.Instant.ofEpochSecond;
+import static java.time.temporal.ChronoUnit.DAYS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.jcraft.jsch.SftpATTRS;
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient;
+import io.streamthoughts.kafka.connect.filepulse.fs.stream.ConnectionAwareInputStream;
+import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Instant;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SftpFileStorageTest {
+
+ @Test
+ @SneakyThrows
+ void when_regular_file_exists_should_return_true() {
+ SftpClient client = mock(SftpClient.class);
+
+ SftpATTRS entryAttrs = mock(SftpATTRS.class);
+ when(entryAttrs.isReg()).thenReturn(true);
+
+ when(client.statFile(eq(Fixture.fUri.toString()))).thenReturn(entryAttrs);
+
+ SftpFileStorage storage = new SftpFileStorage(client);
+ assertThat(storage.exists(Fixture.fUri)).isTrue();
+ }
+
+ @Test
+ @SneakyThrows
+ void when_resources_is_not_a_regular_file_exists_should_return_false() {
+ SftpClient client = mock(SftpClient.class);
+
+ SftpATTRS entryAttrs = mock(SftpATTRS.class);
+ when(entryAttrs.isReg()).thenReturn(false);
+
+ when(client.statFile(eq(Fixture.fUri.toString()))).thenReturn(entryAttrs);
+
+ SftpFileStorage storage = new SftpFileStorage(client);
+ assertThat(storage.exists(Fixture.fUri)).isFalse();
+ }
+
+ @Test
+ @SneakyThrows
+ void when_resource_does_not_exists_getObjectMetadata_should_throw_exception() {
+ SftpClient client = mock(SftpClient.class);
+ when(client.getObjectMetadata(eq(Fixture.notExistsUri))).thenReturn(Stream.empty());
+
+ SftpFileStorage storage = new SftpFileStorage(client);
+ assertThatThrownBy(() -> storage.getObjectMetadata(Fixture.notExistsUri)).isInstanceOf(ConnectFilePulseException.class);
+ verify(client).getObjectMetadata(Fixture.notExistsUri);
+ verify(client, never()).buildFileMetadata(any());
+ }
+
+ @Test
+ @SneakyThrows
+ void when_resource_exists_getObjectMetadata_should_return_correct_FileObjectMeta() {
+
+ SftpClient client = mock(SftpClient.class);
+
+ when(client.getObjectMetadata(eq(Fixture.fUri))).thenReturn(Stream.of(Fixture.expectedFileObjectMeta));
+
+ SftpFileStorage storage = new SftpFileStorage(client);
+ assertThat(storage.getObjectMetadata(Fixture.fUri)).isEqualTo(Fixture.expectedFileObjectMeta);
+ verify(client).getObjectMetadata(eq(Fixture.fUri));
+ }
+
+ @Test
+ @SneakyThrows
+ void when_getInputStream_is_called_it_should_delegate_to_sftpClient_sftpFileInputStream() {
+ SftpClient client = mock(SftpClient.class);
+ ConnectionAwareInputStream sftpStream = mock(ConnectionAwareInputStream.class);
+ when(client.sftpFileInputStream(Fixture.fUri)).thenReturn(sftpStream);
+
+ SftpFileStorage storage = new SftpFileStorage(client);
+ InputStream result = storage.getInputStream(Fixture.fUri);
+ assertThat(result).isEqualTo(sftpStream);
+ verify(client).sftpFileInputStream(eq(Fixture.fUri));
+ }
+
+ @Test
+ void when_move_is_called_it_should_delegate_to_sftpClient_move() {
+ // Given
+ SftpClient client = mock(SftpClient.class);
+ when(client.move(Fixture.fUri, Fixture.tUri)).thenReturn(true);
+ SftpFileStorage storage = new SftpFileStorage(client);
+
+ // When
+ boolean result = storage.move(Fixture.fUri, Fixture.tUri);
+
+ // Then
+ assertThat(result).isTrue();
+ verify(client).move(eq(Fixture.fUri), eq(Fixture.tUri));
+ }
+
+ @Test
+ @SneakyThrows
+ void when_delete_is_called_it_should_delegate_to_sftpClient_delete() {
+ SftpClient client = mock(SftpClient.class);
+ when(client.delete(Fixture.fUri)).thenReturn(true);
+
+ SftpFileStorage storage = new SftpFileStorage(client);
+ boolean result = storage.delete(Fixture.fUri);
+ assertThat(result).isTrue();
+ verify(client).delete(eq(Fixture.fUri));
+ }
+
+ interface Fixture {
+ String fName = "f.txt";
+ String path = "/path";
+ String destination = "/destination";
+ URI fUri = URI.create(String.format("%s/%s", path, fName));
+ URI tUri = URI.create(String.format("%s/%s", destination, fName));
+ URI notExistsUri = URI.create(String.format("%s/%s", path, "this_file_does_not_exists.txt"));
+
+ int entryMTime = (int) Instant.now().minus(2, DAYS).getEpochSecond();
+ long entrySize = 1024;
+
+ GenericFileObjectMeta expectedFileObjectMeta =
+ new GenericFileObjectMeta.Builder()
+ .withName(fName)
+ .withUri(fUri)
+ .withLastModified(ofEpochSecond(entryMTime))
+ .withContentLength(entrySize)
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListingConfigTest.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListingConfigTest.java
new file mode 100644
index 000000000..92e3c5b81
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListingConfigTest.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.CONNECTOR_NAME_PROPERTY;
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_CONNECTION_TIMEOUT_MS;
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_LISTING_DIRECTORY_PATH;
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_LISTING_HOST;
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_LISTING_PASSWORD;
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_LISTING_PORT;
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_LISTING_USER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+class SftpFilesystemListingConfigTest {
+
+ private static Stream when_config_initialized_getSftpListingDirectoryPath_should_return_the_path() {
+ return Stream.of(arguments("/test/path", "/test/path"),
+ arguments("/test/path/", "/test/path"));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void when_config_initialized_getSftpListingDirectoryPath_should_return_the_path(String path, String expectedPath) {
+
+ SftpFilesystemListingConfig config =
+ new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, path));
+
+ assertEquals(expectedPath, config.getSftpListingDirectoryPath());
+ }
+
+ @Test
+ void when_hostname_not_specified_config_initialization_should_throw_exception() {
+ assertThrows(ConfigException.class, () -> new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, "/path")));
+ }
+
+ @Test
+ void when_username_not_specified_config_initialization_should_throw_exception() {
+ assertThrows(ConfigException.class, () -> new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, "/path")));
+ }
+
+ @Test
+ void when_password_not_specified_config_initialization_should_throw_exception() {
+ assertThrows(ConfigException.class, () -> new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_DIRECTORY_PATH, "/path")));
+ }
+
+ @Test
+ void when_root_path_not_specified_config_initialization_should_throw_exception() {
+ assertThrows(ConfigException.class, () -> new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_PASSWORD, "p")));
+ }
+
+ @Test
+ void when_port_not_specified_config_initialization_should_assign_default_sftp_port() {
+ SftpFilesystemListingConfig config =
+ new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, "/path"));
+
+ assertEquals(22, config.getSftpListingPort());
+ }
+
+ @Test
+ void when_port_specified_config_initialization_should_override_default_sftp_port() {
+ SftpFilesystemListingConfig config =
+ new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_PORT, 1234,
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, "/path"));
+
+ assertEquals(1234, config.getSftpListingPort());
+ }
+
+ @Test
+ void when_connection_timeout_not_specified_config_initialization_should_assign_default_timeout() {
+ SftpFilesystemListingConfig config =
+ new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, "/path"));
+
+ assertEquals(10000, config.getSftpConnectionTimeoutMs());
+ }
+
+ @Test
+ void when_connection_timeout_specified_config_initialization_should_override_default_timeout() {
+ SftpFilesystemListingConfig config =
+ new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_USER, "u",
+ SFTP_CONNECTION_TIMEOUT_MS, 5000,
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, "/path"));
+
+ assertEquals(5000, config.getSftpConnectionTimeoutMs());
+ }
+
+ @Test
+ void when_config_has_name_property_getConnectorName_should_return_the_name() {
+ SftpFilesystemListingConfig config =
+ new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, "/path",
+ CONNECTOR_NAME_PROPERTY, "test_connector"));
+
+ assertEquals("test_connector", config.getConnectorName());
+ }
+
+ @Test
+ void when_config_does_not_have_name_property_getConnectorName_should_throw_exception() {
+ SftpFilesystemListingConfig config =
+ new SftpFilesystemListingConfig(Map.of(
+ SFTP_LISTING_HOST, "h0",
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, "/path"));
+
+ assertThrows(ConnectFilePulseException.class, config::getConnectorName);
+ }
+
+}
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListingTest.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListingTest.java
new file mode 100644
index 000000000..740e50eeb
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/SftpFilesystemListingTest.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs;
+
+import static java.time.Instant.ofEpochSecond;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import com.jcraft.jsch.ChannelSftp.LsEntry;
+import com.jcraft.jsch.SftpATTRS;
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient;
+import io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
+import java.net.URI;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class SftpFilesystemListingTest {
+
+ @Test
+ @Order(1)
+ public void when_no_filter_regex_specified_and_entries_for_files_and_directories_listObjects_should_build_metadata_only_for_files() {
+ Stream entries = Stream.of(Fixture.visitorEntry, Fixture.fullReferrerEntry, Fixture.parentDirEntry);
+
+ SftpFilesystemListing listing = buildSftpFilesystemListingMock(entries, Collections.emptyList());
+
+ Collection result = listing.listObjects();
+
+ assertThat(result).hasSize(2);
+ assertThat(result).containsExactlyInAnyOrder(Fixture.visitorMetadata, Fixture.fullReferrerMetadata);
+ }
+
+ @Test
+ @Order(2)
+ public void when_filter_regex_specified_and_entries_for_files_and_directories_listObjects_should_build_metadata_only_for_files_matching_pattern() {
+ Stream entries = Stream.of(Fixture.visitorEntry, Fixture.fullReferrerEntry, Fixture.parentDirEntry);
+ RegexFileListFilter regexFileListFilter = buildRegexFilter(Fixture.visitorRegexPattern);
+
+ SftpFilesystemListing listing = buildSftpFilesystemListingMock(entries, Collections.singletonList(regexFileListFilter));
+
+ Collection result = listing.listObjects();
+
+ assertThat(result).hasSize(1);
+ assertThat(result).containsExactlyInAnyOrder(Fixture.visitorMetadata);
+ }
+
+ @SneakyThrows
+ private SftpFilesystemListing buildSftpFilesystemListingMock(Stream entries, List filters) {
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ when(config.getSftpListingDirectoryPath()).thenReturn(Fixture.path);
+
+ SftpClient client = spy(new SftpClient(config));
+ doReturn(entries).when(client).listAll(anyString());
+
+ SftpFilesystemListing listing = spy(new SftpFilesystemListing(filters));
+
+ doReturn(client).when(listing).getSftpClient();
+ doReturn(config).when(listing).getConfig();
+
+ return listing;
+ }
+
+ private RegexFileListFilter buildRegexFilter(String pattern) {
+ RegexFileListFilter filter = new RegexFileListFilter();
+ Map config = Map.of(Fixture.regexPatternConfig, pattern);
+ filter.configure(config);
+
+ return filter;
+ }
+
+ private static LsEntry buildEntryMock(String entryName, int entryMTime, long entrySize, boolean isRegularFile) {
+ LsEntry entry = mock(LsEntry.class);
+ SftpATTRS attrs = mock(SftpATTRS.class);
+
+ lenient().when(attrs.getMTime()).thenReturn(entryMTime);
+ lenient().when(attrs.getSize()).thenReturn(entrySize);
+ lenient().when(attrs.isReg()).thenReturn(isRegularFile);
+
+ lenient().when(entry.getFilename()).thenReturn(entryName);
+ when(entry.getAttrs()).thenReturn(attrs);
+
+ return entry;
+ }
+
+ private static URI buildFileURI(String fileName) {
+ return URI.create(String.format("%s/%s", Fixture.path, fileName));
+ }
+
+ private static int getEpochSeconds(String timestamp) {
+ return (int) Instant.parse(timestamp).getEpochSecond();
+ }
+
+ interface Fixture {
+ String path = "/userdata";
+ String regexPatternConfig = "file.filter.regex.pattern";
+ String visitorRegexPattern = "^getFullVisitors[a-zA-Z0-9_-]+.csv";
+
+ String visitorFileName = "getFullVisitors_2023-01-19_08.csv";
+ String fullReferrerFileName = "getFullReferrer_2022-11-21_2022-12-21.csv";
+ String parentDirFileName = ".";
+
+ int visitorMTime = getEpochSeconds("2023-02-16T13:45:00Z");
+ int fullReferrerMTime = getEpochSeconds("2023-02-12T06:45:00Z");
+ int parentDirMTime = getEpochSeconds("2023-05-10T10:11:00Z");
+
+ long visitorSize = 1024;
+ long fullReferrerSize = 2048;
+ long parentDirSize = 96;
+
+ LsEntry visitorEntry = buildEntryMock(visitorFileName, visitorMTime, visitorSize, true);
+ LsEntry fullReferrerEntry = buildEntryMock(fullReferrerFileName, fullReferrerMTime, fullReferrerSize, true);
+ LsEntry parentDirEntry = buildEntryMock(parentDirFileName, parentDirMTime, parentDirSize, false);
+
+ GenericFileObjectMeta visitorMetadata =
+ new GenericFileObjectMeta.Builder()
+ .withName(visitorFileName)
+ .withUri(buildFileURI(visitorFileName))
+ .withLastModified(ofEpochSecond(visitorMTime))
+ .withContentLength(visitorSize)
+ .build();
+
+ GenericFileObjectMeta fullReferrerMetadata =
+ new GenericFileObjectMeta.Builder()
+ .withName(fullReferrerFileName)
+ .withUri(buildFileURI(fullReferrerFileName))
+ .withLastModified(ofEpochSecond(fullReferrerMTime))
+ .withContentLength(fullReferrerSize)
+ .build();
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpClientTest.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpClientTest.java
new file mode 100644
index 000000000..44b7a6c67
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpClientTest.java
@@ -0,0 +1,577 @@
+/*
+ * Copyright 2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.client;
+
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_NO_SUCH_FILE;
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_OP_UNSUPPORTED;
+import static com.jcraft.jsch.ChannelSftp.SSH_FX_PERMISSION_DENIED;
+import static io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient.IS_REGULAR_FILE;
+import static io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient.TRANSIENT_SFTP_ERROR_CODES;
+import static java.time.Instant.ofEpochSecond;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.SftpATTRS;
+import com.jcraft.jsch.SftpException;
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig;
+import io.streamthoughts.kafka.connect.filepulse.fs.stream.ConnectionAwareInputStream;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class SftpClientTest {
+
+ @SneakyThrows
+ private ChannelSftp mockChannel(SftpClient mockClient) {
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ ChannelSftp channel = mock(ChannelSftp.class);
+ when(sftpConnection.getChannel()).thenReturn(channel);
+ doReturn(sftpConnection).when(mockClient).createSftpConnection();
+
+ return channel;
+ }
+
+ @SneakyThrows
+ public static SftpClient mockSftpClient(SftpFilesystemListingConfig config) {
+ return spy(new SftpClient(config));
+ }
+
+ @Test
+ @SneakyThrows
+ void when_remote_entries_listFiles_should_return_their_metadata() {
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = spy(new SftpClient(config));
+
+ ChannelSftp channelSftp = mockChannel(client);
+
+ ChannelSftp.LsEntry entry0 = mockEntry(Fixture.f0Name, Fixture.entryMTime, Fixture.entrySize);
+ ChannelSftp.LsEntry entry1 = mockEntry(Fixture.f1Name, Fixture.entryMTime + 1000, Fixture.entrySize * 2);
+ ChannelSftp.LsEntry entry2 = mockEntry(Fixture.f2Name, Fixture.entryMTime + 2000, Fixture.entrySize * 4);
+
+ Vector entries = new Vector<>();
+ entries.add(entry0);
+ entries.add(entry1);
+ entries.add(entry2);
+
+ when(channelSftp.ls(anyString())).thenReturn(entries);
+
+ List result = client.listFiles(Fixture.path).collect(Collectors.toList());
+
+ assertEquals(3, result.size());
+ }
+
+ @Test
+ @SneakyThrows
+ void when_sftp_client_initialized_buildUri_should_create_uris_based_the_root_folder() {
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ when(config.getSftpListingDirectoryPath()).thenReturn("/test/path");
+
+ SftpClient client = new SftpClient(config);
+ URI expectedUri = URI.create("/test/path/filename.txt");
+ assertEquals(expectedUri, client.buildUri("filename.txt"));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ @SneakyThrows
+ void when_content_provided_sftpFileInputStream_should_initialize_stream_successfully(String filePath, Boolean isZipped) {
+ URI testData = new URI(filePath);
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(testData.toString());
+
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+ when(channelSftp.get(anyString())).thenReturn(raw);
+
+ ConnectionAwareInputStream sftpStream = client.sftpFileInputStream(testData);
+
+ assertNotNull(sftpStream);
+ assertEquals(isZipped, sftpStream.isContentZipped());
+ }
+
+ @Test
+ @SneakyThrows
+ void when_filename_ends_with_zip_but_content_is_not_zipped_sftpFileInputStream_should_throw_exception() {
+ URI testData = new URI("data/fake_zip.csv.zip");
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(testData.toString());
+
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+ when(channelSftp.get(anyString())).thenReturn(raw);
+
+ Assertions.assertThatThrownBy(() -> client.sftpFileInputStream(testData))
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasNoCause();
+ }
+
+ @Test
+ @SneakyThrows
+ void when_underlying_inputstream_is_closed_then_sftpFileInputStream_should_throw_exception() {
+ URI testData = new URI("data/test_data.csv.zip");
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(testData.toString());
+ raw.close();
+
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+ when(channelSftp.get(anyString())).thenReturn(raw);
+
+ Assertions.assertThatThrownBy(() -> client.sftpFileInputStream(testData))
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasCauseInstanceOf(IOException.class);
+ }
+
+ @Test
+ @SneakyThrows
+ void when_resource_does_not_exists_sftpFileInputStream_should_throw_exception() {
+ URI notExistingUri = new URI("data/this_file_does_not_exists.csv");
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+ when(channelSftp.get(anyString())).thenThrow(new SftpException(0, "test exception"));
+
+ Assertions.assertThatThrownBy(() -> client.sftpFileInputStream(notExistingUri))
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasRootCauseInstanceOf(SftpException.class);
+ }
+
+ @Test
+ @SneakyThrows
+ void when_sftp_client_initialized_buildFileMetadata_should_return_file_metadata() {
+ ChannelSftp.LsEntry entry = mockEntry(Fixture.f0Name, Fixture.entryMTime, Fixture.entrySize);
+
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ when(config.getSftpListingDirectoryPath()).thenReturn(Fixture.path);
+
+ SftpClient client = spy(new SftpClient(config));
+
+ FileObjectMeta meta = client.buildFileMetadata(entry);
+
+ assertEquals(Fixture.expectedMetadata, meta);
+ }
+
+ @Test
+ @SneakyThrows
+ void when_action_returns_a_value_doInConnection_should_return_that_value() {
+ Function action = mock(Function.class);
+ when(action.apply(any())).thenReturn(Fixture.expectedActionResult);
+
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ SftpClient client = mockSftpClient(config);
+ doReturn(sftpConnection).when(client).createSftpConnection();
+
+ assertThat(client.doInConnection(action)).isEqualTo(Fixture.expectedActionResult);
+ }
+
+ @Test
+ @SneakyThrows
+ void when_action_throws_unexpected_exception_doInConnection_should_rethrow_exception() {
+ Function action = mock(Function.class);
+ when(action.apply(any())).thenThrow(RuntimeException.class);
+
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = mockSftpClient(config);
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ doReturn(sftpConnection).when(client).createSftpConnection();
+
+ assertThatThrownBy(() -> client.doInConnection(action))
+ .isInstanceOf(RuntimeException.class)
+ .hasNoCause();
+ }
+
+ @Test
+ void when_called_doInNewConnection_should_force_closing_connection() {
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = mockSftpClient(config);
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ Function action = mock(Function.class);
+
+ doReturn(sftpConnection).when(client).createSftpConnection();
+ doReturn(Fixture.expectedActionResult).when(action).apply(any());
+
+ String res = client.doInCloseableConnection(action);
+
+ verify(sftpConnection).close();
+ assertThat(res).isEqualTo(Fixture.expectedActionResult);
+ }
+
+ @SneakyThrows
+ @Test
+ void when_channel_stat_throws_SftpException_statFileCore_should_throw_ConnectFilepulseException() {
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ ChannelSftp channel = mock(ChannelSftp.class);
+ SftpConnection connection = mock(SftpConnection.class);
+ when(connection.getChannel()).thenReturn(channel);
+ when(channel.stat(anyString())).thenThrow(SftpException.class);
+
+ SftpClient client = mockSftpClient(config);
+
+ assertThrows(ConnectFilePulseException.class, () -> client.statFileCore(Fixture.f0Name, connection));
+ }
+
+ @SneakyThrows
+ @Test
+ void when_channel_stat_returns_data_statFileCore_should_throw_ConnectFilepulseException() {
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ ChannelSftp channel = mock(ChannelSftp.class);
+ SftpATTRS entryAttrs = mock(SftpATTRS.class);
+ when(channel.stat(anyString())).thenReturn(entryAttrs);
+ SftpConnection connection = mock(SftpConnection.class);
+ when(connection.getChannel()).thenReturn(channel);
+
+ SftpClient client = mockSftpClient(config);
+
+ assertThat(client.statFileCore(Fixture.f0Name, connection)).isEqualTo(entryAttrs);
+ }
+
+ @Test
+ @SneakyThrows
+ void when_statFile_called_doInNewConnection_should_delegate_to_statFileCore() {
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channel = mock(ChannelSftp.class);
+
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ when(sftpConnection.getChannel()).thenReturn(channel);
+ doReturn(sftpConnection).when(client).createSftpConnection();
+
+ client.statFile(Fixture.f1Uri.toString());
+
+ verify(client).doInCloseableConnection(any());
+ verify(client).statFileCore(eq(Fixture.f1Uri.toString()), eq(sftpConnection));
+ verify(sftpConnection).close();
+ }
+
+ @Test
+ @SneakyThrows
+ void when_getObjectMeta_called_doInNewConnection_should_delegate_to_statFileCore() {
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channel = mock(ChannelSftp.class);
+
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ when(sftpConnection.getChannel()).thenReturn(channel);
+ doReturn(sftpConnection).when(client).createSftpConnection();
+
+ SftpATTRS entryAttrs = mock(SftpATTRS.class);
+ when(entryAttrs.getMTime()).thenReturn(Fixture.entryMTime);
+ when(entryAttrs.getSize()).thenReturn(Fixture.entrySize);
+ when(entryAttrs.isReg()).thenReturn(true);
+ when(channel.stat(anyString())).thenReturn(entryAttrs);
+
+ client.getObjectMetadata(Fixture.f1Uri).collect(Collectors.toList());
+
+ verify(client).doInCloseableConnection(any());
+ verify(client).statFileCore(eq(Fixture.f1Uri.toString()), eq(sftpConnection));
+ verify(client).buildFileMetadata(eq(Fixture.f1Name), eq(entryAttrs));
+ verify(sftpConnection).close();
+ }
+
+ public static Stream when_content_provided_sftpFileInputStream_should_initialize_stream_successfully() {
+ return Stream.of(
+ arguments("data/test_data.csv", false),
+ arguments("data/test_data.csv.zip", true)
+ );
+ }
+
+ @Test
+ @SneakyThrows
+ void when_filename_ends_with_zip_but_content_is_not_zipped_and_retry_is_configured_sftpFileInputStream_should_retry_and_throw_exception() {
+ URI testData = new URI("data/fake_zip.csv.zip");
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(testData.toString());
+
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ ChannelSftp channelSftp = mock(ChannelSftp.class);
+ when(config.getSftpRetries()).thenReturn(1);
+
+ when(channelSftp.get(anyString())).thenReturn(raw);
+
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ doReturn(channelSftp).when(sftpConnection).getChannel();
+
+ SftpClient client = spy(new SftpClient(config));
+ doReturn(sftpConnection).when(client).createSftpConnection();
+
+ Assertions.assertThatThrownBy(() -> client.sftpFileInputStream(testData))
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasNoCause();
+
+ verify(client, times(2)).retryActionCore(any(), any(), any());
+ }
+
+ @Test
+ @SneakyThrows
+ void when_underlying_inputstream_is_closed_and_retry_is_configured_then_sftpFileInputStream_should_retry_and_throw_exception() {
+ URI testData = new URI("data/test_data.csv.zip");
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(testData.toString());
+ raw.close();
+
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ ChannelSftp channelSftp = mock(ChannelSftp.class);
+ when(config.getSftpRetries()).thenReturn(2);
+
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ doReturn(channelSftp).when(sftpConnection).getChannel();
+
+ when(channelSftp.get(anyString())).thenReturn(raw);
+
+ SftpClient client = spy(new SftpClient(config));
+ doReturn(sftpConnection).when(client).createSftpConnection();
+
+ Assertions.assertThatThrownBy(() -> client.sftpFileInputStream(testData))
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasCauseInstanceOf(IOException.class);
+
+ verify(client, times(3)).retryActionCore(any(), any(), any());
+ }
+
+ @Test
+ @SneakyThrows
+ void when_resource_does_not_exists_and_retry_is_configured_sftpFileInputStream_should_retry_and_throw_exception() {
+ URI notExistingUri = new URI("data/this_file_does_not_exists.csv");
+
+ ChannelSftp channelSftp = mock(ChannelSftp.class);
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ when(config.getSftpRetries()).thenReturn(3);
+
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ doReturn(channelSftp).when(sftpConnection).getChannel();
+ when(channelSftp.get(anyString())).thenThrow(new SftpException(0, "test exception"));
+
+ SftpClient client = spy(new SftpClient(config));
+ doReturn(sftpConnection).when(client).createSftpConnection();
+
+ Assertions.assertThatThrownBy(() -> client.sftpFileInputStream(notExistingUri))
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasCauseInstanceOf(SftpException.class);
+
+ verify(client, times(4)).retryActionCore(any(), any(), any());
+ }
+
+ @Test
+ @SneakyThrows
+ void when_underlying_channel_removes_file_then_client_should_return_true() {
+ // Given
+ URI testData = new URI("data/test_data.csv.zip");
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+
+ doNothing().when(channelSftp).rm(anyString());
+
+ // When
+ boolean result = client.delete(testData);
+
+ // Then
+ assertThat(result).isTrue();
+ verify(channelSftp).rm(eq(testData.toString()));
+ }
+
+ @ParameterizedTest
+ @MethodSource("permanent_sftp_exceptions")
+ @SneakyThrows
+ void when_underlying_channel_throws_permanent_exception_while_removing_file_then_client_should_return_false(int errorCode) {
+ // Given
+ URI testData = new URI("data/test_data.csv.zip");
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+
+ doThrow(new SftpException(errorCode, "SFTP Channel Exception")).when(channelSftp).rm(anyString());
+
+ // When
+ boolean result = client.delete(testData);
+
+ // Then
+ assertThat(result).isFalse();
+ verify(channelSftp).rm(eq(testData.toString()));
+ }
+
+ public static Stream permanent_sftp_exceptions() {
+ return Stream.of(
+ arguments(SSH_FX_NO_SUCH_FILE),
+ arguments(SSH_FX_OP_UNSUPPORTED),
+ arguments(SSH_FX_PERMISSION_DENIED)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("transient_sftp_exceptions")
+ @SneakyThrows
+ void when_underlying_channel_throws_transient_exception_while_removing_file_then_client_should_retry_and_throw_exception(int errorCode) {
+ // Given
+ URI testData = new URI("data/test_data.csv.zip");
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ when(config.getSftpRetries()).thenReturn(1);
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+
+ doThrow(new SftpException(errorCode, "SFTP Channel Exception")).when(channelSftp).rm(anyString());
+
+ // When
+ Assertions.assertThatThrownBy(() -> client.delete(testData))
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasCauseInstanceOf(SftpException.class);
+
+ // Then
+ verify(channelSftp, times(2)).rm(eq(testData.toString()));
+ }
+
+ public static Stream transient_sftp_exceptions() {
+ return TRANSIENT_SFTP_ERROR_CODES.stream().map(Arguments::arguments);
+ }
+
+ @Test
+ @SneakyThrows
+ void when_underlying_channel_moves_file_then_client_should_return_true() {
+ // Given
+ URI source = new URI("source/test_data.csv");
+ URI destination = new URI("destination/test_data.csv");
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+
+ doNothing().when(channelSftp).rename(anyString(), anyString());
+
+ // When
+ boolean result = client.move(source, destination);
+
+ // Then
+ assertThat(result).isTrue();
+ verify(channelSftp).rename(eq(source.toString()), eq(destination.toString()));
+ }
+
+ @ParameterizedTest
+ @MethodSource("permanent_sftp_exceptions")
+ @SneakyThrows
+ void when_underlying_channel_throws_permanent_exception_while_moving_files_then_client_should_return_false(int errorCode) {
+ // Given
+ URI source = new URI("source/test_data.csv");
+ URI destination = new URI("destination/test_data.csv");
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+
+ doThrow(new SftpException(errorCode, "SFTP Channel Exception")).when(channelSftp).rename(anyString(), anyString());
+
+ // When
+ boolean result = client.move(source, destination);
+
+ // Then
+ assertThat(result).isFalse();
+ verify(channelSftp).rename(eq(source.toString()), eq(destination.toString()));
+ }
+
+ @ParameterizedTest
+ @MethodSource("transient_sftp_exceptions")
+ @SneakyThrows
+ void when_underlying_channel_throws_transient_exception_while_moving_file_then_client_should_retry_and_throw_exception(int errorCode) {
+ // Given
+ URI source = new URI("source/test_data.csv");
+ URI destination = new URI("destination/test_data.csv");
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ when(config.getSftpRetries()).thenReturn(1);
+ SftpClient client = mockSftpClient(config);
+ ChannelSftp channelSftp = mockChannel(client);
+
+ doThrow(new SftpException(errorCode, "SFTP Channel Exception")).when(channelSftp).rename(anyString(), anyString());
+
+ // When
+ Assertions.assertThatThrownBy(() -> client.move(source, destination))
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasCauseInstanceOf(SftpException.class);
+
+ // Then
+ verify(channelSftp, times(2)).rename(eq(source.toString()), eq(destination.toString()));
+ }
+
+ public static ChannelSftp.LsEntry mockEntry(String fileName, int mTime, long size) {
+ ChannelSftp.LsEntry entry = mock(ChannelSftp.LsEntry.class);
+ when(entry.getFilename()).thenReturn(fileName);
+
+ SftpATTRS entryAttrs = mock(SftpATTRS.class);
+ when(entryAttrs.getMTime()).thenReturn(mTime);
+ when(entryAttrs.getSize()).thenReturn(size);
+ when(entryAttrs.isReg()).thenReturn(true);
+ when(entry.getAttrs()).thenReturn(entryAttrs);
+ return entry;
+ }
+
+ interface Fixture {
+ String f0Name = "f0.txt";
+ String f1Name = "f0.txt";
+ String f2Name = "f0.txt";
+ String path = "/path";
+ URI f1Uri = URI.create(String.format("%s/%s", path, f0Name));
+
+ int entryMTime = (int) Instant.now().getEpochSecond();
+ long entrySize = 1024;
+
+ String expectedActionResult = "action_result";
+
+ GenericFileObjectMeta expectedMetadata =
+ new GenericFileObjectMeta.Builder()
+ .withName(f0Name)
+ .withUri(f1Uri)
+ .withLastModified(ofEpochSecond(entryMTime))
+ .withContentLength(entrySize)
+ .withUserDefinedMetadata(Map.of(IS_REGULAR_FILE, true))
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpConnectionTest.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpConnectionTest.java
new file mode 100644
index 000000000..de520b23d
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/client/SftpConnectionTest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.client;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
+import io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Test;
+
+public class SftpConnectionTest {
+ public static SftpFilesystemListingConfig mockSftpFilesystemListingConfig() {
+ SftpFilesystemListingConfig config = mock(SftpFilesystemListingConfig.class);
+ when(config.getSftpListingHost()).thenReturn("host");
+ when(config.getSftpListingUser()).thenReturn("user");
+ when(config.getSftpListingPort()).thenReturn(1234);
+ when(config.getSftpConnectionTimeoutMs()).thenReturn(5000);
+ when(config.getSftpListingPassword()).thenReturn("pass");
+ when(config.getSftpListingStrictHostKeyCheck()).thenReturn("false");
+
+ return config;
+ }
+
+ @Test
+ @SneakyThrows
+ public void when_config_provided_initSession_should_initialized_session_successfully() {
+ JSch jSch = mock(JSch.class);
+ Session session = mock(Session.class);
+ when(jSch.getSession(anyString(), anyString(), anyInt())).thenReturn(session);
+ SftpFilesystemListingConfig config = mockSftpFilesystemListingConfig();
+
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ doReturn(jSch).when(sftpConnection).getJSch();
+ doReturn(config).when(sftpConnection).getConfig();
+ when(sftpConnection.initSession()).thenCallRealMethod();
+
+ sftpConnection.initSession();
+
+ verify(jSch).getSession(eq("user"), eq("host"), eq(1234));
+ verify(session).setPassword(eq("pass"));
+ verify(session).setConfig(eq("StrictHostKeyChecking"), eq("false"));
+ verify(session).connect(eq(5000));
+ }
+
+ @Test
+ @SneakyThrows
+ public void when_getSession_throws_JschException_initSession_should_throw_ConnectFilePulseException() {
+ JSch jSch = mock(JSch.class);
+ when(jSch.getSession(anyString(), anyString(), anyInt())).thenThrow(JSchException.class);
+ SftpFilesystemListingConfig config = mockSftpFilesystemListingConfig();
+
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+ doReturn(jSch).when(sftpConnection).getJSch();
+ doReturn(config).when(sftpConnection).getConfig();
+ when(sftpConnection.initSession()).thenCallRealMethod();
+
+ assertThatThrownBy(sftpConnection::initSession)
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasCauseInstanceOf(JSchException.class);
+ }
+
+ @Test
+ @SneakyThrows
+ void when_openChannel_throws_JschException_initChannel_should_throw_ConnectFilePulseException() {
+ SftpFilesystemListingConfig config = mockSftpFilesystemListingConfig();
+ Session session = mock(Session.class);
+ SftpConnection sftpConnection = mock(SftpConnection.class);
+
+ when(session.openChannel(eq(SftpClient.CHANNEL_TYPE))).thenThrow(JSchException.class);
+
+ doReturn(session).when(sftpConnection).getSession();
+ doReturn(config).when(sftpConnection).getConfig();
+ when(sftpConnection.initChannel()).thenCallRealMethod();
+
+ assertThatThrownBy(sftpConnection::initChannel)
+ .isInstanceOf(ConnectFilePulseException.class)
+ .hasCauseInstanceOf(JSchException.class);
+ }
+}
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileInputIteratorBuilderTest.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileInputIteratorBuilderTest.java
new file mode 100644
index 000000000..e691eeaa8
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileInputIteratorBuilderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.iterator;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.RowFileWithFooterInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SftpRowFileInputIteratorBuilderTest {
+
+ @Test
+ void when_has_no_header_nor_footer_builder_should_add_decorators() {
+ SftpRowFileInputIteratorBuilder builder = spy(new SftpRowFileInputIteratorBuilder());
+
+ RowFileInputIterator rowFileInputIterator = mock(RowFileInputIterator.class);
+ doReturn(rowFileInputIterator).when(builder).initRowFileInputIterator();
+
+ FileInputIterator> result = builder.build();
+
+ verify(builder).initRowFileInputIterator();
+ verify(builder, never()).initRowFileWithFooterInputIterator(any());
+ verify(builder, never()).initSftpRowFileWithHeadersInputIterator(eq(rowFileInputIterator));
+ Assertions.assertThat(result).isEqualTo(rowFileInputIterator);
+ }
+
+ @Test
+ void when_has_header_builder_should_add_header_decorator() {
+ SftpRowFileInputIteratorBuilder builder = spy(new SftpRowFileInputIteratorBuilder());
+
+ RowFileInputIterator rowFileInputIterator = mock(RowFileInputIterator.class);
+ SftpRowFileWithHeadersInputIterator rowFileWithHeadersInputIterator =
+ mock(SftpRowFileWithHeadersInputIterator.class);
+ doReturn(rowFileInputIterator).when(builder).initRowFileInputIterator();
+
+ doReturn(rowFileWithHeadersInputIterator).when(builder)
+ .initSftpRowFileWithHeadersInputIterator(eq(rowFileInputIterator));
+
+ builder.withSkipHeaders(1);
+
+ FileInputIterator> result = builder.build();
+
+ verify(builder).initRowFileInputIterator();
+ verify(builder, never()).initRowFileWithFooterInputIterator(any());
+ verify(builder).initSftpRowFileWithHeadersInputIterator(eq(rowFileInputIterator));
+ Assertions.assertThat(result).isEqualTo(rowFileWithHeadersInputIterator);
+ }
+
+ @Test
+ void when_has_footer__builder_should_add_footer_decorator() {
+ SftpRowFileInputIteratorBuilder builder = spy(new SftpRowFileInputIteratorBuilder());
+
+ RowFileInputIterator rowFileInputIterator = mock(RowFileInputIterator.class);
+ RowFileWithFooterInputIterator rowFileWithFooterInputIterator = mock(RowFileWithFooterInputIterator.class);
+
+ doReturn(rowFileInputIterator).when(builder).initRowFileInputIterator();
+
+ doReturn(rowFileWithFooterInputIterator).when(builder)
+ .initRowFileWithFooterInputIterator(eq(rowFileInputIterator));
+
+ builder.withSkipFooters(2);
+
+ FileInputIterator> result = builder.build();
+
+ verify(builder).initRowFileInputIterator();
+ verify(builder).initRowFileWithFooterInputIterator(eq(rowFileInputIterator));
+ verify(builder, never()).initSftpRowFileWithHeadersInputIterator(any());
+ Assertions.assertThat(result).isEqualTo(rowFileWithFooterInputIterator);
+ }
+
+ @Test
+ void when_has_footer_and_header__builder_should_add_decorators() {
+ SftpRowFileInputIteratorBuilder builder = spy(new SftpRowFileInputIteratorBuilder());
+
+ RowFileInputIterator rowFileInputIterator = mock(RowFileInputIterator.class);
+ RowFileWithFooterInputIterator rowFileWithFooterInputIterator = mock(RowFileWithFooterInputIterator.class);
+ SftpRowFileWithHeadersInputIterator rowFileWithHeadersInputIterator =
+ mock(SftpRowFileWithHeadersInputIterator.class);
+ doReturn(rowFileInputIterator).when(builder).initRowFileInputIterator();
+
+ doReturn(rowFileWithFooterInputIterator).when(builder)
+ .initRowFileWithFooterInputIterator(eq(rowFileInputIterator));
+
+ doReturn(rowFileWithHeadersInputIterator).when(builder)
+ .initSftpRowFileWithHeadersInputIterator(eq(rowFileWithFooterInputIterator));
+
+ builder.withSkipFooters(2);
+ builder.withSkipHeaders(1);
+
+ FileInputIterator> result = builder.build();
+
+ verify(builder).initRowFileInputIterator();
+ verify(builder).initRowFileWithFooterInputIterator(eq(rowFileInputIterator));
+ verify(builder).initSftpRowFileWithHeadersInputIterator(eq(rowFileWithFooterInputIterator));
+ Assertions.assertThat(result).isEqualTo(rowFileWithHeadersInputIterator);
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileWithHeadersInputIteratorTest.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileWithHeadersInputIteratorTest.java
new file mode 100644
index 000000000..8f555483c
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/iterator/SftpRowFileWithHeadersInputIteratorTest.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.iterator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
+import io.streamthoughts.kafka.connect.filepulse.fs.reader.text.NonBlockingBufferReader;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
+import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext;
+import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
+import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SftpRowFileWithHeadersInputIteratorTest {
+
+ @Test
+ @SneakyThrows
+ void when_skipHeaders_is_zero_initHeaders_shouldThrow_exception() {
+ URI fUri = URI.create("data/test_data.csv");
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(fUri.toString());
+
+ FileObjectMeta meta = mock(FileObjectMeta.class);
+ when(meta.uri()).thenReturn(fUri);
+
+ FileObjectContext context = new FileObjectContext(meta);
+
+ FileInputIterator> iterator = mock(FileInputIterator.class);
+ when(iterator.context()).thenReturn(context);
+
+ NonBlockingBufferReader reader = spy(new NonBlockingBufferReader(raw));
+
+ SftpRowFileWithHeadersInputIterator rowFileWithHeadersInputIterator =
+ new SftpRowFileWithHeadersInputIterator(0, reader, iterator);
+
+ assertThatThrownBy(rowFileWithHeadersInputIterator::initHeaders)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasNoCause();
+ }
+
+ @Test
+ @SneakyThrows
+ void when_inputstream_is_closed_initHeaders_shouldThrow_exception() {
+ URI fUri = URI.create("data/test_data.csv");
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(fUri.toString());
+
+ FileObjectMeta meta = mock(FileObjectMeta.class);
+ when(meta.uri()).thenReturn(fUri);
+
+ FileObjectContext context = new FileObjectContext(meta);
+
+ FileInputIterator> iterator = mock(FileInputIterator.class);
+ when(iterator.context()).thenReturn(context);
+
+ NonBlockingBufferReader reader = spy(new NonBlockingBufferReader(raw));
+ raw.close();
+
+ SftpRowFileWithHeadersInputIterator rowFileWithHeadersInputIterator =
+ new SftpRowFileWithHeadersInputIterator(1, reader, iterator);
+
+ assertThatThrownBy(rowFileWithHeadersInputIterator::initHeaders)
+ .isInstanceOf(ReaderException.class)
+ .hasCauseInstanceOf(IOException.class)
+ .hasMessageContaining("Cannot read lines from");
+ }
+
+ @Test
+ @SneakyThrows
+ void when_inputfile_isEmpty_initHeaders_shouldThrow_exception() {
+ URI fUri = URI.create("data/empty.csv");
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(fUri.toString());
+
+ FileObjectMeta meta = mock(FileObjectMeta.class);
+ when(meta.uri()).thenReturn(fUri);
+
+ FileObjectContext context = new FileObjectContext(meta);
+
+ FileInputIterator> iterator = mock(FileInputIterator.class);
+ when(iterator.context()).thenReturn(context);
+
+ NonBlockingBufferReader reader = spy(new NonBlockingBufferReader(raw));
+
+ SftpRowFileWithHeadersInputIterator rowFileWithHeadersInputIterator =
+ new SftpRowFileWithHeadersInputIterator(1, reader, iterator);
+
+ assertThatThrownBy(rowFileWithHeadersInputIterator::initHeaders)
+ .isInstanceOf(ReaderException.class)
+ .hasMessageContaining("Not enough data for reading")
+ .hasNoCause();
+ }
+
+ @Test
+ @SneakyThrows
+ void when_valid_csv_with_headers_initHeaders_should_extract_header_names() {
+ URI fUri = URI.create("data/test_data.csv");
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(fUri.toString());
+
+ FileObjectMeta meta = mock(FileObjectMeta.class);
+ when(meta.uri()).thenReturn(fUri);
+
+ FileObjectContext context = new FileObjectContext(meta);
+
+ FileInputIterator> iterator = mock(FileInputIterator.class);
+ when(iterator.context()).thenReturn(context);
+
+ NonBlockingBufferReader reader = spy(new NonBlockingBufferReader(raw));
+
+ SftpRowFileWithHeadersInputIterator rowFileWithHeadersInputIterator =
+ new SftpRowFileWithHeadersInputIterator(1, reader, iterator);
+
+ rowFileWithHeadersInputIterator.initHeaders();
+
+ assertThat(rowFileWithHeadersInputIterator.getHeaderNames())
+ .containsExactlyInAnyOrder(Fixture.expectedHeadersBlock);
+ }
+
+ @Test
+ @SneakyThrows
+ @SuppressWarnings("unchecked")
+ void when_valid_csv_next_should_return_row_data() {
+ URI fUri = URI.create("data/test_data.csv");
+ InputStream raw = getClass().getClassLoader().getResourceAsStream(fUri.toString());
+
+ FileObjectMeta meta = mock(FileObjectMeta.class);
+ when(meta.uri()).thenReturn(fUri);
+
+ FileObjectContext context = new FileObjectContext(meta);
+
+ FileInputIterator> iterator = mock(FileInputIterator.class);
+ RecordsIterable> records = mock(RecordsIterable.class);
+ doReturn(context).when(iterator).context();
+ doReturn(records).when(iterator).next();
+
+ NonBlockingBufferReader reader = spy(new NonBlockingBufferReader(raw));
+
+ SftpRowFileWithHeadersInputIterator rowFileWithHeadersInputIterator =
+ new SftpRowFileWithHeadersInputIterator(1, reader, iterator);
+
+ rowFileWithHeadersInputIterator.next();
+ verify(iterator).next();
+ }
+
+ interface Fixture {
+ String expectedHeadersBlock = "Name,Age,City";
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SftpRowFileInputReaderTest.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SftpRowFileInputReaderTest.java
new file mode 100644
index 000000000..e264b8c7c
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/SftpRowFileInputReaderTest.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.reader;
+
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_LISTING_DIRECTORY_PATH;
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_LISTING_HOST;
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_LISTING_PASSWORD;
+import static io.streamthoughts.kafka.connect.filepulse.fs.SftpFilesystemListingConfig.SFTP_LISTING_USER;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
+import io.streamthoughts.kafka.connect.filepulse.fs.SftpFileStorage;
+import io.streamthoughts.kafka.connect.filepulse.fs.iterator.SftpRowFileInputIteratorFactory;
+import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
+import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
+import java.net.URI;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SftpRowFileInputReaderTest {
+
+ @Test
+ void when_storage_not_initialized_configure_should_initialize_storage_and_factory() {
+ SftpRowFileInputReader reader = spy(new SftpRowFileInputReader());
+ SftpFileStorage storage = mock(SftpFileStorage.class);
+ SftpRowFileInputIteratorFactory factory = mock(SftpRowFileInputIteratorFactory.class);
+
+ doReturn(storage).when(reader).initStorage(eq(Fixture.config));
+ doReturn(factory).when(reader).initIteratorFactory(eq(Fixture.config));
+
+ reader.configure(Fixture.config);
+
+ verify(reader).initStorage(eq(Fixture.config));
+ verify(reader).initIteratorFactory(eq(Fixture.config));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void when_reader_configured_newIterator_should_initialize_initialize_the_iterator() {
+ SftpRowFileInputReader reader = spy(new SftpRowFileInputReader());
+ SftpFileStorage storage = mock(SftpFileStorage.class);
+ SftpRowFileInputIteratorFactory factory = mock(SftpRowFileInputIteratorFactory.class);
+ FileInputIterator> iterator = mock(FileInputIterator.class);
+ when(factory.newIterator(any())).thenReturn(iterator);
+ IteratorManager iteratorManager = mock(IteratorManager.class);
+
+ doReturn(storage).when(reader).initStorage(eq(Fixture.config));
+ doReturn(factory).when(reader).initIteratorFactory(eq(Fixture.config));
+
+ reader.configure(Fixture.config);
+ FileInputIterator> result = reader.newIterator(URI.create(""), iteratorManager);
+ verify(factory).newIterator(any());
+ assertThat(result).isEqualTo(iterator);
+ }
+
+ interface Fixture {
+ Map config = Map.of(
+ SFTP_LISTING_HOST, "h",
+ SFTP_LISTING_USER, "u",
+ SFTP_LISTING_PASSWORD, "p",
+ SFTP_LISTING_DIRECTORY_PATH, "/path");
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/stream/ConnectionAwareInputStreamTest.java b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/stream/ConnectionAwareInputStreamTest.java
new file mode 100644
index 000000000..7a3fde02a
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/stream/ConnectionAwareInputStreamTest.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2023 StreamThoughts.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamthoughts.kafka.connect.filepulse.fs.stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpConnection;
+import java.io.InputStream;
+import java.util.stream.Stream;
+import java.util.zip.ZipInputStream;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class ConnectionAwareInputStreamTest {
+ @FunctionalInterface
+ interface CheckedFunction {
+ void apply(T t) throws Exception;
+ }
+
+ private ConnectionAwareInputStream mockConnectionAwareInputStream() {
+ SftpConnection connection = mock(SftpConnection.class);
+ InputStream delegate = mock(InputStream.class);
+
+ return new ConnectionAwareInputStream(connection, "absolutePath", delegate);
+ }
+
+ static Stream when_wrapped_method_is_called_ConnectionAwareInputStream_should_delegate_to_wrapped_stream() {
+ return Stream.of(
+ arguments((CheckedFunction) wrapper -> {
+ wrapper.read();
+ verify(wrapper.delegate).read();
+ }),
+ arguments((CheckedFunction) wrapper -> {
+ wrapper.read(new byte[]{});
+ verify(wrapper.delegate).read(any());
+ }),
+ arguments((CheckedFunction) wrapper -> {
+ wrapper.read(new byte[]{}, 0, 1024);
+ verify(wrapper.delegate).read(any(), eq(0), eq(1024));
+ }),
+ arguments((CheckedFunction) wrapper -> {
+ wrapper.skip(1024);
+ verify(wrapper.delegate).skip(eq(1024L));
+ }),
+ arguments((CheckedFunction) wrapper -> {
+ wrapper.available();
+ verify(wrapper.delegate).available();
+ }),
+ arguments((CheckedFunction) wrapper -> {
+ wrapper.mark(1024);
+ verify(wrapper.delegate).mark(eq(1024));
+ }),
+ arguments((CheckedFunction) wrapper -> {
+ wrapper.reset();
+ verify(wrapper.delegate).reset();
+ }),
+ arguments((CheckedFunction) wrapper -> {
+ wrapper.markSupported();
+ verify(wrapper.delegate).markSupported();
+ }),
+ arguments((CheckedFunction) wrapper -> {
+ wrapper.close();
+ verify(wrapper.delegate).close();
+ verify(wrapper.connection).close();
+ })
+ );
+ }
+
+ @SneakyThrows
+ @ParameterizedTest
+ @MethodSource
+ void when_wrapped_method_is_called_ConnectionAwareInputStream_should_delegate_to_wrapped_stream(
+ CheckedFunction validator) {
+
+ ConnectionAwareInputStream wrapper = mockConnectionAwareInputStream();
+ validator.apply(wrapper);
+ }
+
+ private static InputStream getLocalResourceAsStream(String filename) {
+ return ClassLoader.getSystemClassLoader().getResourceAsStream(filename);
+ }
+
+ @SneakyThrows
+ public static Stream when_content_passed_isContentZipped_should_determine_if_zipped_or_not() {
+ return Stream.of(
+ arguments(false, "data/test_data.csv", InputStream.class),
+ arguments(true, "data/test_data.csv.zip", ZipInputStream.class)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ @SneakyThrows
+ void when_content_passed_isContentZipped_should_determine_if_zipped_or_not(Boolean expected, String absolutePath, Class> delegateClass) {
+ SftpConnection connection = mock(SftpConnection.class);
+ InputStream delegate = getLocalResourceAsStream(absolutePath);
+ ConnectionAwareInputStream wrapper = new ConnectionAwareInputStream(connection, absolutePath, delegate);
+ assertThat(wrapper.isContentZipped()).isEqualTo(expected);
+ assertThat(wrapper.delegate).isInstanceOf(delegateClass);
+ }
+}
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/empty.csv b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/empty.csv
new file mode 100644
index 000000000..e69de29bb
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/fake_zip.csv.zip b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/fake_zip.csv.zip
new file mode 100644
index 000000000..605732ffa
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/fake_zip.csv.zip
@@ -0,0 +1,4 @@
+Name,Age,City
+John Doe,25,New York
+Jane Smith,30,Los Angeles
+David Johnson,40,Chicago
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/test_data.csv b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/test_data.csv
new file mode 100644
index 000000000..605732ffa
--- /dev/null
+++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/test_data.csv
@@ -0,0 +1,4 @@
+Name,Age,City
+John Doe,25,New York
+Jane Smith,30,Los Angeles
+David Johnson,40,Chicago
\ No newline at end of file
diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/test_data.csv.zip b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/test_data.csv.zip
new file mode 100644
index 000000000..ef6ce9f97
Binary files /dev/null and b/connect-file-pulse-filesystems/filepulse-sftp-fs/src/test/resources/data/test_data.csv.zip differ
diff --git a/connect-file-pulse-filesystems/pom.xml b/connect-file-pulse-filesystems/pom.xml
index 53b36c71d..5d46fb045 100644
--- a/connect-file-pulse-filesystems/pom.xml
+++ b/connect-file-pulse-filesystems/pom.xml
@@ -38,6 +38,7 @@
filepulse-amazons3-fs
filepulse-azure-storage-fs
filepulse-google-cloud-storage-fs
+ filepulse-sftp-fs