Skip to content

Commit

Permalink
Spill metrics everywhere (NVIDIA#1788)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Feb 22, 2021
1 parent 49d867b commit db568d5
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -337,125 +337,6 @@ abstract class AbstractGpuCoalesceIterator(
}
}

// Remove this iterator when contiguous_split supports nested types
class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch],
schema: StructType,
goal: CoalesceGoal,
maxDecompressBatchMemory: Long,
numInputRows: GpuMetric,
numInputBatches: GpuMetric,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric,
collectTime: GpuMetric,
concatTime: GpuMetric,
totalTime: GpuMetric,
peakDevMemory: GpuMetric,
opName: String)
extends AbstractGpuCoalesceIterator(iter,
goal,
numInputRows,
numInputBatches,
numOutputRows,
numOutputBatches,
collectTime,
concatTime,
totalTime,
opName) with Arm {

private val sparkTypes: Array[DataType] = GpuColumnVector.extractTypes(schema)
private var batches: ArrayBuffer[ColumnarBatch] = ArrayBuffer.empty
private var maxDeviceMemory: Long = 0

// batch indices that are compressed batches
private[this] var compressedBatchIndices: ArrayBuffer[Int] = ArrayBuffer.empty

private[this] var codec: TableCompressionCodec = _

override def initNewBatch(batch: ColumnarBatch): Unit = {
batches.clear()
compressedBatchIndices.clear()
}

override def addBatchToConcat(batch: ColumnarBatch): Unit = {
if (isBatchCompressed(batch)) {
compressedBatchIndices += batches.size
}
batches += batch
}

private def isBatchCompressed(batch: ColumnarBatch): Boolean = {
if (batch.numCols == 0) {
false
} else {
batch.column(0) match {
case _: GpuCompressedColumnVector => true
case _ => false
}
}
}

override def concatAllAndPutOnGPU(): ColumnarBatch = {
decompressBatches()
val tmp = batches.toArray
// Clear the buffer so we don't close it again (buildNonEmptyBatch closed it for us).
batches = ArrayBuffer.empty
val ret = ConcatAndConsumeAll.buildNonEmptyBatch(tmp, schema)
// sum of current batches and concatenating batches. Approximately sizeof(ret * 2).
maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2
ret
}

private def decompressBatches(): Unit = {
if (compressedBatchIndices.nonEmpty) {
val compressedVecs = compressedBatchIndices.map { batchIndex =>
batches(batchIndex).column(0).asInstanceOf[GpuCompressedColumnVector]
}
if (codec == null) {
val descr = compressedVecs.head.getTableMeta.bufferMeta.codecBufferDescrs(0)
codec = TableCompressionCodec.getCodec(descr.codec)
}
withResource(codec.createBatchDecompressor(maxDecompressBatchMemory,
Cuda.DEFAULT_STREAM)) { decompressor =>
compressedVecs.foreach { cv =>
val bufferMeta = cv.getTableMeta.bufferMeta
// don't currently support switching codecs when partitioning
val buffer = cv.getTableBuffer.slice(0, cv.getTableBuffer.getLength)
decompressor.addBufferToDecompress(buffer, bufferMeta)
}
withResource(decompressor.finishAsync()) { outputBuffers =>
outputBuffers.zipWithIndex.foreach { case (outputBuffer, outputIndex) =>
val cv = compressedVecs(outputIndex)
val batchIndex = compressedBatchIndices(outputIndex)
val compressedBatch = batches(batchIndex)
batches(batchIndex) =
MetaUtils.getBatchFromMeta(outputBuffer, cv.getTableMeta, sparkTypes)
compressedBatch.close()
}
}
}
}
}

override def cleanupConcatIsDone(): Unit = {
peakDevMemory.set(maxDeviceMemory)
batches.foreach(_.close())
}

private var onDeck: Option[ColumnarBatch] = None

override protected def hasOnDeck: Boolean = onDeck.isDefined
override protected def saveOnDeck(batch: ColumnarBatch): Unit = onDeck = Some(batch)
override protected def clearOnDeck(): Unit = {
onDeck.foreach(_.close())
onDeck = None
}
override protected def popOnDeck(): ColumnarBatch = {
val ret = onDeck.get
onDeck = None
ret
}
}

class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
schema: StructType,
goal: CoalesceGoal,
Expand All @@ -468,6 +349,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
concatTime: GpuMetric,
totalTime: GpuMetric,
peakDevMemory: GpuMetric,
spillCallback: RapidsBuffer.SpillCallback,
opName: String)
extends AbstractGpuCoalesceIterator(iter,
goal,
Expand All @@ -490,7 +372,8 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
}

override def addBatchToConcat(batch: ColumnarBatch): Unit =
batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY))
batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY,
spillCallback))

private[this] var codec: TableCompressionCodec = _

Expand Down Expand Up @@ -553,7 +436,8 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],

override protected def saveOnDeck(batch: ColumnarBatch): Unit = {
assert(onDeck.isEmpty)
onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY,
spillCallback))
}

override protected def clearOnDeck(): Unit = {
Expand Down Expand Up @@ -583,7 +467,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
COLLECT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_COLLECT_TIME),
CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME),
PEAK_DEVICE_MEMORY -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_PEAK_DEVICE_MEMORY)
)
) ++ spillMetrics

override protected def doExecute(): RDD[InternalRow] = {
throw new IllegalStateException("ROW BASED PROCESSING IS NOT SUPPORTED")
Expand All @@ -608,27 +492,18 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
// cache in local vars to avoid serializing the plan
val outputSchema = schema
val decompressMemoryTarget = maxDecompressBatchMemory
val cannotSpill = child.schema.fields.exists { f =>
f.dataType match {
case MapType(_, _, _) | ArrayType(_, _) | StructType(_) => true
case _ => false
}
}

val batches = child.executeColumnar()
batches.mapPartitions { iter =>
if (outputSchema.isEmpty) {
val numRows = iter.map(_.numRows).sum
val combinedCb = new ColumnarBatch(Array.empty, numRows)
Iterator.single(combinedCb)
} else if (cannotSpill) {
new GpuCoalesceIteratorNoSpill(iter, outputSchema, goal, decompressMemoryTarget,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime,
concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches")
} else {
val callback = GpuMetric.makeSpillCallback(allMetrics)
new GpuCoalesceIterator(iter, outputSchema, goal, decompressMemoryTarget,
numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime,
concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches")
concatTime, totalTime, peakDevMemory, callback, "GpuCoalesceBatches")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.nvidia.spark.rapids

import com.nvidia.spark.rapids.StorageTier.StorageTier

import org.apache.spark.sql.rapids.TempSpillBufferId
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -134,8 +132,7 @@ object SpillableColumnarBatch extends Arm {
*/
def apply(batch: ColumnarBatch,
priority: Long,
spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback)
: SpillableColumnarBatch = {
spillCallback: RapidsBuffer.SpillCallback): SpillableColumnarBatch = {
val numRows = batch.numRows()
if (batch.numCols() <= 0) {
// We consumed it
Expand Down
11 changes: 7 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ object GpuTopN extends Arm {
inputBatches: GpuMetric,
inputRows: GpuMetric,
outputBatches: GpuMetric,
outputRows: GpuMetric): Iterator[ColumnarBatch] =
outputRows: GpuMetric,
spillCallback: RapidsBuffer.SpillCallback): Iterator[ColumnarBatch] =
new Iterator[ColumnarBatch]() {
override def hasNext: Boolean = iter.hasNext

Expand Down Expand Up @@ -230,7 +231,8 @@ object GpuTopN extends Arm {
}
}
pending =
Some(SpillableColumnarBatch(runningResult, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
Some(SpillableColumnarBatch(runningResult, SpillPriorities.ACTIVE_ON_DECK_PRIORITY,
spillCallback))
}
}
val ret = pending.get.getColumnarBatch()
Expand Down Expand Up @@ -267,7 +269,7 @@ case class GpuTopN(
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES),
SORT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_SORT_TIME),
CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME)
)
) ++ spillMetrics

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val sorter = new GpuSorter(sortOrder, child.output)
Expand All @@ -279,9 +281,10 @@ case class GpuTopN(
val outputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val sortTime = gpuLongMetric(SORT_TIME)
val concatTime = gpuLongMetric(CONCAT_TIME)
val callback = GpuMetric.makeSpillCallback(allMetrics)
child.executeColumnar().mapPartitions { iter =>
val topN = GpuTopN(limit, sorter, iter, totalTime, sortTime, concatTime,
inputBatches, inputRows, outputBatches, outputRows)
inputBatches, inputRows, outputBatches, outputRows, callback)
if (projectList != child.output) {
topN.map { batch =>
GpuProjectExec.projectAndClose(batch, boundProjectExprs, totalTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{AggregationOnColumn, ArrowIPCOptions, ArrowIPCWriterOptions, ColumnVector, HostBufferConsumer, HostBufferProvider, HostMemoryBuffer, NvtxColor, NvtxRange, StreamedTableReader, Table}
import com.nvidia.spark.rapids.{Arm, ConcatAndConsumeAll, GpuAggregateWindowFunction, GpuBindReferences, GpuColumnVector, GpuColumnVectorFromBuffer, GpuExec, GpuMetric, GpuProjectExec, GpuSemaphore, GpuUnevaluable, SpillableColumnarBatch, SpillPriorities}
import com.nvidia.spark.rapids.{Arm, ConcatAndConsumeAll, GpuAggregateWindowFunction, GpuBindReferences, GpuColumnVector, GpuColumnVectorFromBuffer, GpuExec, GpuMetric, GpuProjectExec, GpuSemaphore, GpuUnevaluable, RapidsBuffer, SpillableColumnarBatch, SpillPriorities}
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
Expand Down Expand Up @@ -59,7 +59,8 @@ class RebatchingRoundoffIterator(
schema: StructType,
targetRoundoff: Int,
inputRows: GpuMetric,
inputBatches: GpuMetric)
inputBatches: GpuMetric,
spillCallback: RapidsBuffer.SpillCallback)
extends Iterator[ColumnarBatch] with Arm {
var pending: Option[SpillableColumnarBatch] = None

Expand Down Expand Up @@ -94,7 +95,8 @@ class RebatchingRoundoffIterator(
inputBatches += 1
inputRows += got.numRows()
rowsSoFar += got.numRows()
batches.append(SpillableColumnarBatch(got, SpillPriorities.ACTIVE_BATCHING_PRIORITY))
batches.append(SpillableColumnarBatch(got, SpillPriorities.ACTIVE_BATCHING_PRIORITY,
spillCallback))
}
val toConcat = batches.safeMap(_.getColumnarBatch()).toArray
ConcatAndConsumeAll.buildNonEmptyBatch(toConcat, schema)
Expand Down Expand Up @@ -129,7 +131,8 @@ class RebatchingRoundoffIterator(
localPending.setSpillPriority(SpillPriorities.ACTIVE_BATCHING_PRIORITY)
batches.append(localPending)
pending = None
batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY))
batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY,
spillCallback))
fillAndConcat(batches)
} finally {
batches.safeClose()
Expand All @@ -145,7 +148,8 @@ class RebatchingRoundoffIterator(
} else {
val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty
try {
batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY))
batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY,
spillCallback))
fillAndConcat(batches)
} finally {
batches.safeClose()
Expand All @@ -170,7 +174,8 @@ class RebatchingRoundoffIterator(
pending =
Some(SpillableColumnarBatch(GpuColumnVectorFromBuffer.from(split.last,
GpuColumnVector.extractTypes(schema)),
SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
SpillPriorities.ACTIVE_ON_DECK_PRIORITY,
spillCallback))
GpuColumnVectorFromBuffer.from(split.head, GpuColumnVector.extractTypes(schema))
}
}
Expand All @@ -185,8 +190,9 @@ class BatchQueue extends AutoCloseable with Arm {
mutable.Queue[SpillableColumnarBatch]()
private var isSet = false

def add(batch: ColumnarBatch): Unit = synchronized {
queue.enqueue(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
def add(batch: ColumnarBatch, spillCallback: RapidsBuffer.SpillCallback): Unit = synchronized {
queue.enqueue(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY,
spillCallback))
if (!isSet) {
// Wake up anyone waiting for the first batch.
isSet = true
Expand Down Expand Up @@ -543,13 +549,14 @@ case class GpuArrowEvalPythonExec(
NUM_OUTPUT_BATCHES -> createMetric(outputBatchesLevel, DESCRIPTION_NUM_OUTPUT_BATCHES),
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES)
)
) ++ spillMetrics

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val numInputRows = gpuLongMetric(NUM_INPUT_ROWS)
val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES)
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)

lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf)

Expand Down Expand Up @@ -598,12 +605,12 @@ case class GpuArrowEvalPythonExec(

val boundReferences = GpuBindReferences.bindReferences(allInputs, childOutput)
val batchedIterator = new RebatchingRoundoffIterator(iter, inputSchema, targetBatchSize,
numInputRows, numInputBatches)
numInputRows, numInputBatches, spillCallback)
val pyInputIterator = batchedIterator.map { batch =>
// We have to do the project before we add the batch because the batch might be closed
// when it is added
val ret = GpuProjectExec.project(batch, boundReferences)
queue.add(batch)
queue.add(batch, spillCallback)
ret
}

Expand Down
Loading

0 comments on commit db568d5

Please sign in to comment.