Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException while executing AppendFilter and ByteArrayInputReader #59

Closed
fhussonnois opened this issue Aug 10, 2020 · 1 comment

Comments

@fhussonnois
Copy link
Member

{{ matches( $metadata.name, ^a_.*) }}

When i use Appendfilter i am getting following error.

[2020-08-09 21:22:04,642] INFO [coverage.basecoverage-file-source-deva|task-0] Opening new iterator for source : [name='NEXUSCOVBATCH_MEMB_PLAN_COV_4.json', path='/mnt/nexus-kafka-connect/member-coverage/MEMBPLANCOV/deva', size=43516, lastModified=1597022524000, inode=25, hash=880644864] (io.streamthoughts.kafka.connect.filepulse.source.FileInputIterable:65)
[2020-08-09 21:22:04,668] ERROR [coverage.basecoverage-file-source-deva|task-0] Error occurred while executing filter 'AppendFilter' on record='[message=[value=[B@f9f9858, schema =[type=BYTES]]]' (io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline:180)
[2020-08-09 21:22:04,668] ERROR [coverage.basecoverage-file-source-deva|task-0] Error while processing source file '[name='NEXUSCOVBATCH_MEMB_PLAN_COV_4.json', path='/mnt/nexus-kafka-connect/member-coverage/MEMBPLANCOV/deva', size=43516, lastModified=1597022524000, inode=25, hash=880644864]' (io.streamthoughts.kafka.connect.filepulse.source.KafkaFileStateReporter:119)
java.lang.NullPointerException
[2020-08-09 21:22:04,669] INFO [coverage.basecoverage-file-source-deva|task-0] WorkerSourceTask{id=coverage.basecoverage-file-source-deva-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-08-09 21:22:04,669] INFO [coverage.basecoverage-file-source-deva|task-0] WorkerSourceTask{id=coverage.basecoverage-file-source-deva-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-08-09 21:22:04,669] ERROR [coverage.basecoverage-file-source-deva|task-0] WorkerSourceTask{id=coverage.basecoverage-file-source-deva-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException: java.lang.NullPointerException
at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:188)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:126)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
[2020-08-09 21:22:04,669] ERROR [coverage.basecoverage-file-source-deva|task-0] WorkerSourceTask{id=coverage.basecoverage-file-source-deva-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)

My configuration is below.

"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
"filters":"SetSUBGRPTopic",
"filters.SetSUBGRPTopic.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetSUBGRPTopic.if":"{{ matches($metadata.name,^NEXUSCOVBATCH_MEMB_PLAN_COV_.*) }}",
"filters.SetSUBGRPTopic.field":"{{ $topic }}",
"filters.SetSUBGRPTopic.value":"member.coverage.domain.deva.basecoverage"

Originally posted by @mohai-kafka in #47 (comment)

@fhussonnois
Copy link
Member Author

Hi @mohai-kafka, you configuration should be "filters.SetSUBGRPTopic.field":"$topic" . But there is still an issue that will be fixed in the coming bugfixes releases.

fhussonnois added a commit that referenced this issue Aug 10, 2020
…cted type (#59)

Additional changes:
 - AppendFilter throws a FilterException if expression value for target
field return null

Resolves: GH-59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant