Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
revans2 committed Feb 5, 2021
1 parent fc4a82e commit da4368f
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ case class GpuShuffledHashJoinExec(
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, JOIN_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ case class GpuShuffledHashJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan) extends BinaryExecNode with GpuHashJoin {
import GpuMetric._

override val outputRowsLevel: MetricsLevel = ESSENTIAL_LEVEL
override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, JOIN_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ case class GpuShuffledHashJoinExec(
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, JOIN_TIME),
STREAM_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_STREAM_TIME),
JOIN_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_TIME),
JOIN_OUTPUT_ROWS -> createMetric(MODERATE_LEVEL, DESCRIPTION_JOIN_OUTPUT_ROWS),
FILTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_FILTER_TIME))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ class CSVPartitionReader(
override def next(): Boolean = {
batch.foreach(_.close())
batch = if (isExhausted) {
metrics("peakDevMemory").set(maxDeviceMemory)
metrics(PEAK_DEVICE_MEMORY).set(maxDeviceMemory)
None
} else {
readBatch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ class GpuOrcPartitionReader(
if (ctx.blockIterator.hasNext) {
batch = readBatch()
} else {
metrics("peakDevMemory") += maxDeviceMemory
metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory
}
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ class MultiFileParquetPartitionReader(
if (!isDone) {
if (!blockIterator.hasNext) {
isDone = true
metrics("peakDevMemory") += maxDeviceMemory
metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory
} else {
batch = readBatch()
}
Expand Down Expand Up @@ -1483,7 +1483,7 @@ class MultiFileCloudParquetPartitionReader(
}
} else {
isDone = true
metrics("peakDevMemory") += maxDeviceMemory
metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory
}
}
}
Expand Down Expand Up @@ -1637,7 +1637,7 @@ class ParquetPartitionReader(
if (!isDone) {
if (!blockIterator.hasNext) {
isDone = true
metrics("peakDevMemory") += maxDeviceMemory
metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory
} else {
batch = readBatch()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,10 @@ case class HostColumnarToGpu(child: SparkPlan, goal: CoalesceGoal)
val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val collectTime = gpuLongMetric("collectTime")
val concatTime = gpuLongMetric("concatTime")
val collectTime = gpuLongMetric(COLLECT_TIME)
val concatTime = gpuLongMetric(CONCAT_TIME)
val totalTime = gpuLongMetric(TOTAL_TIME)
val peakDevMemory = gpuLongMetric("peakDevMemory")
val peakDevMemory = gpuLongMetric(PEAK_DEVICE_MEMORY)

// cache in a local to avoid serializing the plan
val outputSchema = schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ case class GpuFileSourceScanExec(
throw new IllegalStateException(s"Row-based execution should not occur for $this")

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric("numOutputRows")
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val scanTime = gpuLongMetric("scanTime")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches =>
new Iterator[ColumnarBatch] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ abstract class GpuBroadcastExchangeExecBase(
override lazy val additionalMetrics = Map(
"dataSize" -> createSizeMetric(ESSENTIAL_LEVEL, "data size"),
COLLECT_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_COLLECT_TIME),
"buildTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL, "time to build"),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
"broadcastTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL, "time to broadcast"))

override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
Expand Down Expand Up @@ -275,7 +275,7 @@ abstract class GpuBroadcastExchangeExecBase(
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val totalTime = gpuLongMetric(TOTAL_TIME)
val collectTime = gpuLongMetric(COLLECT_TIME)
val buildTime = gpuLongMetric("buildTime")
val buildTime = gpuLongMetric(BUILD_TIME)
val broadcastTime = gpuLongMetric("broadcastTime")

val task = new Callable[Broadcast[Any]]() {
Expand Down

0 comments on commit da4368f

Please sign in to comment.