Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add framework for batch compression of shuffle partitions #487

Merged
merged 13 commits into from
Aug 11, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.List;

Expand All @@ -39,7 +36,7 @@
* is on the host, and we want to keep as much of the data on the device as possible.
* We also provide GPU accelerated versions of the transitions to and from rows.
*/
public class GpuColumnVector extends ColumnVector {
public class GpuColumnVector extends GpuColumnVectorBase {

public static final class GpuColumnarBatchBuilder implements AutoCloseable {
private final ai.rapids.cudf.HostColumnVector.Builder[] builders;
Expand Down Expand Up @@ -174,7 +171,7 @@ public static final DType getRapidsType(DataType type) {
return result;
}

protected static final DataType getSparkType(DType type) {
static final DataType getSparkType(DType type) {
switch (type) {
case BOOL8:
return DataTypes.BooleanType;
Expand Down Expand Up @@ -396,78 +393,6 @@ public final int numNulls() {
return (int) cudfCv.getNullCount();
}

private final static String BAD_ACCESS = "DATA ACCESS MUST BE ON A HOST VECTOR";

@Override
public final boolean isNullAt(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final boolean getBoolean(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final byte getByte(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final short getShort(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final int getInt(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final long getLong(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final float getFloat(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final double getDouble(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnarArray getArray(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnarMap getMap(int ordinal) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final Decimal getDecimal(int rowId, int precision, int scale) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final UTF8String getUTF8String(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final byte[] getBinary(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnVector getChild(int ordinal) {
throw new IllegalStateException(BAD_ACCESS);
}

public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
long sum = 0;
for (int i = 0; i < batch.numCols(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import ai.rapids.cudf.DType;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

/** Base class for all GPU column vectors. */
abstract class GpuColumnVectorBase extends ColumnVector {
private final static String BAD_ACCESS = "DATA ACCESS MUST BE ON A HOST VECTOR";

protected GpuColumnVectorBase(DataType type) {
super(type);
}

@Override
public final boolean isNullAt(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final boolean getBoolean(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final byte getByte(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final short getShort(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final int getInt(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final long getLong(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final float getFloat(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final double getDouble(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnarArray getArray(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnarMap getMap(int ordinal) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final Decimal getDecimal(int rowId, int precision, int scale) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final UTF8String getUTF8String(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final byte[] getBinary(int rowId) {
throw new IllegalStateException(BAD_ACCESS);
}

@Override
public final ColumnVector getChild(int ordinal) {
throw new IllegalStateException(BAD_ACCESS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids;

import ai.rapids.cudf.DType;
import ai.rapids.cudf.DeviceMemoryBuffer;
import com.nvidia.spark.rapids.format.ColumnMeta;
import com.nvidia.spark.rapids.format.TableMeta;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

/**
* A GPU column vector that has been compressed. The columnar data within cannot
* be accessed directly. This class primarily serves the role of tracking the
* compressed data and table metadata so it can be decompressed later.
*/
public final class GpuCompressedColumnVector extends GpuColumnVectorBase {
private final DeviceMemoryBuffer buffer;
private final TableMeta tableMeta;

public static ColumnarBatch from(CompressedTable compressedTable) {
DeviceMemoryBuffer buffer = compressedTable.buffer();
TableMeta tableMeta = compressedTable.meta();
long rows = tableMeta.rowCount();
if (rows != (int) rows) {
throw new IllegalStateException("Cannot support a batch larger that MAX INT rows");
}

ColumnMeta columnMeta = new ColumnMeta();
int numColumns = tableMeta.columnMetasLength();
ColumnVector[] columns = new ColumnVector[numColumns];
try {
for (int i = 0; i < numColumns; ++i) {
tableMeta.columnMetas(columnMeta, i);
DType dtype = DType.fromNative(columnMeta.dtype());
DataType type = GpuColumnVector.getSparkType(dtype);
DeviceMemoryBuffer slicedBuffer = buffer.slice(0, buffer.getLength());
columns[i] = new GpuCompressedColumnVector(type, slicedBuffer, tableMeta);
}
} catch (Throwable t) {
for (int i = 0; i < numColumns; ++i) {
if (columns[i] != null) {
columns[i].close();
}
}
throw t;
}

return new ColumnarBatch(columns, (int) rows);
}

private GpuCompressedColumnVector(DataType type, DeviceMemoryBuffer buffer, TableMeta tableMeta) {
super(type);
this.buffer = buffer;
this.tableMeta = tableMeta;
}

public DeviceMemoryBuffer getBuffer() {
return buffer;
}

public TableMeta getTableMeta() {
return tableMeta;
}

@Override
public void close() {
buffer.close();
}

@Override
revans2 marked this conversation as resolved.
Show resolved Hide resolved
public boolean hasNull() {
throw new IllegalStateException("column vector is compressed");
}

@Override
public int numNulls() {
throw new IllegalStateException("column vector is compressed");
}
}
Loading