Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventhubs structured streaming job regularly fails once every day with java.util.concurrent.TimeoutException: Futures timed out after [5 minutes] #582

Closed
ganeshchand opened this issue Feb 9, 2021 · 7 comments
Assignees

Comments

@ganeshchand
Copy link

I have a structured streaming job that reads from eventhubs with 32 partitions and auto-inflate TUs. What I am seeing is that the streaming job consistently fails once a day with java.util.concurrent.TimeoutException: Futures timed out after [5 minutes] error. See stacktrace below and this seems to be a connector issue rather than the pipeline or data issue.

  • Actual behavior: The job fails once every 24 hours
  • Expected behavior: The job should continue to run unless there are hard-failures in the pipeline execution.
  • Spark version: DBR 7.5 (Apache Spark 3.0.1)
  • spark-eventhubs artifactId and version: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18

The cluster has 8 nodes w/ 4 cores (i.e. 32 cores) to match 32 partitions. The streaming job uses the default eventhub thread pool settings (16).

This issue was reported in the past and it was supposed to be fixed. See here
Another issue was also filed recently. See here

`Driver stacktrace:
21/02/09 10:52:06 ERROR MicroBatchExecution: Query [id = 25a0e130-0192-48a2-9486-ae54736ee9b8, runId = 964e5376-95f4-4a51-be72-8e28bbbb6acb] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 10727.0 failed 4 times, most recent failure: Lost task 20.3 in stage 10727.0 (TID 88281, 10.211.60.144, executor 2): java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.createReceiver(CachedEventHubsReceiver.scala:99)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.recreateReceiver(CachedEventHubsReceiver.scala:151)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:169)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:231)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:356)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:123)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.createReceiver(CachedEventHubsReceiver.scala:99)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.recreateReceiver(CachedEventHubsReceiver.scala:151)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:169)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:231)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:356)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:123)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)``

@nyaghma
Copy link
Contributor

nyaghma commented Feb 22, 2021

@ganeshchand, we have seen this issue when cached receivers for different partitions keep moving between different executor nodes, which results in recreating those receivers each time they move.
You can increase the spark locality in order to reduce/avoid recreating receivers. The spark locality can be increased by assigning a higher value than the default 3s to the "spark.locality.wait" property.

@ganeshchand
Copy link
Author

Thanks @nyaghma. I read the FAQ as well. Quick questions:

  • Will increasing the spark locality can help to reduce/avoid recreating receivers even in the scenarios when you are using task preemption (if an executor is not responding for a configured amount of time, the task will get killed and reassigned to another executor)?
  • If tasks or partitions are being assigned to the different executor, does it make sense for the connector to handle the timeout and let the job progress and not fail the job completely?
  • When the job fails with this error, restarting the job (with jobs retry policy) is almost always successful. Should the connector then provide a sensible configuration to handle this failure so the client code can decide whether to ignore this class of error ?

@nyaghma
Copy link
Contributor

nyaghma commented Feb 25, 2021

@ganeshchand to answer your questions:

  • The "spark.locality.wait" determines the wait time before launching a task on a less local node. Therefore, I believe it should be helpful even when using task preemption by simply giving more chances for tasks to run locally.
  • A batch is complete only when it has received events from all partitions. Therefore, even if one task fails the entire batch and job will fail. This being said, the connector uses a retry mechanism to re-run a task up to 4 times before giving up and failing the batch (If you look at the log you shared, you see this trace: "...Task 20 in stage 10727.0 failed 4 times...").
  • The connector relies on the spark batch execution model. The configurations provided by spark would be applicable to the connector as well.

Since you have mentioned using task preemption, I just want to bring a point to your attention. If the preemption time out is less than the time required to receive events from a partition (which depends on different factors like the batch size, resources, etc) that results in killing a task before being completed and probably retrying it on a different node. This could increase the chance of receiver recreation.
Since I don't know your workload, I'm not sure if that's the case with your job or not, but you can check the logs to see if any receiving task is being killed by preemption. Also, you can check this trace in the executors' logs which includes the time a task spends on receiving events from a partition:

logInfo(s"(TID $taskId) Finished receiving for $nAndP, consumer group: ${ehConf.consumerGroup

@ganeshchand
Copy link
Author

Thanks @nyaghma. I don't have task preemption enabled on this pipeline but I was evaluating that as one potential option to deal with these transient task failures that only happen during a particular window of a day or two. Basically, here's what I understand:

  • Task preemption will make it worse as it will result in more receivers being created.
  • Increasing spark.locality.wait might fix the issue but it can potentially increase the latency. For latency-sensitive pipelines (under 1 minute), this may not be the right solution.
  • The connector respects underlying Spark retry configurations (4 times). If a task fails all 4 times, the batch and job will fail. This is expected behavior. No issues here. I think we need to investigate on our side what is causing the tasks to fail.

One last question:
When tasks are retried (e.g. Task 20 in stage 10727.0 failed 4 times) - did this result in 4 new receivers being created each time it was re-tried or receivers are only created if and only if the retried task is scheduled to a new executor.

@nyaghma
Copy link
Contributor

nyaghma commented Feb 26, 2021

@ganeshchand, just to clarify, task preemption doesn't necessarily result in a receiver recreation situation, but it could if the preemption time out is low with respect to the size of batches such that a receiver can't finish its task before the scheduler kills it.

To answer your question, retrying a receiving task recreates the receiver only if the driver moves the task to another executor node. The driver could send the new task to the same executor node or another one.
Tasks and receiver recreations can be tracked by investigating the logs on the driver and executor nodes.

@nyaghma
Copy link
Contributor

nyaghma commented Jun 30, 2021

The Future Timeout exception happened due to an issue in the underlying Java SDK. PR #604 updated the Java SDK to version 3.3.0 which has fixed this issue. This PR has been merged to the master branch and been released in version 2.3.20.

@nyaghma nyaghma closed this as completed Jun 30, 2021
@xneg
Copy link

xneg commented Oct 25, 2021

Suddenly caught this exact error with library version 2.3.20.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2409440.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2409440.0 (TID 13294191) (10.210.2.4 executor 15): java.util.concurrent.TimeoutException: Futures timed out after [90 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
	at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
	at scala.concurrent.Await$.result(package.scala:146)
	at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.createReceiver(CachedEventHubsReceiver.scala:102)
	at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.recreateReceiver(CachedEventHubsReceiver.scala:154)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants