-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
With a single row GpuExplode tries to split the generator array #10131
Merged
abellina
merged 9 commits into
NVIDIA:branch-24.02
from
abellina:split_single_row_in_generate
Jan 11, 2024
Merged
Changes from 6 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
8bced26
Split a single row in GpuGenerate
abellina a40fe50
With a single row GpuExplode tries to split the generator array
abellina 08b2eeb
Remove debug prints
abellina be109d4
Ensure the split works with pos_explode + refactoring
abellina b0fc0dd
Upmerge
abellina 1131006
missing import
abellina 31e45d4
Fix scala 2.13
abellina 9e7abd1
Bring back inputRows check, and ensure toFixUp doesnt get leaked
abellina 123b5d8
Apply code review suggestion
abellina File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright (c) 2020-2023, NVIDIA CORPORATION. | ||
* Copyright (c) 2020-2024, NVIDIA CORPORATION. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -24,6 +24,7 @@ import com.nvidia.spark.rapids.GpuOverrides.extractLit | |
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray | ||
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry} | ||
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion | ||
import com.nvidia.spark.rapids.jni.GpuSplitAndRetryOOM | ||
import com.nvidia.spark.rapids.shims.{ShimExpression, ShimUnaryExecNode} | ||
|
||
import org.apache.spark.rdd.RDD | ||
|
@@ -163,16 +164,16 @@ trait GpuGenerator extends GpuUnevaluable { | |
* that generator is an integrated Table transformer instead of column transformer in terms of | ||
* cuDF. | ||
* | ||
* @param inputBatch projected input data, which ensures appending columns are ahead of | ||
* generators' inputs. So, generators can distinguish them with an offset. | ||
* @param batches iterator of spillable batches | ||
* @param generatorOffset column offset of generator's input columns in `inputBatch` | ||
* @param outer when true, each input row will be output at least once, even if the | ||
* output of the given `generator` is empty. | ||
* @return result ColumnarBatch | ||
*/ | ||
def generate(inputBatch: ColumnarBatch, | ||
def generate( | ||
batches: Iterator[SpillableColumnarBatch], | ||
generatorOffset: Int, | ||
outer: Boolean): ColumnarBatch | ||
outer: Boolean): Iterator[ColumnarBatch] | ||
|
||
/** | ||
* Compute split indices for generator's input batches. | ||
|
@@ -244,16 +245,19 @@ case class GpuReplicateRows(children: Seq[Expression]) extends GpuGenerator with | |
case (e, index) => StructField(s"col$index", e.dataType) | ||
}) | ||
|
||
override def generate(inputBatch: ColumnarBatch, | ||
override def generate( | ||
inputBatches: Iterator[SpillableColumnarBatch], | ||
generatorOffset: Int, | ||
outer: Boolean): ColumnarBatch = { | ||
|
||
val schema = GpuColumnVector.extractTypes(inputBatch) | ||
|
||
withResource(GpuColumnVector.from(inputBatch)) { table => | ||
val replicateVector = table.getColumn(generatorOffset) | ||
withResource(table.repeat(replicateVector)) { replicatedTable => | ||
GpuColumnVector.from(replicatedTable, schema) | ||
outer: Boolean): Iterator[ColumnarBatch] = { | ||
withRetry(inputBatches, splitSpillableInHalfByRows) { attempt => | ||
withResource(attempt.getColumnarBatch()) { inputBatch => | ||
val schema = GpuColumnVector.extractTypes(inputBatch) | ||
withResource(GpuColumnVector.from(inputBatch)) { table => | ||
val replicateVector = table.getColumn(generatorOffset) | ||
withResource(table.repeat(replicateVector)) { replicatedTable => | ||
GpuColumnVector.from(replicatedTable, schema) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -367,7 +371,7 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene | |
val inputRows = inputBatch.numRows() | ||
|
||
// if the number of input rows is 1 or less, cannot split | ||
if (inputRows <= 1) return Array() | ||
//if (inputRows <= 1) return Array() | ||
|
||
val vectors = GpuColumnVector.extractBases(inputBatch) | ||
|
||
|
@@ -528,13 +532,35 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene | |
/** | ||
* A function that will do the explode or position explode | ||
*/ | ||
private[this] def explodeFun(inputTable: Table, genOffset: Int, outer: Boolean): Table = { | ||
private[this] def explodeFun( | ||
inputTable: Table, | ||
genOffset: Int, | ||
outer: Boolean, | ||
fixUpOffset: Long): Table = { | ||
if (position) { | ||
if (outer) { | ||
val toFixUp = if (outer) { | ||
inputTable.explodeOuterPosition(genOffset) | ||
} else { | ||
inputTable.explodePosition(genOffset) | ||
} | ||
// the position column is at genOffset and the exploded column at genOffset + 1 | ||
val posColToFix = toFixUp.getColumn(genOffset) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit toFixUp needs to either be under withResource, or at least a closeOnException. |
||
val fixedUpPosCol = withResource(Scalar.fromLong(fixUpOffset)) { offset => | ||
posColToFix.add(offset, posColToFix.getType) | ||
} | ||
withResource(fixedUpPosCol) { _ => | ||
withResource(toFixUp) { _ => | ||
val newCols = new Array[ColumnVector](toFixUp.getNumberOfColumns) | ||
(0 until genOffset).foreach { b => | ||
newCols(b) = toFixUp.getColumn(b) | ||
} | ||
newCols(genOffset) = fixedUpPosCol | ||
(genOffset + 1 until toFixUp.getNumberOfColumns).foreach { a => | ||
newCols(a) = toFixUp.getColumn(a) | ||
} | ||
jlowe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
new Table(newCols: _*) | ||
} | ||
} | ||
} else { | ||
if (outer) { | ||
inputTable.explodeOuter(genOffset) | ||
|
@@ -544,27 +570,135 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene | |
} | ||
} | ||
|
||
override def generate(inputBatch: ColumnarBatch, | ||
def generateSplitSpillableInHalfByRows( | ||
genOffset: Int): BatchToGenerate => Seq[BatchToGenerate] = { | ||
(batchToGenerate: BatchToGenerate) => { | ||
withResource(batchToGenerate) { _ => | ||
val spillable = batchToGenerate.spillable | ||
val toSplitRows = spillable.numRows() | ||
if (toSplitRows == 1) { | ||
// single row, we need to actually add row duplicates, then split | ||
val tbl = withResource(spillable.getColumnarBatch()) { src => | ||
GpuColumnVector.from(src) | ||
} | ||
withResource(tbl) { _ => | ||
if (tbl.getColumn(genOffset).getNullCount == 1) { | ||
// 1 row, and the element to explode is null, we cannot | ||
// handle this case, and should never reach it either. | ||
throw new GpuSplitAndRetryOOM( | ||
"GPU OutOfMemory: cannot split a batch of 1 rows when " + | ||
"the list to explode is null!") | ||
} | ||
val explodingColList = tbl.getColumn(genOffset) | ||
val explodedTbl = { | ||
withResource(explodingColList.getChildColumnView(0)) { | ||
explodingColElements => | ||
if (explodingColElements.getRowCount < 2) { | ||
throw new GpuSplitAndRetryOOM( | ||
"GPU OutOfMemory: cannot split a batch of 1 rows when " + | ||
"the list has a single element!") | ||
} | ||
withResource(explodingColElements.copyToColumnVector()) { col => | ||
new Table(col) | ||
} | ||
} | ||
} | ||
|
||
// Because we are likely to be here solving a cuDF column length limit | ||
// we are just going to split in half blindly until we can fit it. | ||
val splits = withResource(explodedTbl) { _ => | ||
explodedTbl.contiguousSplit(explodedTbl.getRowCount.toInt / 2) | ||
} | ||
|
||
val result = new Array[BatchToGenerate](splits.size) | ||
closeOnExcept(result) { _ => | ||
closeOnExcept(splits) { _ => | ||
// start our fixup offset to the one previously computed | ||
// in the last retry. We will need to start again at this offset, | ||
// and sub-lists we generate need to account for it if `pos_explode`. | ||
var offset = batchToGenerate.fixUpOffset | ||
(0 until splits.length).foreach { split => | ||
val explodedTblToConvertBack = splits(split) | ||
val cols = new Array[ColumnVector](tbl.getNumberOfColumns) | ||
withResource(explodedTblToConvertBack) { _ => | ||
val colToReconstitute = explodedTblToConvertBack.getTable.getColumn(0) | ||
closeOnExcept(cols) { _ => | ||
(0 until tbl.getNumberOfColumns).foreach { col => | ||
cols(col) = if (col == genOffset) { | ||
// turn this column back onto a list, by pairing it with an offset | ||
// column that is the length of the list. | ||
require(colToReconstitute.getRowCount < Int.MaxValue, | ||
s"The number of elements in the list to explode " + | ||
s"${colToReconstitute.getRowCount} is larger than " + | ||
"cuDF column row limits.") | ||
withResource( | ||
ColumnVector.fromInts(0, colToReconstitute.getRowCount.toInt)) { | ||
offsets => | ||
colToReconstitute.makeListFromOffsets(1, offsets) | ||
} | ||
} else { | ||
tbl.getColumn(0).incRefCount() | ||
} | ||
} | ||
} | ||
} | ||
val newOffset = splits(split).getRowCount | ||
splits(split) = null | ||
val finalTbl = withResource(cols) { _ => | ||
new Table(cols: _*) | ||
} | ||
withResource(finalTbl) { _ => | ||
result(split) = new BatchToGenerate(offset, SpillableColumnarBatch( | ||
GpuColumnVector.from(finalTbl, spillable.dataTypes), | ||
SpillPriorities.ACTIVE_BATCHING_PRIORITY)) | ||
|
||
// we update our fixup offset for future batches, as they will | ||
// need to add the number of rows seen so far to the position column | ||
// if this is a `pos_explode` | ||
offset += newOffset | ||
} | ||
} | ||
} | ||
} | ||
result | ||
} | ||
} else { | ||
// more than 1 rows, we just do the regular split-by-rows, | ||
// we need to keep track of the fixup offset | ||
RmmRapidsRetryIterator.splitSpillableInHalfByRows(spillable) | ||
.map(new BatchToGenerate(batchToGenerate.fixUpOffset, _)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
override def generate( | ||
inputSpillables: Iterator[SpillableColumnarBatch], | ||
generatorOffset: Int, | ||
outer: Boolean): ColumnarBatch = { | ||
|
||
require(inputBatch.numCols() - 1 == generatorOffset, | ||
s"Internal Error ${getClass.getSimpleName} supports one and only one input attribute.") | ||
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset) | ||
|
||
withResource(GpuColumnVector.from(inputBatch)) { table => | ||
withResource(explodeFun(table, generatorOffset, outer)) { exploded => | ||
child.dataType match { | ||
case _: ArrayType => | ||
GpuColumnVector.from(exploded, schema) | ||
case MapType(kt, vt, _) => | ||
// We need to pull the key and value of of the struct column | ||
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed => | ||
GpuColumnVector.from(fixed, schema) | ||
outer: Boolean): Iterator[ColumnarBatch] = { | ||
val batchesToGenerate = inputSpillables.map(new BatchToGenerate(0, _)) | ||
withRetry(batchesToGenerate, generateSplitSpillableInHalfByRows(generatorOffset)) { attempt => | ||
withResource(attempt.spillable.getColumnarBatch()) { inputBatch => | ||
require(inputBatch.numCols() - 1 == generatorOffset, | ||
s"Internal Error ${getClass.getSimpleName} supports one and only one input attribute.") | ||
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset) | ||
|
||
withResource(GpuColumnVector.from(inputBatch)) { table => | ||
withResource( | ||
explodeFun(table, generatorOffset, outer, attempt.fixUpOffset)) { exploded => | ||
child.dataType match { | ||
case _: ArrayType => | ||
GpuColumnVector.from(exploded, schema) | ||
case MapType(kt, vt, _) => | ||
// We need to pull the key and value of of the struct column | ||
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed => | ||
GpuColumnVector.from(fixed, schema) | ||
} | ||
case other => | ||
throw new IllegalArgumentException( | ||
s"$other is not supported as explode input right now") | ||
} | ||
case other => | ||
throw new IllegalArgumentException( | ||
s"$other is not supported as explode input right now") | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -815,6 +949,13 @@ case class GpuGenerateExec( | |
} | ||
} | ||
|
||
class BatchToGenerate(val fixUpOffset: Long, val spillable: SpillableColumnarBatch) | ||
extends AutoCloseable { | ||
override def close(): Unit = { | ||
spillable.close() | ||
} | ||
} | ||
|
||
class GpuGenerateIterator( | ||
inputs: Seq[SpillableColumnarBatch], | ||
generator: GpuGenerator, | ||
|
@@ -827,16 +968,15 @@ class GpuGenerateIterator( | |
inputs.foreach(scb => use(scb)) | ||
|
||
// apply generation on each (sub)batch | ||
private val retryIter = withRetry(inputs.toIterator, splitSpillableInHalfByRows) { attempt => | ||
withResource(attempt.getColumnarBatch()) { batch => | ||
generator.generate(batch, generatorOffset, outer) | ||
} | ||
private val generateIter = { | ||
generator.generate(inputs.iterator, generatorOffset, outer) | ||
} | ||
|
||
override def hasNext: Boolean = retryIter.hasNext | ||
override def hasNext: Boolean = generateIter.hasNext | ||
|
||
override def next(): ColumnarBatch = { | ||
withResource(new NvtxWithMetrics("GpuGenerateIterator", NvtxColor.PURPLE, opTime)) { _ => | ||
val cb = retryIter.next() | ||
val cb = generateIter.next() | ||
numOutputBatches += 1 | ||
numOutputRows += cb.numRows() | ||
cb | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would love to see us push
inputSplitIndices
into the individual generate expressions. But that can be follow on as it really is just tech debt.Also why is the row commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, that should not be commented out.
I'll look into the inputSplitIndices refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed this for the refactor of
inputSplitIndices
and more... #10187. I don't want to block this issue further.