diff --git a/sql-plugin/src/main/format/ShuffleMetadataResponse.fbs b/sql-plugin/src/main/format/ShuffleMetadataResponse.fbs index dfeb38e5304..9bb346ac7f4 100644 --- a/sql-plugin/src/main/format/ShuffleMetadataResponse.fbs +++ b/sql-plugin/src/main/format/ShuffleMetadataResponse.fbs @@ -1,4 +1,4 @@ -// Copyright (c) 2019-2020, NVIDIA CORPORATION. +// Copyright (c) 2019-2021, NVIDIA CORPORATION. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,46 +16,16 @@ include "ShuffleCommon.fbs"; namespace com.nvidia.spark.rapids.format; -/// Metadata about cuDF Columns -table ColumnMeta { - /// number of nulls in the column or -1 if unknown - null_count: long; - - /// number of rows in the column - row_count: long; - - /// offset of the column's data buffer - data_offset: long; - - /// length of the column's data buffer - data_length: long; - - /// offset of the column's validity buffer - validity_offset: long; - - /// offset of the column's offsets buffer - offsets_offset: long; - - /// child column metadata - children: [ColumnMeta]; - - /// ordinal of DType enum - dtype_id: int; - - /// DType scale for decimal types - dtype_scale: int; -} - /// Metadata about cuDF tables table TableMeta { - /// metadata about the table buffer + /// metadata about the data encoding buffer_meta: BufferMeta; - /// metadata for each column in the table buffer - column_metas: [ColumnMeta]; - /// number of rows in the table row_count: long; + + /// opaque metadata describing the packed table schema and data layout + packed_meta: [byte]; } /// Flat buffer for Rapids UCX Shuffle Metadata Response diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index b181c9d6e30..009c5da6c1a 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -702,9 +702,9 @@ public final int numNulls() { public static long getTotalDeviceMemoryUsed(ColumnarBatch batch) { long sum = 0; if (batch.numCols() > 0) { - if (batch.column(0) instanceof GpuCompressedColumnVector) { - GpuCompressedColumnVector gccv = (GpuCompressedColumnVector) batch.column(0); - sum += gccv.getBuffer().getLength(); + if (batch.column(0) instanceof WithTableBuffer) { + WithTableBuffer wtb = (WithTableBuffer) batch.column(0); + sum += wtb.getTableBuffer().getLength(); } else { for (int i = 0; i < batch.numCols(); i++) { sum += ((GpuColumnVector) batch.column(i)).getBase().getDeviceMemorySize(); diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorFromBuffer.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorFromBuffer.java index b5911c71505..e23fa76c9f3 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorFromBuffer.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorFromBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import ai.rapids.cudf.ContiguousTable; import ai.rapids.cudf.DeviceMemoryBuffer; import ai.rapids.cudf.Table; +import com.nvidia.spark.rapids.format.TableMeta; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -28,6 +29,7 @@ /** GPU column vector carved from a single buffer, like those from cudf's contiguousSplit. */ public final class GpuColumnVectorFromBuffer extends GpuColumnVector { private final DeviceMemoryBuffer buffer; + private final TableMeta tableMeta; /** * Get a ColumnarBatch from a set of columns in a contiguous table. This differs from the @@ -43,7 +45,8 @@ public final class GpuColumnVectorFromBuffer extends GpuColumnVector { public static ColumnarBatch from(ContiguousTable contigTable, DataType[] colTypes) { DeviceMemoryBuffer buffer = contigTable.getBuffer(); Table table = contigTable.getTable(); - return from(table, buffer, colTypes); + TableMeta meta = MetaUtils.buildTableMeta(0, contigTable); + return from(table, buffer, meta, colTypes); } /** @@ -55,10 +58,12 @@ public static ColumnarBatch from(ContiguousTable contigTable, DataType[] colType * * @param table a table with columns at offsets of `buffer` * @param buffer a device buffer that packs data for columns in `table` + * @param meta metadata describing the table layout and schema * @param colTypes the types the columns should have. * @return batch of GpuColumnVectorFromBuffer instances derived from the table and buffer */ - public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataType[] colTypes) { + public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, + TableMeta meta, DataType[] colTypes) { assert table != null : "Table cannot be null"; assert GpuColumnVector.typeConversionAllowed(table, colTypes) : "Type conversion is not allowed from " + table + " to " + Arrays.toString(colTypes); @@ -72,7 +77,7 @@ public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataTyp for (int i = 0; i < numColumns; ++i) { ColumnVector v = table.getColumn(i); DataType type = colTypes[i]; - columns[i] = new GpuColumnVectorFromBuffer(type, v.incRefCount(), buffer); + columns[i] = new GpuColumnVectorFromBuffer(type, v.incRefCount(), buffer, meta); } return new ColumnarBatch(columns, (int) rows); } catch (Exception e) { @@ -99,11 +104,13 @@ public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataTyp * @param type the spark data type for this column * @param cudfColumn a ColumnVector instance * @param buffer the buffer to hold + * @param meta the metadata describing the buffer layout */ public GpuColumnVectorFromBuffer(DataType type, ColumnVector cudfColumn, - DeviceMemoryBuffer buffer) { + DeviceMemoryBuffer buffer, TableMeta meta) { super(type, cudfColumn); this.buffer = buffer; + this.tableMeta = meta; } /** @@ -114,4 +121,13 @@ public GpuColumnVectorFromBuffer(DataType type, ColumnVector cudfColumn, public DeviceMemoryBuffer getBuffer() { return buffer; } + + /** + * Get the metadata describing the data layout in the buffer, + * shared between columns of the original `ContiguousTable` + * @return opaque metadata + */ + public TableMeta getTableMeta() { + return tableMeta; + } } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java index 174986de992..cd34f35ecab 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,25 +16,22 @@ package com.nvidia.spark.rapids; -import ai.rapids.cudf.DType; import ai.rapids.cudf.DeviceMemoryBuffer; -import com.nvidia.spark.rapids.format.ColumnMeta; import com.nvidia.spark.rapids.format.TableMeta; -import org.apache.spark.sql.types.ArrayType; -import org.apache.spark.sql.types.BinaryType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.MapType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; +import static org.apache.spark.sql.types.DataTypes.NullType; + /** - * A GPU column vector that has been compressed. The columnar data within cannot - * be accessed directly. This class primarily serves the role of tracking the - * compressed data and table metadata so it can be decompressed later. + * A column vector that tracks a compressed table. Unlike a normal GPU column vector, the + * columnar data within cannot be accessed directly. This class primarily serves the role + * of tracking the compressed data and table metadata so it can be decompressed later. */ -public final class GpuCompressedColumnVector extends GpuColumnVectorBase { +public final class GpuCompressedColumnVector extends GpuColumnVectorBase + implements WithTableBuffer { + private static final String BAD_ACCESS_MSG = "Column is compressed"; + private final DeviceMemoryBuffer buffer; private final TableMeta tableMeta; @@ -42,125 +39,39 @@ public final class GpuCompressedColumnVector extends GpuColumnVectorBase { * Build a columnar batch from a compressed table. * NOTE: The data remains compressed and cannot be accessed directly from the columnar batch. */ - public static ColumnarBatch from(CompressedTable compressedTable, DataType[] colTypes) { - return from(compressedTable.buffer(), compressedTable.meta(), colTypes); + public static ColumnarBatch from(CompressedTable compressedTable) { + return from(compressedTable.buffer(), compressedTable.meta()); } public static boolean isBatchCompressed(ColumnarBatch batch) { - if (batch.numCols() == 0) { - return false; - } else { - return batch.column(0) instanceof GpuCompressedColumnVector; - } - } - - /** - * This should only ever be called from an assertion. - */ - private static boolean typeConversionAllowed(ColumnMeta columnMeta, DataType colType) { - DType dt = DType.fromNative(columnMeta.dtypeId(), columnMeta.dtypeScale()); - if (!dt.isNestedType()) { - return GpuColumnVector.getNonNestedRapidsType(colType).equals(dt); - } - if (colType instanceof MapType) { - MapType mType = (MapType) colType; - // list of struct of key/value - if (!(dt.equals(DType.LIST))) { - return false; - } - ColumnMeta structCm = columnMeta.children(0); - if (structCm.dtypeId() != DType.STRUCT.getTypeId().getNativeId()) { - return false; - } - if (structCm.childrenLength() != 2) { - return false; - } - ColumnMeta keyCm = structCm.children(0); - if (!typeConversionAllowed(keyCm, mType.keyType())) { - return false; - } - ColumnMeta valCm = structCm.children(1); - return typeConversionAllowed(valCm, mType.valueType()); - } else if (colType instanceof ArrayType) { - if (!(dt.equals(DType.LIST))) { - return false; - } - ColumnMeta tmp = columnMeta.children(0); - return typeConversionAllowed(tmp, ((ArrayType) colType).elementType()); - } else if (colType instanceof StructType) { - if (!(dt.equals(DType.STRUCT))) { - return false; - } - StructType st = (StructType) colType; - final int numChildren = columnMeta.childrenLength(); - if (numChildren != st.size()) { - return false; - } - for (int childIndex = 0; childIndex < numChildren; childIndex++) { - ColumnMeta tmp = columnMeta.children(childIndex); - StructField entry = ((StructType) colType).apply(childIndex); - if (!typeConversionAllowed(tmp, entry.dataType())) { - return false; - } - } - return true; - } else if (colType instanceof BinaryType) { - if (!(dt.equals(DType.LIST))) { - return false; - } - ColumnMeta tmp = columnMeta.children(0); - return tmp.dtypeId() == DType.INT8.getTypeId().getNativeId() || - tmp.dtypeId() == DType.UINT8.getTypeId().getNativeId(); - } else { - // Unexpected type - return false; - } + return batch.numCols() == 1 && batch.column(0) instanceof GpuCompressedColumnVector; } /** * Build a columnar batch from a compressed data buffer and specified table metadata * NOTE: The data remains compressed and cannot be accessed directly from the columnar batch. */ - public static ColumnarBatch from(DeviceMemoryBuffer compressedBuffer, - TableMeta tableMeta, - DataType[] colTypes) { + public static ColumnarBatch from(DeviceMemoryBuffer compressedBuffer, TableMeta tableMeta) { long rows = tableMeta.rowCount(); - if (rows != (int) rows) { + int batchRows = (int) rows; + if (rows != batchRows) { throw new IllegalStateException("Cannot support a batch larger that MAX INT rows"); } - ColumnMeta columnMeta = new ColumnMeta(); - int numColumns = tableMeta.columnMetasLength(); - assert numColumns == colTypes.length : "Size mismatch on types"; - ColumnVector[] columns = new ColumnVector[numColumns]; - try { - for (int i = 0; i < numColumns; ++i) { - tableMeta.columnMetas(columnMeta, i); - DataType type = colTypes[i]; - assert typeConversionAllowed(columnMeta, type) : "Type conversion is not allowed from " + - columnMeta + " to " + type + " at index " + i; - compressedBuffer.incRefCount(); - columns[i] = new GpuCompressedColumnVector(type, compressedBuffer, tableMeta); - } - } catch (Throwable t) { - for (int i = 0; i < numColumns; ++i) { - if (columns[i] != null) { - columns[i].close(); - } - } - throw t; - } - - return new ColumnarBatch(columns, (int) rows); + ColumnVector column = new GpuCompressedColumnVector(compressedBuffer, tableMeta); + return new ColumnarBatch(new ColumnVector[] { column }, batchRows); } - private GpuCompressedColumnVector(DataType type, DeviceMemoryBuffer buffer, TableMeta tableMeta) { - super(type); + private GpuCompressedColumnVector(DeviceMemoryBuffer buffer, TableMeta tableMeta) { + super(NullType); this.buffer = buffer; this.tableMeta = tableMeta; + // reference the buffer so it remains valid for the duration of this column + this.buffer.incRefCount(); } - public DeviceMemoryBuffer getBuffer() { + @Override + public DeviceMemoryBuffer getTableBuffer() { return buffer; } @@ -175,11 +86,11 @@ public void close() { @Override public boolean hasNull() { - throw new IllegalStateException("column vector is compressed"); + throw new IllegalStateException(BAD_ACCESS_MSG); } @Override public int numNulls() { - throw new IllegalStateException("column vector is compressed"); + throw new IllegalStateException(BAD_ACCESS_MSG); } } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuPackedTableColumn.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuPackedTableColumn.java new file mode 100644 index 00000000000..7c0b1a5a517 --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuPackedTableColumn.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import ai.rapids.cudf.ContiguousTable; +import ai.rapids.cudf.DeviceMemoryBuffer; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +import static org.apache.spark.sql.types.DataTypes.NullType; + +/** + * A GPU column tracking a packed table such as one generated by contiguous split. + * Unlike {@link GpuColumnVectorFromBuffer}, the columnar data cannot be accessed directly. + * + * This class primarily serves the role of tracking the packed table data in a ColumnarBatch + * without requiring the underlying table to be manifested along with all of the child columns. + * The typical use-case generates one of these columns per task output partition, and then the + * RAPIDS shuffle transmits the opaque host metadata and GPU data buffer to another host. + * + * NOTE: There should only be one instance of this column per ColumnarBatch as the + */ +public final class GpuPackedTableColumn extends GpuColumnVectorBase implements WithTableBuffer { + private static final String BAD_ACCESS_MSG = "Column is packed"; + + private final ContiguousTable contigTable; + + public static ColumnarBatch from(ContiguousTable contigTable) { + ColumnVector column = new GpuPackedTableColumn(contigTable); + return new ColumnarBatch(new ColumnVector[] { column }, (int) contigTable.getRowCount()); + } + + /** Returns true if this columnar batch uses a packed table */ + public static boolean isBatchPacked(ColumnarBatch batch) { + return batch.numCols() == 1 && batch.column(0) instanceof GpuPackedTableColumn; + } + + GpuPackedTableColumn(ContiguousTable contigTable) { + super(NullType); + this.contigTable = contigTable; + } + + /** + * Returns the contiguous table underneath this column. + * NOTE: The contiguous table is still owned by this column instance, so the + * resulting contiguous table should NOT be closed by the caller. + */ + public ContiguousTable getContiguousTable() { + return contigTable; + } + + @Override + public DeviceMemoryBuffer getTableBuffer() { + return contigTable.getBuffer(); + } + + @Override + public void close() { + contigTable.close(); + } + + @Override + public boolean hasNull() { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public int numNulls() { + throw new IllegalStateException(BAD_ACCESS_MSG); + } +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/WithTableBuffer.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/WithTableBuffer.java new file mode 100644 index 00000000000..eb1e1db5397 --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/WithTableBuffer.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import ai.rapids.cudf.DeviceMemoryBuffer; + +/** An interface for obtaining the device buffer backing a contiguous/packed table */ +public interface WithTableBuffer { + DeviceMemoryBuffer getTableBuffer(); +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/ColumnMeta.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/ColumnMeta.java deleted file mode 100644 index 43965403e04..00000000000 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/ColumnMeta.java +++ /dev/null @@ -1,107 +0,0 @@ -// automatically generated by the FlatBuffers compiler, do not modify - -package com.nvidia.spark.rapids.format; - -import java.nio.*; -import java.lang.*; -import java.util.*; -import com.google.flatbuffers.*; - -@SuppressWarnings("unused") -/** - * Metadata about cuDF Columns - */ -public final class ColumnMeta extends Table { - public static ColumnMeta getRootAsColumnMeta(ByteBuffer _bb) { return getRootAsColumnMeta(_bb, new ColumnMeta()); } - public static ColumnMeta getRootAsColumnMeta(ByteBuffer _bb, ColumnMeta obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } - public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; vtable_start = bb_pos - bb.getInt(bb_pos); vtable_size = bb.getShort(vtable_start); } - public ColumnMeta __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } - - /** - * number of nulls in the column or -1 if unknown - */ - public long nullCount() { int o = __offset(4); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } - public boolean mutateNullCount(long null_count) { int o = __offset(4); if (o != 0) { bb.putLong(o + bb_pos, null_count); return true; } else { return false; } } - /** - * number of rows in the column - */ - public long rowCount() { int o = __offset(6); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } - public boolean mutateRowCount(long row_count) { int o = __offset(6); if (o != 0) { bb.putLong(o + bb_pos, row_count); return true; } else { return false; } } - /** - * offset of the column's data buffer - */ - public long dataOffset() { int o = __offset(8); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } - public boolean mutateDataOffset(long data_offset) { int o = __offset(8); if (o != 0) { bb.putLong(o + bb_pos, data_offset); return true; } else { return false; } } - /** - * length of the column's data buffer - */ - public long dataLength() { int o = __offset(10); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } - public boolean mutateDataLength(long data_length) { int o = __offset(10); if (o != 0) { bb.putLong(o + bb_pos, data_length); return true; } else { return false; } } - /** - * offset of the column's validity buffer - */ - public long validityOffset() { int o = __offset(12); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } - public boolean mutateValidityOffset(long validity_offset) { int o = __offset(12); if (o != 0) { bb.putLong(o + bb_pos, validity_offset); return true; } else { return false; } } - /** - * offset of the column's offsets buffer - */ - public long offsetsOffset() { int o = __offset(14); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } - public boolean mutateOffsetsOffset(long offsets_offset) { int o = __offset(14); if (o != 0) { bb.putLong(o + bb_pos, offsets_offset); return true; } else { return false; } } - /** - * child column metadata - */ - public ColumnMeta children(int j) { return children(new ColumnMeta(), j); } - public ColumnMeta children(ColumnMeta obj, int j) { int o = __offset(16); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } - public int childrenLength() { int o = __offset(16); return o != 0 ? __vector_len(o) : 0; } - /** - * ordinal of DType enum - */ - public int dtypeId() { int o = __offset(18); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public boolean mutateDtypeId(int dtype_id) { int o = __offset(18); if (o != 0) { bb.putInt(o + bb_pos, dtype_id); return true; } else { return false; } } - /** - * DType scale for decimal types - */ - public int dtypeScale() { int o = __offset(20); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public boolean mutateDtypeScale(int dtype_scale) { int o = __offset(20); if (o != 0) { bb.putInt(o + bb_pos, dtype_scale); return true; } else { return false; } } - - public static int createColumnMeta(FlatBufferBuilder builder, - long null_count, - long row_count, - long data_offset, - long data_length, - long validity_offset, - long offsets_offset, - int childrenOffset, - int dtype_id, - int dtype_scale) { - builder.startObject(9); - ColumnMeta.addOffsetsOffset(builder, offsets_offset); - ColumnMeta.addValidityOffset(builder, validity_offset); - ColumnMeta.addDataLength(builder, data_length); - ColumnMeta.addDataOffset(builder, data_offset); - ColumnMeta.addRowCount(builder, row_count); - ColumnMeta.addNullCount(builder, null_count); - ColumnMeta.addDtypeScale(builder, dtype_scale); - ColumnMeta.addDtypeId(builder, dtype_id); - ColumnMeta.addChildren(builder, childrenOffset); - return ColumnMeta.endColumnMeta(builder); - } - - public static void startColumnMeta(FlatBufferBuilder builder) { builder.startObject(9); } - public static void addNullCount(FlatBufferBuilder builder, long nullCount) { builder.addLong(0, nullCount, 0L); } - public static void addRowCount(FlatBufferBuilder builder, long rowCount) { builder.addLong(1, rowCount, 0L); } - public static void addDataOffset(FlatBufferBuilder builder, long dataOffset) { builder.addLong(2, dataOffset, 0L); } - public static void addDataLength(FlatBufferBuilder builder, long dataLength) { builder.addLong(3, dataLength, 0L); } - public static void addValidityOffset(FlatBufferBuilder builder, long validityOffset) { builder.addLong(4, validityOffset, 0L); } - public static void addOffsetsOffset(FlatBufferBuilder builder, long offsetsOffset) { builder.addLong(5, offsetsOffset, 0L); } - public static void addChildren(FlatBufferBuilder builder, int childrenOffset) { builder.addOffset(6, childrenOffset, 0); } - public static int createChildrenVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startChildrenVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addDtypeId(FlatBufferBuilder builder, int dtypeId) { builder.addInt(7, dtypeId, 0); } - public static void addDtypeScale(FlatBufferBuilder builder, int dtypeScale) { builder.addInt(8, dtypeScale, 0); } - public static int endColumnMeta(FlatBufferBuilder builder) { - int o = builder.endObject(); - return o; - } -} - diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/LICENSE.md b/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/LICENSE.md deleted file mode 100644 index c4957d8ba9b..00000000000 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/LICENSE.md +++ /dev/null @@ -1,13 +0,0 @@ -Copyright (c) 2020, NVIDIA CORPORATION. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/TableMeta.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/TableMeta.java index 91cdd778507..03b37241215 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/TableMeta.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/format/TableMeta.java @@ -18,39 +18,41 @@ public final class TableMeta extends Table { public TableMeta __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } /** - * metadata about the table buffer + * metadata about the data encoding */ public BufferMeta bufferMeta() { return bufferMeta(new BufferMeta()); } public BufferMeta bufferMeta(BufferMeta obj) { int o = __offset(4); return o != 0 ? obj.__assign(__indirect(o + bb_pos), bb) : null; } /** - * metadata for each column in the table buffer + * number of rows in the table */ - public ColumnMeta columnMetas(int j) { return columnMetas(new ColumnMeta(), j); } - public ColumnMeta columnMetas(ColumnMeta obj, int j) { int o = __offset(6); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } - public int columnMetasLength() { int o = __offset(6); return o != 0 ? __vector_len(o) : 0; } + public long rowCount() { int o = __offset(6); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } + public boolean mutateRowCount(long row_count) { int o = __offset(6); if (o != 0) { bb.putLong(o + bb_pos, row_count); return true; } else { return false; } } /** - * number of rows in the table + * opaque metadata describing the packed table schema and data layout */ - public long rowCount() { int o = __offset(8); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } - public boolean mutateRowCount(long row_count) { int o = __offset(8); if (o != 0) { bb.putLong(o + bb_pos, row_count); return true; } else { return false; } } + public byte packedMeta(int j) { int o = __offset(8); return o != 0 ? bb.get(__vector(o) + j * 1) : 0; } + public int packedMetaLength() { int o = __offset(8); return o != 0 ? __vector_len(o) : 0; } + public ByteBuffer packedMetaAsByteBuffer() { return __vector_as_bytebuffer(8, 1); } + public ByteBuffer packedMetaInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); } + public boolean mutatePackedMeta(int j, byte packed_meta) { int o = __offset(8); if (o != 0) { bb.put(__vector(o) + j * 1, packed_meta); return true; } else { return false; } } public static int createTableMeta(FlatBufferBuilder builder, int buffer_metaOffset, - int column_metasOffset, - long row_count) { + long row_count, + int packed_metaOffset) { builder.startObject(3); TableMeta.addRowCount(builder, row_count); - TableMeta.addColumnMetas(builder, column_metasOffset); + TableMeta.addPackedMeta(builder, packed_metaOffset); TableMeta.addBufferMeta(builder, buffer_metaOffset); return TableMeta.endTableMeta(builder); } public static void startTableMeta(FlatBufferBuilder builder) { builder.startObject(3); } public static void addBufferMeta(FlatBufferBuilder builder, int bufferMetaOffset) { builder.addOffset(0, bufferMetaOffset, 0); } - public static void addColumnMetas(FlatBufferBuilder builder, int columnMetasOffset) { builder.addOffset(1, columnMetasOffset, 0); } - public static int createColumnMetasVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } - public static void startColumnMetasVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addRowCount(FlatBufferBuilder builder, long rowCount) { builder.addLong(2, rowCount, 0L); } + public static void addRowCount(FlatBufferBuilder builder, long rowCount) { builder.addLong(1, rowCount, 0L); } + public static void addPackedMeta(FlatBufferBuilder builder, int packedMetaOffset) { builder.addOffset(2, packedMetaOffset, 0); } + public static int createPackedMetaVector(FlatBufferBuilder builder, byte[] data) { builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; i--) builder.addByte(data[i]); return builder.endVector(); } + public static void startPackedMetaVector(FlatBufferBuilder builder, int numElems) { builder.startVector(1, numElems, 1); } public static int endTableMeta(FlatBufferBuilder builder) { int o = builder.endObject(); return o; diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala index 1645f4abea7..7158b82a814 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,8 +34,7 @@ class CopyCompressionCodec extends TableCompressionCodec with Arm { outputBuffer.copyFromDeviceBufferAsync(0, buffer, 0, buffer.getLength, stream) val meta = MetaUtils.buildTableMeta( Some(tableId), - contigTable.getTable, - buffer, + contigTable, codecId, outputBuffer.getLength) stream.sync() @@ -76,20 +75,20 @@ class BatchedCopyCompressor(maxBatchMemory: Long, stream: Cuda.Stream) override protected def compress( tables: Array[ContiguousTable], stream: Cuda.Stream): Array[CompressedTable] = { - tables.safeMap { ct => + val result = tables.safeMap { ct => val inBuffer = ct.getBuffer closeOnExcept(DeviceMemoryBuffer.allocate(inBuffer.getLength)) { outBuffer => outBuffer.copyFromDeviceBufferAsync(0, inBuffer, 0, inBuffer.getLength, stream) val meta = MetaUtils.buildTableMeta( None, - ct.getTable, - inBuffer, + ct, CodecType.COPY, outBuffer.getLength) - stream.sync() CompressedTable(outBuffer.getLength, meta, outBuffer) } } + closeOnExcept(result) { _ => stream.sync() } + result } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index a242dbb41e6..28e32a4cd03 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -235,7 +235,7 @@ abstract class AbstractGpuCoalesceIterator( i => cb.column(i).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize }.sum case g: GpuCompressedColumnVector => - g.getBuffer.getLength + g.getTableBuffer.getLength case g => throw new IllegalStateException(s"Unexpected column type: $g") } @@ -420,7 +420,7 @@ class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch], compressedVecs.foreach { cv => val bufferMeta = cv.getTableMeta.bufferMeta // don't currently support switching codecs when partitioning - val buffer = cv.getBuffer.slice(0, cv.getBuffer.getLength) + val buffer = cv.getTableBuffer.slice(0, cv.getTableBuffer.getLength) decompressor.addBufferToDecompress(buffer, bufferMeta) } withResource(decompressor.finishAsync()) { outputBuffers => @@ -514,7 +514,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], withResource(codec.createBatchDecompressor(maxDecompressBatchMemory, Cuda.DEFAULT_STREAM)) { decompressor => compressedVecs.foreach { cv => - val buffer = cv.getBuffer + val buffer = cv.getTableBuffer val bufferMeta = cv.getTableMeta.bufferMeta // don't currently support switching codecs when partitioning buffer.incRefCount() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index fa2286ddc39..729ec535f2d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,8 +55,9 @@ trait GpuPartitioning extends Partitioning with Arm { case Some(codec) => compressSplits(splits, codec, contiguousTables, dataTypes) case None => - withResource(contiguousTables) { cts => - cts.foreach { ct => splits.append(GpuColumnVectorFromBuffer.from(ct, dataTypes)) } + // GpuPackedTableColumn takes ownership of the contiguous tables + closeOnExcept(contiguousTables) { cts => + cts.foreach { ct => splits.append(GpuPackedTableColumn.from(ct)) } } } splits.toArray @@ -135,10 +136,8 @@ trait GpuPartitioning extends Partitioning with Arm { // add each table either to the batch to be compressed or to the empty batch tracker contiguousTables.zipWithIndex.foreach { case (ct, i) => - if (ct.getTable.getRowCount == 0) { - withResource(ct) { _ => - emptyBatches.append((GpuColumnVector.from(ct.getTable, dataTypes), i)) - } + if (ct.getRowCount == 0) { + emptyBatches.append((GpuPackedTableColumn.from(ct), i)) } else { compressor.addTableToCompress(ct) } @@ -153,7 +152,7 @@ trait GpuPartitioning extends Partitioning with Arm { val numCompressedToAdd = emptyOutputIndex - outputIndex (0 until numCompressedToAdd).foreach { _ => val compressedTable = compressedTables(compressedTableIndex) - outputBatches.append(GpuCompressedColumnVector.from(compressedTable, dataTypes)) + outputBatches.append(GpuCompressedColumnVector.from(compressedTable)) compressedTableIndex += 1 } outputBatches.append(emptyBatch) @@ -163,7 +162,7 @@ trait GpuPartitioning extends Partitioning with Arm { // add any compressed batches that remain after the last empty batch (compressedTableIndex until compressedTables.length).foreach { i => val ct = compressedTables(i) - outputBatches.append(GpuCompressedColumnVector.from(ct, dataTypes)) + outputBatches.append(GpuCompressedColumnVector.from(ct)) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala index d7e5a539827..300fa1bceed 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,57 +40,36 @@ object MetaUtils extends Arm { * Build a TableMeta message from a Table in contiguous memory * * @param tableId the ID to use for this table - * @param table the table whose metadata will be encoded in the message - * @param buffer the contiguous buffer backing the Table + * @param ct the contiguous table whose metadata will be encoded in the message * @return heap-based flatbuffer message */ - def buildTableMeta(tableId: Int, table: Table, buffer: DeviceMemoryBuffer): TableMeta = { - buildTableMeta(tableId, - (0 until table.getNumberOfColumns).map(i => table.getColumn(i)), - table.getRowCount, - buffer) - } - - /** - * Build a TableMeta message - * @param tableId the ID to use for this table - * @param columns the columns in the table - * @param numRows the number of rows in the table - * @param buffer the contiguous buffer backing the columns in the table - * @return heap-based flatbuffer message - */ - def buildTableMeta( - tableId: Int, - columns: Seq[ColumnVector], - numRows: Long, - buffer: DeviceMemoryBuffer): TableMeta = { + def buildTableMeta(tableId: Int, ct: ContiguousTable): TableMeta = { val fbb = new FlatBufferBuilder(1024) - val bufferSize = buffer.getLength + val bufferSize = ct.getBuffer.getLength BufferMeta.startBufferMeta(fbb) BufferMeta.addId(fbb, tableId) BufferMeta.addSize(fbb, bufferSize) BufferMeta.addUncompressedSize(fbb, bufferSize) - val bufferMetaOffset = BufferMeta.endBufferMeta(fbb) - buildTableMeta(fbb, bufferMetaOffset, columns, numRows, buffer.getAddress) + val bufferMetaOffset = Some(BufferMeta.endBufferMeta(fbb)) + buildTableMeta(fbb, bufferMetaOffset, ct.getMetadataDirectBuffer, ct.getRowCount) } /** * Build a TableMeta message from a Table that originated in contiguous memory that has * since been compressed. * @param tableId ID to use for this table - * @param table table whose metadata will be encoded in the message - * @param uncompressedBuffer uncompressed buffer that backs the Table + * @param ct contiguous table representing the uncompressed data * @param codecId identifier of the codec being used, see CodecType * @param compressedSize compressed data from the uncompressed buffer * @return heap-based flatbuffer message */ def buildTableMeta( tableId: Option[Int], - table: Table, - uncompressedBuffer: DeviceMemoryBuffer, + ct: ContiguousTable, codecId: Byte, compressedSize: Long): TableMeta = { val fbb = new FlatBufferBuilder(1024) + val uncompressedBuffer = ct.getBuffer val codecDescrOffset = CodecBufferDescriptor.createCodecBufferDescriptor( fbb, codecId, @@ -105,32 +84,33 @@ object MetaUtils extends Arm { BufferMeta.addSize(fbb, compressedSize) BufferMeta.addUncompressedSize(fbb, uncompressedBuffer.getLength) BufferMeta.addCodecBufferDescrs(fbb, codecDescrArrayOffset) - val bufferMetaOffset = BufferMeta.endBufferMeta(fbb) - val columns = (0 until table.getNumberOfColumns).map(i => table.getColumn(i)) - buildTableMeta(fbb, bufferMetaOffset, columns, table.getRowCount, uncompressedBuffer.getAddress) + val bufferMetaOffset = Some(BufferMeta.endBufferMeta(fbb)) + buildTableMeta(fbb, bufferMetaOffset, ct.getMetadataDirectBuffer, ct.getRowCount) } /** * Build a TableMeta message with a pre-built BufferMeta message * @param fbb flatbuffer builder that has an already built BufferMeta message * @param bufferMetaOffset offset where the BufferMeta message was built - * @param columns the columns in the table + * @param packedMeta opaque metadata needed to unpack the table * @param numRows the number of rows in the table - * @param baseAddress address of uncompressed contiguous buffer holding the table * @return flatbuffer message */ def buildTableMeta( fbb: FlatBufferBuilder, - bufferMetaOffset: Int, - columns: Seq[ColumnVector], - numRows: Long, - baseAddress: Long): TableMeta = { - val columnMetaOffsets = columns.map(col => addColumnMeta(fbb, baseAddress, col)).toArray - val columnMetasOffset = TableMeta.createColumnMetasVector(fbb, columnMetaOffsets) + bufferMetaOffset: Option[Int], + packedMeta: ByteBuffer, + numRows: Long): TableMeta = { + val vectorBuffer = fbb.createUnintializedVector(1, packedMeta.remaining(), 1) + packedMeta.mark() + vectorBuffer.put(packedMeta) + packedMeta.reset() + val packedMetaOffset = fbb.endVector() + TableMeta.startTableMeta(fbb) - TableMeta.addBufferMeta(fbb, bufferMetaOffset) + bufferMetaOffset.foreach { bmo => TableMeta.addBufferMeta(fbb, bmo) } + TableMeta.addPackedMeta(fbb, packedMetaOffset) TableMeta.addRowCount(fbb, numRows) - TableMeta.addColumnMetas(fbb, columnMetasOffset) fbb.finish(TableMeta.endTableMeta(fbb)) // copy the message to trim the backing array to only what is needed TableMeta.getRootAsTableMeta(ByteBuffer.wrap(fbb.sizedByteArray())) @@ -138,83 +118,32 @@ object MetaUtils extends Arm { /** * Build a TableMeta message for a degenerate table (zero columns or rows) - * @param batch the degenerate columnar batch + * @param batch the degenerate columnar batch which must be compressed or packed * @return heap-based flatbuffer message */ def buildDegenerateTableMeta(batch: ColumnarBatch): TableMeta = { require(batch.numRows == 0 || batch.numCols == 0, "batch not degenerate") - if (batch.numCols > 0) { - // Batched compression can result in degenerate batches appearing compressed. In that case - // the table metadata has already been computed and can be returned directly. + if (batch.numCols() == 0) { + val fbb = new FlatBufferBuilder(1024) + TableMeta.startTableMeta(fbb) + TableMeta.addRowCount(fbb, batch.numRows) + fbb.finish(TableMeta.endTableMeta(fbb)) + // copy the message to trim the backing array to only what is needed + TableMeta.getRootAsTableMeta(ByteBuffer.wrap(fbb.sizedByteArray())) + } else { batch.column(0) match { case c: GpuCompressedColumnVector => - return c.getTableMeta + // Batched compression can result in degenerate batches appearing compressed. In that case + // the table metadata has already been computed and can be returned directly. + c.getTableMeta + case c: GpuPackedTableColumn => + val contigTable = c.getContiguousTable + val fbb = new FlatBufferBuilder(1024) + buildTableMeta(fbb, None, contigTable.getMetadataDirectBuffer, contigTable.getRowCount) case _ => + throw new IllegalStateException("batch must be compressed or packed") } } - val fbb = new FlatBufferBuilder(1024) - val columnMetaOffset = if (batch.numCols > 0) { - val columns = GpuColumnVector.extractBases(batch) - val columnMetaOffsets = new ArrayBuffer[Int](batch.numCols) - for (i <- 0 until batch.numCols) { - ColumnMeta.startColumnMeta(fbb) - ColumnMeta.addNullCount(fbb, 0) - ColumnMeta.addRowCount(fbb, batch.numRows) - val columnType = columns(i).getType - ColumnMeta.addDtypeId(fbb, columnType.getTypeId.getNativeId) - ColumnMeta.addDtypeScale(fbb, columnType.getScale) - columnMetaOffsets.append(ColumnMeta.endColumnMeta(fbb)) - } - Some(TableMeta.createColumnMetasVector(fbb, columnMetaOffsets.toArray)) - } else { - None - } - - TableMeta.startTableMeta(fbb) - TableMeta.addRowCount(fbb, batch.numRows) - columnMetaOffset.foreach(c => TableMeta.addColumnMetas(fbb, c)) - fbb.finish(TableMeta.endTableMeta(fbb)) - // copy the message to trim the backing array to only what is needed - TableMeta.getRootAsTableMeta(ByteBuffer.wrap(fbb.sizedByteArray())) - } - - /** Add a ColumnMeta, returning the buffer offset where it was added */ - private def addColumnMeta( - fbb: FlatBufferBuilder, - baseAddress: Long, - column: ColumnView): Int = { - val columnType = column.getType - val childVectorOffset = if (columnType.isNestedType) { - val childMetaOffsets = (0 until column.getNumChildren).map { i => - withResource(column.getChildColumnView(i)) { childView => - addColumnMeta(fbb, baseAddress, childView) - } - } - Some(ColumnMeta.createChildrenVector(fbb, childMetaOffsets.toArray)) - } else { - None - } - - ColumnMeta.startColumnMeta(fbb) - ColumnMeta.addNullCount(fbb, column.getNullCount) - ColumnMeta.addRowCount(fbb, column.getRowCount) - val data = column.getData - if (data != null) { - ColumnMeta.addDataOffset(fbb, data.getAddress - baseAddress) - ColumnMeta.addDataLength(fbb, data.getLength) - } - val validity = column.getValid - if (validity != null) { - ColumnMeta.addValidityOffset(fbb, validity.getAddress - baseAddress) - } - val offsets = column.getOffsets - if (offsets != null) { - ColumnMeta.addOffsetsOffset(fbb, offsets.getAddress - baseAddress) - } - ColumnMeta.addDtypeId(fbb, columnType.getTypeId.getNativeId) - ColumnMeta.addDtypeScale(fbb, columnType.getScale) - childVectorOffset.foreach(x => ColumnMeta.addChildren(fbb, x)) - ColumnMeta.endColumnMeta(fbb) } /** @@ -225,13 +154,9 @@ object MetaUtils extends Arm { * @return table that must be closed by the caller */ def getTableFromMeta(deviceBuffer: DeviceMemoryBuffer, meta: TableMeta): Table = { - withResource(new Array[ColumnVector](meta.columnMetasLength())) { columns => - val columnMeta = new ColumnMeta - (0 until meta.columnMetasLength).foreach { i => - columns(i) = makeCudfColumn(deviceBuffer, meta.columnMetas(columnMeta, i)) - } - new Table(columns :_*) - } + val packedMeta = meta.packedMetaAsByteBuffer + require(packedMeta != null, "Missing packed table metadata") + Table.fromPackedTable(packedMeta, deviceBuffer) } /** @@ -242,87 +167,10 @@ object MetaUtils extends Arm { * @param sparkTypes the spark types that the `ColumnarBatch` should have. * @return columnar batch that must be closed by the caller */ - def getBatchFromMeta(deviceBuffer: DeviceMemoryBuffer, - meta: TableMeta, + def getBatchFromMeta(deviceBuffer: DeviceMemoryBuffer, meta: TableMeta, sparkTypes: Array[DataType]): ColumnarBatch = { - closeOnExcept(new ArrayBuffer[GpuColumnVector](meta.columnMetasLength())) { columns => - val columnMeta = new ColumnMeta - (0 until meta.columnMetasLength).foreach { i => - columns.append(makeColumn(deviceBuffer, meta.columnMetas(columnMeta, i), sparkTypes(i))) - } - new ColumnarBatch(columns.toArray, meta.rowCount.toInt) - } - } - - private def makeColumn( - buffer: DeviceMemoryBuffer, - meta: ColumnMeta, - sparkType: DataType): GpuColumnVector = { - val columnView = makeCudfColumnView(buffer, meta) - val column = ColumnViewUtil.fromViewWithContiguousAllocation(columnView, buffer) - new GpuColumnVectorFromBuffer(sparkType, column, buffer) - } - - private def makeCudfColumn(buffer: DeviceMemoryBuffer, meta: ColumnMeta): ColumnVector = { - val columnView = makeCudfColumnView(buffer, meta) - ColumnViewUtil.fromViewWithContiguousAllocation(columnView, buffer) - } - - private def makeCudfColumnView( - buffer: DeviceMemoryBuffer, - meta: ColumnMeta): Long = { - val numChildren = meta.childrenLength() - val childViews = if (numChildren > 0) new Array[Long](numChildren) else null - try { - if (childViews != null) { - val columnMetaObj = new ColumnMeta - childViews.indices.foreach { i => - val childMeta = meta.children(columnMetaObj, i) - childViews(i) = makeCudfColumnView(buffer, childMeta) - } - } - - val dtype = DType.fromNative(meta.dtypeId(), meta.dtypeScale()) - val rowCount = meta.rowCount().toInt - val nullCount = meta.nullCount().toInt - val baseAddress = buffer.getAddress - val dataLength = meta.dataLength() - val dataAddress = if (dtype.isNestedType) { - 0L - } else { - baseAddress + meta.dataOffset() - } - val validityAddress = if (nullCount > 0) { - baseAddress + meta.validityOffset() - } else { - 0L - } - val offsetsAddress = if (dtype.hasOffsets) { - baseAddress + meta.offsetsOffset() - } else { - 0L - } - ColumnViewUtil.makeCudfColumnView( - dtype.getTypeId.getNativeId, - dtype.getScale, - dataAddress, - dataLength, - offsetsAddress, - validityAddress, - nullCount, - rowCount, - childViews) - } catch { - case t: Throwable => - if (childViews != null) { - try { - childViews.foreach(ColumnViewUtil.deleteColumnView) - } catch { - case e: Throwable => - t.addSuppressed(e) - } - } - throw t + withResource(getTableFromMeta(deviceBuffer, meta)) { table => + GpuColumnVectorFromBuffer.from(table, deviceBuffer, meta, sparkTypes) } } } @@ -361,32 +209,6 @@ object ShuffleMetadata extends Logging{ BufferMeta.endBufferMeta(fbb) } - private def copyColumnMeta(fbb: FlatBufferBuilder, col: ColumnMeta): Int = { - val numChildren = col.childrenLength() - val childMetaVectorOffset = if (numChildren > 0) { - val columnMetaObj = new ColumnMeta - val childMetaOffsets = (0 until numChildren).map { i => - val childMeta = col.children(columnMetaObj, i) - copyColumnMeta(fbb, childMeta) - } - Some(ColumnMeta.createChildrenVector(fbb, childMetaOffsets.toArray)) - } else { - None - } - - ColumnMeta.startColumnMeta(fbb) - ColumnMeta.addNullCount(fbb, col.nullCount()) - ColumnMeta.addRowCount(fbb, col.rowCount()) - ColumnMeta.addDataOffset(fbb, col.dataOffset()) - ColumnMeta.addDataLength(fbb, col.dataLength()) - ColumnMeta.addValidityOffset(fbb, col.validityOffset()) - ColumnMeta.addOffsetsOffset(fbb, col.offsetsOffset()) - childMetaVectorOffset.foreach(x => ColumnMeta.addChildren(fbb, x)) - ColumnMeta.addDtypeId(fbb, col.dtypeId()) - ColumnMeta.addDtypeScale(fbb, col.dtypeScale()) - ColumnMeta.endColumnMeta(fbb) - } - /** * Given a sequence of `TableMeta`, re-lay the metas using the flat buffer builder in `fbb`. * @param fbb builder to use @@ -402,21 +224,18 @@ object ShuffleMetadata extends Logging{ None } - val columnMetaObj = new ColumnMeta - val columnMetaOffsets = (0 until tableMeta.columnMetasLength()).map { i => - val columnMeta = tableMeta.columnMetas(columnMetaObj, i) - copyColumnMeta(fbb, columnMeta) - } - - val columnMetaOffset = if (columnMetaOffsets.nonEmpty) { - Some(TableMeta.createColumnMetasVector(fbb, columnMetaOffsets.toArray)) + val packedMetaBuffer = tableMeta.packedMetaAsByteBuffer() + val packedMetaOffset = if (packedMetaBuffer != null) { + val destBuffer = fbb.createUnintializedVector(1, packedMetaBuffer.remaining(), 1) + destBuffer.put(packedMetaBuffer) + Some(fbb.endVector()) } else { None } TableMeta.startTableMeta(fbb) buffMetaOffset.foreach(bmo => TableMeta.addBufferMeta(fbb, bmo)) - columnMetaOffset.foreach(cmo => TableMeta.addColumnMetas(fbb, cmo)) + packedMetaOffset.foreach(pmo => TableMeta.addPackedMeta(fbb, pmo)) TableMeta.addRowCount(fbb, tableMeta.rowCount()) TableMeta.endTableMeta(fbb) }.toArray @@ -514,24 +333,6 @@ object ShuffleMetadata extends Logging{ fbb.dataBuffer() } - private def printColumnMeta(colName: String, columnMeta: ColumnMeta): String = { - val columnType = DType.fromNative(columnMeta.dtypeId(), columnMeta.dtypeScale()) - val numChildren = columnMeta.childrenLength() - val childrenStr = if (numChildren > 0) { - val columnMetaObj = new ColumnMeta - (0 until numChildren).map { i => - val childName = s"$colName child $i" - val childMeta = columnMeta.children(columnMetaObj, i) - printColumnMeta(childName, childMeta) - }.mkString - } else { - "" - } - s"$colName [type=$columnType rows=${columnMeta.rowCount} " + - s"nullcount=${columnMeta.nullCount()} data_len=${columnMeta.dataLength()}]\n" + - s"$childrenStr" - } - def printResponse(state: String, res: MetadataResponse): String = { val out = new StringBuilder out.append(s"----------------------- METADATA RESPONSE $state --------------------------\n") @@ -539,13 +340,8 @@ object ShuffleMetadata extends Logging{ out.append("------------------------------------------------------------------------------\n") for (tableIndex <- 0 until res.tableMetasLength()) { val tableMeta = res.tableMetas(tableIndex) - out.append(s"table: $tableIndex [cols=${tableMeta.columnMetasLength()}, " + - s"rows=${tableMeta.rowCount}, full_content_size=${res.fullResponseSize()}]\n") - val columnMetaObj = new ColumnMeta - (0 until tableMeta.columnMetasLength()).foreach { i => - val columnMeta = tableMeta.columnMetas(columnMetaObj, i) - out.append(printColumnMeta(s"column: $i", columnMeta)) - } + out.append(s"table: $tableIndex rows=${tableMeta.rowCount}, " + + s"full_content_size=${res.fullResponseSize()}]\n") } out.append(s"----------------------- END METADATA RESPONSE $state ----------------------\n") out.toString() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvcompLZ4CompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvcompLZ4CompressionCodec.scala index b118d1c44d8..0b38956bbee 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvcompLZ4CompressionCodec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvcompLZ4CompressionCodec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,8 +36,7 @@ class NvcompLZ4CompressionCodec extends TableCompressionCodec with Arm { require(compressedSize <= oversizedBuffer.getLength, "compressed buffer overrun") val tableMeta = MetaUtils.buildTableMeta( Some(tableId), - contigTable.getTable, - tableBuffer, + contigTable, CodecType.NVCOMP_LZ4, compressedSize) CompressedTable(compressedSize, tableMeta, oversizedBuffer) @@ -138,8 +137,7 @@ class BatchedNvcompLZ4Compressor(maxBatchMemorySize: Long, stream: Cuda.Stream) require(compressedSize <= buffer.getLength, "compressed buffer overrun") val meta = MetaUtils.buildTableMeta( None, - contigTable.getTable, - contigTable.getBuffer, + contigTable, CodecType.NVCOMP_LZ4, compressedSize) CompressedTable(compressedSize, meta, buffer) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index c865dfaafc2..7b418d2a804 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,15 +17,14 @@ package com.nvidia.spark.rapids import java.io.File -import java.util.Optional -import ai.rapids.cudf.{DType, MemoryBuffer} +import ai.rapids.cudf.{DeviceMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.StorageTier.StorageTier -import com.nvidia.spark.rapids.format.{ColumnMeta, TableMeta} +import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.sql.rapids.RapidsDiskBlockManager import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.ColumnarBatch /** * An identifier for a RAPIDS buffer that can be automatically spilled between buffer stores. @@ -139,24 +138,23 @@ trait RapidsBuffer extends AutoCloseable { */ sealed class DegenerateRapidsBuffer( override val id: RapidsBufferId, - override val meta: TableMeta) extends RapidsBuffer { + override val meta: TableMeta) extends RapidsBuffer with Arm { override val size: Long = 0L override val storageTier: StorageTier = StorageTier.DEVICE override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = { val rowCount = meta.rowCount - val nullCount = Optional.of(java.lang.Long.valueOf(0)) - val columnMeta = new ColumnMeta - val columns = new Array[ColumnVector](meta.columnMetasLength) - (0 until meta.columnMetasLength).foreach { i => - meta.columnMetas(columnMeta, i) - assert(columnMeta.childrenLength == 0, "child columns are not yet supported") - val dtype = DType.fromNative(columnMeta.dtypeId(), columnMeta.dtypeScale()) - columns(i) = GpuColumnVector.from(new ai.rapids.cudf.ColumnVector( - dtype, rowCount, nullCount, null, null, null), - sparkTypes(i)) + val packedMeta = meta.packedMetaAsByteBuffer() + if (packedMeta != null) { + withResource(DeviceMemoryBuffer.allocate(0)) { deviceBuffer => + withResource(Table.fromPackedTable(meta.packedMetaAsByteBuffer(), deviceBuffer)) { table => + GpuColumnVectorFromBuffer.from(table, deviceBuffer, meta, sparkTypes) + } + } + } else { + // no packed metadata, must be a table with zero columns + new ColumnarBatch(Array.empty, rowCount.toInt) } - new ColumnarBatch(columns, rowCount.toInt) } override def free(): Unit = {} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index e772827703f..deb832eca4f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import java.util.concurrent.ConcurrentHashMap import java.util.function.BiFunction -import ai.rapids.cudf.{DeviceMemoryBuffer, Rmm, Table} +import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer, Rmm, Table} import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta @@ -185,14 +185,28 @@ object RapidsBufferCatalog extends Logging with Arm { * @param id buffer ID to associate with this buffer * @param table cudf table based from the contiguous buffer * @param contigBuffer device memory buffer backing the table + * @param tableMeta metadata describing the buffer layout * @param initialSpillPriority starting spill priority value for the buffer */ def addTable( id: RapidsBufferId, table: Table, contigBuffer: DeviceMemoryBuffer, + tableMeta: TableMeta, + initialSpillPriority: Long): Unit = + deviceStorage.addTable(id, table, contigBuffer, tableMeta, initialSpillPriority) + + /** + * Adds a contiguous table to the device storage, taking ownership of the table. + * @param id buffer ID to associate with this buffer + * @param contigTable contiguos table to track in device storage + * @param initialSpillPriority starting spill priority value for the buffer + */ + def addContiguousTable( + id: RapidsBufferId, + contigTable: ContiguousTable, initialSpillPriority: Long): Unit = - deviceStorage.addTable(id, table, contigBuffer, initialSpillPriority) + deviceStorage.addContiguousTable(id, contigTable, initialSpillPriority) /** * Adds a buffer to the device storage, taking ownership of the buffer. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 057b2ff5fbf..127fb80ed0b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -295,7 +295,7 @@ abstract class RapidsBufferStore( if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { MetaUtils.getBatchFromMeta(devBuffer, meta, sparkTypes) } else { - GpuCompressedColumnVector.from(devBuffer, meta, sparkTypes) + GpuCompressedColumnVector.from(devBuffer, meta) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index f941fe3434f..677813c34cd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer, Table} +import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta @@ -40,23 +40,64 @@ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog * @param id buffer ID to associate with this buffer * @param table cudf table based from the contiguous buffer * @param contigBuffer device memory buffer backing the table + * @param tableMeta metadata describing the buffer layout * @param initialSpillPriority starting spill priority value for the buffer */ def addTable( id: RapidsBufferId, table: Table, contigBuffer: DeviceMemoryBuffer, + tableMeta: TableMeta, initialSpillPriority: Long): Unit = { - val buffer = uncompressedBufferFromTable(id, table, contigBuffer, initialSpillPriority) + val buffer = new RapidsDeviceMemoryBuffer( + id, + contigBuffer.getLength, + tableMeta, + Some(table), + contigBuffer, + initialSpillPriority) + try { + logDebug(s"Adding table for: [id=$id, size=${buffer.size}, " + + s"meta_id=${buffer.meta.bufferMeta.id}, meta_size=${buffer.meta.bufferMeta.size}]") + addBuffer(buffer) + } catch { + case t: Throwable => + buffer.free() + throw t + } + } + + /** + * Adds a contiguous table to the device storage. This does NOT take ownership of the + * contiguous table, so it is the responsibility of the caller to close it. The refcount of the + * underlying device buffer will be incremented so the contiguous table can be closed before + * this buffer is destroyed. + * @param id buffer ID to associate with this buffer + * @param contigTable contiguous table to track in storage + * @param initialSpillPriority starting spill priority value for the buffer + */ + def addContiguousTable( + id: RapidsBufferId, + contigTable: ContiguousTable, + initialSpillPriority: Long): Unit = { + val contigBuffer = contigTable.getBuffer + val size = contigBuffer.getLength + val meta = MetaUtils.buildTableMeta(id.tableId, contigTable) + contigBuffer.incRefCount() + val buffer = new RapidsDeviceMemoryBuffer( + id, + size, + meta, + None, + contigBuffer, + initialSpillPriority) try { logDebug(s"Adding table for: [id=$id, size=${buffer.size}, " + s"uncompressed=${buffer.meta.bufferMeta.uncompressedSize}, " + - s"meta_id=${buffer.meta.bufferMeta.id}, meta_size=${buffer.meta.bufferMeta.size}, " + - s"meta_num_cols=${buffer.meta.columnMetasLength}]") + s"meta_id=${buffer.meta.bufferMeta.id}, meta_size=${buffer.meta.bufferMeta.size}]") addBuffer(buffer) } catch { case t: Throwable => - logError(s"Error while adding, freeing the buffer ${buffer.id}: ", t) buffer.free() throw t } @@ -74,44 +115,24 @@ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog buffer: DeviceMemoryBuffer, tableMeta: TableMeta, initialSpillPriority: Long): Unit = { - logDebug(s"Adding receive side table for: [id=$id, size=${buffer.getLength}, " + - s"meta_id=${tableMeta.bufferMeta.id}, " + - s"meta_size=${tableMeta.bufferMeta.size}, " + - s"meta_num_cols=${tableMeta.columnMetasLength}]") - - val table = if (tableMeta.bufferMeta.codecBufferDescrsLength() > 0) { - // buffer is compressed so there is no Table. - None - } else { - // hold the 1 ref count extra in buffer, it will be removed later in releaseResources - Some(MetaUtils.getTableFromMeta(buffer, tableMeta)) // REFCOUNT 1 + # COLS - } - val buff = new RapidsDeviceMemoryBuffer( id, buffer.getLength, tableMeta, - table, + None, buffer, initialSpillPriority) - - addBuffer(buff) - } - - private def uncompressedBufferFromTable( - id: RapidsBufferId, - table: Table, - contigBuffer: DeviceMemoryBuffer, - initialSpillPriority: Long): RapidsDeviceMemoryBuffer = { - val size = contigBuffer.getLength - val meta = MetaUtils.buildTableMeta(id.tableId, table, contigBuffer) - new RapidsDeviceMemoryBuffer( - id, - size, - meta, - Some(table), - contigBuffer, - initialSpillPriority) + try { + logDebug(s"Adding receive side table for: [id=$id, size=${buffer.getLength}, " + + s"uncompressed=${buff.meta.bufferMeta.uncompressedSize}, " + + s"meta_id=${tableMeta.bufferMeta.id}, " + + s"meta_size=${tableMeta.bufferMeta.size}]") + addBuffer(buff) + } catch { + case t: Throwable => + buff.free() + throw t + } } class RapidsDeviceMemoryBuffer( @@ -121,8 +142,6 @@ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog table: Option[Table], contigBuffer: DeviceMemoryBuffer, spillPriority: Long) extends RapidsBufferBase(id, size, meta, spillPriority) { - require(table.isDefined || meta.bufferMeta.codecBufferDescrsLength() > 0) - override val storageTier: StorageTier = StorageTier.DEVICE override protected def releaseResources(): Unit = { @@ -138,7 +157,7 @@ class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = { if (table.isDefined) { //REFCOUNT ++ of all columns - GpuColumnVectorFromBuffer.from(table.get, contigBuffer, sparkTypes) + GpuColumnVectorFromBuffer.from(table.get, contigBuffer, meta, sparkTypes) } else { columnarBatchFromDeviceBuffer(contigBuffer, sparkTypes) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 4b2cb5b72d7..62789eddd83 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -16,8 +16,6 @@ package com.nvidia.spark.rapids -import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableArray - import org.apache.spark.sql.rapids.TempSpillBufferId import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -146,31 +144,30 @@ object SpillableColumnarBatch extends Arm { id: RapidsBufferId, batch: ColumnarBatch, initialSpillPriority: Long): Unit = { - val numColumns = batch.numCols() - if (GpuCompressedColumnVector.isBatchCompressed(batch)) { - withResource(batch) { batch => + withResource(batch) { batch => + val numColumns = batch.numCols() + if (GpuCompressedColumnVector.isBatchCompressed(batch)) { val cv = batch.column(0).asInstanceOf[GpuCompressedColumnVector] - val buff = cv.getBuffer + val buff = cv.getTableBuffer buff.incRefCount() RapidsBufferCatalog.addBuffer(id, buff, cv.getTableMeta, initialSpillPriority) - } - } else if (numColumns > 0 && - (0 until numColumns) - .forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) { - val cv = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer] - withResource(batch) { batch => + } else if (GpuPackedTableColumn.isBatchPacked(batch)) { + val cv = batch.column(0).asInstanceOf[GpuPackedTableColumn] + RapidsBufferCatalog.addContiguousTable(id, cv.getContiguousTable, initialSpillPriority) + } else if (numColumns > 0 && + (0 until numColumns) + .forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) { + val cv = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer] val table = GpuColumnVector.from(batch) val buff = cv.getBuffer buff.incRefCount() - RapidsBufferCatalog.addTable(id, table, buff, initialSpillPriority) - } - } else { - withResource(batch) { batch => + RapidsBufferCatalog.addTable(id, table, buff, cv.getTableMeta, initialSpillPriority) + } else { withResource(GpuColumnVector.from(batch)) { tmpTable => - val contigTables = tmpTable.contiguousSplit(batch.numRows()) - val tab = contigTables.head - contigTables.tail.safeClose() - RapidsBufferCatalog.addTable(id, tab.getTable, tab.getBuffer, initialSpillPriority) + withResource(tmpTable.contiguousSplit()) { contigTables => + require(contigTables.length == 1, "Unexpected number of contiguous spit tables") + RapidsBufferCatalog.addContiguousTable(id, contigTables.head, initialSpillPriority) + } } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala similarity index 97% rename from sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala rename to sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala index 23f61bcd781..ba44f6634b0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala @@ -88,6 +88,8 @@ class RapidsCachingWriter[K, V]( private val uncompressedMetric: SQLMetric = metrics("dataSize") override def write(records: Iterator[Product2[K, V]]): Unit = { + // NOTE: This MUST NOT CLOSE the incoming batches because they are + // closed by the input iterator generated by GpuShuffleExchangeExec val nvtxRange = new NvtxRange("RapidsCachingWriter.write", NvtxColor.CYAN) try { var bytesWritten: Long = 0L @@ -104,18 +106,16 @@ class RapidsCachingWriter[K, V]( if (batch.numRows > 0 && batch.numCols > 0) { // Add the table to the shuffle store batch.column(0) match { - case c: GpuColumnVectorFromBuffer => - val buffer = c.getBuffer - buffer.incRefCount() - partSize = buffer.getLength + case c: GpuPackedTableColumn => + val contigTable = c.getContiguousTable + partSize = c.getTableBuffer.getLength uncompressedMetric += partSize - shuffleStorage.addTable( + shuffleStorage.addContiguousTable( bufferId, - GpuColumnVector.from(batch), - buffer, + contigTable, SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY) case c: GpuCompressedColumnVector => - val buffer = c.getBuffer + val buffer = c.getTableBuffer buffer.incRefCount() partSize = buffer.getLength val tableMeta = c.getTableMeta diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index e4d0a37086c..b812555125a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -522,8 +522,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val codec = TableCompressionCodec.getCodec(CodecType.NVCOMP_LZ4) withResource(codec.createBatchCompressor(0, Cuda.DEFAULT_STREAM)) { compressor => compressor.addTableToCompress(buildContiguousTable(start, numRows)) - GpuCompressedColumnVector.from(compressor.finish().head, - Array[DataType](LongType, DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 3))) + GpuCompressedColumnVector.from(compressor.finish().head) } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala index 1a768c8aaa3..7d36fc73e8e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import java.io.File import java.math.RoundingMode -import ai.rapids.cudf.{DType, Table} +import ai.rapids.cudf.{ColumnVector, DType, Table} import org.scalatest.FunSuite import org.apache.spark.SparkConf @@ -42,8 +42,24 @@ class GpuPartitioningSuite extends FunSuite with Arm { } } + /** + * Retrieves the underlying column vectors for a batch without incrementing + * the refcounts of those columns. Therefore the column vectors are only + * valid as long as the batch is valid. + */ + private def extractBases(batch: ColumnarBatch): Array[ColumnVector] = { + if (GpuPackedTableColumn.isBatchPacked(batch)) { + val packedColumn = batch.column(0).asInstanceOf[GpuPackedTableColumn] + val table = packedColumn.getContiguousTable.getTable + // The contiguous table is still responsible for closing these columns. + (0 until table.getNumberOfColumns).map(table.getColumn).toArray + } else { + GpuColumnVector.extractBases(batch) + } + } + private def buildSubBatch(batch: ColumnarBatch, startRow: Int, endRow: Int): ColumnarBatch = { - val columns = GpuColumnVector.extractBases(batch) + val columns = extractBases(batch) val types = GpuColumnVector.extractTypes(batch) val sliced = columns.zip(types).map { case (c, t) => GpuColumnVector.from(c.subVector(startRow, endRow), t) @@ -53,9 +69,9 @@ class GpuPartitioningSuite extends FunSuite with Arm { private def compareBatches(expected: ColumnarBatch, actual: ColumnarBatch): Unit = { assertResult(expected.numRows)(actual.numRows) - assertResult(expected.numCols)(actual.numCols) - val expectedColumns = GpuColumnVector.extractBases(expected) - val actualColumns = GpuColumnVector.extractBases(expected) + val expectedColumns = extractBases(expected) + val actualColumns = extractBases(expected) + assertResult(expectedColumns.length)(actualColumns.length) expectedColumns.zip(actualColumns).foreach { case (expected, actual) => // FIXME: For decimal types, NULL_EQUALS has not been supported in cuDF yet val cpVec = if (expected.getType.isDecimalType) { @@ -93,11 +109,7 @@ class GpuPartitioningSuite extends FunSuite with Arm { } val expectedRows = endRow - startRow assertResult(expectedRows)(partBatch.numRows) - val columns = (0 until partBatch.numCols).map(i => partBatch.column(i)) - columns.foreach { column => - assert(column.isInstanceOf[GpuColumnVectorFromBuffer]) - assertResult(expectedRows)(column.asInstanceOf[GpuColumnVector].getRowCount) - } + assert(GpuPackedTableColumn.isBatchPacked(partBatch)) withResource(buildSubBatch(batch, startRow, endRow)) { expectedBatch => compareBatches(expectedBatch, partBatch) } @@ -142,17 +154,19 @@ class GpuPartitioningSuite extends FunSuite with Arm { val rows = c.getTableMeta.rowCount assert(rows != 0) rows - case c: GpuColumnVector => - val rows = c.getRowCount + case c: GpuPackedTableColumn => + val rows = c.getContiguousTable.getRowCount assert(rows == 0) rows + case _ => + throw new IllegalStateException("column should either be compressed or packed") } assertResult(expectedRows)(actualRows) } if (GpuCompressedColumnVector.isBatchCompressed(partBatch)) { val gccv = columns.head.asInstanceOf[GpuCompressedColumnVector] val bufferId = MockRapidsBufferId(partIndex) - val devBuffer = gccv.getBuffer + val devBuffer = gccv.getTableBuffer // device store takes ownership of the buffer devBuffer.incRefCount() deviceStore.addBuffer(bufferId, devBuffer, gccv.getTableMeta, spillPriority) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala index 5a4b719d758..2f652c9b46a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,21 +46,28 @@ class GpuSinglePartitioningSuite extends FunSuite with Arm { TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(new RapidsConf(conf)) val partitioner = GpuSinglePartitioning(Nil) - withResource(buildBatch()) { expected => - // partition will consume batch, so make a new batch with incremented refcounts - val columns = GpuColumnVector.extractColumns(expected) - columns.foreach(_.incRefCount()) - val batch = new ColumnarBatch(columns.toArray, expected.numRows) - val result = partitioner.columnarEval(batch).asInstanceOf[Array[(ColumnarBatch, Int)]] - try { - assertResult(1)(result.length) - assertResult(0)(result.head._2) - val resultBatch = result.head._1 - // verify this is a contiguous split table - assert(resultBatch.column(0).isInstanceOf[GpuColumnVectorFromBuffer]) - TestUtils.compareBatches(expected, resultBatch) - } finally { - result.foreach(_._1.close()) + withResource(buildBatch()) { batch => + withResource(GpuColumnVector.from(batch)) { table => + withResource(table.contiguousSplit()) { contigTables => + val expected = contigTables.head + // partition will consume batch, so increment refcounts enabling withResource to close + GpuColumnVector.extractBases(batch).foreach(_.incRefCount()) + val result = partitioner.columnarEval(batch).asInstanceOf[Array[(ColumnarBatch, Int)]] + try { + assertResult(1)(result.length) + assertResult(0)(result.head._2) + val resultBatch = result.head._1 + // verify this is a contiguous split table + assert(GpuPackedTableColumn.isBatchPacked(resultBatch)) + val packedColumn = resultBatch.column(0).asInstanceOf[GpuPackedTableColumn] + val actual = packedColumn.getContiguousTable + assertResult(expected.getBuffer.getLength)(actual.getBuffer.getLength) + assertResult(expected.getMetadataDirectBuffer)(actual.getMetadataDirectBuffer) + TestUtils.compareTables(expected.getTable, actual.getTable) + } finally { + result.foreach(_._1.close()) + } + } } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala index 5808036ef0f..b6047459e26 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ import java.util import ai.rapids.cudf.{ColumnView, ContiguousTable, DeviceMemoryBuffer, DType, HostColumnVector, Table} import ai.rapids.cudf.HostColumnVector.{BasicType, StructData} -import com.nvidia.spark.rapids.format.{CodecType, ColumnMeta} +import com.nvidia.spark.rapids.format.CodecType import org.scalatest.FunSuite import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, DoubleType, IntegerType, StringType, StructField, StructType} @@ -61,61 +61,26 @@ class MetaUtilsSuite extends FunSuite with Arm { } } - def verifyColumnMeta( - buffer: DeviceMemoryBuffer, - col: ColumnView, - columnMeta: ColumnMeta): Unit = { - assertResult(col.getNullCount)(columnMeta.nullCount) - assertResult(col.getRowCount)(columnMeta.rowCount) - assertResult(col.getType.getTypeId.getNativeId)(columnMeta.dtypeId()) - assertResult(col.getType.getScale)(columnMeta.dtypeScale()) - val dataBuffer = col.getData - if (dataBuffer != null) { - assertResult(dataBuffer.getAddress - buffer.getAddress)(columnMeta.dataOffset()) - assertResult(dataBuffer.getLength)(columnMeta.dataLength()) - } else { - assertResult(0)(columnMeta.dataOffset()) - assertResult(0)(columnMeta.dataLength()) - } - val validBuffer = col.getValid - if (validBuffer != null) { - assertResult(validBuffer.getAddress - buffer.getAddress)(columnMeta.validityOffset()) - } else { - assertResult(0)(columnMeta.validityOffset()) - } - val offsetsBuffer = col.getOffsets - if (offsetsBuffer != null) { - assertResult(offsetsBuffer.getAddress - buffer.getAddress)(columnMeta.offsetsOffset()) - } else { - assertResult(0)(columnMeta.offsetsOffset()) - } - - (0 until col.getNumChildren).foreach { i => - withResource(col.getChildColumnView(i)) { childView => - verifyColumnMeta(buffer, childView, columnMeta.children(i)) + private def buildDegenerateTable(schema: StructType): ContiguousTable = { + withResource(GpuColumnVector.emptyBatch(schema)) { batch => + withResource(GpuColumnVector.from(batch)) { table => + table.contiguousSplit().head } } } test("buildTableMeta") { withResource(buildContiguousTable()) { contigTable => - val table = contigTable.getTable val buffer = contigTable.getBuffer - val meta = MetaUtils.buildTableMeta(7, table, buffer) + val meta = MetaUtils.buildTableMeta(7, contigTable) val bufferMeta = meta.bufferMeta assertResult(7)(bufferMeta.id) assertResult(buffer.getLength)(bufferMeta.size) assertResult(buffer.getLength)(bufferMeta.uncompressedSize) assertResult(0)(bufferMeta.codecBufferDescrsLength) - assertResult(table.getRowCount)(meta.rowCount) - - assertResult(table.getNumberOfColumns)(meta.columnMetasLength) - val columnMeta = new ColumnMeta - (0 until table.getNumberOfColumns).foreach { i => - assert(meta.columnMetas(columnMeta, i) != null) - verifyColumnMeta(buffer, table.getColumn(i), columnMeta) - } + assertResult(contigTable.getRowCount)(meta.rowCount) + assertResult(contigTable.getMetadataDirectBuffer)(meta.packedMetaAsByteBuffer()) } } @@ -124,14 +89,14 @@ class MetaUtilsSuite extends FunSuite with Arm { val tableId = 7 val codecType = CodecType.COPY val compressedSize: Long = 123 - val table = contigTable.getTable val buffer = contigTable.getBuffer - val meta = MetaUtils.buildTableMeta(Some(tableId), table, buffer, codecType, compressedSize) + val meta = MetaUtils.buildTableMeta(Some(tableId), contigTable, codecType, compressedSize) val bufferMeta = meta.bufferMeta assertResult(tableId)(bufferMeta.id) assertResult(compressedSize)(bufferMeta.size) - assertResult(table.getRowCount)(meta.rowCount) + assertResult(contigTable.getRowCount)(meta.rowCount) + assertResult(contigTable.getMetadataDirectBuffer)(meta.packedMetaAsByteBuffer()) assertResult(1)(bufferMeta.codecBufferDescrsLength) val codecDescr = bufferMeta.codecBufferDescrs(0) assertResult(codecType)(codecDescr.codec) @@ -146,56 +111,34 @@ class MetaUtilsSuite extends FunSuite with Arm { val degenerateBatch = new ColumnarBatch(Array(), 127) val meta = MetaUtils.buildDegenerateTableMeta(degenerateBatch) assertResult(null)(meta.bufferMeta) - assertResult(0)(meta.columnMetasLength) + assertResult(null)(meta.packedMetaAsByteBuffer()) assertResult(127)(meta.rowCount) } test("buildDegenerateTableMeta no rows") { val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE, d DECIMAL(15, 5)") - withResource(GpuColumnVector.emptyBatch(schema)) { batch => - val meta = MetaUtils.buildDegenerateTableMeta(batch) - assertResult(null)(meta.bufferMeta) - assertResult(0)(meta.rowCount) - assertResult(4)(meta.columnMetasLength) - (0 until meta.columnMetasLength).foreach { i => - val columnMeta = meta.columnMetas(i) - assertResult(0)(columnMeta.nullCount) - assertResult(0)(columnMeta.rowCount) - val expectedType = batch.column(i).asInstanceOf[GpuColumnVector].getBase.getType - assertResult(expectedType.getTypeId.getNativeId)(columnMeta.dtypeId()) - assertResult(expectedType.getScale)(columnMeta.dtypeScale()) - assertResult(0)(columnMeta.dataOffset()) - assertResult(0)(columnMeta.dataLength()) - assertResult(0)(columnMeta.validityOffset()) - assertResult(0)(columnMeta.offsetsOffset()) + withResource(buildDegenerateTable(schema)) { contigTable => + withResource(GpuPackedTableColumn.from(contigTable)) { packedBatch => + val meta = MetaUtils.buildDegenerateTableMeta(packedBatch) + assertResult(null)(meta.bufferMeta) + assertResult(0)(meta.rowCount) + assertResult(contigTable.getMetadataDirectBuffer)(meta.packedMetaAsByteBuffer()) } } } test("buildDegenerateTableMeta no rows compressed table") { val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE, d DECIMAL(15, 5)") - withResource(GpuColumnVector.emptyBatch(schema)) { uncompressedBatch => - val uncompressedMeta = MetaUtils.buildDegenerateTableMeta(uncompressedBatch) - withResource(DeviceMemoryBuffer.allocate(0)) { buffer => - val compressedTable = CompressedTable(0, uncompressedMeta, buffer) - withResource(GpuCompressedColumnVector.from(compressedTable, - GpuColumnVector.extractTypes(schema))) { batch => - val meta = MetaUtils.buildDegenerateTableMeta(batch) - assertResult(null)(meta.bufferMeta) - assertResult(0)(meta.rowCount) - assertResult(4)(meta.columnMetasLength) - (0 until meta.columnMetasLength).foreach { i => - val columnMeta = meta.columnMetas(i) - assertResult(0)(columnMeta.nullCount) - assertResult(0)(columnMeta.rowCount) - val expectedType = uncompressedBatch.column(i).asInstanceOf[GpuColumnVector] - .getBase.getType - assertResult(expectedType.getTypeId.getNativeId)(columnMeta.dtypeId()) - assertResult(expectedType.getScale)(columnMeta.dtypeScale()) - assertResult(0)(columnMeta.dataOffset()) - assertResult(0)(columnMeta.dataLength()) - assertResult(0)(columnMeta.validityOffset()) - assertResult(0)(columnMeta.offsetsOffset()) + withResource(buildDegenerateTable(schema)) { contigTable => + withResource(GpuPackedTableColumn.from(contigTable)) { uncompressedBatch => + val uncompressedMeta = MetaUtils.buildDegenerateTableMeta(uncompressedBatch) + withResource(DeviceMemoryBuffer.allocate(0)) { buffer => + val compressedTable = CompressedTable(0, uncompressedMeta, buffer) + withResource(GpuCompressedColumnVector.from(compressedTable)) { batch => + val meta = MetaUtils.buildDegenerateTableMeta(batch) + assertResult(null)(meta.bufferMeta) + assertResult(0)(meta.rowCount) + assertResult(uncompressedMeta.packedMetaAsByteBuffer())(meta.packedMetaAsByteBuffer()) } } } @@ -204,15 +147,15 @@ class MetaUtilsSuite extends FunSuite with Arm { test("getBatchFromMeta") { withResource(buildContiguousTable()) { contigTable => - val table = contigTable.getTable val origBuffer = contigTable.getBuffer - val meta = MetaUtils.buildTableMeta(10, table, origBuffer) + val meta = MetaUtils.buildTableMeta(10, contigTable) val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType, DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 5)) withResource(origBuffer.sliceWithCopy(0, origBuffer.getLength)) { buffer => withResource(MetaUtils.getBatchFromMeta(buffer, meta, contiguousTableSparkTypes)) { batch => - assertResult(table.getRowCount)(batch.numRows) + assertResult(contigTable.getRowCount)(batch.numRows) + val table = contigTable.getTable assertResult(table.getNumberOfColumns)(batch.numCols) (0 until table.getNumberOfColumns).foreach { i => val batchColumn = batch.column(i) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index 9effde6b9a4..6e3939c43ba 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,7 +51,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { val bufferId = MockRapidsBufferId(7) closeOnExcept(buildContiguousTable()) { ct => // store takes ownership of the table - store.addTable(bufferId, ct.getTable, ct.getBuffer, spillPriority) + store.addContiguousTable(bufferId, ct, spillPriority) } val captor: ArgumentCaptor[RapidsBuffer] = ArgumentCaptor.forClass(classOf[RapidsBuffer]) verify(catalog).registerNewBuffer(captor.capture()) @@ -67,7 +67,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { val spillPriority = 3 val bufferId = MockRapidsBufferId(7) val meta = withResource(buildContiguousTable()) { ct => - val meta = MetaUtils.buildTableMeta(bufferId.tableId, ct.getTable, ct.getBuffer) + val meta = MetaUtils.buildTableMeta(bufferId.tableId, ct) // store takes ownership of the buffer ct.getBuffer.incRefCount() store.addBuffer(bufferId, ct.getBuffer, meta, spillPriority) @@ -89,7 +89,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { withResource(buildContiguousTable()) { ct => withResource(HostMemoryBuffer.allocate(ct.getBuffer.getLength)) { expectedHostBuffer => expectedHostBuffer.copyFromDeviceBuffer(ct.getBuffer) - val meta = MetaUtils.buildTableMeta(bufferId.tableId, ct.getTable, ct.getBuffer) + val meta = MetaUtils.buildTableMeta(bufferId.tableId, ct) // store takes ownership of the buffer ct.getBuffer.incRefCount() store.addBuffer(bufferId, ct.getBuffer, meta, initialSpillPriority = 3) @@ -115,7 +115,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { withResource(buildContiguousTable()) { ct => withResource(GpuColumnVector.from(ct.getTable, sparkTypes)) { expectedBatch => - val meta = MetaUtils.buildTableMeta(bufferId.tableId, ct.getTable, ct.getBuffer) + val meta = MetaUtils.buildTableMeta(bufferId.tableId, ct) // store takes ownership of the buffer ct.getBuffer.incRefCount() store.addBuffer(bufferId, ct.getBuffer, meta, initialSpillPriority = 3) @@ -145,7 +145,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { closeOnExcept(buildContiguousTable()) { ct => bufferSizes(i) = ct.getBuffer.getLength // store takes ownership of the table - store.addTable(MockRapidsBufferId(i), ct.getTable, ct.getBuffer, initialSpillPriority = 0) + store.addContiguousTable(MockRapidsBufferId(i), ct, initialSpillPriority = 0) } assertResult(bufferSizes.take(i+1).sum)(store.currentSize) } @@ -167,7 +167,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { closeOnExcept(buildContiguousTable()) { ct => bufferSizes(i) = ct.getBuffer.getLength // store takes ownership of the table - store.addTable(MockRapidsBufferId(i), ct.getTable, ct.getBuffer, spillPriorities(i)) + store.addContiguousTable(MockRapidsBufferId(i), ct, spillPriorities(i)) } } assert(spillStore.spilledBuffers.isEmpty) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala index 20194320f9b..8245ea8e99c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -199,7 +199,7 @@ class RapidsDiskStoreSuite extends FunSuite with BeforeAndAfterEach with Arm wit closeOnExcept(buildContiguousTable()) { ct => val bufferSize = ct.getBuffer.getLength // store takes ownership of the table - devStore.addTable(bufferId, ct.getTable, ct.getBuffer, spillPriority) + devStore.addContiguousTable(bufferId, ct, spillPriority) bufferSize } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala index be7f8d4e9c4..7b84429272c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,7 +66,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { val bufferSize = closeOnExcept(buildContiguousTable()) { ct => val len = ct.getBuffer.getLength // store takes ownership of the table - devStore.addTable(bufferId, ct.getTable, ct.getBuffer, spillPriority) + devStore.addContiguousTable(bufferId, ct, spillPriority) len } @@ -98,7 +98,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { withResource(HostMemoryBuffer.allocate(ct.getBuffer.getLength)) { expectedBuffer => expectedBuffer.copyFromDeviceBuffer(ct.getBuffer) // store takes ownership of the table - devStore.addTable(bufferId, ct.getTable, ct.getBuffer, spillPriority) + devStore.addContiguousTable(bufferId, ct, spillPriority) ct = null devStore.synchronousSpill(0) @@ -135,7 +135,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { withResource(GpuColumnVector.from(ct.getTable, sparkTypes)) { expectedBatch => // store takes ownership of the table - devStore.addTable(bufferId, ct.getTable, ct.getBuffer, spillPriority) + devStore.addContiguousTable(bufferId, ct, spillPriority) ct = null devStore.synchronousSpill(0) @@ -174,7 +174,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { smallTable = buildContiguousTable(1) withResource(GpuColumnVector.from(bigTable.getTable, sparkTypes)) { expectedBatch => // store takes ownership of the table - devStore.addTable(bigBufferId, bigTable.getTable, bigTable.getBuffer, spillPriority) + devStore.addContiguousTable(bigBufferId, bigTable, spillPriority) bigTable = null devStore.synchronousSpill(0) @@ -187,8 +187,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { } } - devStore.addTable(smallBufferId, smallTable.getTable, smallTable.getBuffer, - spillPriority) + devStore.addContiguousTable(smallBufferId, smallTable, spillPriority) smallTable = null devStore.synchronousSpill(0) val ac: ArgumentCaptor[RapidsBuffer] = ArgumentCaptor.forClass(classOf[RapidsBuffer]) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala index 259d6aac4e1..e8b2c8282f9 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala @@ -48,7 +48,7 @@ class RapidsShuffleClientSuite extends RapidsShuffleTestHelper { def verifyTableMeta(expected: TableMeta, actual: TableMeta): Unit = { assertResult(expected.rowCount())(actual.rowCount()) - assertResult(expected.columnMetasLength())(actual.columnMetasLength()) + assertResult(expected.packedMetaAsByteBuffer())(actual.packedMetaAsByteBuffer()) verifyBufferMeta(expected.bufferMeta, actual.bufferMeta) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala index 99cd7129fb5..e39da9276da 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala @@ -156,9 +156,7 @@ class RapidsShuffleTestHelper extends FunSuite object RapidsShuffleTestHelper extends MockitoSugar with Arm { def buildMockTableMeta(tableId: Int, contigTable: ContiguousTable): TableMeta = { - val tbl = contigTable.getTable - val cols = (0 until tbl.getNumberOfColumns).map(tbl.getColumn) - MetaUtils.buildTableMeta(tableId, cols, tbl.getRowCount, contigTable.getBuffer) + MetaUtils.buildTableMeta(tableId, contigTable) } def buildDegenerateMockTableMeta(): TableMeta = {