Skip to content

Commit

Permalink
Fix empty table broadcast requiring a GPU on driver node (NVIDIA#1057)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Nov 4, 2020
1 parent 49507e3 commit 111575f
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 56 deletions.
130 changes: 80 additions & 50 deletions sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import ai.rapids.cudf.Table;

import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
Expand Down Expand Up @@ -62,23 +61,7 @@ public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch)
try {
for (int i = 0; i < len; i++) {
StructField field = fields[i];
if (field.dataType() instanceof StringType) {
// If we cannot know the exact size, assume the string is small and allocate
// 8 bytes per row. The buffer of the builder will grow as needed if it is
// too small.
int bufferSize = rows * 8;
if (batch != null) {
ColumnVector cv = batch.column(i);
if (cv instanceof WritableColumnVector) {
WritableColumnVector wcv = (WritableColumnVector) cv;
if (!wcv.hasDictionary()) {
bufferSize = wcv.getArrayOffset(rows - 1) +
wcv.getArrayLength(rows - 1);
}
}
}
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.BasicType(true, DType.STRING), rows);
} else if (field.dataType() instanceof MapType) {
if (field.dataType() instanceof MapType) {
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.ListType(true,
new HostColumnVector.StructType(true, Arrays.asList(
new HostColumnVector.BasicType(true, DType.STRING),
Expand Down Expand Up @@ -127,6 +110,27 @@ public ColumnarBatch build(int rows) {
}
}

public HostColumnVector[] buildHostColumns() {
HostColumnVector[] vectors = new HostColumnVector[builders.length];
try {
for (int i = 0; i < builders.length; i++) {
vectors[i] = builders[i].build();
builders[i] = null;
}
HostColumnVector[] result = vectors;
vectors = null;
return result;
} finally {
if (vectors != null) {
for (HostColumnVector v : vectors) {
if (v != null) {
v.close();
}
}
}
}
}

@Override
public void close() {
for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) {
Expand All @@ -137,7 +141,7 @@ public void close() {
}
}

private static final DType toRapidsOrNull(DataType type) {
private static DType toRapidsOrNull(DataType type) {
if (type instanceof LongType) {
return DType.INT64;
} else if (type instanceof DoubleType) {
Expand All @@ -162,24 +166,24 @@ private static final DType toRapidsOrNull(DataType type) {
return null;
}

public static final boolean isSupportedType(DataType type) {
public static boolean isSupportedType(DataType type) {
return toRapidsOrNull(type) != null;
}

public static final DType getRapidsType(StructField field) {
public static DType getRapidsType(StructField field) {
DataType type = field.dataType();
return getRapidsType(type);
}

public static final DType getRapidsType(DataType type) {
public static DType getRapidsType(DataType type) {
DType result = toRapidsOrNull(type);
if (result == null) {
throw new IllegalArgumentException(type + " is not supported for GPU processing yet.");
}
return result;
}

static final DataType getSparkType(DType type) {
static DataType getSparkType(DType type) {
switch (type.getTypeId()) {
case BOOL8:
return DataTypes.BooleanType;
Expand All @@ -206,7 +210,7 @@ static final DataType getSparkType(DType type) {
}
}

protected static final <T> DataType getSparkTypeFrom(ColumnViewAccess<T> access) {
protected static <T> DataType getSparkTypeFrom(ColumnViewAccess<T> access) {
DType type = access.getDataType();
if (type == DType.LIST) {
try (ColumnViewAccess<T> child = access.getChildColumnViewAccess(0)) {
Expand All @@ -221,32 +225,58 @@ protected static final <T> DataType getSparkTypeFrom(ColumnViewAccess<T> access)
* Create an empty batch from the given format. This should be used very sparingly because
* returning an empty batch from an operator is almost always the wrong thing to do.
*/
public static final ColumnarBatch emptyBatch(StructType schema) {
return new GpuColumnarBatchBuilder(schema, 0, null).build(0);
public static ColumnarBatch emptyBatch(StructType schema) {
try (GpuColumnarBatchBuilder builder = new GpuColumnarBatchBuilder(schema, 0, null)) {
return builder.build(0);
}
}

/**
* Create an empty batch from the given format. This should be used very sparingly because
* returning an empty batch from an operator is almost always the wrong thing to do.
*/
public static final ColumnarBatch emptyBatch(List<Attribute> format) {
StructType schema = new StructType();
public static ColumnarBatch emptyBatch(List<Attribute> format) {
return emptyBatch(structFromAttributes(format));
}


/**
* Create empty host column vectors from the given format. This should only be necessary
* when serializing an empty broadcast table.
*/
public static HostColumnVector[] emptyHostColumns(StructType schema) {
try (GpuColumnarBatchBuilder builder = new GpuColumnarBatchBuilder(schema, 0, null)) {
return builder.buildHostColumns();
}
}

/**
* Create empty host column vectors from the given format. This should only be necessary
* when serializing an empty broadcast table.
*/
public static HostColumnVector[] emptyHostColumns(List<Attribute> format) {
return emptyHostColumns(structFromAttributes(format));
}

private static StructType structFromAttributes(List<Attribute> format) {
StructField[] fields = new StructField[format.size()];
int i = 0;
for (Attribute attribute: format) {
schema = schema.add(new StructField(attribute.name(),
fields[i++] = new StructField(
attribute.name(),
attribute.dataType(),
attribute.nullable(),
null));
null);
}
return emptyBatch(schema);
return new StructType(fields);
}


/**
* Convert a spark schema into a cudf schema
* @param input the spark schema to convert
* @return the cudf schema
*/
public static final Schema from(StructType input) {
public static Schema from(StructType input) {
Schema.Builder builder = Schema.builder();
input.foreach(f -> builder.column(GpuColumnVector.getRapidsType(f.dataType()), f.name()));
return builder.build();
Expand All @@ -257,14 +287,14 @@ public static final Schema from(StructType input) {
* the columns in the batch, so you will need to close both the batch passed in and the table
* returned to avoid any memory leaks.
*/
public static final Table from(ColumnarBatch batch) {
public static Table from(ColumnarBatch batch) {
return new Table(extractBases(batch));
}

/**
* Get the data types for a batch.
*/
public static final DataType[] extractTypes(ColumnarBatch batch) {
public static DataType[] extractTypes(ColumnarBatch batch) {
DataType[] ret = new DataType[batch.numCols()];
for (int i = 0; i < batch.numCols(); i++) {
ret[i] = batch.column(i).dataType();
Expand All @@ -275,7 +305,7 @@ public static final DataType[] extractTypes(ColumnarBatch batch) {
/**
* Get the data types for a struct.
*/
public static final DataType[] extractTypes(StructType st) {
public static DataType[] extractTypes(StructType st) {
DataType[] ret = new DataType[st.size()];
for (int i = 0; i < st.size(); i++) {
ret[i] = st.apply(i).dataType();
Expand All @@ -290,11 +320,11 @@ public static final DataType[] extractTypes(StructType st) {
* @deprecated spark data types must be provided with it.
*/
@Deprecated
public static final ColumnarBatch from(Table table) {
public static ColumnarBatch from(Table table) {
return from(table, 0, table.getNumberOfColumns());
}

public static final ColumnarBatch from(Table table, DataType[] colTypes) {
public static ColumnarBatch from(Table table, DataType[] colTypes) {
return from(table, colTypes, 0, table.getNumberOfColumns());
}

Expand Down Expand Up @@ -398,7 +428,7 @@ private static boolean typeConversionAllowed(Table table, DataType[] colTypes) {
* @deprecated must use the version that takes spark data types.
*/
@Deprecated
private static final ColumnarBatch from(Table table, int startColIndex, int untilColIndex) {
private static ColumnarBatch from(Table table, int startColIndex, int untilColIndex) {
assert table != null : "Table cannot be null";
int numColumns = untilColIndex - startColIndex;
ColumnVector[] columns = new ColumnVector[numColumns];
Expand Down Expand Up @@ -439,7 +469,7 @@ private static final ColumnarBatch from(Table table, int startColIndex, int unti
* @param untilColIndex until index of the columns. (ie doesn't include that column num)
* @return a ColumnarBatch of the vectors from the table
*/
public static final ColumnarBatch from(Table table, DataType[] colTypes, int startColIndex, int untilColIndex) {
public static ColumnarBatch from(Table table, DataType[] colTypes, int startColIndex, int untilColIndex) {
assert table != null : "Table cannot be null";
assert typeConversionAllowed(table, colTypes) : "Type conversion is not allowed from " + table +
" to " + colTypes;
Expand Down Expand Up @@ -475,15 +505,15 @@ assert typeConversionAllowed(table, colTypes) : "Type conversion is not allowed
* are incremented so you need to either close the returned value or the input value,
* but not both.
*/
public static final GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv) {
public static GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv) {
return new GpuColumnVector(getSparkTypeFrom(cudfCv), cudfCv);
}

public static final GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv, DataType type) {
public static GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv, DataType type) {
return new GpuColumnVector(type, cudfCv);
}

public static final GpuColumnVector from(Scalar scalar, int count) {
public static GpuColumnVector from(Scalar scalar, int count) {
return from(ai.rapids.cudf.ColumnVector.fromScalar(scalar, count));
}

Expand All @@ -492,7 +522,7 @@ public static final GpuColumnVector from(Scalar scalar, int count) {
* reference counts so if you want to use these columns after the batch is closed
* you will need to do that on your own.
*/
public static final ai.rapids.cudf.ColumnVector[] extractBases(ColumnarBatch batch) {
public static ai.rapids.cudf.ColumnVector[] extractBases(ColumnarBatch batch) {
int numColumns = batch.numCols();
ai.rapids.cudf.ColumnVector[] vectors = new ai.rapids.cudf.ColumnVector[numColumns];
for (int i = 0; i < vectors.length; i++) {
Expand All @@ -506,7 +536,7 @@ public static final ai.rapids.cudf.ColumnVector[] extractBases(ColumnarBatch bat
* reference counts so if you want to use these columns after the batch is closed
* you will need to do that on your own.
*/
public static final GpuColumnVector[] extractColumns(ColumnarBatch batch) {
public static GpuColumnVector[] extractColumns(ColumnarBatch batch) {
int numColumns = batch.numCols();
GpuColumnVector[] vectors = new GpuColumnVector[numColumns];

Expand All @@ -517,7 +547,7 @@ public static final GpuColumnVector[] extractColumns(ColumnarBatch batch) {
}

@Deprecated
public static final GpuColumnVector[] extractColumns(Table table) {
public static GpuColumnVector[] extractColumns(Table table) {
try (ColumnarBatch batch = from(table)) {
return extractColumns(batch);
}
Expand All @@ -529,7 +559,7 @@ public static final GpuColumnVector[] extractColumns(Table table) {
* Take an INT32 column vector and return a host side int array. Don't use this for anything
* too large. Note that this ignores validity totally.
*/
public static final int[] toIntArray(ai.rapids.cudf.ColumnVector vec) {
public static int[] toIntArray(ai.rapids.cudf.ColumnVector vec) {
assert vec.getType() == DType.INT32;
int rowCount = (int)vec.getRowCount();
int[] output = new int[rowCount];
Expand Down Expand Up @@ -572,7 +602,7 @@ public final int numNulls() {
return (int) cudfCv.getNullCount();
}

public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
public static long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
long sum = 0;
if (batch.numCols() > 0) {
if (batch.column(0) instanceof GpuCompressedColumnVector) {
Expand All @@ -587,15 +617,15 @@ public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
return sum;
}

public static final long getTotalDeviceMemoryUsed(GpuColumnVector[] cv) {
public static long getTotalDeviceMemoryUsed(GpuColumnVector[] cv) {
long sum = 0;
for (int i = 0; i < cv.length; i++){
sum += cv[i].getBase().getDeviceMemorySize();
}
return sum;
}

public static final long getTotalDeviceMemoryUsed(Table tb) {
public static long getTotalDeviceMemoryUsed(Table tb) {
long sum = 0;
int len = tb.getNumberOfColumns();
for (int i = 0; i < len; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
class SerializeConcatHostBuffersDeserializeBatch(
private val data: Array[SerializeBatchDeserializeHostBuffer],
private val output: Seq[Attribute])
extends Serializable with AutoCloseable {
extends Serializable with Arm with AutoCloseable {
@transient private val headers = data.map(_.header)
@transient private val buffers = data.map(_.buffer)
@transient private var batchInternal: ColumnarBatch = null
Expand All @@ -68,11 +68,8 @@ class SerializeConcatHostBuffersDeserializeBatch(
if (headers.length == 0) {
import scala.collection.JavaConverters._
// We didn't get any data back, but we need to write out an empty table that matches
val empty = GpuColumnVector.emptyBatch(output.asJava)
try {
JCudfSerialization.writeToStream(GpuColumnVector.extractBases(empty), out, 0, 0)
} finally {
empty.close()
withResource(GpuColumnVector.emptyHostColumns(output.asJava)) { hostVectors =>
JCudfSerialization.writeToStream(hostVectors, out, 0, 0)
}
} else if (headers.head.getNumColumns == 0) {
JCudfSerialization.writeRowsToStream(out, numRows)
Expand Down

0 comments on commit 111575f

Please sign in to comment.