-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ReactorDispatcher instance is closed. #454
Comments
We are hitting this issue 8 out of 10 times while trying to process data from Event hub in Stream and batch mode |
Could you please let us know which version of the eventhubs-spark connector you are using? Are you seeing this using 2.3.14.1? |
I'm closing this issue now. Please reopen if you see the same problem using the current version. |
I am hitting this issue during my tests. I understand one of the reasons for this is if one has multiple streaming queries running while using the same consumer group. However how can I get out of this state once I hit this issue for what ever reason? I have a single Databricks notebook running but now cannot get out of this error state unless I restart the cluster. I am using the latest version of the library 2.3.15 |
I'm facing this at 2.3.17, can you please reopenit @nyaghma ? at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:201) Driver stacktrace: |
Hi,
I see this was an issue seemed to have resolved in 2018 but I am hitting this in 2.4.1
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 342.0 failed 4 times, most recent failure: Lost task 0.3 in stage 342.0 (TID 107985, 10.139.64.7, executor 1): java.util.concurrent.RejectedExecutionException: ReactorDispatcher instance is closed.
at com.microsoft.azure.eventhubs.impl.ReactorDispatcher.throwIfSchedulerError(ReactorDispatcher.java:81)
at com.microsoft.azure.eventhubs.impl.ReactorDispatcher.invoke(ReactorDispatcher.java:57)
at com.microsoft.azure.eventhubs.impl.MessagingFactory.scheduleOnReactorThread(MessagingFactory.java:375)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.receive(MessageReceiver.java:268)
at com.microsoft.azure.eventhubs.impl.PartitionReceiverImpl.receive(PartitionReceiverImpl.java:142)
at com.microsoft.azure.eventhubs.PartitionReceiver.lambda$receiveSync$0(PartitionReceiver.java:89)
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.sync(ExceptionUtil.java:179)
at com.microsoft.azure.eventhubs.PartitionReceiver.receiveSync(PartitionReceiver.java:89)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.liftedTree1$1(CachedEventHubsReceiver.scala:103)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:102)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:157)
at org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.next(EventHubsRDD.scala:147)
at org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.next(EventHubsRDD.scala:128)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:638)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:638)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:150)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:299)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:595)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2076)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:223)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2240)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2262)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2281)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:191)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: ReactorDispatcher instance is closed.
at com.microsoft.azure.eventhubs.impl.ReactorDispatcher.throwIfSchedulerError(ReactorDispatcher.java:81)
at com.microsoft.azure.eventhubs.impl.ReactorDispatcher.invoke(ReactorDispatcher.java:57)
at com.microsoft.azure.eventhubs.impl.MessagingFactory.scheduleOnReactorThread(MessagingFactory.java:375)
at com.microsoft.azure.eventhubs.impl.MessageReceiver.receive(MessageReceiver.java:268)
at com.microsoft.azure.eventhubs.impl.PartitionReceiverImpl.receive(PartitionReceiverImpl.java:142)
at com.microsoft.azure.eventhubs.PartitionReceiver.lambda$receiveSync$0(PartitionReceiver.java:89)
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.sync(ExceptionUtil.java:179)
at com.microsoft.azure.eventhubs.PartitionReceiver.receiveSync(PartitionReceiver.java:89)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.liftedTree1$1(CachedEventHubsReceiver.scala:103)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:102)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:157)
at org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.next(EventHubsRDD.scala:147)
at org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.next(EventHubsRDD.scala:128)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:638)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:638)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:150)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:299)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:595)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:383)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2076)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:223)
The text was updated successfully, but these errors were encountered: