Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update BufferMeta to support multiple codec buffers per table #426

Merged
merged 2 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions sql-plugin/src/main/format/ShuffleCommon.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,43 @@
namespace com.nvidia.spark.rapids.format;

enum CodecType : byte {
/// data simply copied, codec is only for testing
COPY = -1,

/// no compression codec was used on the data
UNCOMPRESSED = 0
UNCOMPRESSED = 0,
}

/// Descriptor for a compressed buffer
table CodecBufferDescriptor {
/// the compression codec used
codec: CodecType;

/// byte offset from the start of the enclosing compressed buffer
/// where the compressed data begins
compressed_offset: long;

/// size of the compressed data in bytes
compressed_size: long;

/// byte offset from the start of the enclosing uncompressed buffer
/// where the uncompressed data should be written
uncompressed_offset: long;

/// size of the uncompressed data in bytes
uncompressed_size: long;
}

table BufferMeta {
/// ID of this buffer
id: int;

/// size of the uncompressed buffer data in bytes
actual_size: long;
/// size of the buffer data in bytes
size: long;

/// size of the compressed buffer data if a codec is used
compressed_size: long;
/// size of the uncompressed buffer data
uncompressed_size: long;

/// type of compression codec used
codec: CodecType;
/// array of codec buffer descriptors if the data is compressed
codec_buffer_descrs: [CodecBufferDescriptor];
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,42 @@ public final class BufferMeta extends Table {
public int id() { int o = __offset(4); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
public boolean mutateId(int id) { int o = __offset(4); if (o != 0) { bb.putInt(o + bb_pos, id); return true; } else { return false; } }
/**
* size of the uncompressed buffer data in bytes
* size of the buffer data in bytes
*/
public long actualSize() { int o = __offset(6); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateActualSize(long actual_size) { int o = __offset(6); if (o != 0) { bb.putLong(o + bb_pos, actual_size); return true; } else { return false; } }
public long size() { int o = __offset(6); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateSize(long size) { int o = __offset(6); if (o != 0) { bb.putLong(o + bb_pos, size); return true; } else { return false; } }
/**
* size of the compressed buffer data if a codec is used
* size of the uncompressed buffer data
*/
public long compressedSize() { int o = __offset(8); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateCompressedSize(long compressed_size) { int o = __offset(8); if (o != 0) { bb.putLong(o + bb_pos, compressed_size); return true; } else { return false; } }
public long uncompressedSize() { int o = __offset(8); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateUncompressedSize(long uncompressed_size) { int o = __offset(8); if (o != 0) { bb.putLong(o + bb_pos, uncompressed_size); return true; } else { return false; } }
/**
* type of compression codec used
* array of codec buffer descriptors if the data is compressed
*/
public byte codec() { int o = __offset(10); return o != 0 ? bb.get(o + bb_pos) : 0; }
public boolean mutateCodec(byte codec) { int o = __offset(10); if (o != 0) { bb.put(o + bb_pos, codec); return true; } else { return false; } }
public CodecBufferDescriptor codecBufferDescrs(int j) { return codecBufferDescrs(new CodecBufferDescriptor(), j); }
public CodecBufferDescriptor codecBufferDescrs(CodecBufferDescriptor obj, int j) { int o = __offset(10); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
public int codecBufferDescrsLength() { int o = __offset(10); return o != 0 ? __vector_len(o) : 0; }

public static int createBufferMeta(FlatBufferBuilder builder,
int id,
long actual_size,
long compressed_size,
byte codec) {
long size,
long uncompressed_size,
int codec_buffer_descrsOffset) {
builder.startObject(4);
BufferMeta.addCompressedSize(builder, compressed_size);
BufferMeta.addActualSize(builder, actual_size);
BufferMeta.addUncompressedSize(builder, uncompressed_size);
BufferMeta.addSize(builder, size);
BufferMeta.addCodecBufferDescrs(builder, codec_buffer_descrsOffset);
BufferMeta.addId(builder, id);
BufferMeta.addCodec(builder, codec);
return BufferMeta.endBufferMeta(builder);
}

public static void startBufferMeta(FlatBufferBuilder builder) { builder.startObject(4); }
public static void addId(FlatBufferBuilder builder, int id) { builder.addInt(0, id, 0); }
public static void addActualSize(FlatBufferBuilder builder, long actualSize) { builder.addLong(1, actualSize, 0L); }
public static void addCompressedSize(FlatBufferBuilder builder, long compressedSize) { builder.addLong(2, compressedSize, 0L); }
public static void addCodec(FlatBufferBuilder builder, byte codec) { builder.addByte(3, codec, 0); }
public static void addSize(FlatBufferBuilder builder, long size) { builder.addLong(1, size, 0L); }
public static void addUncompressedSize(FlatBufferBuilder builder, long uncompressedSize) { builder.addLong(2, uncompressedSize, 0L); }
public static void addCodecBufferDescrs(FlatBufferBuilder builder, int codecBufferDescrsOffset) { builder.addOffset(3, codecBufferDescrsOffset, 0); }
public static int createCodecBufferDescrsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startCodecBufferDescrsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static int endBufferMeta(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// automatically generated by the FlatBuffers compiler, do not modify

package com.nvidia.spark.rapids.format;

import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;

@SuppressWarnings("unused")
/**
* Descriptor for a compressed buffer
*/
public final class CodecBufferDescriptor extends Table {
public static CodecBufferDescriptor getRootAsCodecBufferDescriptor(ByteBuffer _bb) { return getRootAsCodecBufferDescriptor(_bb, new CodecBufferDescriptor()); }
public static CodecBufferDescriptor getRootAsCodecBufferDescriptor(ByteBuffer _bb, CodecBufferDescriptor obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; vtable_start = bb_pos - bb.getInt(bb_pos); vtable_size = bb.getShort(vtable_start); }
public CodecBufferDescriptor __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }

/**
* the compression codec used
*/
public byte codec() { int o = __offset(4); return o != 0 ? bb.get(o + bb_pos) : 0; }
public boolean mutateCodec(byte codec) { int o = __offset(4); if (o != 0) { bb.put(o + bb_pos, codec); return true; } else { return false; } }
/**
* byte offset from the start of the enclosing compressed buffer
* where the compressed data begins
*/
public long compressedOffset() { int o = __offset(6); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateCompressedOffset(long compressed_offset) { int o = __offset(6); if (o != 0) { bb.putLong(o + bb_pos, compressed_offset); return true; } else { return false; } }
/**
* size of the compressed data in bytes
*/
public long compressedSize() { int o = __offset(8); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateCompressedSize(long compressed_size) { int o = __offset(8); if (o != 0) { bb.putLong(o + bb_pos, compressed_size); return true; } else { return false; } }
/**
* byte offset from the start of the enclosing uncompressed buffer
* where the uncompressed data should be written
*/
public long uncompressedOffset() { int o = __offset(10); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateUncompressedOffset(long uncompressed_offset) { int o = __offset(10); if (o != 0) { bb.putLong(o + bb_pos, uncompressed_offset); return true; } else { return false; } }
/**
* size of the uncompressed data in bytes
*/
public long uncompressedSize() { int o = __offset(12); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateUncompressedSize(long uncompressed_size) { int o = __offset(12); if (o != 0) { bb.putLong(o + bb_pos, uncompressed_size); return true; } else { return false; } }

public static int createCodecBufferDescriptor(FlatBufferBuilder builder,
byte codec,
long compressed_offset,
long compressed_size,
long uncompressed_offset,
long uncompressed_size) {
builder.startObject(5);
CodecBufferDescriptor.addUncompressedSize(builder, uncompressed_size);
CodecBufferDescriptor.addUncompressedOffset(builder, uncompressed_offset);
CodecBufferDescriptor.addCompressedSize(builder, compressed_size);
CodecBufferDescriptor.addCompressedOffset(builder, compressed_offset);
CodecBufferDescriptor.addCodec(builder, codec);
return CodecBufferDescriptor.endCodecBufferDescriptor(builder);
}

public static void startCodecBufferDescriptor(FlatBufferBuilder builder) { builder.startObject(5); }
public static void addCodec(FlatBufferBuilder builder, byte codec) { builder.addByte(0, codec, 0); }
public static void addCompressedOffset(FlatBufferBuilder builder, long compressedOffset) { builder.addLong(1, compressedOffset, 0L); }
public static void addCompressedSize(FlatBufferBuilder builder, long compressedSize) { builder.addLong(2, compressedSize, 0L); }
public static void addUncompressedOffset(FlatBufferBuilder builder, long uncompressedOffset) { builder.addLong(3, uncompressedOffset, 0L); }
public static void addUncompressedSize(FlatBufferBuilder builder, long uncompressedSize) { builder.addLong(4, uncompressedSize, 0L); }
public static int endCodecBufferDescriptor(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

public final class CodecType {
private CodecType() { }
/**
* data simply copied, codec is only for testing
*/
public static final byte COPY = -1;
/**
* no compression codec was used on the data
*/
public static final byte UNCOMPRESSED = 0;

public static final String[] names = { "UNCOMPRESSED", };
public static final String[] names = { "COPY", "UNCOMPRESSED", };

public static String name(int e) { return names[e]; }
public static String name(int e) { return names[e - COPY]; }
}

125 changes: 94 additions & 31 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,81 @@ object MetaUtils {
buffer)
}

/**
* Build a TableMeta message
* @param tableId the ID to use for this table
* @param columns the columns in the table
* @param numRows the number of rows in the table
* @param buffer the contiguous buffer backing the columns in the table
* @return heap-based flatbuffer message
*/
def buildTableMeta(
tableId: Int,
columns: Seq[ColumnVector],
numRows: Long,
buffer: DeviceMemoryBuffer): TableMeta = {
val fbb = new FlatBufferBuilder(1024)
val baseAddress = buffer.getAddress
val bufferSize = buffer.getLength
val bufferMetaOffset = BufferMeta.createBufferMeta(
BufferMeta.startBufferMeta(fbb)
BufferMeta.addId(fbb, tableId)
BufferMeta.addSize(fbb, bufferSize)
BufferMeta.addUncompressedSize(fbb, bufferSize)
val bufferMetaOffset = BufferMeta.endBufferMeta(fbb)
buildTableMeta(fbb, bufferMetaOffset, columns, numRows, buffer.getAddress)
}

/**
* Build a TableMeta message from a Table that originated in contiguous memory that has
* since been compressed.
* @param tableId ID to use for this table
* @param table table whose metadata will be encoded in the message
* @param uncompressedBuffer uncompressed buffer that backs the Table
* @param codecId identifier of the codec being used, see CodecType
* @param compressedSize compressed data from the uncompressed buffer
* @return heap-based flatbuffer message
*/
def buildTableMeta(
tableId: Int,
table: Table,
uncompressedBuffer: DeviceMemoryBuffer,
codecId: Byte,
compressedSize: Long): TableMeta = {
val fbb = new FlatBufferBuilder(1024)
val codecDescrOffset = CodecBufferDescriptor.createCodecBufferDescriptor(
fbb,
tableId,
bufferSize,
bufferSize,
CodecType.UNCOMPRESSED)
codecId,
0,
compressedSize,
0,
uncompressedBuffer.getLength)
val codecDescrArrayOffset =
BufferMeta.createCodecBufferDescrsVector(fbb, Array(codecDescrOffset))
BufferMeta.startBufferMeta(fbb)
BufferMeta.addId(fbb, tableId)
BufferMeta.addSize(fbb, compressedSize)
BufferMeta.addUncompressedSize(fbb, uncompressedBuffer.getLength)
BufferMeta.addCodecBufferDescrs(fbb, codecDescrArrayOffset)
val bufferMetaOffset = BufferMeta.endBufferMeta(fbb)
val columns = (0 until table.getNumberOfColumns).map(i => table.getColumn(i))
buildTableMeta(fbb, bufferMetaOffset, columns, table.getRowCount, uncompressedBuffer.getAddress)
}

/**
* Build a TableMeta message with a pre-built BufferMeta message
* @param fbb flatbuffer builder that has an already built BufferMeta message
* @param bufferMetaOffset offset where the BufferMeta message was built
* @param columns the columns in the table
* @param numRows the number of rows in the table
* @param baseAddress address of uncompressed contiguous buffer holding the table
* @return flatbuffer message
*/
def buildTableMeta(
fbb: FlatBufferBuilder,
bufferMetaOffset: Int,
columns: Seq[ColumnVector],
numRows: Long,
baseAddress: Long): TableMeta = {
val columnMetaOffsets = columns.map(col => addColumnMeta(fbb, baseAddress, col)).toArray

val columnMetasOffset = TableMeta.createColumnMetasVector(fbb, columnMetaOffsets)
TableMeta.startTableMeta(fbb)
TableMeta.addBufferMeta(fbb, bufferMetaOffset)
Expand Down Expand Up @@ -187,6 +245,30 @@ object ShuffleMetadata extends Logging{

val bbFactory = new DirectByteBufferFactory

private def copyBufferMeta(fbb: FlatBufferBuilder, buffMeta: BufferMeta): Int = {
val descrOffsets = (0 until buffMeta.codecBufferDescrsLength()).map { i =>
val descr = buffMeta.codecBufferDescrs(i)
CodecBufferDescriptor.createCodecBufferDescriptor(
fbb,
descr.codec,
descr.compressedOffset,
descr.compressedSize,
descr.uncompressedOffset,
descr.uncompressedSize)
}
val codecDescrArrayOffset = if (descrOffsets.nonEmpty) {
Some(BufferMeta.createCodecBufferDescrsVector(fbb, descrOffsets.toArray))
} else {
None
}
BufferMeta.startBufferMeta(fbb)
BufferMeta.addId(fbb, buffMeta.id)
BufferMeta.addSize(fbb, buffMeta.size)
BufferMeta.addUncompressedSize(fbb, buffMeta.uncompressedSize)
codecDescrArrayOffset.foreach(off => BufferMeta.addCodecBufferDescrs(fbb, off))
BufferMeta.endBufferMeta(fbb)
}

/**
* Given a sequence of `TableMeta`, re-lay the metas using the flat buffer builder in `fbb`.
* @param fbb builder to use
Expand All @@ -196,10 +278,8 @@ object ShuffleMetadata extends Logging{
def copyTables(fbb: FlatBufferBuilder, tables: Seq[TableMeta]): Array[Int] = {
tables.map { tableMeta =>
val buffMeta = tableMeta.bufferMeta()

val buffMetaOffset = if (buffMeta != null) {
Some(BufferMeta.createBufferMeta(fbb, buffMeta.id(), buffMeta.actualSize(),
buffMeta.compressedSize(), buffMeta.codec()))
Some(copyBufferMeta(fbb, buffMeta))
} else {
None
}
Expand Down Expand Up @@ -240,12 +320,8 @@ object ShuffleMetadata extends Logging{
}

TableMeta.startTableMeta(fbb)
if (buffMetaOffset.isDefined) {
TableMeta.addBufferMeta(fbb, buffMetaOffset.get)
}
if (columnMetaOffset.isDefined) {
TableMeta.addColumnMetas(fbb, columnMetaOffset.get)
}
buffMetaOffset.foreach(bmo => TableMeta.addBufferMeta(fbb, bmo))
columnMetaOffset.foreach(cmo => TableMeta.addColumnMetas(fbb, cmo))
TableMeta.addRowCount(fbb, tableMeta.rowCount())
TableMeta.endTableMeta(fbb)
}.toArray
Expand All @@ -272,7 +348,7 @@ object ShuffleMetadata extends Logging{
blockIds : Seq[ShuffleBlockBatchId],
maxResponseSize: Long) : ByteBuffer = {
val fbb = new FlatBufferBuilder(1024, bbFactory)
val blockIdOffsets = blockIds.map { case blockId =>
val blockIdOffsets = blockIds.map { blockId =>
BlockIdMeta.createBlockIdMeta(fbb,
blockId.shuffleId,
blockId.mapId,
Expand Down Expand Up @@ -313,8 +389,7 @@ object ShuffleMetadata extends Logging{
def buildBufferTransferResponse(bufferMetas: Seq[BufferMeta]): ByteBuffer = {
val fbb = new FlatBufferBuilder(1024, bbFactory)
val responses = bufferMetas.map { bm =>
val buffMetaOffset = BufferMeta.createBufferMeta(fbb, bm.id(), bm.actualSize(),
bm.compressedSize(), bm.codec())
val buffMetaOffset = copyBufferMeta(fbb, bm)
BufferTransferResponse.createBufferTransferResponse(fbb, bm.id(), TransferState.STARTED,
buffMetaOffset)
}.toArray
Expand Down Expand Up @@ -365,24 +440,12 @@ object ShuffleMetadata extends Logging{
// TODO: Need to expose native ID in cudf
if (DType.STRING == DType.fromNative(columnMeta.dtype())) {
val offsetLenStr = columnMeta.offsets().length().toString
val validityLen = if (columnMeta.validity() == null) {
-1
} else {
columnMeta.validity().length()
}
out.append(s"column: $i [rows=${columnMeta.rowCount}, " +
s"data_len=${columnMeta.data().length()}, offset_len=${offsetLenStr}, " +
s"validity_len=$validityLen, type=${DType.fromNative(columnMeta.dtype())}, " +
s"null_count=${columnMeta.nullCount()}]\n")
} else {
val offsetLenStr = "NC"

val validityLen = if (columnMeta.validity() == null) {
-1
} else {
columnMeta.validity().length()
}

out.append(s"column: $i [rows=${columnMeta.rowCount}, " +
s"data_len=${columnMeta.data().length()}, offset_len=${offsetLenStr}, " +
s"validity_len=$validityLen, type=${DType.fromNative(columnMeta.dtype())}, " +
Expand Down
Loading