You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I need to configure this filter because I have files with the same name but in different folders.
Therefore I am trying to convert the key from just the file name to {path}/{filename}.
[2020-07-01 23:54:49,061] ERROR Unexpected error while scanning file system. (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread:81)
java.lang.IllegalStateException: Duplicate key [name='Kube+ 51421338 08;15;00.raw_bin', path='/tmp/data/2020-05-15', size=671, lastModified=1589637255000, inode=443601, hash=4286515073]
at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1254)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.toScheduled(LocalFileSystemScanner.java:251)
at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.updateFiles(LocalFileSystemScanner.java:228)
at io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner.scan(LocalFileSystemScanner.java:193)
at io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread.run(FileSystemMonitorThread.java:79)
I was doing this name conversion because of the above Duplicate key exception. And I believe File Pulse is taking the filename as the message key.
I had gone through the console output and tried to find any keywords of filter or Filter, and got no results. Therefore I was suggesting the config file is not registering the filters as I imagined.
Any help is much appreciated. I have to confess that I am very new to the kafka ecosystem. Thanks.
The text was updated successfully, but these errors were encountered:
Hi @ycchangabriel, sorry for the late reply. Actually the filter chain configured with filters property is applied by Task after input directories have been scanned. So you cannot you the Connect File Pulse capabilities for that purpose. I will try to found a solution for your issue.
Hi @ycchangabriel, Sorry I come back to your issue. I've just noticed that in your configuration you have specified offset.strategy=name+path but name+path is invalid. Which strategy did you finally configure ?
It might be I overlooked some documentation, but I am in a situation that I think my standalone kafka-connect worker is ignoring the filters setting.
I need to configure this filter because I have files with the same name but in different folders.
Therefore I am trying to convert the key from just the file name to {path}/{filename}.
I was doing this name conversion because of the above
Duplicate key
exception. And I believe File Pulse is taking the filename as the message key.I had gone through the console output and tried to find any keywords of
filter
orFilter
, and got no results. Therefore I was suggesting the config file is not registering the filters as I imagined.Any help is much appreciated. I have to confess that I am very new to the kafka ecosystem. Thanks.
The text was updated successfully, but these errors were encountered: