Skip to content

Commit

Permalink
Add retry support to RowToColumnarIterator (#9066)
Browse files Browse the repository at this point in the history
This change adds a new API named tryBuild in the GpuColumnarBatchBuilder class to
 support the retry mechanism.

Builders can build the columns only once. So this change holds the host columns built from
 the builders and reuses the host columns to build the columnar batch as many times as it needs.

---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored Aug 21, 2023
1 parent a889054 commit 5cafe66
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.function.Function;

/**
* A GPU accelerated version of the Spark ColumnVector.
Expand Down Expand Up @@ -134,16 +135,18 @@ public static abstract class GpuColumnarBatchBuilderBase implements AutoCloseabl

public abstract void copyColumnar(ColumnVector cv, int colNum, int rows);

protected abstract ColumnVector buildAndPutOnDevice(int builderIndex);
protected abstract int buildersLength();
protected abstract ai.rapids.cudf.ColumnVector buildAndPutOnDevice(int builderIndex);

public ColumnarBatch build(int rows) {
int buildersLen = buildersLength();
ColumnVector[] vectors = new ColumnVector[buildersLen];
return build(rows, this::buildAndPutOnDevice);
}

protected ColumnarBatch build(int rows, Function<Integer, ai.rapids.cudf.ColumnVector> col) {
ColumnVector[] vectors = new ColumnVector[fields.length];
boolean success = false;
try {
for (int i = 0; i < buildersLen; i++) {
vectors[i] = buildAndPutOnDevice(i);
for (int i = 0; i < fields.length; i++) {
vectors[i] = new GpuColumnVector(fields[i].dataType(), col.apply(i));
}
ColumnarBatch ret = new ColumnarBatch(vectors, rows);
success = true;
Expand Down Expand Up @@ -191,17 +194,11 @@ public GpuArrowColumnarBatchBuilder(StructType schema) {
}

@Override
protected int buildersLength() {
return builders.length;
}

@Override
protected ColumnVector buildAndPutOnDevice(int builderIndex) {
protected ai.rapids.cudf.ColumnVector buildAndPutOnDevice(int builderIndex) {
ai.rapids.cudf.ColumnVector cv = builders[builderIndex].buildAndPutOnDevice();
GpuColumnVector gcv = new GpuColumnVector(fields[builderIndex].dataType(), cv);
referenceHolders[builderIndex].releaseReferences();
builders[builderIndex] = null;
return gcv;
return cv;
}

@Override
Expand Down Expand Up @@ -230,6 +227,7 @@ public void close() {

public static final class GpuColumnarBatchBuilder extends GpuColumnarBatchBuilderBase {
private final ai.rapids.cudf.HostColumnVector.ColumnBuilder[] builders;
private ai.rapids.cudf.HostColumnVector[] hostColumns;

/**
* A collection of builders for building up columnar data.
Expand Down Expand Up @@ -270,16 +268,10 @@ public ai.rapids.cudf.HostColumnVector.ColumnBuilder builder(int i) {
}

@Override
protected int buildersLength() {
return builders.length;
}

@Override
protected ColumnVector buildAndPutOnDevice(int builderIndex) {
protected ai.rapids.cudf.ColumnVector buildAndPutOnDevice(int builderIndex) {
ai.rapids.cudf.ColumnVector cv = builders[builderIndex].buildAndPutOnDevice();
GpuColumnVector gcv = new GpuColumnVector(fields[builderIndex].dataType(), cv);
builders[builderIndex] = null;
return gcv;
return cv;
}

public HostColumnVector[] buildHostColumns() {
Expand All @@ -303,11 +295,34 @@ public HostColumnVector[] buildHostColumns() {
}
}

/**
* Build a columnar batch without releasing the holding data on host.
* It is safe to call this multiple times, and data will be released
* after a call to `close`.
*/
public ColumnarBatch tryBuild(int rows) {
if (hostColumns == null) {
hostColumns = buildHostColumns();
}
return build(rows, i -> hostColumns[i].copyToDevice());
}

@Override
public void close() {
for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) {
if (b != null) {
b.close();
try {
for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) {
if (b != null) {
b.close();
}
}
} finally {
if (hostColumns != null) {
for (ai.rapids.cudf.HostColumnVector hcv: hostColumns) {
if (hcv != null) {
hcv.close();
}
}
hostColumns = null;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,10 @@ class RowToColumnarIterator(
.foreach(ctx => GpuSemaphore.acquireIfNecessary(ctx))

val ret = withResource(new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN,
opTime)) { _ =>
builders.build(rowCount)
opTime)) { _ =>
RmmRapidsRetryIterator.withRetryNoSplit[ColumnarBatch] {
builders.tryBuild(rowCount)
}
}
numInputRows += rowCount
numOutputRows += rowCount
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import com.nvidia.spark.rapids.jni.{RmmSpark, SplitAndRetryOOM}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._

class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
private val schema = StructType(Seq(StructField("a", IntegerType)))

test("test simple OOM retry") {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
Arm.withResource(row2ColIter.next()) { batch =>
assertResult(10)(batch.numRows())
}
}

test("test simple OOM split and retry") {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
assertThrows[SplitAndRetryOOM] {
row2ColIter.next()
}
}
}

0 comments on commit 5cafe66

Please sign in to comment.