From 695c6f20d4a790b2da6a616b2eb50f8930696ccd Mon Sep 17 00:00:00 2001 From: rouellet99 Date: Wed, 3 Apr 2024 16:30:33 -0400 Subject: [PATCH] Update LocalFSDirectoryListing.java fix: (plugin): change the code for the configuration to delete the compressed file after extraction LocalFSDirectoryListing.java:[104,5] (metrics) CyclomaticComplexity: Cyclomatic Complexity is 16 (max allowed is 15). When using zipped file, is there any reason , why the compressed file is not removed from the directory once extracted? #603 --- .../filepulse/fs/LocalFSDirectoryListing.java | 89 ++++++++++++------- 1 file changed, 57 insertions(+), 32 deletions(-) diff --git a/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java b/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java index 864121df..e4e611c2 100644 --- a/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java +++ b/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java @@ -103,22 +103,54 @@ public void setFilter(final FileListFilter filter) { private List listEligibleFiles(final Path input) { final List listingLocalFiles = new LinkedList<>(); - if (!Files.isReadable(input)) { - LOG.warn("Cannot get directory listing for '{}'. Input path is not readable.", input); + if (!isPathReadable(input)) { return listingLocalFiles; } - if (!Files.isDirectory(input)) { - LOG.warn("Cannot get directory listing for '{}'. Input path is not a directory.", input); - return listingLocalFiles; - } - - if (isHidden(input)) { + if (!isPathDirectory(input) || isHidden(input)) { return listingLocalFiles; } final List decompressedDirs = new LinkedList<>(); final List directories = new LinkedList<>(); + processFiles(input, listingLocalFiles, directories, decompressedDirs); + + if (config.isRecursiveScanEnable() && !directories.isEmpty()) { + listingLocalFiles.addAll(scanRecursiveDirectories(directories, decompressedDirs)); + } + return listingLocalFiles; + } + + private boolean isPathReadable(Path path) { + if (!Files.isReadable(path)) { + LOG.warn("Cannot get directory listing for '{}'. Input path is not readable.", path); + return false; + } + return true; + } + + private boolean isPathDirectory(Path path) { + if (!Files.isDirectory(path)) { + LOG.warn("Cannot get directory listing for '{}'. Input path is not a directory.", path); + return false; + } + return true; + } + + private boolean isHidden(final Path input) { + try { + return Files.isHidden(input); + } catch (IOException e) { + LOG.warn( + "Error while checking if input file is hidden '{}': {}", + input, + e.getLocalizedMessage()); + return false; + } + } + + private void processFiles(Path input, List listingLocalFiles, List directories, + List decompressedDirs) { try (DirectoryStream stream = Files.newDirectoryStream(input)) { for (Path path : stream) { if (Files.isDirectory(path)) { @@ -137,10 +169,8 @@ private List listEligibleFiles(final Path input) { final Path decompressed = codec.decompress(file).toPath(); listingLocalFiles.addAll(listEligibleFiles(decompressed)); decompressedDirs.add(decompressed); - if (config.isDeleteCompressFileEnable() && decompressed.toFile().exists()) { - file.delete(); - } - LOG.debug("Compressed file deleted successfully : {}", path); + LOG.debug("Compressed file extracted successfully : {}", path); + handleFileDeletion(file, path); } catch (IOException | SecurityException e) { if (e instanceof IOException) { LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e); @@ -158,34 +188,29 @@ private List listEligibleFiles(final Path input) { } } } catch (IOException e) { - LOG.error( - "Error while getting directory listing for {}: {}", - input, - e.getLocalizedMessage()); + LOG.error("Error while getting directory listing for {}: {}", input, e.getLocalizedMessage()); throw new ConnectException(e); } - if (config.isRecursiveScanEnable() && !directories.isEmpty()) { - listingLocalFiles.addAll(directories.stream() - .filter(f -> !decompressedDirs.contains(f)) - .flatMap(f -> listEligibleFiles(f).stream()) - .collect(Collectors.toList())); - } - return listingLocalFiles; } - private boolean isHidden(final Path input) { - try { - return Files.isHidden(input); - } catch (IOException e) { - LOG.warn( - "Error while checking if input file is hidden '{}': {}", - input, - e.getLocalizedMessage()); - return false; + private void handleFileDeletion(File file, Path path) { + if (config.isDeleteCompressFileEnable() && file.exists()) { + if (file.delete()) { + LOG.debug("Compressed file deleted successfully : {}", path); + } else { + LOG.warn("Error while deleting input file '{}'. Skip and continue.", path); + } } } + private List scanRecursiveDirectories(List directories, List decompressedDirs) { + return directories.stream() + .filter(f -> !decompressedDirs.contains(f)) + .flatMap(f -> listEligibleFiles(f).stream()) + .collect(Collectors.toList()); + } + /** * {@inheritDoc} */