From b2ff1544d45422aaf5d0972232c96be87584e533 Mon Sep 17 00:00:00 2001 From: nayaghma Date: Tue, 7 Jul 2020 13:20:30 -0500 Subject: [PATCH 1/4] sync with upstream --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index afa40cd21..b8bf93097 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ target/ scalafmt-output.xml dependency-reduced-pom.xml metastore_db +.DS_Store From 4b16fad9de8f04cf22dfeb244244d919cb5d414c Mon Sep 17 00:00:00 2001 From: Navid Yaghmazadeh Date: Tue, 22 Sep 2020 16:27:30 -0700 Subject: [PATCH 2/4] fix RPC endpoint issue for non streaming query --- .../client/CachedEventHubsReceiver.scala | 76 ++++++++++++------- 1 file changed, 47 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala index 4a8008920..9c1b26370 100644 --- a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala +++ b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala @@ -33,6 +33,8 @@ import org.apache.spark.eventhubs.{ PartitionPerformanceReceiver } import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.SparkException import org.apache.spark.util.RpcUtils import scala.collection.JavaConverters._ @@ -66,7 +68,7 @@ private[spark] trait CachedReceiver { private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, nAndP: NameAndPartition, startSeqNo: SequenceNumber) - extends Logging { + extends Logging { type AwaitTimeoutException = java.util.concurrent.TimeoutException @@ -89,11 +91,11 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}-$taskId") val consumer = retryJava( EventHubsUtils.createReceiverInner(client, - ehConf.useExclusiveReceiver, - consumerGroup, - nAndP.partitionId.toString, - EventPosition.fromSequenceNumber(seqNo).convert, - receiverOptions), + ehConf.useExclusiveReceiver, + consumerGroup, + nAndP.partitionId.toString, + EventPosition.fromSequenceNumber(seqNo).convert, + receiverOptions), "CachedReceiver creation." ) Await.result(consumer, ehConf.internalOperationTimeout) @@ -162,7 +164,7 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, Await.result(lastReceivedOffset(), ehConf.internalOperationTimeout) if ((lastReceivedSeqNo > -1 && lastReceivedSeqNo + 1 != requestSeqNo) || - !receiver.getIsOpen) { + !receiver.getIsOpen) { logInfo(s"(TID $taskId) checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup.getOrElse( DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo, lastReceivedSeqNo: $lastReceivedSeqNo, isOpen: ${receiver.getIsOpen}") @@ -193,11 +195,11 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, // The event still isn't present. It must be (2). val info = Await.result( retryJava(client.getPartitionRuntimeInformation(nAndP.partitionId.toString), - "partitionRuntime"), + "partitionRuntime"), ehConf.internalOperationTimeout) if (requestSeqNo < info.getBeginSequenceNumber && - movedSeqNo == info.getBeginSequenceNumber) { + movedSeqNo == info.getBeginSequenceNumber) { Future { movedEvent } @@ -238,8 +240,8 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, val theRest = for { i <- 1 until batchCount } yield awaitReceiveMessage(receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout), - s"receive; $nAndP; seqNo: ${requestSeqNo + i}"), - requestSeqNo) + s"receive; $nAndP; seqNo: ${requestSeqNo + i}"), + requestSeqNo) // Combine and sort the data. val combined = first ++ theRest.flatten val sorted = combined.toSeq @@ -254,10 +256,10 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, if (ehConf.slowPartitionAdjustment) { sendPartitionPerformanceToDriver( PartitionPerformanceMetric(nAndP, - EventHubsUtils.getTaskContextSlim, - requestSeqNo, - batchCount, - elapsedTimeMs)) + EventHubsUtils.getTaskContextSlim, + requestSeqNo, + batchCount, + elapsedTimeMs)) } if (metricPlugin.isDefined) { @@ -270,10 +272,10 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, .getOrElse((0, 0L)) metricPlugin.foreach( _.onReceiveMetric(EventHubsUtils.getTaskContextSlim, - nAndP, - batchCount, - batchSizeInBytes, - elapsedTimeMs)) + nAndP, + batchCount, + batchSizeInBytes, + elapsedTimeMs)) assert(validateSize == batchCount) } else { assert(validate.size == batchCount) @@ -305,13 +307,21 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, logDebug( s"(Task: ${EventHubsUtils.getTaskContextSlim}) sends PartitionPerformanceMetric: " + s"$PartitionPerformanceMetric to the driver.") - try { - CachedEventHubsReceiver.partitionPerformanceReceiverRef.send(partitionPerformance) - } catch { - case e: Exception => - logError( + CachedEventHubsReceiver.partitionPerformanceReceiverRef match { + case Some(receiverRef) => + try { + receiverRef.send(partitionPerformance) + } catch { + case e: Exception => + logError( + s"(Task: ${EventHubsUtils.getTaskContextSlim}) failed to send the RPC message containing " + + s"PartitionPerformanceMetric: $PartitionPerformanceMetric to the driver.") + } + case None => + throw new NullPointerException( s"(Task: ${EventHubsUtils.getTaskContextSlim}) failed to send the RPC message containing " + - s"PartitionPerformanceMetric: $PartitionPerformanceMetric to the driver.") + s"PartitionPerformanceMetric: $PartitionPerformanceMetric to the driver because the " + + s"RPC endpoint is not open.") } } } @@ -330,10 +340,18 @@ private[spark] object CachedEventHubsReceiver extends CachedReceiver with Loggin private[this] val receivers = new MutableMap[String, CachedEventHubsReceiver]() // RPC endpoint for partition performacne communciation in the executor - val partitionPerformanceReceiverRef = - RpcUtils.makeDriverRef(PartitionPerformanceReceiver.ENDPOINT_NAME, - SparkEnv.get.conf, - SparkEnv.get.rpcEnv) + val partitionPerformanceReceiverRef : Option[RpcEndpointRef] = + try { + Some(RpcUtils.makeDriverRef(PartitionPerformanceReceiver.ENDPOINT_NAME, + SparkEnv.get.conf, + SparkEnv.get.rpcEnv)) + } catch { + case (e: SparkException) => { + logWarning(s"RPC endpoint for partition performacne communciation has not been opened on the driver. " + + s"If you are using the SlowPartitionAdjustment feature, this will cause an error. " + + s"Note that SlowPartitionAdjustment feature is only being supported for streaming applications.") + None + }} private def key(ehConf: EventHubsConf, nAndP: NameAndPartition): String = { (ehConf.connectionString + ehConf.consumerGroup + nAndP.partitionId).toLowerCase From 90d0e9be0075e84cc10ba509bdd7a3eddfc6dcb8 Mon Sep 17 00:00:00 2001 From: Navid Yaghmazadeh Date: Thu, 1 Oct 2020 13:12:51 -0700 Subject: [PATCH 3/4] open rpc endpoint on driver in all cases --- .../client/CachedEventHubsReceiver.scala | 38 +++++-------------- .../spark/sql/eventhubs/EventHubsSource.scala | 8 +--- .../eventhubs/EventHubsSourceProvider.scala | 8 ++++ 3 files changed, 19 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala index 9c1b26370..f26588eb6 100644 --- a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala +++ b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala @@ -33,8 +33,6 @@ import org.apache.spark.eventhubs.{ PartitionPerformanceReceiver } import org.apache.spark.internal.Logging -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.SparkException import org.apache.spark.util.RpcUtils import scala.collection.JavaConverters._ @@ -307,21 +305,13 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, logDebug( s"(Task: ${EventHubsUtils.getTaskContextSlim}) sends PartitionPerformanceMetric: " + s"$PartitionPerformanceMetric to the driver.") - CachedEventHubsReceiver.partitionPerformanceReceiverRef match { - case Some(receiverRef) => - try { - receiverRef.send(partitionPerformance) - } catch { - case e: Exception => - logError( - s"(Task: ${EventHubsUtils.getTaskContextSlim}) failed to send the RPC message containing " + - s"PartitionPerformanceMetric: $PartitionPerformanceMetric to the driver.") - } - case None => - throw new NullPointerException( + try { + CachedEventHubsReceiver.partitionPerformanceReceiverRef.send(partitionPerformance) + } catch { + case e: Exception => + logError( s"(Task: ${EventHubsUtils.getTaskContextSlim}) failed to send the RPC message containing " + - s"PartitionPerformanceMetric: $PartitionPerformanceMetric to the driver because the " + - s"RPC endpoint is not open.") + s"PartitionPerformanceMetric: $PartitionPerformanceMetric to the driver.") } } } @@ -340,18 +330,10 @@ private[spark] object CachedEventHubsReceiver extends CachedReceiver with Loggin private[this] val receivers = new MutableMap[String, CachedEventHubsReceiver]() // RPC endpoint for partition performacne communciation in the executor - val partitionPerformanceReceiverRef : Option[RpcEndpointRef] = - try { - Some(RpcUtils.makeDriverRef(PartitionPerformanceReceiver.ENDPOINT_NAME, - SparkEnv.get.conf, - SparkEnv.get.rpcEnv)) - } catch { - case (e: SparkException) => { - logWarning(s"RPC endpoint for partition performacne communciation has not been opened on the driver. " + - s"If you are using the SlowPartitionAdjustment feature, this will cause an error. " + - s"Note that SlowPartitionAdjustment feature is only being supported for streaming applications.") - None - }} + val partitionPerformanceReceiverRef = + RpcUtils.makeDriverRef(PartitionPerformanceReceiver.ENDPOINT_NAME, + SparkEnv.get.conf, + SparkEnv.get.rpcEnv) private def key(ehConf: EventHubsConf, nAndP: NameAndPartition): String = { (ehConf.connectionString + ehConf.consumerGroup + nAndP.partitionId).toLowerCase diff --git a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala index 07df27d63..03f010cea 100644 --- a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala @@ -77,6 +77,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, import EventHubsConf._ import EventHubsSource._ + import EventHubsSourceProvider._ private lazy val ehClient = EventHubsSourceProvider.clientFactory(parameters)(ehConf) private lazy val partitionCount: Int = ehClient.partitionCount @@ -438,14 +439,7 @@ private[eventhubs] object EventHubsSource { """.stripMargin private[eventhubs] val VERSION = 1 - - // RPC endpoint for partition performacne communciation in the driver private var localBatchId = -1 - val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker - val partitionPerformanceReceiver: PartitionPerformanceReceiver = - new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker) - val partitionPerformanceReceiverRef: RpcEndpointRef = SparkEnv.get.rpcEnv - .setupEndpoint(PartitionPerformanceReceiver.ENDPOINT_NAME, partitionPerformanceReceiver) def getSortedExecutorList(sc: SparkContext): Array[String] = { val bm = sc.env.blockManager diff --git a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala index 230b2ee0b..d224195e7 100644 --- a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala +++ b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala @@ -45,6 +45,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{ AnalysisException, DataFrame, SQLContext, SaveMode } import org.apache.spark.unsafe.types.UTF8String import org.json4s.jackson.Serialization +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.SparkEnv import collection.JavaConverters._ @@ -140,6 +142,12 @@ private[sql] class EventHubsSourceProvider } private[sql] object EventHubsSourceProvider extends Serializable { + // RPC endpoint for partition performacne communciation in the driver + val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker + val partitionPerformanceReceiver: PartitionPerformanceReceiver = + new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker) + val partitionPerformanceReceiverRef: RpcEndpointRef = SparkEnv.get.rpcEnv + .setupEndpoint(PartitionPerformanceReceiver.ENDPOINT_NAME, partitionPerformanceReceiver) def eventHubsSchema: StructType = { StructType( From 7e6b6af6fdbc0803a6c41953136b03fb53bf8806 Mon Sep 17 00:00:00 2001 From: Navid Yaghmazadeh Date: Thu, 1 Oct 2020 14:18:27 -0700 Subject: [PATCH 4/4] fixed typos in comments, update formating --- .../client/CachedEventHubsReceiver.scala | 44 +++++++------- .../spark/sql/eventhubs/EventHubsSource.scala | 27 +++++---- .../eventhubs/EventHubsSourceProvider.scala | 58 ++++++++++--------- 3 files changed, 67 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala index f26588eb6..cf784387a 100644 --- a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala +++ b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala @@ -66,7 +66,7 @@ private[spark] trait CachedReceiver { private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, nAndP: NameAndPartition, startSeqNo: SequenceNumber) - extends Logging { + extends Logging { type AwaitTimeoutException = java.util.concurrent.TimeoutException @@ -89,11 +89,11 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}-$taskId") val consumer = retryJava( EventHubsUtils.createReceiverInner(client, - ehConf.useExclusiveReceiver, - consumerGroup, - nAndP.partitionId.toString, - EventPosition.fromSequenceNumber(seqNo).convert, - receiverOptions), + ehConf.useExclusiveReceiver, + consumerGroup, + nAndP.partitionId.toString, + EventPosition.fromSequenceNumber(seqNo).convert, + receiverOptions), "CachedReceiver creation." ) Await.result(consumer, ehConf.internalOperationTimeout) @@ -162,7 +162,7 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, Await.result(lastReceivedOffset(), ehConf.internalOperationTimeout) if ((lastReceivedSeqNo > -1 && lastReceivedSeqNo + 1 != requestSeqNo) || - !receiver.getIsOpen) { + !receiver.getIsOpen) { logInfo(s"(TID $taskId) checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup.getOrElse( DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo, lastReceivedSeqNo: $lastReceivedSeqNo, isOpen: ${receiver.getIsOpen}") @@ -193,11 +193,11 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, // The event still isn't present. It must be (2). val info = Await.result( retryJava(client.getPartitionRuntimeInformation(nAndP.partitionId.toString), - "partitionRuntime"), + "partitionRuntime"), ehConf.internalOperationTimeout) if (requestSeqNo < info.getBeginSequenceNumber && - movedSeqNo == info.getBeginSequenceNumber) { + movedSeqNo == info.getBeginSequenceNumber) { Future { movedEvent } @@ -238,8 +238,8 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, val theRest = for { i <- 1 until batchCount } yield awaitReceiveMessage(receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout), - s"receive; $nAndP; seqNo: ${requestSeqNo + i}"), - requestSeqNo) + s"receive; $nAndP; seqNo: ${requestSeqNo + i}"), + requestSeqNo) // Combine and sort the data. val combined = first ++ theRest.flatten val sorted = combined.toSeq @@ -254,10 +254,10 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, if (ehConf.slowPartitionAdjustment) { sendPartitionPerformanceToDriver( PartitionPerformanceMetric(nAndP, - EventHubsUtils.getTaskContextSlim, - requestSeqNo, - batchCount, - elapsedTimeMs)) + EventHubsUtils.getTaskContextSlim, + requestSeqNo, + batchCount, + elapsedTimeMs)) } if (metricPlugin.isDefined) { @@ -270,10 +270,10 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf, .getOrElse((0, 0L)) metricPlugin.foreach( _.onReceiveMetric(EventHubsUtils.getTaskContextSlim, - nAndP, - batchCount, - batchSizeInBytes, - elapsedTimeMs)) + nAndP, + batchCount, + batchSizeInBytes, + elapsedTimeMs)) assert(validateSize == batchCount) } else { assert(validate.size == batchCount) @@ -329,11 +329,11 @@ private[spark] object CachedEventHubsReceiver extends CachedReceiver with Loggin private[this] val receivers = new MutableMap[String, CachedEventHubsReceiver]() - // RPC endpoint for partition performacne communciation in the executor + // RPC endpoint for partition performance communication in the executor val partitionPerformanceReceiverRef = RpcUtils.makeDriverRef(PartitionPerformanceReceiver.ENDPOINT_NAME, - SparkEnv.get.conf, - SparkEnv.get.rpcEnv) + SparkEnv.get.conf, + SparkEnv.get.rpcEnv) private def key(ehConf: EventHubsConf, nAndP: NameAndPartition): String = { (ehConf.connectionString + ehConf.consumerGroup + nAndP.partitionId).toLowerCase diff --git a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala index 03f010cea..2245cace7 100644 --- a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala @@ -30,9 +30,7 @@ import org.apache.spark.eventhubs.rdd.{ EventHubsRDD, OffsetRange } import org.apache.spark.eventhubs.utils.ThrottlingStatusPlugin import org.apache.spark.eventhubs.{ EventHubsConf, NameAndPartition, _ } import org.apache.spark.internal.Logging -import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.ExecutorCacheTaskLocation -import org.apache.spark.SparkEnv import org.apache.spark.sql.execution.streaming.{ HDFSMetadataLog, Offset, @@ -88,9 +86,11 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, private val sc = sqlContext.sparkContext private val maxOffsetsPerTrigger: Option[Long] = - Option(parameters.get(MaxEventsPerTriggerKey).map(_.toLong).getOrElse( - parameters.get(MaxEventsPerTriggerKeyAlias).map(_.toLong).getOrElse( - partitionCount * 1000))) + Option(parameters + .get(MaxEventsPerTriggerKey) + .map(_.toLong) + .getOrElse( + parameters.get(MaxEventsPerTriggerKeyAlias).map(_.toLong).getOrElse(partitionCount * 1000))) // set slow partition adjustment flag and static values in the tracker private val slowPartitionAdjustment: Boolean = @@ -148,22 +148,25 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext, text.substring(1, text.length).toInt } catch { case _: NumberFormatException => - throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + - s"version from $text.") + throw new IllegalStateException( + s"Log file was malformed: failed to read correct log " + + s"version from $text.") } if (version > 0) { if (version > maxSupportedVersion) { - throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + - s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + - s"by a newer version of Spark and cannot be read by this version. Please upgrade.") + throw new IllegalStateException( + s"UnsupportedLogVersion: maximum supported log version " + + s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + + s"by a newer version of Spark and cannot be read by this version. Please upgrade.") } else { return version } } } // reaching here means we failed to read the correct log version - throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + - s"version from $text.") + throw new IllegalStateException( + s"Log file was malformed: failed to read correct log " + + s"version from $text.") } } diff --git a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala index d224195e7..5097b8f85 100644 --- a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala +++ b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala @@ -142,7 +142,7 @@ private[sql] class EventHubsSourceProvider } private[sql] object EventHubsSourceProvider extends Serializable { - // RPC endpoint for partition performacne communciation in the driver + // RPC endpoint for partition performance communication in the driver val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker val partitionPerformanceReceiver: PartitionPerformanceReceiver = new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker) @@ -177,32 +177,32 @@ private[sql] object EventHubsSourceProvider extends Serializable { new java.sql.Timestamp(ed.getSystemProperties.getEnqueuedTime.toEpochMilli)), UTF8String.fromString(ed.getSystemProperties.getPublisher), UTF8String.fromString(ed.getSystemProperties.getPartitionKey), - ArrayBasedMapData( - ed.getProperties.asScala - .mapValues { - case b: Binary => - val buf = b.asByteBuffer() - val arr = new Array[Byte](buf.remaining) - buf.get(arr) - arr.asInstanceOf[AnyRef] - case d128: Decimal128 => d128.asBytes.asInstanceOf[AnyRef] - case d32: Decimal32 => d32.getBits.asInstanceOf[AnyRef] - case d64: Decimal64 => d64.getBits.asInstanceOf[AnyRef] - case s: Symbol => s.toString.asInstanceOf[AnyRef] - case ub: UnsignedByte => ub.toString.asInstanceOf[AnyRef] - case ui: UnsignedInteger => ui.toString.asInstanceOf[AnyRef] - case ul: UnsignedLong => ul.toString.asInstanceOf[AnyRef] - case us: UnsignedShort => us.toString.asInstanceOf[AnyRef] - case c: Character => c.toString.asInstanceOf[AnyRef] - case d: DescribedType => d.getDescribed - case default => default + ArrayBasedMapData(ed.getProperties.asScala + .mapValues { + case b: Binary => + val buf = b.asByteBuffer() + val arr = new Array[Byte](buf.remaining) + buf.get(arr) + arr.asInstanceOf[AnyRef] + case d128: Decimal128 => d128.asBytes.asInstanceOf[AnyRef] + case d32: Decimal32 => d32.getBits.asInstanceOf[AnyRef] + case d64: Decimal64 => d64.getBits.asInstanceOf[AnyRef] + case s: Symbol => s.toString.asInstanceOf[AnyRef] + case ub: UnsignedByte => ub.toString.asInstanceOf[AnyRef] + case ui: UnsignedInteger => ui.toString.asInstanceOf[AnyRef] + case ul: UnsignedLong => ul.toString.asInstanceOf[AnyRef] + case us: UnsignedShort => us.toString.asInstanceOf[AnyRef] + case c: Character => c.toString.asInstanceOf[AnyRef] + case d: DescribedType => d.getDescribed + case default => default + } + .map { p => + p._2 match { + case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s) + case default => + UTF8String.fromString(p._1) -> UTF8String.fromString(Serialization.write(p._2)) } - .map { p => - p._2 match { - case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s) - case default => UTF8String.fromString(p._1) -> UTF8String.fromString(Serialization.write(p._2)) - } - }), + }), ArrayBasedMapData( // Don't duplicate offset, enqueued time, and seqNo (ed.getSystemProperties.asScala -- Seq(OffsetAnnotation, @@ -218,8 +218,10 @@ private[sql] object EventHubsSourceProvider extends Serializable { } .map { p => p._2 match { - case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s) - case default => UTF8String.fromString(p._1) -> UTF8String.fromString(Serialization.write(p._2)) + case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s) + case default => + UTF8String.fromString(p._1) -> UTF8String.fromString( + Serialization.write(p._2)) } }) )