Skip to content

Commit

Permalink
feat(plugin): add SFTP Filesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
TheWorkshopCom authored and fhussonnois committed Jun 27, 2023
1 parent e2bf74f commit d8e8df8
Show file tree
Hide file tree
Showing 25 changed files with 3,055 additions and 0 deletions.
87 changes: 87 additions & 0 deletions connect-file-pulse-filesystems/filepulse-sftp-fs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2019-2021 StreamThoughts.
~
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.streamthoughts</groupId>
<artifactId>kafka-connect-filepulse-filesystems</artifactId>
<version>2.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<name>Kafka Connect Source File Pulse SFTP FS</name>
<artifactId>kafka-connect-filepulse-sftp-fs</artifactId>

<properties>
<checkstyle.config.location>${project.parent.basedir}/..</checkstyle.config.location>
<license.header.file>${project.parent.basedir}/../license-header</license.header.file>
<jsch.version>0.1.55</jsch.version>
<mockito-junit-jupiter.version>2.28.2</mockito-junit-jupiter.version>
<assertj-core.version>3.11.1</assertj-core.version>
<lombok.version>1.18.12</lombok.version>
</properties>

<dependencies>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
<dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>kafka-connect-filepulse-commons-fs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<!-- Test Dependencies-->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito-junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>kafka-connect-filepulse-plugin</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2019-2023 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs;

import com.jcraft.jsch.SftpATTRS;
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import java.io.InputStream;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SftpFileStorage implements Storage {
private static final Logger log = LoggerFactory.getLogger(SftpFileStorage.class);
private static final String CANNOT_STAT_FILE_ERROR_MSG_TEMPLATE = "Cannot stat file with uri: %s";

private final SftpClient sftpClient;

public SftpFileStorage(SftpFilesystemListingConfig config) {
this.sftpClient = new SftpClient(config);
}

SftpFileStorage(SftpClient sftpClient) {
this.sftpClient = sftpClient;
}

@Override
public FileObjectMeta getObjectMetadata(URI uri) {
log.debug("Getting object metadata for '{}'", uri);
return sftpClient.getObjectMetadata(uri)
.findFirst().orElseThrow(() -> new ConnectFilePulseException(buildCannotStatFileErrorMsg(uri)));
}

@Override
public boolean exists(URI uri) {
log.info("Checking if '{}' exists", uri);
SftpATTRS attrs = sftpClient.statFile(uri.toString());

return attrs.isReg();
}

private String buildCannotStatFileErrorMsg(URI uri) {
return String.format(CANNOT_STAT_FILE_ERROR_MSG_TEMPLATE, uri);
}

/**
* {@inheritDoc}
*/
@Override
public boolean delete(URI uri) {
return sftpClient.delete(uri);
}

@Override
public boolean move(URI source, URI dest) {
return sftpClient.move(source, dest);
}

@Override
public InputStream getInputStream(URI uri) {
return sftpClient.sftpFileInputStream(uri);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2019-2023 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.fs;

import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SftpFilesystemListing implements FileSystemListing<SftpFileStorage> {

private static final Logger log = LoggerFactory.getLogger(SftpFilesystemListing.class);
private FileListFilter filter;

private SftpFilesystemListingConfig config;

private SftpClient sftpClient;

public SftpFilesystemListing(final List<FileListFilter> filters) {
Objects.requireNonNull(filters, "filters can't be null");
this.filter = new CompositeFileListFilter(filters);
}

@SuppressWarnings("unused")
public SftpFilesystemListing() {
this(Collections.emptyList());
}

@Override
public void configure(final Map<String, ?> configs) {
log.debug("Configuring SftpFilesystemListing");
config = new SftpFilesystemListingConfig(configs);
sftpClient = new SftpClient(config);
}

@Override
public Collection<FileObjectMeta> listObjects() {
String listingDirectoryPath = getConfig().getSftpListingDirectoryPath();

List<FileObjectMeta> filesMetadata = getSftpClient().listFiles(listingDirectoryPath)
.collect(Collectors.toList());

return filter.filterFiles(filesMetadata);
}

@Override
public void setFilter(FileListFilter filter) {
this.filter = filter;
}

@Override
public SftpFileStorage storage() {
return new SftpFileStorage(config);
}

SftpClient getSftpClient() {
return sftpClient;
}

SftpFilesystemListingConfig getConfig() {
return config;
}
}
Loading

0 comments on commit d8e8df8

Please sign in to comment.