Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry to RoundRobin Partitioner and Range Partitioner #9419

Merged
merged 10 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.hashing.byteswap32
import ai.rapids.cudf
import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
Expand Down Expand Up @@ -187,6 +188,8 @@ case class GpuRangePartitioner(
* @return the partition id for each item.
*/
def computePartitionIndexes(cb: ColumnarBatch): cudf.ColumnVector = {
// Don't make this retry-block avoiding nested try-blocks
// from computeBoundsAndCloseWithRetry
withResource(converters.convertBatch(rangeBounds,
TrampolineUtil.fromAttributes(sorter.projectedBatchSchema))) { ranges =>
withResource(sorter.appendProjectedColumns(cb)) { withExtraColumns =>
Expand All @@ -195,32 +198,35 @@ case class GpuRangePartitioner(
}
}

def computeBoundsAndClose(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = {
def computeBoundsAndCloseWithRetry(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = {
val types = GpuColumnVector.extractTypes(batch)
val partedTable = withResource(batch) { batch =>
val parts = withResource(new NvtxRange("Calculate part", NvtxColor.CYAN)) { _ =>
computePartitionIndexes(batch)
}
withResource(parts) { parts =>
withResource(GpuColumnVector.from(batch)) { table =>
table.partition(parts, numPartitions)
withRetryNoSplit(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) { sb =>
val partedTable = withResource(sb.getColumnarBatch()) { cb =>
val parts = withResource(new NvtxRange("Calculate part", NvtxColor.CYAN)) { _ =>
computePartitionIndexes(cb)
}
withResource(parts) { parts =>
withResource(GpuColumnVector.from(cb)) { table =>
table.partition(parts, numPartitions)
}
}
}
}
withResource(partedTable) { partedTable =>
val parts = partedTable.getPartitions
val tp = partedTable.getTable
val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(types).map {
case (index, sparkType) =>
GpuColumnVector.from(tp.getColumn(index).incRefCount(), sparkType)

withResource(partedTable) { partedTable =>
val parts = partedTable.getPartitions
val tp = partedTable.getTable
val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(types).map {
case (index, sparkType) =>
GpuColumnVector.from(tp.getColumn(index).incRefCount(), sparkType)
}
(parts, columns.toArray)
}
(parts, columns.toArray)
}
}

override def columnarEvalAny(batch: ColumnarBatch): Any = {
if (rangeBounds.nonEmpty) {
val (parts, partitionColumns) = computeBoundsAndClose(batch)
val (parts, partitionColumns) = computeBoundsAndCloseWithRetry(batch)
sliceInternalGpuOrCpuAndClose(partitionColumns.head.getRowCount.toInt,
parts, partitionColumns)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.Random

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.TaskContext
Expand All @@ -42,21 +43,35 @@ case class GpuRoundRobinPartitioning(numPartitions: Int)

def partitionInternal(batch: ColumnarBatch): (Array[Int], Array[GpuColumnVector]) = {
val sparkTypes = GpuColumnVector.extractTypes(batch)
withResource(GpuColumnVector.from(batch)) { table =>
if (numPartitions == 1) {
if (1 == numPartitions) {
// Skip retry since partition number = 1
withResource(GpuColumnVector.from(batch)) { table =>
val columns = (0 until table.getNumberOfColumns).zip(sparkTypes).map {
case(idx, sparkType) =>
GpuColumnVector.from(table.getColumn(idx).incRefCount(), sparkType)
case (idx, sparkType) =>
GpuColumnVector
.from(table.getColumn(idx).incRefCount(), sparkType)
}.toArray
return (Array(0), columns)
(Array(0), columns)
}
withResource(table.roundRobinPartition(numPartitions, getStartPartition)) { partedTable =>
val parts = partedTable.getPartitions
val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(sparkTypes).map {
case(idx, sparkType) =>
GpuColumnVector.from(partedTable.getColumn(idx).incRefCount(), sparkType)
}.toArray
(parts, columns)
} else {
// Increase ref count since the caller will close the batch.
val spillableBatch = SpillableColumnarBatch(
GpuColumnVector.incRefCounts(batch), SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
winningsix marked this conversation as resolved.
Show resolved Hide resolved
withRetryNoSplit(spillableBatch) { sb =>
withResource(sb.getColumnarBatch()) { b =>
withResource(GpuColumnVector.from(b)) { table =>
withResource(table.
roundRobinPartition(numPartitions, getStartPartition)) { partedTable =>
val parts = partedTable.getPartitions
val columns = (0 until partedTable.getNumberOfColumns.toInt).zip(sparkTypes).map {
case (idx, sparkType) =>
GpuColumnVector
.from(partedTable.getColumn(idx).incRefCount(), sparkType)
}.toArray
(parts, columns)
}
}
}
}
}
}
Expand All @@ -83,12 +98,16 @@ case class GpuRoundRobinPartitioning(numPartitions: Int)
}
}

private def getStartPartition : Int = {
private def getStartPartition: Int = {
// Follow Spark's way of getting the randomized position to pass to the round robin,
// which starts at randomNumber + 1 and we do the same here to maintain compatibility.
// See ShuffleExchangeExec.getPartitionKeyExtractor for reference.
val position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) + 1
val partId = position.hashCode % numPartitions
partId
val random = if (null != TaskContext.get()) {
new Random(TaskContext.get().partitionId())
} else {
// For unit test purpose where task context does not exist
new Random
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we have a hard coded seed here instead? just so that we get deterministic results for unit tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it could be helpful when we need to validate result. We can touch this part when we have such needs.

}
(random.nextInt(numPartitions) + 1).hashCode % numPartitions
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.jni.RmmSpark

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, ExprId, SortOrder, SpecificInternalRow}
import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
import org.apache.spark.sql.vectorized.ColumnarBatch

class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase {
private def buildBatch(): ColumnarBatch = {
withResource(new Table.TestBuilder()
.column(9, null.asInstanceOf[java.lang.Integer], 8, 7, 6, 5, 4, 3, 2, 1)
.column("nine", "eight", null, null, "six", "five", "four", "three", "two", "one")
.build()) { table =>
GpuColumnVector.from(table, Array(IntegerType, StringType))
}
}

private def testRoundRobinPartitioner(partNum: Int) = {
TestUtils.withGpuSparkSession(new SparkConf()) { _ =>
val rrp = GpuRoundRobinPartitioning(partNum)
// batch will be closed within columnarEvalAny
val batch = buildBatch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it might be nice to build the batch right when we call columnarEvalAny just so there is less code between where it is created and where it is consumed. Just because it could leak if there is an exception inbetween.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's separated due to buildBatch method implement didn't have a retry in this test suite. We can do it and since it's just a test, so I skipped it.

RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rrp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
assert(partNum === ret.size)
} finally {
if (ret != null) {
ret.map(_._1).safeClose()
}
}
}
}

test("GPU range partition with retry") {
TestUtils.withGpuSparkSession(new SparkConf()) { _ =>
// Initialize range bounds
val fieldTypes: Array[DataType] = Array(IntegerType)
val bounds = new SpecificInternalRow(fieldTypes)
bounds.setInt(0, 3)
// Initialize GPU sorter
val ref = GpuBoundReference(0, IntegerType, nullable = true)(ExprId(0), "a")
val sortOrder = SortOrder(ref, Ascending)
val attrs = AttributeReference(ref.name, ref.dataType, ref.nullable)()
val gpuSorter = new GpuSorter(Seq(sortOrder), Array(attrs))

val rp = GpuRangePartitioner(Array.apply(bounds), gpuSorter)
// batch will be closed within columnarEvalAny
val batch = buildBatch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same nit here about how long it lives for. Very minor, but a nice to have.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
assert(ret.length === 2)
} finally {
if (ret != null) {
ret.map(_._1).safeClose()
}
}
}
}

test("GPU round robin partition with retry using multiple partition") {
testRoundRobinPartitioner(4)
}

test("GPU round robin partitioner with retry using 1 partition") {
testRoundRobinPartitioner(1)
}
}
Loading