Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't consume from IOTHub's "Event Hub-compatible endpoint" #312

Closed
eduardBM opened this issue May 3, 2018 · 10 comments
Closed

Can't consume from IOTHub's "Event Hub-compatible endpoint" #312

eduardBM opened this issue May 3, 2018 · 10 comments
Assignees

Comments

@eduardBM
Copy link

eduardBM commented May 3, 2018

Bug Report:
Trying to consume/stream events from an Azure IOTHub's "Event Hub-compatible endpoint" inside spark streaming

  • Actual behavior

18/05/03 13:02:35 WARN MessagingFactory: messagingFactory[MessagingFactory0308e7], hostName[iothub-ns-iohenedspe-443357-03c23e169d.servicebus.windows.net], error[UnHandled exception while processing events in reactor:
java.lang.IllegalArgumentException: message.getMessageId() should be null
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112)
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:445)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)Cause: message.getMessageId() should be null
com.microsoft.azure.eventhubs.impl.RequestResponseChannel.request(RequestResponseChannel.java:110)

  • Expected behavior
    Stream/consume messages from EH endpoint

  • Spark version
    2.1.1.2.6.2.25-1

  • spark-eventhubs artifactId and version
    azure-eventhubs-spark_2.11-2.3.0.jar

@sabeegrewal
Copy link
Contributor

Hey @eduardBM! For Spark 2.1 you need to use version 2.2.0 of the azure-eventhubs-spark_2.11 library. Can you try switching and see how it goes?

@eduardBM
Copy link
Author

eduardBM commented May 9, 2018

Hi @sabeegrewal. I updated my local copy with the tip of "spark-2.2" branch, ended up with artifact "core/target/azure-eventhubs-spark_2.11-2.2.0-PREVIEW.jar", tried running the spark streaming job with that one but still getting the same error.

@eduardBM
Copy link
Author

eduardBM commented May 9, 2018

Hi,

Looking closely at the traceback, the actual cause is https://github.com/Azure/azure-event-hubs-java/blob/dev/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java#L104 which means the "IoT Hub Event hub compatible endpoint" is producing incompatible messages. (as the same library works with an actual Event Hub endpoint)

Package used: "--packages com.microsoft.azure:azure-eventhubs:1.0.1"

@eduardBM
Copy link
Author

Hi, @sabeegrewal , any news regarding this?
Thx,

Ed

@sabeegrewal sabeegrewal self-assigned this May 14, 2018
@sabeegrewal
Copy link
Contributor

sabeegrewal commented May 14, 2018

Hey @eduardBM - I haven't tried to repro this but we have a number of people running pipelines from IoTHub, so I'm curious to know what's going on.

I'm going to try to repro the issue tonight/tomorrow morning. If you have anything else that'd help me repro, please let me know!

@eduardBM
Copy link
Author

Hi,
@sabeegrewal
Thanks for the quick reply.

Some more details:

  • we're running a customized version of this repo, related to: Cannot create EventHubsDirectStream in PySpark #282
  • we got it running on a "standard" EventHubs endpoint, where we pushed data with a test script and we saw data coming in
  • we're running python code using PySpark
  • after tests with EH endpoint we switched to "Event Hub-compatible endpoint" of IOTHub (since data is coming from IOTHub devices)

Here is the full traceback inside spark:

18/05/09 13:17:01 WARN MessagingFactory: messagingFactory[MessagingFactory626d64], hostName[.servicebus.windows.net], error[UnHandled exception while processing events in reactor:
java.lang.NullPointerException
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112)
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:462)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)Cause: null
com.microsoft.azure.eventhubs.impl.AmqpUtil.sizeof(AmqpUtil.java:125)
com.microsoft.azure.eventhubs.impl.AmqpUtil.getDataSerializedSize(AmqpUtil.java:85)
com.microsoft.azure.eventhubs.impl.RequestResponseChannel.request(RequestResponseChannel.java:115)
com.microsoft.azure.eventhubs.impl.ManagementChannel$1.onComplete(ManagementChannel.java:45)
com.microsoft.azure.eventhubs.impl.ManagementChannel$1.onComplete(ManagementChannel.java:42)
com.microsoft.azure.eventhubs.impl.FaultTolerantObject$1$1.onComplete(FaultTolerantObject.java:59)
com.microsoft.azure.eventhubs.impl.FaultTolerantObject$1$1.onComplete(FaultTolerantObject.java:53)
com.microsoft.azure.eventhubs.impl.RequestResponseOpener$2.onComplete(RequestResponseOpener.java:55)
com.microsoft.azure.eventhubs.impl.RequestResponseOpener$2.onComplete(RequestResponseOpener.java:49)
com.microsoft.azure.eventhubs.impl.RequestResponseChannel.onLinkOpenComplete(RequestResponseChannel.java:129)
com.microsoft.azure.eventhubs.impl.RequestResponseChannel.access$200(RequestResponseChannel.java:23)
com.microsoft.azure.eventhubs.impl.RequestResponseChannel$ResponseHandler.onOpenComplete(RequestResponseChannel.java:228)
com.microsoft.azure.eventhubs.impl.ReceiveLinkHandler.onLinkRemoteOpen(ReceiveLinkHandler.java:57)
org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:164)
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:462)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)]
18/05/09 13:17:02 WARN MessagingFactory: messagingFactory[MessagingFactory626d64], hostName[.servicebus.windows.net], error[UnHandled exception while processing events in reactor:
java.lang.IllegalArgumentException: message.getMessageId() should be null
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:112)
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:462)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)Cause: message.getMessageId() should be null
com.microsoft.azure.eventhubs.impl.RequestResponseChannel.request(RequestResponseChannel.java:104)
com.microsoft.azure.eventhubs.impl.ManagementChannel$1.onComplete(ManagementChannel.java:45)
com.microsoft.azure.eventhubs.impl.ManagementChannel$1.onComplete(ManagementChannel.java:42)
com.microsoft.azure.eventhubs.impl.FaultTolerantObject$1$1.onComplete(FaultTolerantObject.java:59)
com.microsoft.azure.eventhubs.impl.FaultTolerantObject$1$1.onComplete(FaultTolerantObject.java:53)
com.microsoft.azure.eventhubs.impl.RequestResponseOpener$2.onComplete(RequestResponseOpener.java:55)
com.microsoft.azure.eventhubs.impl.RequestResponseOpener$2.onComplete(RequestResponseOpener.java:49)
com.microsoft.azure.eventhubs.impl.RequestResponseChannel.onLinkOpenComplete(RequestResponseChannel.java:129)
com.microsoft.azure.eventhubs.impl.RequestResponseChannel.access$200(RequestResponseChannel.java:23)
com.microsoft.azure.eventhubs.impl.RequestResponseChannel$ResponseHandler.onOpenComplete(RequestResponseChannel.java:228)
com.microsoft.azure.eventhubs.impl.ReceiveLinkHandler.onLinkRemoteOpen(ReceiveLinkHandler.java:57)
org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:164)
org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:462)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)]
18/05/09 13:48:04 ERROR ApplicationMaster: RECEIVED SIGNAL TERM
18/05/09 13:48:04 ERROR ApplicationMaster: User application exited with status 143
18/05/09 13:48:04 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate(6,WrappedArray())

This is what looks like the cause of the error:

Cause: message.getMessageId() should be null
com.microsoft.azure.eventhubs.impl.RequestResponseChannel.request(RequestResponseChannel.java:104)

Thanks,

Ed

@sabeegrewal
Copy link
Contributor

Oh wow, so to be clear: you guys had this working with an Event Hub? And when you switched to the Event Hub Compatible endpoint it stopped working?

Also, can you share you Event Hub Compatible endpoint here? Just change all the private/personal info to xxxx or something 👍

@eduardBM
Copy link
Author

Hi,

The working eventHub: "Endpoint=sb://test-eduard.servicebus.windows.net/;SharedAccessKeyName=IncomingDataListen;SharedAccessKey=XXX;EntityPath=incoming-data",

The IOTHub Compatible eventHub: "Endpoint=sb://iothub-ns-iohenedspe-443357-03c23e169d.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=XXX"

Ed

@sabeegrewal
Copy link
Contributor

sabeegrewal commented May 18, 2018

Ah, there's the issue. You're missing the EntityPath in your second connection string. You should see an EventHub-compatible name in the Azure portal where you got the endpoint. Update your connection string to include it:

Endpoint=sb://iothub-ns-iohenedspe-443357-03c23e169d.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=XXX;EntityPath={YOUR.EVENT.HUBS.COMPATIBLE.NAME}

It'd be nice if all the pieces were in one spot...no clue why the IoT Hub team has it separate like that. They may have a good reason, but I'll track down whoever works on the IoT Hub portal and see if they're willing to change it 👍

Try that out and let me know how it goes!

@eduardBM
Copy link
Author

@sabeegrewal
Hi, i managed to get it working, can't believe it was something so silly... (facepalm).
Neither the official documentation nor the portal UI mention this.

(https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-read-builtin)

If you need to build an Event Hub connection string by using the previous information, use the following pattern:

Endpoint={Event Hub-compatible endpoint};SharedAccessKeyName={iot hub policy name};SharedAccessKey={iot hub policy key}

Thanks for your help, and please let me know when you have to go over the other issue (with Pyspark).

Thanks,

Ed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants