Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

With a single row GpuExplode tries to split the generator array #10131

Merged
merged 9 commits into from
Jan 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, DType, NvtxColor, NvtxRange, OrderByArg, Scalar, Table}
import ai.rapids.cudf.{ColumnVector, DType, NvtxColor, NvtxRange, OrderByArg, ReductionAggregation, Scalar, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuOverrides.extractLit
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry}
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetry
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.jni.GpuSplitAndRetryOOM
import com.nvidia.spark.rapids.shims.{ShimExpression, ShimUnaryExecNode}

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -826,13 +827,99 @@ class GpuGenerateIterator(
// Need to ensure these are closed in case of failure.
inputs.foreach(scb => use(scb))

// apply generation on each (sub)batch
private val retryIter = withRetry(inputs.toIterator, splitSpillableInHalfByRows) { attempt =>
withResource(attempt.getColumnarBatch()) { batch =>
generator.generate(batch, generatorOffset, outer)
def generateSplitSpillableInHalfByRows: SpillableColumnarBatch => Seq[SpillableColumnarBatch] = {
(spillable: SpillableColumnarBatch) => {
withResource(spillable) { _ =>
val toSplitRows = spillable.numRows()
if (toSplitRows == 1) {
// single row, we need to actually add row duplicates, then split
val tbl = withResource(spillable.getColumnarBatch()) { src =>
GpuColumnVector.from(src)
}
withResource(tbl) { _ =>
if (tbl.getColumn(generatorOffset).getNullCount == 1) {
// 1 row, and the element to explode is null, we cannot
// handle this case
throw new GpuSplitAndRetryOOM(
s"GPU OutOfMemory: cannot split a batch of 1 rows when " +
s"the array to explode is null!")
}
// borrow the column to explode, in order to split it as a table
// explodedTbl is 1 row per element of the list
val explodedTbl =
withResource(new Table(tbl.getColumn(generatorOffset))) { generatorTmpTable =>
// we use regular `explode` because we know we have 1 list column
// and it isn't null => this operation will produce N rows, 1 row per element
// of the column at `generatorOffset`
generatorTmpTable.explode(0)
}
jlowe marked this conversation as resolved.
Show resolved Hide resolved

// Because we are likely to be here solving a cuDF column length limit
// we are just going to split in half blindly until we can fit it.
val splits = withResource(explodedTbl) { _ =>
if (explodedTbl.getRowCount < 2) {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// we should never hit this
throw new GpuSplitAndRetryOOM(
s"GPU OutOfMemory: cannot split a batch of 1 rows when " +
s"the array has a single element!")
}
val splitIx = Seq(explodedTbl.getRowCount.toInt / 2)
explodedTbl.contiguousSplit(splitIx: _*)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}
val result = new Array[SpillableColumnarBatch](splits.size)
closeOnExcept(result) { _ =>
closeOnExcept(splits) { _ =>
(0 until splits.length).foreach { split =>
val explodedTblToConvertBack = splits(split)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
val cols = new Array[ColumnVector](tbl.getNumberOfColumns)
withResource(explodedTblToConvertBack) { _ =>
val colToReconstitute = explodedTblToConvertBack.getTable.getColumn(0)
closeOnExcept(cols) { _ =>
(0 until tbl.getNumberOfColumns).foreach { col =>
cols(col) = if (col == generatorOffset) {
withResource(
colToReconstitute.reduce(
ReductionAggregation.collectList(), DType.LIST)) { sc =>
ColumnVector.fromScalar(sc, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a roundabout way to build a list column. We already have the child column, we just need a simple offset vector of two rows, 0 and child column size, and then build a list column from that.

}
} else {
withResource(tbl.getColumn(col).getScalarElement(0)) { sc =>
ColumnVector.fromScalar(sc, 1)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we're getting the first (and only) row of a column and then building a new column from that. Why isn't this just incref of the original table column without copying anything?

}
}
}
}
splits(split) = null
val finalTbl = withResource(cols) { _ =>
new Table(cols: _*)
}
withResource(finalTbl) { _ =>
result(split) = SpillableColumnarBatch(
GpuColumnVector.from(finalTbl, spillable.dataTypes),
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
}
}
}
result
}
} else {
// more than 1 rows, we just do the regular split-by-rows
RmmRapidsRetryIterator.splitSpillableInHalfByRows(spillable)
}
}
}
}

// apply generation on each (sub)batch
private val retryIter =
withRetry(inputs.toIterator, generateSplitSpillableInHalfByRows) { attempt =>
withResource(attempt.getColumnarBatch()) { batch =>
generator.generate(batch, generatorOffset, outer)
}
}

override def hasNext: Boolean = retryIter.hasNext
override def next(): ColumnarBatch = {
withResource(new NvtxWithMetrics("GpuGenerateIterator", NvtxColor.PURPLE, opTime)) { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, DType, Table}
import ai.rapids.cudf.HostColumnVector.{BasicType, ListType}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.jni.GpuSplitAndRetryOOM

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuGenerateSuite
extends SparkQueryCompareTestSuite {
extends RmmSparkRetrySuiteBase
with SparkQueryCompareTestSuite {
val rapidsConf = new RapidsConf(Map.empty[String, String])

def makeListColumn(
Expand Down Expand Up @@ -174,6 +177,89 @@ class GpuGenerateSuite
}
}

test("on splitAndRetry try to split by rows else split the exploding column") {
class MyExplode(child: Expression) extends GpuExplode(child) {
private var forceNumOOMs: Int = 0
override def generate(
inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {
if (forceNumOOMs > 0) {
forceNumOOMs -= 1
throw new GpuSplitAndRetryOOM(s"mock split and retry. ${forceNumOOMs} pending.")
}
super.generate(inputBatch, generatorOffset, outer)
}

def doForceSplitAndRetry(numOOMs: Int) = {
forceNumOOMs = numOOMs
}
}
// numRows = 1: exercises the split code trying to save a 1-row scenario where
// we are running OOM.
// numRows = 2: is the regular split code
(1 until 3).foreach { numRows =>
val (expected, _) = makeBatch(numRows)
val e = GpuExplode(AttributeReference("foo", ArrayType(IntegerType))())
val itNoFailures = new GpuGenerateIterator(
Seq(SpillableColumnarBatch(expected, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)),
generator = e,
generatorOffset = 1,
outer = false,
NoopMetric,
NoopMetric,
NoopMetric)
val expectedExploded = itNoFailures.next()
withResource(expectedExploded) { _ =>
assertResult(false)(itNoFailures.hasNext)
// with numRows > 1, we follow regular split semantics, else
// we split the exploding column.
// - numRows = 1, numOOMs = 1:
// 2 batches each with 2 rows (4 items in the original array).
// - numRows = 1, numOOMs = 2:
// 3 batches (two 1-row batches, and the last batch has 2 rows).
// - numRows = 2, numOOMs = 1:
// 2 batches, each with 4 rows
// - numRows = 2, numOOMs = 2:
// 3 batches (2-row, 2-row and a 4-row)
(1 until 3).foreach { numOOMs =>
val (actual, _) = makeBatch(numRows)
val actualSpillable =
SpillableColumnarBatch(actual, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
val failingGenerator = new MyExplode(AttributeReference("foo", ArrayType(IntegerType))())
val it = new GpuGenerateIterator(
Seq(actualSpillable),
generator = failingGenerator,
generatorOffset = 1,
outer = false,
NoopMetric,
NoopMetric,
NoopMetric)

failingGenerator.doForceSplitAndRetry(numOOMs)

// this should return 2 batches, each with half the output
val results = new ArrayBuffer[ColumnarBatch]()
closeOnExcept(results) { _ =>
while (it.hasNext) {
results.append(it.next())
}
}

withResource(results) { _ =>
withResource(results.map(GpuColumnVector.from)) { resultTbls =>
withResource(Table.concatenate(resultTbls: _*)) { res =>
withResource(GpuColumnVector.from(expectedExploded)) { expectedTbl =>
TestUtils.compareTables(expectedTbl, res)
}
}
}
}
}
}
}
}

test("2-row batches split in half") {
val (batch, inputSize) = makeBatch(numRows = 2)
withResource(batch) { _ =>
Expand Down
Loading