From da4368fd65ab5264f865a3c578914e48c6e092e0 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 5 Feb 2021 11:38:51 -0600 Subject: [PATCH] Addressed review comments --- .../rapids/shims/spark300/GpuShuffledHashJoinExec.scala | 4 ++-- .../rapids/shims/spark301db/GpuShuffledHashJoinExec.scala | 5 +++-- .../rapids/shims/spark311/GpuShuffledHashJoinExec.scala | 4 ++-- .../scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala | 2 +- .../src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 2 +- .../main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 6 +++--- .../scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala | 6 +++--- .../org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala | 2 +- .../sql/rapids/execution/GpuBroadcastExchangeExec.scala | 4 ++-- 9 files changed, 18 insertions(+), 17 deletions(-) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala index 3f23699b243..43540feef2e 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala @@ -98,8 +98,8 @@ case class GpuShuffledHashJoinExec( override lazy val additionalMetrics: Map[String, GpuMetric] = Map( BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE), BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME), - STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, STREAM_TIME), - JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, JOIN_TIME), + STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME), + JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala index 51029db70d0..21f76b2f29f 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala @@ -81,14 +81,15 @@ case class GpuShuffledHashJoinExec( condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { + import GpuMetric._ override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL override lazy val additionalMetrics: Map[String, GpuMetric] = Map( BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE), BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME), - STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, STREAM_TIME), - JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, JOIN_TIME), + STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME), + JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) diff --git a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala index 5162bd7482c..30c50804c86 100644 --- a/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala +++ b/shims/spark311/src/main/scala/com/nvidia/spark/rapids/shims/spark311/GpuShuffledHashJoinExec.scala @@ -99,8 +99,8 @@ case class GpuShuffledHashJoinExec( override lazy val additionalMetrics: Map[String, GpuMetric] = Map( BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE), BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME), - STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, STREAM_TIME), - JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, JOIN_TIME), + STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME), + JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME), JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS), FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index 3b3acaff511..e8be113c6d8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -489,7 +489,7 @@ class CSVPartitionReader( override def next(): Boolean = { batch.foreach(_.close()) batch = if (isExhausted) { - metrics("peakDevMemory").set(maxDeviceMemory) + metrics(PEAK_DEVICE_MEMORY).set(maxDeviceMemory) None } else { readBatch() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 20107c11235..ef24236b860 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -414,7 +414,7 @@ class GpuOrcPartitionReader( if (ctx.blockIterator.hasNext) { batch = readBatch() } else { - metrics("peakDevMemory") += maxDeviceMemory + metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory } // This is odd, but some operators return data even when there is no input so we need to // be sure that we grab the GPU diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index f1cefeecda9..bd65d862d16 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -921,7 +921,7 @@ class MultiFileParquetPartitionReader( if (!isDone) { if (!blockIterator.hasNext) { isDone = true - metrics("peakDevMemory") += maxDeviceMemory + metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory } else { batch = readBatch() } @@ -1483,7 +1483,7 @@ class MultiFileCloudParquetPartitionReader( } } else { isDone = true - metrics("peakDevMemory") += maxDeviceMemory + metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory } } } @@ -1637,7 +1637,7 @@ class ParquetPartitionReader( if (!isDone) { if (!blockIterator.hasNext) { isDone = true - metrics("peakDevMemory") += maxDeviceMemory + metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory } else { batch = readBatch() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index 0b7f6a3af8f..eaff871af99 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -388,10 +388,10 @@ case class HostColumnarToGpu(child: SparkPlan, goal: CoalesceGoal) val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES) val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) - val collectTime = gpuLongMetric("collectTime") - val concatTime = gpuLongMetric("concatTime") + val collectTime = gpuLongMetric(COLLECT_TIME) + val concatTime = gpuLongMetric(CONCAT_TIME) val totalTime = gpuLongMetric(TOTAL_TIME) - val peakDevMemory = gpuLongMetric("peakDevMemory") + val peakDevMemory = gpuLongMetric(PEAK_DEVICE_MEMORY) // cache in a local to avoid serializing the plan val outputSchema = schema diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index 5f37855bf2d..f822993ef96 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -374,7 +374,7 @@ case class GpuFileSourceScanExec( throw new IllegalStateException(s"Row-based execution should not occur for $this") override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { - val numOutputRows = gpuLongMetric("numOutputRows") + val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val scanTime = gpuLongMetric("scanTime") inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches => new Iterator[ColumnarBatch] { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index 48337d1e8c0..b72b84296c9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -244,7 +244,7 @@ abstract class GpuBroadcastExchangeExecBase( override lazy val additionalMetrics = Map( "dataSize" -> createSizeMetric(ESSENTIAL_LEVEL, "data size"), COLLECT_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_COLLECT_TIME), - "buildTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL, "time to build"), + BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME), "broadcastTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL, "time to broadcast")) override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) @@ -275,7 +275,7 @@ abstract class GpuBroadcastExchangeExecBase( val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val totalTime = gpuLongMetric(TOTAL_TIME) val collectTime = gpuLongMetric(COLLECT_TIME) - val buildTime = gpuLongMetric("buildTime") + val buildTime = gpuLongMetric(BUILD_TIME) val broadcastTime = gpuLongMetric("broadcastTime") val task = new Callable[Broadcast[Any]]() {