Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Retry for GpuLocalLimitExec and GpuGlobalLimitExec #9193

Merged
merged 2 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 94 additions & 75 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,97 @@ import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan}
import org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuBaseLimitExec {

private def sliceBatchAndCloseWithRetry(
spillBatch: SpillableColumnarBatch,
start: Int,
length: Int): ColumnarBatch = {
val end = Math.min(start + length, spillBatch.numRows())
RmmRapidsRetryIterator.withRetryNoSplit(spillBatch) { _ =>
withResource(spillBatch.getColumnarBatch()) { batch =>
val subCols = (0 until batch.numCols()).safeMap { i =>
val col = batch.column(i).asInstanceOf[GpuColumnVector]
val subVector = col.getBase.subVector(start, end)
assert(subVector != null)
GpuColumnVector.from(subVector, col.dataType)
}
new ColumnarBatch(subCols.toArray, end - start)
}
}
}

def apply(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we call this something different. To me GpuBaseLimitExec.apply should return a GpuBaseLimitExec, we are not 100% consistent about that. Could we rename the object to GpuBaseLimitExecIterator? It would make it a little clearer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. I changed the new object "GpuBaseLimitExec" to a class called GpuBaseLimitIterator .

input: Iterator[ColumnarBatch],
limit: Int,
offset: Int,
opTime: GpuMetric,
numOutputBatches: GpuMetric,
numOutputRows: GpuMetric): Iterator[ColumnarBatch] = {
new Iterator[ColumnarBatch] {
private var remainingLimit = limit - offset
private var remainingOffset = offset

override def hasNext: Boolean = (limit == -1 || remainingLimit > 0) && input.hasNext

override def next(): ColumnarBatch = {
if (!this.hasNext) {
throw new NoSuchElementException("Next on empty iterator")
}

var batch = input.next()
val numCols = batch.numCols()

// In each partition, we need to skip `offset` rows
while (batch != null && remainingOffset >= batch.numRows()) {
remainingOffset -= batch.numRows()
batch.safeClose()
batch = if (this.hasNext) {
input.next()
} else {
null
}
}

// If the last batch is null, then we have offset >= numRows in this partition.
// In such case, we should return an empty batch
if (batch == null || batch.numRows() == 0) {
return new ColumnarBatch(new ArrayBuffer[GpuColumnVector](numCols).toArray, 0)
}

// Here 0 <= remainingOffset < batch.numRow(), we need to get batch[remainingOffset:]
withResource(new NvtxWithMetrics("limit and offset", NvtxColor.ORANGE, opTime)) { _ =>
var result: ColumnarBatch = null
// limit < 0 (limit == -1) denotes there is no limitation, so when
// (remainingOffset == 0 && (remainingLimit >= batch.numRows() || limit < 0)) is true,
// we can take this batch completely
if (remainingOffset == 0 && (remainingLimit >= batch.numRows() || limit < 0)) {
result = batch
} else {
// otherwise, we need to slice batch with (remainingOffset, remainingLimit).
// And remainingOffset > 0 will be used only once, for the latter batches in this
// partition, set remainingOffset = 0
val length = if (remainingLimit >= batch.numRows() || limit < 0) {
batch.numRows()
} else {
remainingLimit
}
val scb = closeOnExcept(batch) { _ =>
SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
}
result = sliceBatchAndCloseWithRetry(scb, remainingOffset, length)
remainingOffset = 0
}
remainingLimit -= result.numRows()
numOutputBatches += 1
numOutputRows += result.numRows()
result
}
}
}
}
}

/**
* Helper trait which defines methods that are shared by both
* [[GpuLocalLimitExec]] and [[GpuGlobalLimitExec]].
Expand Down Expand Up @@ -65,87 +156,15 @@ trait GpuBaseLimitExec extends LimitExec with GpuExec with ShimUnaryExecNode {
sliceRDD(child.executeColumnar(), limit, 0)
}

def sliceRDD(rdd: RDD[ColumnarBatch], limit: Int, offset: Int): RDD[ColumnarBatch] = {
protected def sliceRDD(rdd: RDD[ColumnarBatch], limit: Int, offset: Int): RDD[ColumnarBatch] = {
val opTime = gpuLongMetric(OP_TIME)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
rdd.mapPartitions { iter =>
new Iterator[ColumnarBatch] {
private var remainingLimit = limit - offset
private var remainingOffset = offset

override def hasNext: Boolean = (limit == -1 || remainingLimit > 0) && iter.hasNext

override def next(): ColumnarBatch = {
if (!this.hasNext) {
throw new NoSuchElementException("Next on empty iterator")
}

var batch = iter.next()
val numCols = batch.numCols()

// In each partition, we need to skip `offset` rows
while (batch != null && remainingOffset >= batch.numRows()) {
remainingOffset -= batch.numRows()
batch.safeClose()
batch = if (this.hasNext) { iter.next() } else { null }
}

// If the last batch is null, then we have offset >= numRows in this partition.
// In such case, we should return an empty batch
if (batch == null || batch.numRows() == 0) {
return new ColumnarBatch(new ArrayBuffer[GpuColumnVector](numCols).toArray, 0)
}

// Here 0 <= remainingOffset < batch.numRow(), we need to get batch[remainingOffset:]
withResource(new NvtxWithMetrics("limit and offset", NvtxColor.ORANGE, opTime)) { _ =>
var result: ColumnarBatch = null
// limit < 0 (limit == -1) denotes there is no limitation, so when
// (remainingOffset == 0 && (remainingLimit >= batch.numRows() || limit < 0)) is true,
// we can take this batch completely
if (remainingOffset == 0 && (remainingLimit >= batch.numRows() || limit < 0)) {
result = batch
} else {
// otherwise, we need to slice batch with (remainingOffset, remainingLimit).
// And remainingOffset > 0 will be used only once, for the latter batches in this
// partition, set remainingOffset = 0
val length = if (remainingLimit >= batch.numRows() || limit < 0) {
batch.numRows()
} else {
remainingLimit
}
result = sliceBatchWithOffset(batch, remainingOffset, length)
remainingOffset = 0
}
numOutputBatches += 1
numOutputRows += result.numRows()
remainingLimit -= result.numRows()
result
}
}

def sliceBatchWithOffset(batch: ColumnarBatch, offset: Int, limit: Int): ColumnarBatch = {
val numCols = batch.numCols()
val end = Math.min(offset + limit, batch.numRows())
withResource(batch) { _ =>
// result buffer need to be closed when there is an exception
closeOnExcept(new ArrayBuffer[GpuColumnVector](numCols)) { result =>
if (numCols > 0) {
withResource(GpuColumnVector.from(batch)) { table =>
(0 until numCols).zip(output).foreach{ case (i, attr) =>
val subVector = table.getColumn(i).subVector(offset, end)
assert(subVector != null)
result.append(GpuColumnVector.from(subVector, attr.dataType))
}
}
}
new ColumnarBatch(result.toArray, end - offset)
}
}
}
}
GpuBaseLimitExec(iter, limit, offset, opTime, numOutputBatches, numOutputRows)
}
}

}

/**
Expand Down
40 changes: 35 additions & 5 deletions tests/src/test/scala/com/nvidia/spark/rapids/LimitRetrySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ class LimitRetrySuite extends RmmSparkRetrySuiteBase {
private val gpuSorter = new GpuSorter(Seq(SortOrder(ref, Ascending)), Array(attrs))
private val NUM_ROWS = 100

private def buildBatch1: ColumnarBatch = {
val ints = 0 until NUM_ROWS by 2
private def buildBatch(ints: Seq[Int]): ColumnarBatch = {
new ColumnarBatch(
Array(GpuColumnVector.from(ColumnVector.fromInts(ints: _*), IntegerType)), ints.length)
}

private def buildBatch1: ColumnarBatch = {
buildBatch(0 until NUM_ROWS by 2)
}

private def buildBatch2: ColumnarBatch = {
val ints = 1 until NUM_ROWS by 2
new ColumnarBatch(
Array(GpuColumnVector.from(ColumnVector.fromInts(ints: _*), IntegerType)), ints.length)
buildBatch(1 until NUM_ROWS by 2)
}

test("GPU topn with split and retry OOM") {
Expand Down Expand Up @@ -70,4 +71,33 @@ class LimitRetrySuite extends RmmSparkRetrySuiteBase {
assert(!topNIter.hasNext)
}
}

test("GPU limit with retry OOM") {
val totalRows = 24
Seq((20, 5), (50, 5)).foreach { case (limit, offset) =>
val limitIter = GpuBaseLimitExec(
// 3 batches as input, and each has 8 rows
(0 until totalRows).grouped(8).map(buildBatch(_)).toList.toIterator,
limit, offset, NoopMetric, NoopMetric, NoopMetric)
var leftRows = if (limit > totalRows) totalRows - offset else limit - offset
var curValue = offset
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
while(limitIter.hasNext) {
var pos = 0
withResource(limitIter.next()) { cb =>
withResource(cb.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { hCol =>
while (pos < hCol.getRowCount.toInt) {
assertResult(curValue)(hCol.getInt(pos))
pos += 1
curValue += 1
}
}
leftRows -= cb.numRows()
}
}
// all the rows are consumed
assertResult(0)(leftRows)
}
}

}