-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Structured stream from event hub error: java.util.concurrent.TimeoutException: Futures timed out after #411
Comments
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 604.0 failed 4 times, most recent failure: Lost task 1.3 in stage 604.0 (TID 17919, 10.139.64.7, executor 1): java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] |
This is the timeout that causes the issue. I am using a databricks notebook that is listening to the eventhub messages, and it's totally possible that first event is received after longer period than 300 seconds. core/src/main/scala/org/apache/spark/eventhubs/package.scala val InternalOperationTimeout: FiniteDuration = 300.secondsCan it be made configurable? |
I am too getting this error. I'm using azure-eventhubs-spark_2.11 2.2.6 and spark 2.1.1. This happens when a new message is added to the eventbub that I'm listening to.
|
@mattias01 |
Databricks runtime: 4.3 (includes Apache Spark 2.3.1, Scala 2.11)
azure-eventhubs-spark_2.11-2.3.6
If only one message is sent to to azure event hub stream is stopped after a while with the following error:
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
Table "intraday.trades" is not updated.
Sending more messages works most of the time. But we need to reliably send and receive any number of messages. Please help. Thanks!
val customEventhubParameters = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEnqueuedTime(Instant.now))
intradayTrades.writeStream.format("delta").outputMode("append").option("mergeSchema", "true").option("checkpointLocation", s"$datalake$check").table("intraday.trades")
The text was updated successfully, but these errors were encountered: