Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some metrics improvements and timeline reporting #4451

Merged
merged 10 commits into from
Jan 5, 2022
29 changes: 23 additions & 6 deletions docs/spark-profiling-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,22 +321,39 @@ be a part of multiple stages and only one of the stages will be selected. `Excha
left out of the sections associated with a stage because they cover at least 2 stages and possibly
more. In other cases we may not be able to determine what stage something was a part of. In those
cases we mark it as `UNKNOWN STAGE`. This is because we rely on metrics to link a node to a stage.
If a stage hs no metrics, like if the query crashed early, we cannot establish that link.
If a stage has no metrics, like if the query crashed early, we cannot establish that link.

- Generate timeline for application (--generate-timeline option):

The output of this is an [svg](https://en.wikipedia.org/wiki/Scalable_Vector_Graphics) file
named `timeline.svg`. Most web browsers can display this file. It is a
timeline view similar Apache Spark's
timeline view similar to Apache Spark's
[event timeline](https://spark.apache.org/docs/latest/web-ui.html).

This displays several data sections.

1. **Tasks** This shows all tasks in the application divided by executor. Please note that this
1. **Tasks** This shows all tasks in the application divided by executor. Please note that this
tries to pack the tasks in the graph. It does not represent actual scheduling on CPU cores.
The tasks are labeled with the time it took for them to run, but there is no breakdown about
different aspects of each task, like there is in Spark's timeline.
2. **STAGES** This shows the stages times reported by Spark. It starts with when the stage was
The tasks are labeled with the time it took for them to run. There is a breakdown of some metrics
per task in the lower half of the task block with different colors used to designate different
metrics.
1. Yellow is the deserialization time for the task as reported by Spark. This works for both CPU
and GPU tasks.
2. White is the read time for a task. This is a combination of the "buffer time" GPU SQL metric
and the shuffle read time as reported by Spark. The shuffle time works for both CPU and GPU
tasks, but "buffer time" only is reported for GPU accelerated file reads.
3. Red is the semaphore wait time. This is the amount of time a task spent waiting to get access
to the GPU. This only shows up on GPU tasks when DEBUG metrics are enabled. It does not apply to
CPU tasks, as they don't go through the Semaphore.
4. Green is the "op time" SQL metric along with a few other metrics that also indicate the amount
of time the GPU was being used to process data. This is GPU specific.
5. Blue is the write time for a task. This is the "write time" SQL metric used when writing out
results as files using GPU acceleration, or it is the shuffle write time as reported by Spark.
The shuffle metrics work for both CPU and GPU tasks, but the "write time" metrics is GPU specific.
6. Anything else is time that is not accounted for by these metrics. Typically, this is time
spent on the CPU, but could also include semaphore wait time as DEBUG metrics are not on by
default.
2. **STAGES** This shows the stages times reported by Spark. It starts with when the stage was
scheduled and ends when Spark considered the stage done.
3. **STAGE RANGES** This shows the time from the start of the first task to the end of the last
task. Often a stage is scheduled, but there are not enough resources in the cluster to run it.
Expand Down
52 changes: 27 additions & 25 deletions docs/tuning-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,32 +306,34 @@ performance.

Custom Spark SQL Metrics are available which can help identify performance bottlenecks in a query.

| Key | Name | Description |
|------------------|--------------------------|---------------------------------------------------|
| bufferTime | buffer time | Time spent buffering input from file data sources. This buffering time happens on the CPU, typically with no GPU semaphore held.|
| buildDataSize | build side size | Size in bytes of the build-side of a join. |
| buildTime | build time | Time to load the build-side of a join. |
| collectTime | collect time | For a broadcast the amount of time it took to collect the broadcast data back to the driver before broadcasting it back out.|
| computeAggTime | aggregation time | Time computing an aggregation. |
| concatTime | concat batch time | Time to concatenate batches. |
| filterTime | filter time | Time spent applying filters within other operators, such as joins. |
| gpuDecodeTime | GPU decode time | Time spent on GPU decoding encrypted or compressed data. |
| joinOutputRows | join output rows | The number of rows produced by a join before any filter expression is applied. |
| joinTime | join time | Time doing a join operation. |
| numInputBatches | input columnar batches | Number of columnar batches that the operator received from its child operator(s). |
| numInputRows | input rows | Number of rows that the operator received from its child operator(s). |
| numOutputBatches | output columnar batches | Number of columnar batches that the operator outputs. |
| numOutputRows | output rows | Number of rows that the operator outputs. |
| numPartitions | partitions | Number of output partitions from a file scan or shuffle exchange. |
| Key | Name | Description |
|------------------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| bufferTime | buffer time | Time spent buffering input from file data sources. This buffering time happens on the CPU, typically with no GPU semaphore held. |
| readFsTime | time to read fs data | Time spent actually reading the data and writing it to on-heap memory. This is a part of `bufferTime` |
| writeBufferTime | time to write data to buffer | Time spent moving the on-heap buffered data read from the file system to off-heap memory so the GPU can access it. This is a part of `bufferTime` |
| buildDataSize | build side size | Size in bytes of the build-side of a join. |
| buildTime | build time | Time to load the build-side of a join. |
| collectTime | collect time | For a broadcast the amount of time it took to collect the broadcast data back to the driver before broadcasting it back out. |
| computeAggTime | aggregation time | Time computing an aggregation. |
| concatTime | concat batch time | Time to concatenate batches. |
| filterTime | filter time | Time spent applying filters within other operators, such as joins. |
| gpuDecodeTime | GPU decode time | Time spent on GPU decoding encrypted or compressed data. |
| joinOutputRows | join output rows | The number of rows produced by a join before any filter expression is applied. |
| joinTime | join time | Time doing a join operation. |
| numInputBatches | input columnar batches | Number of columnar batches that the operator received from its child operator(s). |
| numInputRows | input rows | Number of rows that the operator received from its child operator(s). |
| numOutputBatches | output columnar batches | Number of columnar batches that the operator outputs. |
| numOutputRows | output rows | Number of rows that the operator outputs. |
| numPartitions | partitions | Number of output partitions from a file scan or shuffle exchange. |
| opTime | op time | Time that an operator takes, exclusive of the time for executing or fetching results from child operators, and typically outside of the time it takes to acquire the GPU semaphore. |
| partitionSize | partition data size | Total size in bytes of output partitions. |
| peakDevMemory | peak device memory | Peak GPU memory used during execution of an operator. |
| semaphoreWaitTime| GPU semaphore wait time | Time spent waiting for the GPU semaphore. |
| sortTime | sort time | Time spent in sort operations in GpuSortExec and GpuTopN. |
| spillData | bytes spilled from GPU | Total bytes spilled from GPU. |
| spillDisk | bytes spilled to disk | Total bytes spilled from GPU to disk. |
| spillHost | bytes spilled to host | Total bytes spilled from GPU to host memory. |
| streamTime | stream time | Time spent reading data from a child. This generally happens for the stream side of a hash join or for columnar to row and row to columnar operations. |
| partitionSize | partition data size | Total size in bytes of output partitions. |
| peakDevMemory | peak device memory | Peak GPU memory used during execution of an operator. |
| semaphoreWaitTime| GPU semaphore wait time | Time spent waiting for the GPU semaphore. |
| sortTime | sort time | Time spent in sort operations in GpuSortExec and GpuTopN. |
| spillData | bytes spilled from GPU | Total bytes spilled from GPU. |
| spillDisk | bytes spilled to disk | Total bytes spilled from GPU to disk. |
| spillHost | bytes spilled to host | Total bytes spilled from GPU to host memory. |
| streamTime | stream time | Time spent reading data from a child. This generally happens for the stream side of a hash join or for columnar to row and row to columnar operations. |

Not all metrics are enabled by default. The configuration setting `spark.rapids.sql.metrics.level` can be set
to `DEBUG`, `MODERATE`, or `ESSENTIAL`, with `MODERATE` being the default value. More information about this
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
abstract class AbstractGpuJoinIterator(
gatherNvtxName: String,
targetSize: Long,
opTime: GpuMetric,
val opTime: GpuMetric,
jlowe marked this conversation as resolved.
Show resolved Hide resolved
joinTime: GpuMetric) extends Iterator[ColumnarBatch] with Arm with AutoCloseable {
private[this] var nextCb: Option[ColumnarBatch] = None
private[this] var gathererStore: Option[JoinGatherer] = None
Expand All @@ -48,15 +48,10 @@ abstract class AbstractGpuJoinIterator(
/** Returns whether there are any more batches on the stream side of the join */
protected def hasNextStreamBatch: Boolean

private def timedHasNextStreamBatch: (Boolean, Long) = {
val start = System.nanoTime()
val ret = hasNextStreamBatch
(ret, System.nanoTime() - start)
}

/**
* Called to setup the next join gatherer instance when the previous instance is done or
* there is no previous instance.
* there is no previous instance. Because this is likely to call next or has next on the
* stream side all implementations must track their own opTime metrics.
* @param startNanoTime system nanoseconds timestamp at the top of the iterator loop, useful for
* calculating the time spent producing the next stream batch
* @return some gatherer to use next or None if there is no next gatherer or the loop should try
Expand All @@ -68,32 +63,32 @@ abstract class AbstractGpuJoinIterator(
if (closed) {
return false
}
val startTime = System.nanoTime()
var totalHasNextTime = 0L
var mayContinue = true
while (nextCb.isEmpty && mayContinue) {
if (gathererStore.exists(!_.isDone)) {
nextCb = nextCbFromGatherer()
opTime.ns {
nextCb = nextCbFromGatherer()
}
} else {
// The only part we want to skip in the timing is hasNextStreamBatch
val (hasNextValue, hasNextTime) = timedHasNextStreamBatch
totalHasNextTime += hasNextTime
if (hasNextValue) {
if (hasNextStreamBatch) {
// Need to refill the gatherer
gathererStore.foreach(_.close())
gathererStore = None
opTime.ns {
gathererStore.foreach(_.close())
gathererStore = None
}
gathererStore = setupNextGatherer()
nextCb = nextCbFromGatherer()
opTime.ns {
nextCb = nextCbFromGatherer()
}
} else {
mayContinue = false
}
}
}
if (nextCb.isEmpty) {
// Nothing is left to return so close ASAP.
close()
opTime.ns(close())
}
opTime += (System.nanoTime() - startTime) - totalHasNextTime
nextCb.isDefined
}

Expand Down Expand Up @@ -187,40 +182,48 @@ abstract class SplittableJoinIterator(
isInitialJoin = false
if (pendingSplits.nonEmpty || stream.hasNext) {
val cb = if (pendingSplits.nonEmpty) {
withResource(pendingSplits.dequeue()) {
_.getColumnarBatch()
opTime.ns {
withResource(pendingSplits.dequeue()) {
_.getColumnarBatch()
}
}
} else {
val batch = withResource(stream.next()) { lazyBatch =>
lazyBatch.releaseBatch()
opTime.ns {
lazyBatch.releaseBatch()
}
}
batch
}
withResource(cb) { cb =>
val numJoinRows = computeNumJoinRows(cb)
opTime.ns {
withResource(cb) { cb =>
val numJoinRows = computeNumJoinRows(cb)

// We want the gather maps size to be around the target size. There are two gather maps
// that are made up of ints, so compute how many rows on the stream side will produce the
// desired gather maps size.
val maxJoinRows = Math.max(1, targetSize / (2 * Integer.BYTES))
if (numJoinRows > maxJoinRows && cb.numRows() > 1) {
// Need to split the batch to reduce the gather maps size. This takes a simplistic
// approach of assuming the data is uniformly distributed in the stream table.
val numSplits = Math.min(cb.numRows(),
Math.ceil(numJoinRows.toDouble / maxJoinRows).toInt)
splitAndSave(cb, numSplits)
// We want the gather maps size to be around the target size. There are two gather maps
// that are made up of ints, so compute how many rows on the stream side will produce the
// desired gather maps size.
val maxJoinRows = Math.max(1, targetSize / (2 * Integer.BYTES))
if (numJoinRows > maxJoinRows && cb.numRows() > 1) {
// Need to split the batch to reduce the gather maps size. This takes a simplistic
// approach of assuming the data is uniformly distributed in the stream table.
val numSplits = Math.min(cb.numRows(),
Math.ceil(numJoinRows.toDouble / maxJoinRows).toInt)
splitAndSave(cb, numSplits)

// Return no gatherer so the outer loop will try again
return None
}
// Return no gatherer so the outer loop will try again
return None
}

createGatherer(cb, Some(numJoinRows))
createGatherer(cb, Some(numJoinRows))
}
}
} else {
assert(wasInitialJoin)
import scala.collection.JavaConverters._
withResource(GpuColumnVector.emptyBatch(streamAttributes.asJava)) { cb =>
createGatherer(cb, None)
opTime.ns {
assert(wasInitialJoin)
import scala.collection.JavaConverters._
withResource(GpuColumnVector.emptyBatch(streamAttributes.asJava)) { cb =>
createGatherer(cb, None)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -102,7 +102,7 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
}
jlowe marked this conversation as resolved.
Show resolved Hide resolved

// Update statistics
val writeTime = System.nanoTime - writeStartTimestamp
val writeTime = System.nanoTime - writeStartTimestamp - gpuTime
statsTrackers.foreach {
case gpuTracker: GpuWriteTaskStatsTracker =>
gpuTracker.addWriteTime(writeTime)
Expand Down
Loading