diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index 80b324de0ae..72a2edaef31 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -20,8 +20,9 @@ import scala.annotation.tailrec import scala.collection.mutable.Queue import ai.rapids.cudf.{Cuda, HostColumnVector, NvtxColor, Table} -import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.splitSpillableInHalfByRows import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.ShimUnaryExecNode @@ -73,7 +74,12 @@ class AcceleratedColumnarToRowIterator( private var at: Int = 0 private var total: Int = 0 - onTaskCompletion(closeAllPendingBatches()) + // Don't install the callback if in a unit test + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + closeAllPendingBatches() + } + } private def setCurrentBatch(wip: HostColumnVector): Unit = { currentCv = Some(wip) @@ -100,34 +106,42 @@ class AcceleratedColumnarToRowIterator( new Table(rearrangedColumns : _*) } - private[this] def setupBatch(cb: ColumnarBatch): Boolean = { + private[this] def setupBatchAndClose(scb: SpillableColumnarBatch): Boolean = { numInputBatches += 1 // In order to match the numOutputRows metric in the generated code we update // numOutputRows for each batch. This is less accurate than doing it at output // because it will over count the number of rows output in the case of a limit, // but it is more efficient. - numOutputRows += cb.numRows() - if (cb.numRows() > 0) { + numOutputRows += scb.numRows() + if (scb.numRows() > 0) { withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ => - withResource(rearrangeRows(cb)) { table => - // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means at - // most 184 double/long values. Spark by default limits codegen to 100 fields - // "spark.sql.codegen.maxFields". So, we are going to be cautious and start with that - // until we have tested it more. We branching over the size of the output to know which - // kernel to call. If schema.length < 100 we call the fixed-width optimized version, - // otherwise the generic one - withResource(if (schema.length < 100) { - table.convertToRowsFixedWidthOptimized() - } else { - table.convertToRows() - }) { rowsCvList => + val it = RmmRapidsRetryIterator.withRetry(scb, splitSpillableInHalfByRows) { attempt => + withResource(attempt.getColumnarBatch()) { attemptCb => + withResource(rearrangeRows(attemptCb)) { table => + // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which + // means at most 184 double/long values. Spark by default limits codegen to 100 + // fields "spark.sql.codegen.maxFields". So, we are going to be cautious and + // start with that until we have tested it more. We branching over the size of + // the output to know which kernel to call. If schema.length < 100 we call the + // fixed-width optimized version, otherwise the generic one + if (schema.length < 100) { + table.convertToRowsFixedWidthOptimized() + } else { + table.convertToRows() + } + } + } + } + assert(it.hasNext, "Got an unexpected empty iterator after setting up batch with retry") + it.foreach { rowsCvList => + withResource(rowsCvList) { _ => rowsCvList.foreach { rowsCv => pendingCvs += rowsCv.copyToHost() } - setCurrentBatch(pendingCvs.dequeue()) - return true } } + setCurrentBatch(pendingCvs.dequeue()) + return true } } false @@ -148,16 +162,20 @@ class AcceleratedColumnarToRowIterator( // keep fetching input batches until we have a non-empty batch ready val nextBatch = fetchNextBatch() if (nextBatch.isDefined) { - if (!withResource(nextBatch.get)(setupBatch)) { + if (!setupBatchAndClose(nextBatch.get)) { populateBatch() } } } - private def fetchNextBatch(): Option[ColumnarBatch] = { + private def fetchNextBatch(): Option[SpillableColumnarBatch] = { withResource(new NvtxWithMetrics("ColumnarToRow: fetch", NvtxColor.BLUE, streamTime)) { _ => if (batches.hasNext) { - Some(batches.next()) + // Make it spillable once getting a columnar batch. + val spillBatch = closeOnExcept(batches.next()) { cb => + SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) + } + Some(spillBatch) } else { None } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ColumnToRowIteratorRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ColumnToRowIteratorRetrySuite.scala new file mode 100644 index 00000000000..6b66f4ecca7 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ColumnToRowIteratorRetrySuite.scala @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.ColumnVector +import com.nvidia.spark.rapids.jni.RmmSpark + +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId} +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.vectorized.ColumnarBatch + +class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase { + + private val ref = GpuBoundReference(0, IntegerType, nullable = false)(ExprId(0), "a") + private val attrs = Seq(AttributeReference(ref.name, ref.dataType, ref.nullable)()) + private val NUM_ROWS = 50 + + private def buildBatch: ColumnarBatch = { + val ints = 0 until NUM_ROWS + new ColumnarBatch( + Array(GpuColumnVector.from(ColumnVector.fromInts(ints: _*), IntegerType)), NUM_ROWS) + } + + test("Accelerated columnar to row with retry OOM") { + val aCol2RowIter = new AcceleratedColumnarToRowIterator( + attrs, + Iterator(buildBatch), + NoopMetric, NoopMetric, NoopMetric, NoopMetric) + RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId) + var numRows = 0 + aCol2RowIter.foreach { _ => + numRows += 1 + } + assertResult(NUM_ROWS)(numRows) + } + + test("Accelerated columnar_to_row with split and retry OOM") { + val aCol2RowIter = new AcceleratedColumnarToRowIterator( + attrs, + Iterator(buildBatch), + NoopMetric, NoopMetric, NoopMetric, NoopMetric) + RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId) + var numRows = 0 + aCol2RowIter.foreach { _ => + numRows += 1 + } + assertResult(NUM_ROWS)(numRows) + } + +}