diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala index 1461492ace1..505d32baa10 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala @@ -22,6 +22,7 @@ import scala.util.hashing.byteswap32 import ai.rapids.cudf import ai.rapids.cudf.{NvtxColor, NvtxRange} import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.rdd.{PartitionPruningRDD, RDD} @@ -187,6 +188,8 @@ case class GpuRangePartitioner( * @return the partition id for each item. */ def computePartitionIndexes(cb: ColumnarBatch): cudf.ColumnVector = { + // Don't make this retry-block avoiding nested try-blocks + // from computeBoundsAndCloseWithRetry withResource(converters.convertBatch(rangeBounds, TrampolineUtil.fromAttributes(sorter.projectedBatchSchema))) { ranges => withResource(sorter.appendProjectedColumns(cb)) { withExtraColumns => @@ -195,32 +198,35 @@ case class GpuRangePartitioner( } } - def computeBoundsAndClose(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = { + def computeBoundsAndCloseWithRetry(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = { val types = GpuColumnVector.extractTypes(batch) - val partedTable = withResource(batch) { batch => - val parts = withResource(new NvtxRange("Calculate part", NvtxColor.CYAN)) { _ => - computePartitionIndexes(batch) - } - withResource(parts) { parts => - withResource(GpuColumnVector.from(batch)) { table => - table.partition(parts, numPartitions) + withRetryNoSplit(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) { sb => + val partedTable = withResource(sb.getColumnarBatch()) { cb => + val parts = withResource(new NvtxRange("Calculate part", NvtxColor.CYAN)) { _ => + computePartitionIndexes(cb) + } + withResource(parts) { parts => + withResource(GpuColumnVector.from(cb)) { table => + table.partition(parts, numPartitions) + } } } - } - withResource(partedTable) { partedTable => - val parts = partedTable.getPartitions - val tp = partedTable.getTable - val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(types).map { - case (index, sparkType) => - GpuColumnVector.from(tp.getColumn(index).incRefCount(), sparkType) + + withResource(partedTable) { partedTable => + val parts = partedTable.getPartitions + val tp = partedTable.getTable + val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(types).map { + case (index, sparkType) => + GpuColumnVector.from(tp.getColumn(index).incRefCount(), sparkType) + } + (parts, columns.toArray) } - (parts, columns.toArray) } } override def columnarEvalAny(batch: ColumnarBatch): Any = { if (rangeBounds.nonEmpty) { - val (parts, partitionColumns) = computeBoundsAndClose(batch) + val (parts, partitionColumns) = computeBoundsAndCloseWithRetry(batch) sliceInternalGpuOrCpuAndClose(partitionColumns.head.getRowCount.toInt, parts, partitionColumns) } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRoundRobinPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRoundRobinPartitioning.scala index 7ea2a57aaca..4995d584094 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRoundRobinPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRoundRobinPartitioning.scala @@ -20,6 +20,7 @@ import java.util.Random import ai.rapids.cudf.{NvtxColor, NvtxRange} import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.TaskContext @@ -40,23 +41,38 @@ case class GpuRoundRobinPartitioning(numPartitions: Int) override def dataType: DataType = IntegerType - def partitionInternal(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = { + private def partitionInternalWithClose( + batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = { val sparkTypes = GpuColumnVector.extractTypes(batch) - withResource(GpuColumnVector.from(batch)) { table => - if (numPartitions == 1) { - val columns = (0 until table.getNumberOfColumns).zip(sparkTypes).map { - case(idx, sparkType) => - GpuColumnVector.from(table.getColumn(idx).incRefCount(), sparkType) - }.toArray - return (Array(0), columns) + if (1 == numPartitions) { + // Skip retry since partition number = 1 + withResource(batch) { batch => + withResource(GpuColumnVector.from(batch)) { table => + val columns = (0 until table.getNumberOfColumns).zip(sparkTypes).map { + case (idx, sparkType) => + GpuColumnVector + .from(table.getColumn(idx).incRefCount(), sparkType) + }.toArray + (Array(0), columns) + } } - withResource(table.roundRobinPartition(numPartitions, getStartPartition)) { partedTable => - val parts = partedTable.getPartitions - val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(sparkTypes).map { - case(idx, sparkType) => - GpuColumnVector.from(partedTable.getColumn(idx).incRefCount(), sparkType) - }.toArray - (parts, columns) + } else { + withRetryNoSplit( + SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) { sb => + withResource(sb.getColumnarBatch()) { b => + withResource(GpuColumnVector.from(b)) { table => + withResource(table. + roundRobinPartition(numPartitions, getStartPartition)) { partedTable => + val parts = partedTable.getPartitions + val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(sparkTypes).map { + case (idx, sparkType) => + GpuColumnVector + .from(partedTable.getColumn(idx).incRefCount(), sparkType) + }.toArray + (parts, columns) + } + } + } } } } @@ -71,9 +87,8 @@ case class GpuRoundRobinPartitioning(numPartitions: Int) val (partitionIndexes, partitionColumns) = { val partitionRange = new NvtxRange("partition", NvtxColor.BLUE) try { - partitionInternal(batch) + partitionInternalWithClose(batch) } finally { - batch.close() partitionRange.close() } } @@ -83,12 +98,16 @@ case class GpuRoundRobinPartitioning(numPartitions: Int) } } - private def getStartPartition : Int = { + private def getStartPartition: Int = { // Follow Spark's way of getting the randomized position to pass to the round robin, // which starts at randomNumber + 1 and we do the same here to maintain compatibility. // See ShuffleExchangeExec.getPartitionKeyExtractor for reference. - val position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) + 1 - val partId = position.hashCode % numPartitions - partId + val random = if (null != TaskContext.get()) { + new Random(TaskContext.get().partitionId()) + } else { + // For unit test purpose where task context does not exist + new Random + } + (random.nextInt(numPartitions) + 1).hashCode % numPartitions } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ShufflePartitionerRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ShufflePartitionerRetrySuite.scala new file mode 100644 index 00000000000..a6f96224808 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ShufflePartitionerRetrySuite.scala @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids + +import ai.rapids.cudf.Table +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.jni.RmmSpark + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, ExprId, SortOrder, SpecificInternalRow} +import org.apache.spark.sql.types.{DataType, IntegerType, StringType} +import org.apache.spark.sql.vectorized.ColumnarBatch + +class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase { + private def buildBatch(): ColumnarBatch = { + withResource(new Table.TestBuilder() + .column(9, null.asInstanceOf[java.lang.Integer], 8, 7, 6, 5, 4, 3, 2, 1) + .column("nine", "eight", null, null, "six", "five", "four", "three", "two", "one") + .build()) { table => + GpuColumnVector.from(table, Array(IntegerType, StringType)) + } + } + + private def testRoundRobinPartitioner(partNum: Int) = { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + val rrp = GpuRoundRobinPartitioning(partNum) + // batch will be closed within columnarEvalAny + val batch = buildBatch + RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) + var ret: Array[(ColumnarBatch, Int)] = null + try { + ret = rrp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]] + assert(partNum === ret.size) + } finally { + if (ret != null) { + ret.map(_._1).safeClose() + } + } + } + } + + test("GPU range partition with retry") { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + // Initialize range bounds + val fieldTypes: Array[DataType] = Array(IntegerType) + val bounds = new SpecificInternalRow(fieldTypes) + bounds.setInt(0, 3) + // Initialize GPU sorter + val ref = GpuBoundReference(0, IntegerType, nullable = true)(ExprId(0), "a") + val sortOrder = SortOrder(ref, Ascending) + val attrs = AttributeReference(ref.name, ref.dataType, ref.nullable)() + val gpuSorter = new GpuSorter(Seq(sortOrder), Array(attrs)) + + val rp = GpuRangePartitioner(Array.apply(bounds), gpuSorter) + // batch will be closed within columnarEvalAny + val batch = buildBatch + RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) + var ret: Array[(ColumnarBatch, Int)] = null + try { + ret = rp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]] + assert(ret.length === 2) + } finally { + if (ret != null) { + ret.map(_._1).safeClose() + } + } + } + } + + test("GPU round robin partition with retry using multiple partition") { + testRoundRobinPartitioner(4) + } + + test("GPU round robin partitioner with retry using 1 partition") { + testRoundRobinPartitioner(1) + } +}