Skip to content

Commit

Permalink
Add retry to RoundRobin Partitioner and Range Partitioner (#9419)
Browse files Browse the repository at this point in the history
* Add retry to RoundRobin Partitioner and Range Partitioner

Signed-off-by: Ferdinand Xu <ferdinandx@nvidia.com>

* Revert some unintented changes

* Fix some unintented changes

* Fix

* Fix failed cases

* Address comments

* Address comments

* Address comments

* Fix

* Address comments

---------

Signed-off-by: Ferdinand Xu <ferdinandx@nvidia.com>
  • Loading branch information
winningsix authored Oct 23, 2023
1 parent 8b04945 commit ed37af8
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.hashing.byteswap32
import ai.rapids.cudf
import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
Expand Down Expand Up @@ -187,6 +188,8 @@ case class GpuRangePartitioner(
* @return the partition id for each item.
*/
def computePartitionIndexes(cb: ColumnarBatch): cudf.ColumnVector = {
// Don't make this retry-block avoiding nested try-blocks
// from computeBoundsAndCloseWithRetry
withResource(converters.convertBatch(rangeBounds,
TrampolineUtil.fromAttributes(sorter.projectedBatchSchema))) { ranges =>
withResource(sorter.appendProjectedColumns(cb)) { withExtraColumns =>
Expand All @@ -195,32 +198,35 @@ case class GpuRangePartitioner(
}
}

def computeBoundsAndClose(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = {
def computeBoundsAndCloseWithRetry(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = {
val types = GpuColumnVector.extractTypes(batch)
val partedTable = withResource(batch) { batch =>
val parts = withResource(new NvtxRange("Calculate part", NvtxColor.CYAN)) { _ =>
computePartitionIndexes(batch)
}
withResource(parts) { parts =>
withResource(GpuColumnVector.from(batch)) { table =>
table.partition(parts, numPartitions)
withRetryNoSplit(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) { sb =>
val partedTable = withResource(sb.getColumnarBatch()) { cb =>
val parts = withResource(new NvtxRange("Calculate part", NvtxColor.CYAN)) { _ =>
computePartitionIndexes(cb)
}
withResource(parts) { parts =>
withResource(GpuColumnVector.from(cb)) { table =>
table.partition(parts, numPartitions)
}
}
}
}
withResource(partedTable) { partedTable =>
val parts = partedTable.getPartitions
val tp = partedTable.getTable
val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(types).map {
case (index, sparkType) =>
GpuColumnVector.from(tp.getColumn(index).incRefCount(), sparkType)

withResource(partedTable) { partedTable =>
val parts = partedTable.getPartitions
val tp = partedTable.getTable
val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(types).map {
case (index, sparkType) =>
GpuColumnVector.from(tp.getColumn(index).incRefCount(), sparkType)
}
(parts, columns.toArray)
}
(parts, columns.toArray)
}
}

override def columnarEvalAny(batch: ColumnarBatch): Any = {
if (rangeBounds.nonEmpty) {
val (parts, partitionColumns) = computeBoundsAndClose(batch)
val (parts, partitionColumns) = computeBoundsAndCloseWithRetry(batch)
sliceInternalGpuOrCpuAndClose(partitionColumns.head.getRowCount.toInt,
parts, partitionColumns)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.Random

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.TaskContext
Expand All @@ -40,23 +41,38 @@ case class GpuRoundRobinPartitioning(numPartitions: Int)

override def dataType: DataType = IntegerType

def partitionInternal(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = {
private def partitionInternalWithClose(
batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = {
val sparkTypes = GpuColumnVector.extractTypes(batch)
withResource(GpuColumnVector.from(batch)) { table =>
if (numPartitions == 1) {
val columns = (0 until table.getNumberOfColumns).zip(sparkTypes).map {
case(idx, sparkType) =>
GpuColumnVector.from(table.getColumn(idx).incRefCount(), sparkType)
}.toArray
return (Array(0), columns)
if (1 == numPartitions) {
// Skip retry since partition number = 1
withResource(batch) { batch =>
withResource(GpuColumnVector.from(batch)) { table =>
val columns = (0 until table.getNumberOfColumns).zip(sparkTypes).map {
case (idx, sparkType) =>
GpuColumnVector
.from(table.getColumn(idx).incRefCount(), sparkType)
}.toArray
(Array(0), columns)
}
}
withResource(table.roundRobinPartition(numPartitions, getStartPartition)) { partedTable =>
val parts = partedTable.getPartitions
val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(sparkTypes).map {
case(idx, sparkType) =>
GpuColumnVector.from(partedTable.getColumn(idx).incRefCount(), sparkType)
}.toArray
(parts, columns)
} else {
withRetryNoSplit(
SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) { sb =>
withResource(sb.getColumnarBatch()) { b =>
withResource(GpuColumnVector.from(b)) { table =>
withResource(table.
roundRobinPartition(numPartitions, getStartPartition)) { partedTable =>
val parts = partedTable.getPartitions
val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(sparkTypes).map {
case (idx, sparkType) =>
GpuColumnVector
.from(partedTable.getColumn(idx).incRefCount(), sparkType)
}.toArray
(parts, columns)
}
}
}
}
}
}
Expand All @@ -71,9 +87,8 @@ case class GpuRoundRobinPartitioning(numPartitions: Int)
val (partitionIndexes, partitionColumns) = {
val partitionRange = new NvtxRange("partition", NvtxColor.BLUE)
try {
partitionInternal(batch)
partitionInternalWithClose(batch)
} finally {
batch.close()
partitionRange.close()
}
}
Expand All @@ -83,12 +98,16 @@ case class GpuRoundRobinPartitioning(numPartitions: Int)
}
}

private def getStartPartition : Int = {
private def getStartPartition: Int = {
// Follow Spark's way of getting the randomized position to pass to the round robin,
// which starts at randomNumber + 1 and we do the same here to maintain compatibility.
// See ShuffleExchangeExec.getPartitionKeyExtractor for reference.
val position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) + 1
val partId = position.hashCode % numPartitions
partId
val random = if (null != TaskContext.get()) {
new Random(TaskContext.get().partitionId())
} else {
// For unit test purpose where task context does not exist
new Random
}
(random.nextInt(numPartitions) + 1).hashCode % numPartitions
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.jni.RmmSpark

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, ExprId, SortOrder, SpecificInternalRow}
import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
import org.apache.spark.sql.vectorized.ColumnarBatch

class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase {
private def buildBatch(): ColumnarBatch = {
withResource(new Table.TestBuilder()
.column(9, null.asInstanceOf[java.lang.Integer], 8, 7, 6, 5, 4, 3, 2, 1)
.column("nine", "eight", null, null, "six", "five", "four", "three", "two", "one")
.build()) { table =>
GpuColumnVector.from(table, Array(IntegerType, StringType))
}
}

private def testRoundRobinPartitioner(partNum: Int) = {
TestUtils.withGpuSparkSession(new SparkConf()) { _ =>
val rrp = GpuRoundRobinPartitioning(partNum)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rrp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
assert(partNum === ret.size)
} finally {
if (ret != null) {
ret.map(_._1).safeClose()
}
}
}
}

test("GPU range partition with retry") {
TestUtils.withGpuSparkSession(new SparkConf()) { _ =>
// Initialize range bounds
val fieldTypes: Array[DataType] = Array(IntegerType)
val bounds = new SpecificInternalRow(fieldTypes)
bounds.setInt(0, 3)
// Initialize GPU sorter
val ref = GpuBoundReference(0, IntegerType, nullable = true)(ExprId(0), "a")
val sortOrder = SortOrder(ref, Ascending)
val attrs = AttributeReference(ref.name, ref.dataType, ref.nullable)()
val gpuSorter = new GpuSorter(Seq(sortOrder), Array(attrs))

val rp = GpuRangePartitioner(Array.apply(bounds), gpuSorter)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
assert(ret.length === 2)
} finally {
if (ret != null) {
ret.map(_._1).safeClose()
}
}
}
}

test("GPU round robin partition with retry using multiple partition") {
testRoundRobinPartitioner(4)
}

test("GPU round robin partitioner with retry using 1 partition") {
testRoundRobinPartitioner(1)
}
}

0 comments on commit ed37af8

Please sign in to comment.