Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A future can return null in EventHubsClient.translate #357

Closed
sardinois opened this issue Jul 18, 2018 · 2 comments · Fixed by #369
Closed

A future can return null in EventHubsClient.translate #357

sardinois opened this issue Jul 18, 2018 · 2 comments · Fixed by #369
Assignees
Labels
known issue This is a verified bug in the codebase.
Milestone

Comments

@sardinois
Copy link
Contributor

I'm seeing some unexpected behaviour since updating from 2.3.1 to 2.3.2 which I think is related to the cached receivers.

I am running a pyspark job which fetches a dataframe from my EventHub, both startingPosition end endingPosition properties are set with a enqueuedTime some processing on the data is done and finally saved.
First time I run the job all is fine, when I run the same job a second time however (after the first job is finished but on the the same cluster) I get following error on:
spark.read.format("eventhubs").options(**eh_conf).load()

 java.lang.NullPointerException
	at org.apache.spark.eventhubs.client.EventHubsClient$$anonfun$9$$anonfun$12.apply(EventHubsClient.scala:246)
	at org.apache.spark.eventhubs.client.EventHubsClient$$anonfun$9$$anonfun$12.apply(EventHubsClient.scala:245)
	at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
	at scala.util.Try$.apply(Try.scala:192)
	at scala.util.Success.map(Try.scala:237)
	at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
	at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
	at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

This only is happening when using version 2.3.2, the same job with the 2.3.1 library runs fine on every subsequent run, always returning the same output.

I am using Spark version 2.3, I get the same behaviour when running on a Databricks interactive cluster and running pyspark in local mode.

@sabeegrewal sabeegrewal self-assigned this Jul 20, 2018
@sabeegrewal sabeegrewal added the possible issue This is a user-reported bug that has yet to be confirmed. label Jul 20, 2018
@sabeegrewal
Copy link
Contributor

AdaptedForkJoinTask.exec <- that's happening in the ExecutionContext used for async operations.

The line that's giving the NPE is happening when an EventData is being read. Looks like this could be a bug with the Future being used in the translate method.

I'll report back soon!

@sabeegrewal sabeegrewal added known issue This is a verified bug in the codebase. and removed possible issue This is a user-reported bug that has yet to be confirmed. labels Jul 20, 2018
@sabeegrewal
Copy link
Contributor

Ok, this is a bug. I'll have it patched soon - it doesn't have to do with cached receivers. Basically, when you call receive the service can give back null in some cases and the Future in the translate method doesn't retry in those cases. So then the null is read and we get a NullPointerException.

This only happens if a offset or enqueued time is passed as a startingPosition or endingPosition.

You have two options for now:

  1. Wait for the patch and build your JAR from master. Continue using enqueued times. The patch won't take long - I expect end of Monday at the latest.
  2. The quickest thing for your job to be doing is to find the max sequence number per partition in the batch, save those, and then using those as the startingPositions for your next batch. endOfStream can be your endingPositions. Then in Databricks you simply schedule the job to run every so often with retries. That way there's no data loss and the jobs won't be running as long since using endOfStream, startOfStream, and sequence numbers are by far the fastest ways to use the connector.

The second option makes sense to me but I can see many reasons why you wouldn't do it!! Just wanted to make the option available :) Anyways, I'll patch this up and close this issue out when the PR is merged. Let me know if you have any questions!

@sabeegrewal sabeegrewal changed the title Failed to rerun same job since 2.3.2 update In some case, a future can return null in translate method Jul 23, 2018
@sabeegrewal sabeegrewal changed the title In some case, a future can return null in translate method A future can return null in translate method Jul 23, 2018
@sabeegrewal sabeegrewal changed the title A future can return null in translate method A future can return null in EventHubsClient.translate Jul 26, 2018
@sabeegrewal sabeegrewal added this to the 2.3.3 milestone Aug 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
known issue This is a verified bug in the codebase.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants