Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

With a single row GpuExplode tries to split the generator array #10131

Merged
merged 9 commits into from
Jan 11, 2024
Merged
221 changes: 180 additions & 41 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@ import com.nvidia.spark.rapids.GpuOverrides.extractLit
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.jni.GpuSplitAndRetryOOM
import com.nvidia.spark.rapids.shims.{ShimExpression, ShimUnaryExecNode}

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -163,16 +164,16 @@ trait GpuGenerator extends GpuUnevaluable {
* that generator is an integrated Table transformer instead of column transformer in terms of
* cuDF.
*
* @param inputBatch projected input data, which ensures appending columns are ahead of
* generators' inputs. So, generators can distinguish them with an offset.
* @param batches iterator of spillable batches
* @param generatorOffset column offset of generator's input columns in `inputBatch`
* @param outer when true, each input row will be output at least once, even if the
* output of the given `generator` is empty.
* @return result ColumnarBatch
*/
def generate(inputBatch: ColumnarBatch,
def generate(
batches: Iterator[SpillableColumnarBatch],
generatorOffset: Int,
outer: Boolean): ColumnarBatch
outer: Boolean): Iterator[ColumnarBatch]

/**
* Compute split indices for generator's input batches.
Expand Down Expand Up @@ -244,16 +245,19 @@ case class GpuReplicateRows(children: Seq[Expression]) extends GpuGenerator with
case (e, index) => StructField(s"col$index", e.dataType)
})

override def generate(inputBatch: ColumnarBatch,
override def generate(
inputBatches: Iterator[SpillableColumnarBatch],
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {

val schema = GpuColumnVector.extractTypes(inputBatch)

withResource(GpuColumnVector.from(inputBatch)) { table =>
val replicateVector = table.getColumn(generatorOffset)
withResource(table.repeat(replicateVector)) { replicatedTable =>
GpuColumnVector.from(replicatedTable, schema)
outer: Boolean): Iterator[ColumnarBatch] = {
withRetry(inputBatches, splitSpillableInHalfByRows) { attempt =>
withResource(attempt.getColumnarBatch()) { inputBatch =>
val schema = GpuColumnVector.extractTypes(inputBatch)
withResource(GpuColumnVector.from(inputBatch)) { table =>
val replicateVector = table.getColumn(generatorOffset)
withResource(table.repeat(replicateVector)) { replicatedTable =>
GpuColumnVector.from(replicatedTable, schema)
}
}
}
}
}
Expand Down Expand Up @@ -528,13 +532,34 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene
/**
* A function that will do the explode or position explode
*/
private[this] def explodeFun(inputTable: Table, genOffset: Int, outer: Boolean): Table = {
private[this] def explodeFun(
inputTable: Table,
genOffset: Int,
outer: Boolean,
fixUpOffset: Long): Table = {
if (position) {
if (outer) {
val toFixUp = if (outer) {
inputTable.explodeOuterPosition(genOffset)
} else {
inputTable.explodePosition(genOffset)
}
// the position column is at genOffset and the exploded column at genOffset + 1
withResource(toFixUp) { _ =>
val posColToFix = toFixUp.getColumn(genOffset)
val fixedUpPosCol = withResource(Scalar.fromLong(fixUpOffset)) { offset =>
posColToFix.add(offset, posColToFix.getType)
}
withResource(fixedUpPosCol) { _ =>
val newCols = (0 until toFixUp.getNumberOfColumns).map { i =>
if (i == genOffset) {
fixedUpPosCol
} else {
toFixUp.getColumn(i)
}
}.toArray
new Table(newCols: _*)
}
}
} else {
if (outer) {
inputTable.explodeOuter(genOffset)
Expand All @@ -544,27 +569,135 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene
}
}

override def generate(inputBatch: ColumnarBatch,
def generateSplitSpillableInHalfByRows(
genOffset: Int): BatchToGenerate => Seq[BatchToGenerate] = {
(batchToGenerate: BatchToGenerate) => {
withResource(batchToGenerate) { _ =>
val spillable = batchToGenerate.spillable
val toSplitRows = spillable.numRows()
if (toSplitRows == 1) {
// single row, we need to actually add row duplicates, then split
val tbl = withResource(spillable.getColumnarBatch()) { src =>
GpuColumnVector.from(src)
}
withResource(tbl) { _ =>
if (tbl.getColumn(genOffset).getNullCount == 1) {
// 1 row, and the element to explode is null, we cannot
// handle this case, and should never reach it either.
throw new GpuSplitAndRetryOOM(
"GPU OutOfMemory: cannot split a batch of 1 rows when " +
"the list to explode is null!")
}
val explodingColList = tbl.getColumn(genOffset)
val explodedTbl = {
withResource(explodingColList.getChildColumnView(0)) {
explodingColElements =>
if (explodingColElements.getRowCount < 2) {
throw new GpuSplitAndRetryOOM(
"GPU OutOfMemory: cannot split a batch of 1 rows when " +
"the list has a single element!")
}
withResource(explodingColElements.copyToColumnVector()) { col =>
new Table(col)
}
}
}

// Because we are likely to be here solving a cuDF column length limit
// we are just going to split in half blindly until we can fit it.
val splits = withResource(explodedTbl) { _ =>
explodedTbl.contiguousSplit(explodedTbl.getRowCount.toInt / 2)
}

val result = new Array[BatchToGenerate](splits.size)
closeOnExcept(result) { _ =>
closeOnExcept(splits) { _ =>
// start our fixup offset to the one previously computed
// in the last retry. We will need to start again at this offset,
// and sub-lists we generate need to account for it if `pos_explode`.
var offset = batchToGenerate.fixUpOffset
(0 until splits.length).foreach { split =>
val explodedTblToConvertBack = splits(split)
val cols = new Array[ColumnVector](tbl.getNumberOfColumns)
withResource(explodedTblToConvertBack) { _ =>
val colToReconstitute = explodedTblToConvertBack.getTable.getColumn(0)
closeOnExcept(cols) { _ =>
(0 until tbl.getNumberOfColumns).foreach { col =>
cols(col) = if (col == genOffset) {
// turn this column back onto a list, by pairing it with an offset
// column that is the length of the list.
require(colToReconstitute.getRowCount < Int.MaxValue,
s"The number of elements in the list to explode " +
s"${colToReconstitute.getRowCount} is larger than " +
"cuDF column row limits.")
withResource(
ColumnVector.fromInts(0, colToReconstitute.getRowCount.toInt)) {
offsets =>
colToReconstitute.makeListFromOffsets(1, offsets)
}
} else {
tbl.getColumn(0).incRefCount()
}
}
}
}
val newOffset = splits(split).getRowCount
splits(split) = null
val finalTbl = withResource(cols) { _ =>
new Table(cols: _*)
}
withResource(finalTbl) { _ =>
result(split) = new BatchToGenerate(offset, SpillableColumnarBatch(
GpuColumnVector.from(finalTbl, spillable.dataTypes),
SpillPriorities.ACTIVE_BATCHING_PRIORITY))

// we update our fixup offset for future batches, as they will
// need to add the number of rows seen so far to the position column
// if this is a `pos_explode`
offset += newOffset
}
}
}
}
result
}
} else {
// more than 1 rows, we just do the regular split-by-rows,
// we need to keep track of the fixup offset
RmmRapidsRetryIterator.splitSpillableInHalfByRows(spillable)
.map(new BatchToGenerate(batchToGenerate.fixUpOffset, _))
}
}
}
}

override def generate(
inputSpillables: Iterator[SpillableColumnarBatch],
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {

require(inputBatch.numCols() - 1 == generatorOffset,
s"Internal Error ${getClass.getSimpleName} supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)

withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(explodeFun(table, generatorOffset, outer)) { exploded =>
child.dataType match {
case _: ArrayType =>
GpuColumnVector.from(exploded, schema)
case MapType(kt, vt, _) =>
// We need to pull the key and value of of the struct column
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed =>
GpuColumnVector.from(fixed, schema)
outer: Boolean): Iterator[ColumnarBatch] = {
val batchesToGenerate = inputSpillables.map(new BatchToGenerate(0, _))
withRetry(batchesToGenerate, generateSplitSpillableInHalfByRows(generatorOffset)) { attempt =>
withResource(attempt.spillable.getColumnarBatch()) { inputBatch =>
require(inputBatch.numCols() - 1 == generatorOffset,
s"Internal Error ${getClass.getSimpleName} supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)

withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(
explodeFun(table, generatorOffset, outer, attempt.fixUpOffset)) { exploded =>
child.dataType match {
case _: ArrayType =>
GpuColumnVector.from(exploded, schema)
case MapType(kt, vt, _) =>
// We need to pull the key and value of of the struct column
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed =>
GpuColumnVector.from(fixed, schema)
}
case other =>
throw new IllegalArgumentException(
s"$other is not supported as explode input right now")
}
case other =>
throw new IllegalArgumentException(
s"$other is not supported as explode input right now")
}
}
}
}
Expand Down Expand Up @@ -815,6 +948,13 @@ case class GpuGenerateExec(
}
}

class BatchToGenerate(val fixUpOffset: Long, val spillable: SpillableColumnarBatch)
extends AutoCloseable {
override def close(): Unit = {
spillable.close()
}
}

class GpuGenerateIterator(
inputs: Seq[SpillableColumnarBatch],
generator: GpuGenerator,
Expand All @@ -827,16 +967,15 @@ class GpuGenerateIterator(
inputs.foreach(scb => use(scb))

// apply generation on each (sub)batch
private val retryIter = withRetry(inputs.toIterator, splitSpillableInHalfByRows) { attempt =>
withResource(attempt.getColumnarBatch()) { batch =>
generator.generate(batch, generatorOffset, outer)
}
private val generateIter = {
generator.generate(inputs.iterator, generatorOffset, outer)
}

override def hasNext: Boolean = retryIter.hasNext
override def hasNext: Boolean = generateIter.hasNext

override def next(): ColumnarBatch = {
withResource(new NvtxWithMetrics("GpuGenerateIterator", NvtxColor.PURPLE, opTime)) { _ =>
val cb = retryIter.next()
val cb = generateIter.next()
numOutputBatches += 1
numOutputRows += cb.numRows()
cb
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.Scalar
import com.nvidia.spark.rapids.Arm._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry}
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
Expand Down Expand Up @@ -49,33 +50,36 @@ case class GpuJsonTuple(children: Seq[Expression]) extends GpuGenerator
}
}

override def generate(inputBatch: ColumnarBatch,
override def generate(
inputBatches: Iterator[SpillableColumnarBatch],
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {
outer: Boolean): Iterator[ColumnarBatch] = {
withRetry(inputBatches, splitSpillableInHalfByRows) { attempt =>
// this is obviously broken
val inputBatch = attempt.getColumnarBatch()

val json = inputBatch.column(generatorOffset).asInstanceOf[GpuColumnVector].getBase
val schema = Array.fill[DataType](fieldExpressions.length)(StringType)
val json = inputBatch.column(generatorOffset).asInstanceOf[GpuColumnVector].getBase
val schema = Array.fill[DataType](fieldExpressions.length)(StringType)

val fieldScalars = fieldExpressions.safeMap { field =>
withResourceIfAllowed(field.columnarEvalAny(inputBatch)) { fieldVal =>
fieldVal match {
val fieldScalars = fieldExpressions.safeMap { field =>
withResourceIfAllowed(field.columnarEvalAny(inputBatch)) {
case fieldScalar: GpuScalar =>
// Specials characters like '.', '[', ']' are not supported in field names
Scalar.fromString("$." + fieldScalar.getBase.getJavaString)
case _ => throw new UnsupportedOperationException(s"JSON field must be a scalar value")
}
}
}

withResource(fieldScalars) { fieldScalars =>
withResource(fieldScalars.safeMap(field => json.getJSONObject(field))) { resultCols =>
val generatorCols = resultCols.safeMap(_.incRefCount).zip(schema).safeMap {
case (col, dataType) => GpuColumnVector.from(col, dataType)
}
val nonGeneratorCols = (0 until generatorOffset).safeMap { i =>
inputBatch.column(i).asInstanceOf[GpuColumnVector].incRefCount
withResource(fieldScalars) { fieldScalars =>
withResource(fieldScalars.safeMap(field => json.getJSONObject(field))) { resultCols =>
val generatorCols = resultCols.safeMap(_.incRefCount).zip(schema).safeMap {
case (col, dataType) => GpuColumnVector.from(col, dataType)
}
val nonGeneratorCols = (0 until generatorOffset).safeMap { i =>
inputBatch.column(i).asInstanceOf[GpuColumnVector].incRefCount
}
new ColumnarBatch((nonGeneratorCols ++ generatorCols).toArray, inputBatch.numRows)
}
new ColumnarBatch((nonGeneratorCols ++ generatorCols).toArray, inputBatch.numRows)
}
}
}
Expand Down
Loading