diff --git a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala index 4a8008920..cf784387a 100644 --- a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala +++ b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala @@ -329,7 +329,7 @@ private[spark] object CachedEventHubsReceiver extends CachedReceiver with Loggin private[this] val receivers = new MutableMap[String, CachedEventHubsReceiver]() - // RPC endpoint for partition performacne communciation in the executor + // RPC endpoint for partition performance communication in the executor val partitionPerformanceReceiverRef = RpcUtils.makeDriverRef(PartitionPerformanceReceiver.ENDPOINT_NAME, SparkEnv.get.conf, diff --git a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala index 23e36f30d..f871605e0 100644 --- a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala @@ -30,9 +30,7 @@ import org.apache.spark.eventhubs.rdd.{ EventHubsRDD, OffsetRange } import org.apache.spark.eventhubs.utils.ThrottlingStatusPlugin import org.apache.spark.eventhubs.{ EventHubsConf, NameAndPartition, _ } import org.apache.spark.internal.Logging -import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.ExecutorCacheTaskLocation -import org.apache.spark.SparkEnv import org.apache.spark.sql.execution.streaming.{ HDFSMetadataLog, Offset, @@ -77,6 +75,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, import EventHubsConf._ import EventHubsSource._ + import EventHubsSourceProvider._ private lazy val ehClient = EventHubsSourceProvider.clientFactory(parameters)(ehConf) private lazy val partitionCount: Int = ehClient.partitionCount @@ -87,9 +86,11 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, private val sc = sqlContext.sparkContext private val maxOffsetsPerTrigger: Option[Long] = - Option(parameters.get(MaxEventsPerTriggerKey).map(_.toLong).getOrElse( - parameters.get(MaxEventsPerTriggerKeyAlias).map(_.toLong).getOrElse( - partitionCount * 1000))) + Option(parameters + .get(MaxEventsPerTriggerKey) + .map(_.toLong) + .getOrElse( + parameters.get(MaxEventsPerTriggerKeyAlias).map(_.toLong).getOrElse(partitionCount * 1000))) // set slow partition adjustment flag and static values in the tracker private val slowPartitionAdjustment: Boolean = @@ -147,22 +148,25 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, text.substring(1, text.length).toInt } catch { case _: NumberFormatException => - throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + - s"version from $text.") + throw new IllegalStateException( + s"Log file was malformed: failed to read correct log " + + s"version from $text.") } if (version > 0) { if (version > maxSupportedVersion) { - throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + - s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + - s"by a newer version of Spark and cannot be read by this version. Please upgrade.") + throw new IllegalStateException( + s"UnsupportedLogVersion: maximum supported log version " + + s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + + s"by a newer version of Spark and cannot be read by this version. Please upgrade.") } else { return version } } } // reaching here means we failed to read the correct log version - throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + - s"version from $text.") + throw new IllegalStateException( + s"Log file was malformed: failed to read correct log " + + s"version from $text.") } } val defaultSeqNos = ehClient @@ -449,14 +453,7 @@ private[eventhubs] object EventHubsSource { """.stripMargin private[eventhubs] val VERSION = 1 - - // RPC endpoint for partition performacne communciation in the driver private var localBatchId = -1 - val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker - val partitionPerformanceReceiver: PartitionPerformanceReceiver = - new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker) - val partitionPerformanceReceiverRef: RpcEndpointRef = SparkEnv.get.rpcEnv - .setupEndpoint(PartitionPerformanceReceiver.ENDPOINT_NAME, partitionPerformanceReceiver) def getSortedExecutorList(sc: SparkContext): Array[String] = { val bm = sc.env.blockManager diff --git a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala index 230b2ee0b..5097b8f85 100644 --- a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala +++ b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala @@ -45,6 +45,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{ AnalysisException, DataFrame, SQLContext, SaveMode } import org.apache.spark.unsafe.types.UTF8String import org.json4s.jackson.Serialization +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.SparkEnv import collection.JavaConverters._ @@ -140,6 +142,12 @@ private[sql] class EventHubsSourceProvider } private[sql] object EventHubsSourceProvider extends Serializable { + // RPC endpoint for partition performance communication in the driver + val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker + val partitionPerformanceReceiver: PartitionPerformanceReceiver = + new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker) + val partitionPerformanceReceiverRef: RpcEndpointRef = SparkEnv.get.rpcEnv + .setupEndpoint(PartitionPerformanceReceiver.ENDPOINT_NAME, partitionPerformanceReceiver) def eventHubsSchema: StructType = { StructType( @@ -169,32 +177,32 @@ private[sql] object EventHubsSourceProvider extends Serializable { new java.sql.Timestamp(ed.getSystemProperties.getEnqueuedTime.toEpochMilli)), UTF8String.fromString(ed.getSystemProperties.getPublisher), UTF8String.fromString(ed.getSystemProperties.getPartitionKey), - ArrayBasedMapData( - ed.getProperties.asScala - .mapValues { - case b: Binary => - val buf = b.asByteBuffer() - val arr = new Array[Byte](buf.remaining) - buf.get(arr) - arr.asInstanceOf[AnyRef] - case d128: Decimal128 => d128.asBytes.asInstanceOf[AnyRef] - case d32: Decimal32 => d32.getBits.asInstanceOf[AnyRef] - case d64: Decimal64 => d64.getBits.asInstanceOf[AnyRef] - case s: Symbol => s.toString.asInstanceOf[AnyRef] - case ub: UnsignedByte => ub.toString.asInstanceOf[AnyRef] - case ui: UnsignedInteger => ui.toString.asInstanceOf[AnyRef] - case ul: UnsignedLong => ul.toString.asInstanceOf[AnyRef] - case us: UnsignedShort => us.toString.asInstanceOf[AnyRef] - case c: Character => c.toString.asInstanceOf[AnyRef] - case d: DescribedType => d.getDescribed - case default => default + ArrayBasedMapData(ed.getProperties.asScala + .mapValues { + case b: Binary => + val buf = b.asByteBuffer() + val arr = new Array[Byte](buf.remaining) + buf.get(arr) + arr.asInstanceOf[AnyRef] + case d128: Decimal128 => d128.asBytes.asInstanceOf[AnyRef] + case d32: Decimal32 => d32.getBits.asInstanceOf[AnyRef] + case d64: Decimal64 => d64.getBits.asInstanceOf[AnyRef] + case s: Symbol => s.toString.asInstanceOf[AnyRef] + case ub: UnsignedByte => ub.toString.asInstanceOf[AnyRef] + case ui: UnsignedInteger => ui.toString.asInstanceOf[AnyRef] + case ul: UnsignedLong => ul.toString.asInstanceOf[AnyRef] + case us: UnsignedShort => us.toString.asInstanceOf[AnyRef] + case c: Character => c.toString.asInstanceOf[AnyRef] + case d: DescribedType => d.getDescribed + case default => default + } + .map { p => + p._2 match { + case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s) + case default => + UTF8String.fromString(p._1) -> UTF8String.fromString(Serialization.write(p._2)) } - .map { p => - p._2 match { - case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s) - case default => UTF8String.fromString(p._1) -> UTF8String.fromString(Serialization.write(p._2)) - } - }), + }), ArrayBasedMapData( // Don't duplicate offset, enqueued time, and seqNo (ed.getSystemProperties.asScala -- Seq(OffsetAnnotation, @@ -210,8 +218,10 @@ private[sql] object EventHubsSourceProvider extends Serializable { } .map { p => p._2 match { - case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s) - case default => UTF8String.fromString(p._1) -> UTF8String.fromString(Serialization.write(p._2)) + case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s) + case default => + UTF8String.fromString(p._1) -> UTF8String.fromString( + Serialization.write(p._2)) } }) )