You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm seeing some unexpected behaviour since updating from 2.3.1 to 2.3.2 which I think is related to the cached receivers.
I am running a pyspark job which fetches a dataframe from my EventHub, both startingPosition end endingPosition properties are set with a enqueuedTime some processing on the data is done and finally saved.
First time I run the job all is fine, when I run the same job a second time however (after the first job is finished but on the the same cluster) I get following error on: spark.read.format("eventhubs").options(**eh_conf).load()
java.lang.NullPointerException
at org.apache.spark.eventhubs.client.EventHubsClient$$anonfun$9$$anonfun$12.apply(EventHubsClient.scala:246)
at org.apache.spark.eventhubs.client.EventHubsClient$$anonfun$9$$anonfun$12.apply(EventHubsClient.scala:245)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Success.map(Try.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
This only is happening when using version 2.3.2, the same job with the 2.3.1 library runs fine on every subsequent run, always returning the same output.
I am using Spark version 2.3, I get the same behaviour when running on a Databricks interactive cluster and running pyspark in local mode.
The text was updated successfully, but these errors were encountered:
AdaptedForkJoinTask.exec <- that's happening in the ExecutionContext used for async operations.
The line that's giving the NPE is happening when an EventData is being read. Looks like this could be a bug with the Future being used in the translate method.
Ok, this is a bug. I'll have it patched soon - it doesn't have to do with cached receivers. Basically, when you call receive the service can give back null in some cases and the Future in the translate method doesn't retry in those cases. So then the null is read and we get a NullPointerException.
This only happens if a offset or enqueued time is passed as a startingPosition or endingPosition.
You have two options for now:
Wait for the patch and build your JAR from master. Continue using enqueued times. The patch won't take long - I expect end of Monday at the latest.
The quickest thing for your job to be doing is to find the max sequence number per partition in the batch, save those, and then using those as the startingPositions for your next batch. endOfStream can be your endingPositions. Then in Databricks you simply schedule the job to run every so often with retries. That way there's no data loss and the jobs won't be running as long since using endOfStream, startOfStream, and sequence numbers are by far the fastest ways to use the connector.
The second option makes sense to me but I can see many reasons why you wouldn't do it!! Just wanted to make the option available :) Anyways, I'll patch this up and close this issue out when the PR is merged. Let me know if you have any questions!
sabeegrewal
changed the title
Failed to rerun same job since 2.3.2 update
In some case, a future can return null in translate method
Jul 23, 2018
sabeegrewal
changed the title
In some case, a future can return null in translate method
A future can return null in translate method
Jul 23, 2018
sabeegrewal
changed the title
A future can return null in translate method
A future can return null in EventHubsClient.translate
Jul 26, 2018
I'm seeing some unexpected behaviour since updating from 2.3.1 to 2.3.2 which I think is related to the cached receivers.
I am running a pyspark job which fetches a dataframe from my EventHub, both
startingPosition
endendingPosition
properties are set with aenqueuedTime
some processing on the data is done and finally saved.First time I run the job all is fine, when I run the same job a second time however (after the first job is finished but on the the same cluster) I get following error on:
spark.read.format("eventhubs").options(**eh_conf).load()
This only is happening when using version 2.3.2, the same job with the 2.3.1 library runs fine on every subsequent run, always returning the same output.
I am using Spark version 2.3, I get the same behaviour when running on a Databricks interactive cluster and running pyspark in local mode.
The text was updated successfully, but these errors were encountered: