Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot Register Filters #40

Closed
yccgabriel opened this issue Jul 2, 2020 · 3 comments
Closed

Cannot Register Filters #40

yccgabriel opened this issue Jul 2, 2020 · 3 comments

Comments

@yccgabriel
Copy link

It might be I overlooked some documentation, but I am in a situation that I think my standalone kafka-connect worker is ignoring the filters setting.

name=connect-file-pulse-quickstart
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector

filters=RenameKeyFilter
filters.RenameKeyFilter.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
filters.RenameKeyFilter.field="$key"
filters.RenameKeyFilter.value="{{ $metadata.path }}/{{ $metadata.name }}"

fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.MoveCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/tmp/data/
#fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern="\\.raw_bin$"
fs.scan.interval.ms=10000


internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-quickstart
internal.kafka.reporter.topic=connect-file-pulse-status
offset.strategy=name+path
read.max.wait.ms=5000
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader
topic=rawbins
tasks.max=1

I need to configure this filter because I have files with the same name but in different folders.
Therefore I am trying to convert the key from just the file name to {path}/{filename}.

[2020-07-01 23:54:49,061] ERROR Unexpected error while scanning file system. (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread:81)
java.lang.IllegalStateException: Duplicate key [name='Kube+ 51421338 08;15;00.raw_bin', path='/tmp/data/2020-05-15', size=671, lastModified=1589637255000, inode=443601, hash=4286515073]
        at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
        at java.util.HashMap.merge(HashMap.java:1254)
        at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
        at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.toScheduled(LocalFileSystemScanner.java:251)
        at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.updateFiles(LocalFileSystemScanner.java:228)
        at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.scan(LocalFileSystemScanner.java:193)
        at io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread.run(FileSystemMonitorThread.java:79)

I was doing this name conversion because of the above Duplicate key exception. And I believe File Pulse is taking the filename as the message key.

I had gone through the console output and tried to find any keywords of filter or Filter, and got no results. Therefore I was suggesting the config file is not registering the filters as I imagined.

Any help is much appreciated. I have to confess that I am very new to the kafka ecosystem. Thanks.

@fhussonnois
Copy link
Member

Hi @ycchangabriel, sorry for the late reply. Actually the filter chain configured with filters property is applied by Task after input directories have been scanned. So you cannot you the Connect File Pulse capabilities for that purpose. I will try to found a solution for your issue.

@yccgabriel
Copy link
Author

Hi @fhussonnois ,

Thanks for your feedback. No worries in that case I'll try to setup another transform job (e.g. rsync) to get the files inplace.

@fhussonnois
Copy link
Member

Hi @ycchangabriel, Sorry I come back to your issue. I've just noticed that in your configuration you have specified offset.strategy=name+path but name+path is invalid. Which strategy did you finally configure ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants