Skip to content

Commit

Permalink
Add framework for batch compression of shuffle partitions (NVIDIA#487)
Browse files Browse the repository at this point in the history
* Add framework for batch compression of shuffle partitions

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Extract common row accessor methods to GpuColumnVectorBase

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Address review comments

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Fix handling of degenerate batches

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Fix buffer leak

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Add comment about degenerate batches potentially appearing compressed

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Use a shuffle compression codec of "none" to indicate no compression

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Aug 11, 2020
1 parent 2058b2d commit 510d3af
Show file tree
Hide file tree
Showing 21 changed files with 1,184 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.List;

Expand All @@ -39,7 +36,7 @@
* is on the host, and we want to keep as much of the data on the device as possible.
* We also provide GPU accelerated versions of the transitions to and from rows.
*/
public class GpuColumnVector extends ColumnVector {
public class GpuColumnVector extends GpuColumnVectorBase {

public static final class GpuColumnarBatchBuilder implements AutoCloseable {
private final ai.rapids.cudf.HostColumnVector.Builder[] builders;
Expand Down Expand Up @@ -174,7 +171,7 @@ public static final DType getRapidsType(DataType type) {
return result;
}

protected static final DataType getSparkType(DType type) {
static final DataType getSparkType(DType type) {
switch (type) {
case BOOL8:
return DataTypes.BooleanType;
Expand Down Expand Up @@ -396,78 +393,6 @@ public final int numNulls() {
return (int) cudfCv.getNullCount();
}

private final static String BAD_ACCESS = "DATA ACCESS MUST BE ON A HOST VECTOR";

@Override
public final boolean isNullAt(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final boolean getBoolean(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final byte getByte(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final short getShort(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final int getInt(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final long getLong(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final float getFloat(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final double getDouble(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnarArray getArray(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnarMap getMap(int ordinal) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final Decimal getDecimal(int rowId, int precision, int scale) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final UTF8String getUTF8String(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final byte[] getBinary(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnVector getChild(int ordinal) {
throw new IllegalStateException(BAD_ACCESS);
}

public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
long sum = 0;
for (int i = 0; i < batch.numCols(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import ai.rapids.cudf.DType;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

/** Base class for all GPU column vectors. */
abstract class GpuColumnVectorBase extends ColumnVector {
private final static String BAD_ACCESS = "DATA ACCESS MUST BE ON A HOST VECTOR";

protected GpuColumnVectorBase(DataType type) {
super(type);
}

@Override
public final boolean isNullAt(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final boolean getBoolean(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final byte getByte(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final short getShort(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final int getInt(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final long getLong(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final float getFloat(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final double getDouble(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnarArray getArray(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnarMap getMap(int ordinal) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final Decimal getDecimal(int rowId, int precision, int scale) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final UTF8String getUTF8String(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final byte[] getBinary(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnVector getChild(int ordinal) {
throw new IllegalStateException(BAD_ACCESS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import ai.rapids.cudf.DType;
import ai.rapids.cudf.DeviceMemoryBuffer;
import com.nvidia.spark.rapids.format.ColumnMeta;
import com.nvidia.spark.rapids.format.TableMeta;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

/**
* A GPU column vector that has been compressed. The columnar data within cannot
* be accessed directly. This class primarily serves the role of tracking the
* compressed data and table metadata so it can be decompressed later.
*/
public final class GpuCompressedColumnVector extends GpuColumnVectorBase {
private final DeviceMemoryBuffer buffer;
private final TableMeta tableMeta;

public static ColumnarBatch from(CompressedTable compressedTable) {
DeviceMemoryBuffer buffer = compressedTable.buffer();
TableMeta tableMeta = compressedTable.meta();
long rows = tableMeta.rowCount();
if (rows != (int) rows) {
throw new IllegalStateException("Cannot support a batch larger that MAX INT rows");
}

ColumnMeta columnMeta = new ColumnMeta();
int numColumns = tableMeta.columnMetasLength();
ColumnVector[] columns = new ColumnVector[numColumns];
try {
for (int i = 0; i < numColumns; ++i) {
tableMeta.columnMetas(columnMeta, i);
DType dtype = DType.fromNative(columnMeta.dtype());
DataType type = GpuColumnVector.getSparkType(dtype);
DeviceMemoryBuffer slicedBuffer = buffer.slice(0, buffer.getLength());
columns[i] = new GpuCompressedColumnVector(type, slicedBuffer, tableMeta);
}
} catch (Throwable t) {
for (int i = 0; i < numColumns; ++i) {
if (columns[i] != null) {
columns[i].close();
}
}
throw t;
}

return new ColumnarBatch(columns, (int) rows);
}

private GpuCompressedColumnVector(DataType type, DeviceMemoryBuffer buffer, TableMeta tableMeta) {
super(type);
this.buffer = buffer;
this.tableMeta = tableMeta;
}

public DeviceMemoryBuffer getBuffer() {
return buffer;
}

public TableMeta getTableMeta() {
return tableMeta;
}

@Override
public void close() {
buffer.close();
}

@Override
public boolean hasNull() {
throw new IllegalStateException("column vector is compressed");
}

@Override
public int numNulls() {
throw new IllegalStateException("column vector is compressed");
}
}
Loading

0 comments on commit 510d3af

Please sign in to comment.