Skip to content

Commit

Permalink
Avoid unnecessary Table instances after contiguous split (NVIDIA#1593)
Browse files Browse the repository at this point in the history
* Avoid unnecessary Table instances after contiguous split

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Address review comments

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Remove ColumnMeta

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Remove extra license file

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Feb 5, 2021
1 parent fc2f74a commit c25e9ee
Show file tree
Hide file tree
Showing 29 changed files with 475 additions and 807 deletions.
40 changes: 5 additions & 35 deletions sql-plugin/src/main/format/ShuffleMetadataResponse.fbs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2019-2020, NVIDIA CORPORATION.
// Copyright (c) 2019-2021, NVIDIA CORPORATION.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,46 +16,16 @@ include "ShuffleCommon.fbs";

namespace com.nvidia.spark.rapids.format;

/// Metadata about cuDF Columns
table ColumnMeta {
/// number of nulls in the column or -1 if unknown
null_count: long;

/// number of rows in the column
row_count: long;

/// offset of the column's data buffer
data_offset: long;

/// length of the column's data buffer
data_length: long;

/// offset of the column's validity buffer
validity_offset: long;

/// offset of the column's offsets buffer
offsets_offset: long;

/// child column metadata
children: [ColumnMeta];

/// ordinal of DType enum
dtype_id: int;

/// DType scale for decimal types
dtype_scale: int;
}

/// Metadata about cuDF tables
table TableMeta {
/// metadata about the table buffer
/// metadata about the data encoding
buffer_meta: BufferMeta;

/// metadata for each column in the table buffer
column_metas: [ColumnMeta];

/// number of rows in the table
row_count: long;

/// opaque metadata describing the packed table schema and data layout
packed_meta: [byte];
}

/// Flat buffer for Rapids UCX Shuffle Metadata Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,9 @@ public final int numNulls() {
public static long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
long sum = 0;
if (batch.numCols() > 0) {
if (batch.column(0) instanceof GpuCompressedColumnVector) {
GpuCompressedColumnVector gccv = (GpuCompressedColumnVector) batch.column(0);
sum += gccv.getBuffer().getLength();
if (batch.column(0) instanceof WithTableBuffer) {
WithTableBuffer wtb = (WithTableBuffer) batch.column(0);
sum += wtb.getTableBuffer().getLength();
} else {
for (int i = 0; i < batch.numCols(); i++) {
sum += ((GpuColumnVector) batch.column(i)).getBase().getDeviceMemorySize();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import ai.rapids.cudf.ContiguousTable;
import ai.rapids.cudf.DeviceMemoryBuffer;
import ai.rapids.cudf.Table;
import com.nvidia.spark.rapids.format.TableMeta;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

Expand All @@ -28,6 +29,7 @@
/** GPU column vector carved from a single buffer, like those from cudf's contiguousSplit. */
public final class GpuColumnVectorFromBuffer extends GpuColumnVector {
private final DeviceMemoryBuffer buffer;
private final TableMeta tableMeta;

/**
* Get a ColumnarBatch from a set of columns in a contiguous table. This differs from the
Expand All @@ -43,7 +45,8 @@ public final class GpuColumnVectorFromBuffer extends GpuColumnVector {
public static ColumnarBatch from(ContiguousTable contigTable, DataType[] colTypes) {
DeviceMemoryBuffer buffer = contigTable.getBuffer();
Table table = contigTable.getTable();
return from(table, buffer, colTypes);
TableMeta meta = MetaUtils.buildTableMeta(0, contigTable);
return from(table, buffer, meta, colTypes);
}

/**
Expand All @@ -55,10 +58,12 @@ public static ColumnarBatch from(ContiguousTable contigTable, DataType[] colType
*
* @param table a table with columns at offsets of `buffer`
* @param buffer a device buffer that packs data for columns in `table`
* @param meta metadata describing the table layout and schema
* @param colTypes the types the columns should have.
* @return batch of GpuColumnVectorFromBuffer instances derived from the table and buffer
*/
public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataType[] colTypes) {
public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer,
TableMeta meta, DataType[] colTypes) {
assert table != null : "Table cannot be null";
assert GpuColumnVector.typeConversionAllowed(table, colTypes) :
"Type conversion is not allowed from " + table + " to " + Arrays.toString(colTypes);
Expand All @@ -72,7 +77,7 @@ public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataTyp
for (int i = 0; i < numColumns; ++i) {
ColumnVector v = table.getColumn(i);
DataType type = colTypes[i];
columns[i] = new GpuColumnVectorFromBuffer(type, v.incRefCount(), buffer);
columns[i] = new GpuColumnVectorFromBuffer(type, v.incRefCount(), buffer, meta);
}
return new ColumnarBatch(columns, (int) rows);
} catch (Exception e) {
Expand All @@ -99,11 +104,13 @@ public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataTyp
* @param type the spark data type for this column
* @param cudfColumn a ColumnVector instance
* @param buffer the buffer to hold
* @param meta the metadata describing the buffer layout
*/
public GpuColumnVectorFromBuffer(DataType type, ColumnVector cudfColumn,
DeviceMemoryBuffer buffer) {
DeviceMemoryBuffer buffer, TableMeta meta) {
super(type, cudfColumn);
this.buffer = buffer;
this.tableMeta = meta;
}

/**
Expand All @@ -114,4 +121,13 @@ public GpuColumnVectorFromBuffer(DataType type, ColumnVector cudfColumn,
public DeviceMemoryBuffer getBuffer() {
return buffer;
}

/**
* Get the metadata describing the data layout in the buffer,
* shared between columns of the original `ContiguousTable`
* @return opaque metadata
*/
public TableMeta getTableMeta() {
return tableMeta;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,151 +16,62 @@

package com.nvidia.spark.rapids;

import ai.rapids.cudf.DType;
import ai.rapids.cudf.DeviceMemoryBuffer;
import com.nvidia.spark.rapids.format.ColumnMeta;
import com.nvidia.spark.rapids.format.TableMeta;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import static org.apache.spark.sql.types.DataTypes.NullType;

/**
* A GPU column vector that has been compressed. The columnar data within cannot
* be accessed directly. This class primarily serves the role of tracking the
* compressed data and table metadata so it can be decompressed later.
* A column vector that tracks a compressed table. Unlike a normal GPU column vector, the
* columnar data within cannot be accessed directly. This class primarily serves the role
* of tracking the compressed data and table metadata so it can be decompressed later.
*/
public final class GpuCompressedColumnVector extends GpuColumnVectorBase {
public final class GpuCompressedColumnVector extends GpuColumnVectorBase
implements WithTableBuffer {
private static final String BAD_ACCESS_MSG = "Column is compressed";

private final DeviceMemoryBuffer buffer;
private final TableMeta tableMeta;

/**
* Build a columnar batch from a compressed table.
* NOTE: The data remains compressed and cannot be accessed directly from the columnar batch.
*/
public static ColumnarBatch from(CompressedTable compressedTable, DataType[] colTypes) {
return from(compressedTable.buffer(), compressedTable.meta(), colTypes);
public static ColumnarBatch from(CompressedTable compressedTable) {
return from(compressedTable.buffer(), compressedTable.meta());
}

public static boolean isBatchCompressed(ColumnarBatch batch) {
if (batch.numCols() == 0) {
return false;
} else {
return batch.column(0) instanceof GpuCompressedColumnVector;
}
}

/**
* This should only ever be called from an assertion.
*/
private static boolean typeConversionAllowed(ColumnMeta columnMeta, DataType colType) {
DType dt = DType.fromNative(columnMeta.dtypeId(), columnMeta.dtypeScale());
if (!dt.isNestedType()) {
return GpuColumnVector.getNonNestedRapidsType(colType).equals(dt);
}
if (colType instanceof MapType) {
MapType mType = (MapType) colType;
// list of struct of key/value
if (!(dt.equals(DType.LIST))) {
return false;
}
ColumnMeta structCm = columnMeta.children(0);
if (structCm.dtypeId() != DType.STRUCT.getTypeId().getNativeId()) {
return false;
}
if (structCm.childrenLength() != 2) {
return false;
}
ColumnMeta keyCm = structCm.children(0);
if (!typeConversionAllowed(keyCm, mType.keyType())) {
return false;
}
ColumnMeta valCm = structCm.children(1);
return typeConversionAllowed(valCm, mType.valueType());
} else if (colType instanceof ArrayType) {
if (!(dt.equals(DType.LIST))) {
return false;
}
ColumnMeta tmp = columnMeta.children(0);
return typeConversionAllowed(tmp, ((ArrayType) colType).elementType());
} else if (colType instanceof StructType) {
if (!(dt.equals(DType.STRUCT))) {
return false;
}
StructType st = (StructType) colType;
final int numChildren = columnMeta.childrenLength();
if (numChildren != st.size()) {
return false;
}
for (int childIndex = 0; childIndex < numChildren; childIndex++) {
ColumnMeta tmp = columnMeta.children(childIndex);
StructField entry = ((StructType) colType).apply(childIndex);
if (!typeConversionAllowed(tmp, entry.dataType())) {
return false;
}
}
return true;
} else if (colType instanceof BinaryType) {
if (!(dt.equals(DType.LIST))) {
return false;
}
ColumnMeta tmp = columnMeta.children(0);
return tmp.dtypeId() == DType.INT8.getTypeId().getNativeId() ||
tmp.dtypeId() == DType.UINT8.getTypeId().getNativeId();
} else {
// Unexpected type
return false;
}
return batch.numCols() == 1 && batch.column(0) instanceof GpuCompressedColumnVector;
}

/**
* Build a columnar batch from a compressed data buffer and specified table metadata
* NOTE: The data remains compressed and cannot be accessed directly from the columnar batch.
*/
public static ColumnarBatch from(DeviceMemoryBuffer compressedBuffer,
TableMeta tableMeta,
DataType[] colTypes) {
public static ColumnarBatch from(DeviceMemoryBuffer compressedBuffer, TableMeta tableMeta) {
long rows = tableMeta.rowCount();
if (rows != (int) rows) {
int batchRows = (int) rows;
if (rows != batchRows) {
throw new IllegalStateException("Cannot support a batch larger that MAX INT rows");
}

ColumnMeta columnMeta = new ColumnMeta();
int numColumns = tableMeta.columnMetasLength();
assert numColumns == colTypes.length : "Size mismatch on types";
ColumnVector[] columns = new ColumnVector[numColumns];
try {
for (int i = 0; i < numColumns; ++i) {
tableMeta.columnMetas(columnMeta, i);
DataType type = colTypes[i];
assert typeConversionAllowed(columnMeta, type) : "Type conversion is not allowed from " +
columnMeta + " to " + type + " at index " + i;
compressedBuffer.incRefCount();
columns[i] = new GpuCompressedColumnVector(type, compressedBuffer, tableMeta);
}
} catch (Throwable t) {
for (int i = 0; i < numColumns; ++i) {
if (columns[i] != null) {
columns[i].close();
}
}
throw t;
}

return new ColumnarBatch(columns, (int) rows);
ColumnVector column = new GpuCompressedColumnVector(compressedBuffer, tableMeta);
return new ColumnarBatch(new ColumnVector[] { column }, batchRows);
}

private GpuCompressedColumnVector(DataType type, DeviceMemoryBuffer buffer, TableMeta tableMeta) {
super(type);
private GpuCompressedColumnVector(DeviceMemoryBuffer buffer, TableMeta tableMeta) {
super(NullType);
this.buffer = buffer;
this.tableMeta = tableMeta;
// reference the buffer so it remains valid for the duration of this column
this.buffer.incRefCount();
}

public DeviceMemoryBuffer getBuffer() {
@Override
public DeviceMemoryBuffer getTableBuffer() {
return buffer;
}

Expand All @@ -175,11 +86,11 @@ public void close() {

@Override
public boolean hasNull() {
throw new IllegalStateException("column vector is compressed");
throw new IllegalStateException(BAD_ACCESS_MSG);
}

@Override
public int numNulls() {
throw new IllegalStateException("column vector is compressed");
throw new IllegalStateException(BAD_ACCESS_MSG);
}
}
Loading

0 comments on commit c25e9ee

Please sign in to comment.