Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structured stream from event hub error: java.util.concurrent.TimeoutException: Futures timed out after #411

Closed
tmslva opened this issue Nov 20, 2018 · 4 comments · Fixed by #419
Assignees
Milestone

Comments

@tmslva
Copy link

tmslva commented Nov 20, 2018

Databricks runtime: 4.3 (includes Apache Spark 2.3.1, Scala 2.11)
azure-eventhubs-spark_2.11-2.3.6

If only one message is sent to to azure event hub stream is stopped after a while with the following error:
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
Table "intraday.trades" is not updated.

Sending more messages works most of the time. But we need to reliably send and receive any number of messages. Please help. Thanks!

val customEventhubParameters = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEnqueuedTime(Instant.now))

intradayTrades.writeStream.format("delta").outputMode("append").option("mergeSchema", "true").option("checkpointLocation", s"$datalake$check").table("intraday.trades")

@tmslva
Copy link
Author

tmslva commented Nov 26, 2018

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 604.0 failed 4 times, most recent failure: Lost task 1.3 in stage 604.0 (TID 17919, 10.139.64.7, executor 1): java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:96)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:130)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:179)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:122)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:336)

@tmslva
Copy link
Author

tmslva commented Nov 27, 2018

This is the timeout that causes the issue. I am using a databricks notebook that is listening to the eventhub messages, and it's totally possible that first event is received after longer period than 300 seconds.

core/src/main/scala/org/apache/spark/eventhubs/package.scala

val InternalOperationTimeout: FiniteDuration = 300.seconds

Can it be made configurable?
https://github.com/Azure/azure-event-hubs-spark/search?q=InternalOperationTimeout&unscoped_q=InternalOperationTimeout

@mattias01
Copy link

I am too getting this error. I'm using azure-eventhubs-spark_2.11 2.2.6 and spark 2.1.1.
The same error is also reported in: #413

This happens when a new message is added to the eventbub that I'm listening to.

java.util.concurrent.TimeoutException:` Futures timed out after [300 seconds]
...
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:96)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:130)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:179)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:122)
...

@tmslva
Copy link
Author

tmslva commented Nov 28, 2018

@mattias01
The timeout is hard-coded in package.scala line 51. I didn't find how to change it. For me it times out when there are no events received withing 300 seconds from the start of the streaming query. The work around was to send messages to the event hub (heartbeat messages) within this 5 min time frame not to let it time out. After that the query is running and streaming all day without any issues.

https://github.com/Azure/azure-event-hubs-spark/blob/57d083f2f898f85b006417619fd60e2d8b33c021/core/src/main/scala/org/apache/spark/eventhubs/package.scala

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants