Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark failed to retrieve message from event hub , Throwing timeout exception. #435

Closed
jojinmp opened this issue Mar 27, 2019 · 4 comments
Closed

Comments

@jojinmp
Copy link

jojinmp commented Mar 27, 2019

I am trying to setup a spark structured streaming connection with event hub. As of now sending events using nodejs for testing. Working in intellij with maven using spark 2.3.0 with below dependency for 'azure event hub spark streaming'

    <dependency>
        <groupId>com.microsoft.azure</groupId>
        <artifactId>spark-streaming-eventhubs_2.11</artifactId>
        <version>2.1.5</version>
    </dependency>

I am getting below error while running the spark streaming job, Please note I tried with fresh event hub to make sure no other jobs running concurrently,Still getting the same error. When checked the event hub monitor page in azure portal , I could see messages and requests graph showing some values. Please help to resolve the issue, I was trying my luck in establishing spark structured streaming job with event hub past one week, I couldn't find a working sample project over internet too.

19/03/27 14:57:04 WARN SimpleSslTransportWrapper: RSA premaster secret error
19/03/27 14:58:03 ERROR MicroBatchExecution: Query [id = 7f9b54ad-f8a1-41af-b958-bd942fe716f7, runId = 599b063e-e595-4947-98d9-36fb74816882] terminated with error
com.microsoft.azure.eventhubs.TimeoutException: Opening MessagingFactory timed out.
at com.microsoft.azure.eventhubs.MessagingFactory$3.run(MessagingFactory.java:221)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
org.apache.spark.sql.streaming.StreamingQueryException: Opening MessagingFactory timed out.
=== Streaming Query ===
Identifier: [id = 7f9b54ad-f8a1-41af-b958-bd942fe716f7, runId = 599b063e-e595-4947-98d9-36fb74816882]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [window#33-T10000ms.start AS start#43, window#33-T10000ms.end AS end#44, device#25, avg(reading)#38]
+- Aggregate [window#39-T10000ms, device#25], [window#39-T10000ms AS window#33-T10000ms, device#25, avg(cast(reading#26 as double)) AS avg(reading)#38]
+- Filter isnotnull(enqueuedTime#21-T10000ms)
+- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(enqueuedTime#21-T10000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(enqueuedTime#21-T10000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(enqueuedTime#21-T10000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(enqueuedTime#21-T10000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(enqueuedTime#21-T10000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(enqueuedTime#21-T10000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(enqueuedTime#21-T10000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(enqueuedTime#21-T10000ms, TimestampType, LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0) + 60000000), LongType, TimestampType)) AS window#39-T10000ms, enqueuedTime#21-T10000ms, device#25, reading#26]
+- EventTimeWatermark enqueuedTime#21: timestamp, interval 10 seconds
+- Project [enqueuedTime#21, sensorReading#22.device AS device#25, cast(sensorReading#22.reading as float) AS reading#26]
+- Project [cast(enqueuedTime#10L as timestamp) AS enqueuedTime#21, jsontostructs(StructField(device,StringType,true), StructField(reading,StringType,true), cast(body#7 as string), Some(Asia/Calcutta)) AS sensorReading#22]
+- StreamingExecutionRelation org.apache.spark.sql.streaming.eventhubs.EventHubsSource@33a0fc07, [body#7, offset#8L, seqNumber#9L, enqueuedTime#10L, publisher#11, partitionKey#12, properties#13]

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Caused by: com.microsoft.azure.eventhubs.TimeoutException: Opening MessagingFactory timed out.

@sjkwak
Copy link
Member

sjkwak commented Apr 1, 2019

@jojinmp here is correct artifactId for the connector. Let us know if you still see the issue.
groupId = com.microsoft.azure
artifactId = azure-eventhubs-spark_2.11
version = 2.3.9

@sjkwak
Copy link
Member

sjkwak commented Apr 2, 2019

@jojinmp please reopen this issue if you still see a failure.

@sjkwak sjkwak closed this as completed Apr 2, 2019
@jojinmp
Copy link
Author

jojinmp commented Apr 3, 2019

I started with below dependency only , I am getting class not found exception, As Provided below.
groupId = com.microsoft.azure
artifactId = azure-eventhubs-spark_2.11
version = 2.3.9

Error message I am getting -: Caused by: java.lang.ClassNotFoundException: com.microsoft.azure.eventhubs.impl.EventHubClientImpl
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

I was following your doc and ran into multiple issues, I am setting up in Intellij with Maven and tried to ran in local as well as hdinsight cluster. Below are the issues I faced

  1. EventHubConf.toMap is not getting compiled. I used direct mapping of parameters to get over this.

2.In the doc spark is not defined before spark.readstream statement, Assume it is a spark session and I created the same.

  1. Once I configured as mentioned I am getting the error mentioned above.

Please note I am able to ran code from Hdinsight/Databricks Notebook, Even the above mentioned issues are not arsing there, But when setup in Intellij running into multiple issues.

Do you have a sample project which setup eventhub spark structured streaming which I can import in intellij? Would be very help full if you could provide the same.

@eswarib
Copy link

eswarib commented Dec 27, 2020

Im also facing similar issue, I am not getting error. But the spark session gets closed automatically. How did you resolve this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants