-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add retry to RoundRobin Partitioner and Range Partitioner #9419
Changes from all commits
7f7e624
5130a51
9e07fe8
cec4482
85e6641
e0407a2
2dffe58
bb4cd76
fbf48fc
1bdb1b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Copyright (c) 2023, NVIDIA CORPORATION. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.nvidia.spark.rapids | ||
|
||
import ai.rapids.cudf.Table | ||
import com.nvidia.spark.rapids.Arm.withResource | ||
import com.nvidia.spark.rapids.RapidsPluginImplicits._ | ||
import com.nvidia.spark.rapids.jni.RmmSpark | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, ExprId, SortOrder, SpecificInternalRow} | ||
import org.apache.spark.sql.types.{DataType, IntegerType, StringType} | ||
import org.apache.spark.sql.vectorized.ColumnarBatch | ||
|
||
class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase { | ||
private def buildBatch(): ColumnarBatch = { | ||
withResource(new Table.TestBuilder() | ||
.column(9, null.asInstanceOf[java.lang.Integer], 8, 7, 6, 5, 4, 3, 2, 1) | ||
.column("nine", "eight", null, null, "six", "five", "four", "three", "two", "one") | ||
.build()) { table => | ||
GpuColumnVector.from(table, Array(IntegerType, StringType)) | ||
} | ||
} | ||
|
||
private def testRoundRobinPartitioner(partNum: Int) = { | ||
TestUtils.withGpuSparkSession(new SparkConf()) { _ => | ||
val rrp = GpuRoundRobinPartitioning(partNum) | ||
// batch will be closed within columnarEvalAny | ||
val batch = buildBatch | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: it might be nice to build the batch right when we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's separated due to |
||
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) | ||
var ret: Array[(ColumnarBatch, Int)] = null | ||
try { | ||
ret = rrp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]] | ||
assert(partNum === ret.size) | ||
} finally { | ||
if (ret != null) { | ||
ret.map(_._1).safeClose() | ||
} | ||
} | ||
} | ||
} | ||
|
||
test("GPU range partition with retry") { | ||
TestUtils.withGpuSparkSession(new SparkConf()) { _ => | ||
// Initialize range bounds | ||
val fieldTypes: Array[DataType] = Array(IntegerType) | ||
val bounds = new SpecificInternalRow(fieldTypes) | ||
bounds.setInt(0, 3) | ||
// Initialize GPU sorter | ||
val ref = GpuBoundReference(0, IntegerType, nullable = true)(ExprId(0), "a") | ||
val sortOrder = SortOrder(ref, Ascending) | ||
val attrs = AttributeReference(ref.name, ref.dataType, ref.nullable)() | ||
val gpuSorter = new GpuSorter(Seq(sortOrder), Array(attrs)) | ||
|
||
val rp = GpuRangePartitioner(Array.apply(bounds), gpuSorter) | ||
// batch will be closed within columnarEvalAny | ||
val batch = buildBatch | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same nit here about how long it lives for. Very minor, but a nice to have. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. |
||
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1) | ||
var ret: Array[(ColumnarBatch, Int)] = null | ||
try { | ||
ret = rp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]] | ||
assert(ret.length === 2) | ||
} finally { | ||
if (ret != null) { | ||
ret.map(_._1).safeClose() | ||
} | ||
} | ||
} | ||
} | ||
|
||
test("GPU round robin partition with retry using multiple partition") { | ||
testRoundRobinPartitioner(4) | ||
} | ||
|
||
test("GPU round robin partitioner with retry using 1 partition") { | ||
testRoundRobinPartitioner(1) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we have a hard coded seed here instead? just so that we get deterministic results for unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it could be helpful when we need to validate result. We can touch this part when we have such needs.