Skip to content

Commit

Permalink
Add retry and SplitAndRetry support to AcceleratedColumnarToRowIterat…
Browse files Browse the repository at this point in the history
…or (#9088)

This PR adds retry and SplitAndRetry support to AcceleratedColumnarToRowIterator.

It will retry converting columns to rows by cudf when getting any oom exception.
---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored Aug 24, 2023
1 parent 8ee3e2b commit daedfe5
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import scala.annotation.tailrec
import scala.collection.mutable.Queue

import ai.rapids.cudf.{Cuda, HostColumnVector, NvtxColor, Table}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.splitSpillableInHalfByRows
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode

Expand Down Expand Up @@ -73,7 +74,12 @@ class AcceleratedColumnarToRowIterator(
private var at: Int = 0
private var total: Int = 0

onTaskCompletion(closeAllPendingBatches())
// Don't install the callback if in a unit test
Option(TaskContext.get()).foreach { tc =>
onTaskCompletion(tc) {
closeAllPendingBatches()
}
}

private def setCurrentBatch(wip: HostColumnVector): Unit = {
currentCv = Some(wip)
Expand All @@ -100,34 +106,42 @@ class AcceleratedColumnarToRowIterator(
new Table(rearrangedColumns : _*)
}

private[this] def setupBatch(cb: ColumnarBatch): Boolean = {
private[this] def setupBatchAndClose(scb: SpillableColumnarBatch): Boolean = {
numInputBatches += 1
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += cb.numRows()
if (cb.numRows() > 0) {
numOutputRows += scb.numRows()
if (scb.numRows() > 0) {
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
withResource(rearrangeRows(cb)) { table =>
// The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means at
// most 184 double/long values. Spark by default limits codegen to 100 fields
// "spark.sql.codegen.maxFields". So, we are going to be cautious and start with that
// until we have tested it more. We branching over the size of the output to know which
// kernel to call. If schema.length < 100 we call the fixed-width optimized version,
// otherwise the generic one
withResource(if (schema.length < 100) {
table.convertToRowsFixedWidthOptimized()
} else {
table.convertToRows()
}) { rowsCvList =>
val it = RmmRapidsRetryIterator.withRetry(scb, splitSpillableInHalfByRows) { attempt =>
withResource(attempt.getColumnarBatch()) { attemptCb =>
withResource(rearrangeRows(attemptCb)) { table =>
// The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which
// means at most 184 double/long values. Spark by default limits codegen to 100
// fields "spark.sql.codegen.maxFields". So, we are going to be cautious and
// start with that until we have tested it more. We branching over the size of
// the output to know which kernel to call. If schema.length < 100 we call the
// fixed-width optimized version, otherwise the generic one
if (schema.length < 100) {
table.convertToRowsFixedWidthOptimized()
} else {
table.convertToRows()
}
}
}
}
assert(it.hasNext, "Got an unexpected empty iterator after setting up batch with retry")
it.foreach { rowsCvList =>
withResource(rowsCvList) { _ =>
rowsCvList.foreach { rowsCv =>
pendingCvs += rowsCv.copyToHost()
}
setCurrentBatch(pendingCvs.dequeue())
return true
}
}
setCurrentBatch(pendingCvs.dequeue())
return true
}
}
false
Expand All @@ -148,16 +162,20 @@ class AcceleratedColumnarToRowIterator(
// keep fetching input batches until we have a non-empty batch ready
val nextBatch = fetchNextBatch()
if (nextBatch.isDefined) {
if (!withResource(nextBatch.get)(setupBatch)) {
if (!setupBatchAndClose(nextBatch.get)) {
populateBatch()
}
}
}

private def fetchNextBatch(): Option[ColumnarBatch] = {
private def fetchNextBatch(): Option[SpillableColumnarBatch] = {
withResource(new NvtxWithMetrics("ColumnarToRow: fetch", NvtxColor.BLUE, streamTime)) { _ =>
if (batches.hasNext) {
Some(batches.next())
// Make it spillable once getting a columnar batch.
val spillBatch = closeOnExcept(batches.next()) { cb =>
SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
}
Some(spillBatch)
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import ai.rapids.cudf.ColumnVector
import com.nvidia.spark.rapids.jni.RmmSpark

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.vectorized.ColumnarBatch

class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase {

private val ref = GpuBoundReference(0, IntegerType, nullable = false)(ExprId(0), "a")
private val attrs = Seq(AttributeReference(ref.name, ref.dataType, ref.nullable)())
private val NUM_ROWS = 50

private def buildBatch: ColumnarBatch = {
val ints = 0 until NUM_ROWS
new ColumnarBatch(
Array(GpuColumnVector.from(ColumnVector.fromInts(ints: _*), IntegerType)), NUM_ROWS)
}

test("Accelerated columnar to row with retry OOM") {
val aCol2RowIter = new AcceleratedColumnarToRowIterator(
attrs,
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
}
assertResult(NUM_ROWS)(numRows)
}

test("Accelerated columnar_to_row with split and retry OOM") {
val aCol2RowIter = new AcceleratedColumnarToRowIterator(
attrs,
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
}
assertResult(NUM_ROWS)(numRows)
}

}

0 comments on commit daedfe5

Please sign in to comment.