diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 49ed06f848a..ddee475247d 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -24,7 +24,6 @@ import ai.rapids.cudf.Table; import org.apache.spark.sql.catalyst.expressions.Attribute; -import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.*; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -62,23 +61,7 @@ public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) try { for (int i = 0; i < len; i++) { StructField field = fields[i]; - if (field.dataType() instanceof StringType) { - // If we cannot know the exact size, assume the string is small and allocate - // 8 bytes per row. The buffer of the builder will grow as needed if it is - // too small. - int bufferSize = rows * 8; - if (batch != null) { - ColumnVector cv = batch.column(i); - if (cv instanceof WritableColumnVector) { - WritableColumnVector wcv = (WritableColumnVector) cv; - if (!wcv.hasDictionary()) { - bufferSize = wcv.getArrayOffset(rows - 1) + - wcv.getArrayLength(rows - 1); - } - } - } - builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.BasicType(true, DType.STRING), rows); - } else if (field.dataType() instanceof MapType) { + if (field.dataType() instanceof MapType) { builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.ListType(true, new HostColumnVector.StructType(true, Arrays.asList( new HostColumnVector.BasicType(true, DType.STRING), @@ -127,6 +110,27 @@ public ColumnarBatch build(int rows) { } } + public HostColumnVector[] buildHostColumns() { + HostColumnVector[] vectors = new HostColumnVector[builders.length]; + try { + for (int i = 0; i < builders.length; i++) { + vectors[i] = builders[i].build(); + builders[i] = null; + } + HostColumnVector[] result = vectors; + vectors = null; + return result; + } finally { + if (vectors != null) { + for (HostColumnVector v : vectors) { + if (v != null) { + v.close(); + } + } + } + } + } + @Override public void close() { for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) { @@ -137,7 +141,7 @@ public void close() { } } - private static final DType toRapidsOrNull(DataType type) { + private static DType toRapidsOrNull(DataType type) { if (type instanceof LongType) { return DType.INT64; } else if (type instanceof DoubleType) { @@ -162,16 +166,16 @@ private static final DType toRapidsOrNull(DataType type) { return null; } - public static final boolean isSupportedType(DataType type) { + public static boolean isSupportedType(DataType type) { return toRapidsOrNull(type) != null; } - public static final DType getRapidsType(StructField field) { + public static DType getRapidsType(StructField field) { DataType type = field.dataType(); return getRapidsType(type); } - public static final DType getRapidsType(DataType type) { + public static DType getRapidsType(DataType type) { DType result = toRapidsOrNull(type); if (result == null) { throw new IllegalArgumentException(type + " is not supported for GPU processing yet."); @@ -179,7 +183,7 @@ public static final DType getRapidsType(DataType type) { return result; } - static final DataType getSparkType(DType type) { + static DataType getSparkType(DType type) { switch (type.getTypeId()) { case BOOL8: return DataTypes.BooleanType; @@ -206,7 +210,7 @@ static final DataType getSparkType(DType type) { } } - protected static final DataType getSparkTypeFrom(ColumnViewAccess access) { + protected static DataType getSparkTypeFrom(ColumnViewAccess access) { DType type = access.getDataType(); if (type == DType.LIST) { try (ColumnViewAccess child = access.getChildColumnViewAccess(0)) { @@ -221,32 +225,58 @@ protected static final DataType getSparkTypeFrom(ColumnViewAccess access) * Create an empty batch from the given format. This should be used very sparingly because * returning an empty batch from an operator is almost always the wrong thing to do. */ - public static final ColumnarBatch emptyBatch(StructType schema) { - return new GpuColumnarBatchBuilder(schema, 0, null).build(0); + public static ColumnarBatch emptyBatch(StructType schema) { + try (GpuColumnarBatchBuilder builder = new GpuColumnarBatchBuilder(schema, 0, null)) { + return builder.build(0); + } } /** * Create an empty batch from the given format. This should be used very sparingly because * returning an empty batch from an operator is almost always the wrong thing to do. */ - public static final ColumnarBatch emptyBatch(List format) { - StructType schema = new StructType(); + public static ColumnarBatch emptyBatch(List format) { + return emptyBatch(structFromAttributes(format)); + } + + + /** + * Create empty host column vectors from the given format. This should only be necessary + * when serializing an empty broadcast table. + */ + public static HostColumnVector[] emptyHostColumns(StructType schema) { + try (GpuColumnarBatchBuilder builder = new GpuColumnarBatchBuilder(schema, 0, null)) { + return builder.buildHostColumns(); + } + } + + /** + * Create empty host column vectors from the given format. This should only be necessary + * when serializing an empty broadcast table. + */ + public static HostColumnVector[] emptyHostColumns(List format) { + return emptyHostColumns(structFromAttributes(format)); + } + + private static StructType structFromAttributes(List format) { + StructField[] fields = new StructField[format.size()]; + int i = 0; for (Attribute attribute: format) { - schema = schema.add(new StructField(attribute.name(), + fields[i++] = new StructField( + attribute.name(), attribute.dataType(), attribute.nullable(), - null)); + null); } - return emptyBatch(schema); + return new StructType(fields); } - /** * Convert a spark schema into a cudf schema * @param input the spark schema to convert * @return the cudf schema */ - public static final Schema from(StructType input) { + public static Schema from(StructType input) { Schema.Builder builder = Schema.builder(); input.foreach(f -> builder.column(GpuColumnVector.getRapidsType(f.dataType()), f.name())); return builder.build(); @@ -257,14 +287,14 @@ public static final Schema from(StructType input) { * the columns in the batch, so you will need to close both the batch passed in and the table * returned to avoid any memory leaks. */ - public static final Table from(ColumnarBatch batch) { + public static Table from(ColumnarBatch batch) { return new Table(extractBases(batch)); } /** * Get the data types for a batch. */ - public static final DataType[] extractTypes(ColumnarBatch batch) { + public static DataType[] extractTypes(ColumnarBatch batch) { DataType[] ret = new DataType[batch.numCols()]; for (int i = 0; i < batch.numCols(); i++) { ret[i] = batch.column(i).dataType(); @@ -275,7 +305,7 @@ public static final DataType[] extractTypes(ColumnarBatch batch) { /** * Get the data types for a struct. */ - public static final DataType[] extractTypes(StructType st) { + public static DataType[] extractTypes(StructType st) { DataType[] ret = new DataType[st.size()]; for (int i = 0; i < st.size(); i++) { ret[i] = st.apply(i).dataType(); @@ -290,11 +320,11 @@ public static final DataType[] extractTypes(StructType st) { * @deprecated spark data types must be provided with it. */ @Deprecated - public static final ColumnarBatch from(Table table) { + public static ColumnarBatch from(Table table) { return from(table, 0, table.getNumberOfColumns()); } - public static final ColumnarBatch from(Table table, DataType[] colTypes) { + public static ColumnarBatch from(Table table, DataType[] colTypes) { return from(table, colTypes, 0, table.getNumberOfColumns()); } @@ -398,7 +428,7 @@ private static boolean typeConversionAllowed(Table table, DataType[] colTypes) { * @deprecated must use the version that takes spark data types. */ @Deprecated - private static final ColumnarBatch from(Table table, int startColIndex, int untilColIndex) { + private static ColumnarBatch from(Table table, int startColIndex, int untilColIndex) { assert table != null : "Table cannot be null"; int numColumns = untilColIndex - startColIndex; ColumnVector[] columns = new ColumnVector[numColumns]; @@ -439,7 +469,7 @@ private static final ColumnarBatch from(Table table, int startColIndex, int unti * @param untilColIndex until index of the columns. (ie doesn't include that column num) * @return a ColumnarBatch of the vectors from the table */ - public static final ColumnarBatch from(Table table, DataType[] colTypes, int startColIndex, int untilColIndex) { + public static ColumnarBatch from(Table table, DataType[] colTypes, int startColIndex, int untilColIndex) { assert table != null : "Table cannot be null"; assert typeConversionAllowed(table, colTypes) : "Type conversion is not allowed from " + table + " to " + colTypes; @@ -475,15 +505,15 @@ assert typeConversionAllowed(table, colTypes) : "Type conversion is not allowed * are incremented so you need to either close the returned value or the input value, * but not both. */ - public static final GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv) { + public static GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv) { return new GpuColumnVector(getSparkTypeFrom(cudfCv), cudfCv); } - public static final GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv, DataType type) { + public static GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv, DataType type) { return new GpuColumnVector(type, cudfCv); } - public static final GpuColumnVector from(Scalar scalar, int count) { + public static GpuColumnVector from(Scalar scalar, int count) { return from(ai.rapids.cudf.ColumnVector.fromScalar(scalar, count)); } @@ -492,7 +522,7 @@ public static final GpuColumnVector from(Scalar scalar, int count) { * reference counts so if you want to use these columns after the batch is closed * you will need to do that on your own. */ - public static final ai.rapids.cudf.ColumnVector[] extractBases(ColumnarBatch batch) { + public static ai.rapids.cudf.ColumnVector[] extractBases(ColumnarBatch batch) { int numColumns = batch.numCols(); ai.rapids.cudf.ColumnVector[] vectors = new ai.rapids.cudf.ColumnVector[numColumns]; for (int i = 0; i < vectors.length; i++) { @@ -506,7 +536,7 @@ public static final ai.rapids.cudf.ColumnVector[] extractBases(ColumnarBatch bat * reference counts so if you want to use these columns after the batch is closed * you will need to do that on your own. */ - public static final GpuColumnVector[] extractColumns(ColumnarBatch batch) { + public static GpuColumnVector[] extractColumns(ColumnarBatch batch) { int numColumns = batch.numCols(); GpuColumnVector[] vectors = new GpuColumnVector[numColumns]; @@ -517,7 +547,7 @@ public static final GpuColumnVector[] extractColumns(ColumnarBatch batch) { } @Deprecated - public static final GpuColumnVector[] extractColumns(Table table) { + public static GpuColumnVector[] extractColumns(Table table) { try (ColumnarBatch batch = from(table)) { return extractColumns(batch); } @@ -529,7 +559,7 @@ public static final GpuColumnVector[] extractColumns(Table table) { * Take an INT32 column vector and return a host side int array. Don't use this for anything * too large. Note that this ignores validity totally. */ - public static final int[] toIntArray(ai.rapids.cudf.ColumnVector vec) { + public static int[] toIntArray(ai.rapids.cudf.ColumnVector vec) { assert vec.getType() == DType.INT32; int rowCount = (int)vec.getRowCount(); int[] output = new int[rowCount]; @@ -572,7 +602,7 @@ public final int numNulls() { return (int) cudfCv.getNullCount(); } - public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) { + public static long getTotalDeviceMemoryUsed(ColumnarBatch batch) { long sum = 0; if (batch.numCols() > 0) { if (batch.column(0) instanceof GpuCompressedColumnVector) { @@ -587,7 +617,7 @@ public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) { return sum; } - public static final long getTotalDeviceMemoryUsed(GpuColumnVector[] cv) { + public static long getTotalDeviceMemoryUsed(GpuColumnVector[] cv) { long sum = 0; for (int i = 0; i < cv.length; i++){ sum += cv[i].getBase().getDeviceMemorySize(); @@ -595,7 +625,7 @@ public static final long getTotalDeviceMemoryUsed(GpuColumnVector[] cv) { return sum; } - public static final long getTotalDeviceMemoryUsed(Table tb) { + public static long getTotalDeviceMemoryUsed(Table tb) { long sum = 0; int len = tb.getNumberOfColumns(); for (int i = 0; i < len; i++) { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index c1efff7abc9..25eb713e89e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} class SerializeConcatHostBuffersDeserializeBatch( private val data: Array[SerializeBatchDeserializeHostBuffer], private val output: Seq[Attribute]) - extends Serializable with AutoCloseable { + extends Serializable with Arm with AutoCloseable { @transient private val headers = data.map(_.header) @transient private val buffers = data.map(_.buffer) @transient private var batchInternal: ColumnarBatch = null @@ -68,11 +68,8 @@ class SerializeConcatHostBuffersDeserializeBatch( if (headers.length == 0) { import scala.collection.JavaConverters._ // We didn't get any data back, but we need to write out an empty table that matches - val empty = GpuColumnVector.emptyBatch(output.asJava) - try { - JCudfSerialization.writeToStream(GpuColumnVector.extractBases(empty), out, 0, 0) - } finally { - empty.close() + withResource(GpuColumnVector.emptyHostColumns(output.asJava)) { hostVectors => + JCudfSerialization.writeToStream(hostVectors, out, 0, 0) } } else if (headers.head.getNumColumns == 0) { JCudfSerialization.writeRowsToStream(out, numRows)