Skip to content

Commit

Permalink
Update BufferMeta to support multiple codec buffers per table (NVIDIA…
Browse files Browse the repository at this point in the history
…#426)

* Update BufferMeta to support multiple codec buffers per table

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Update uncompressedSize to always have a size
  • Loading branch information
jlowe authored Jul 28, 2020
1 parent 1944562 commit ad31730
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 100 deletions.
37 changes: 30 additions & 7 deletions sql-plugin/src/main/format/ShuffleCommon.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,43 @@
namespace com.nvidia.spark.rapids.format;

enum CodecType : byte {
/// data simply copied, codec is only for testing
COPY = -1,

/// no compression codec was used on the data
UNCOMPRESSED = 0
UNCOMPRESSED = 0,
}

/// Descriptor for a compressed buffer
table CodecBufferDescriptor {
/// the compression codec used
codec: CodecType;

/// byte offset from the start of the enclosing compressed buffer
/// where the compressed data begins
compressed_offset: long;

/// size of the compressed data in bytes
compressed_size: long;

/// byte offset from the start of the enclosing uncompressed buffer
/// where the uncompressed data should be written
uncompressed_offset: long;

/// size of the uncompressed data in bytes
uncompressed_size: long;
}

table BufferMeta {
/// ID of this buffer
id: int;

/// size of the uncompressed buffer data in bytes
actual_size: long;
/// size of the buffer data in bytes
size: long;

/// size of the compressed buffer data if a codec is used
compressed_size: long;
/// size of the uncompressed buffer data
uncompressed_size: long;

/// type of compression codec used
codec: CodecType;
/// array of codec buffer descriptors if the data is compressed
codec_buffer_descrs: [CodecBufferDescriptor];
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,42 @@ public final class BufferMeta extends Table {
public int id() { int o = __offset(4); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
public boolean mutateId(int id) { int o = __offset(4); if (o != 0) { bb.putInt(o + bb_pos, id); return true; } else { return false; } }
/**
* size of the uncompressed buffer data in bytes
* size of the buffer data in bytes
*/
public long actualSize() { int o = __offset(6); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateActualSize(long actual_size) { int o = __offset(6); if (o != 0) { bb.putLong(o + bb_pos, actual_size); return true; } else { return false; } }
public long size() { int o = __offset(6); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateSize(long size) { int o = __offset(6); if (o != 0) { bb.putLong(o + bb_pos, size); return true; } else { return false; } }
/**
* size of the compressed buffer data if a codec is used
* size of the uncompressed buffer data
*/
public long compressedSize() { int o = __offset(8); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateCompressedSize(long compressed_size) { int o = __offset(8); if (o != 0) { bb.putLong(o + bb_pos, compressed_size); return true; } else { return false; } }
public long uncompressedSize() { int o = __offset(8); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateUncompressedSize(long uncompressed_size) { int o = __offset(8); if (o != 0) { bb.putLong(o + bb_pos, uncompressed_size); return true; } else { return false; } }
/**
* type of compression codec used
* array of codec buffer descriptors if the data is compressed
*/
public byte codec() { int o = __offset(10); return o != 0 ? bb.get(o + bb_pos) : 0; }
public boolean mutateCodec(byte codec) { int o = __offset(10); if (o != 0) { bb.put(o + bb_pos, codec); return true; } else { return false; } }
public CodecBufferDescriptor codecBufferDescrs(int j) { return codecBufferDescrs(new CodecBufferDescriptor(), j); }
public CodecBufferDescriptor codecBufferDescrs(CodecBufferDescriptor obj, int j) { int o = __offset(10); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
public int codecBufferDescrsLength() { int o = __offset(10); return o != 0 ? __vector_len(o) : 0; }

public static int createBufferMeta(FlatBufferBuilder builder,
int id,
long actual_size,
long compressed_size,
byte codec) {
long size,
long uncompressed_size,
int codec_buffer_descrsOffset) {
builder.startObject(4);
BufferMeta.addCompressedSize(builder, compressed_size);
BufferMeta.addActualSize(builder, actual_size);
BufferMeta.addUncompressedSize(builder, uncompressed_size);
BufferMeta.addSize(builder, size);
BufferMeta.addCodecBufferDescrs(builder, codec_buffer_descrsOffset);
BufferMeta.addId(builder, id);
BufferMeta.addCodec(builder, codec);
return BufferMeta.endBufferMeta(builder);
}

public static void startBufferMeta(FlatBufferBuilder builder) { builder.startObject(4); }
public static void addId(FlatBufferBuilder builder, int id) { builder.addInt(0, id, 0); }
public static void addActualSize(FlatBufferBuilder builder, long actualSize) { builder.addLong(1, actualSize, 0L); }
public static void addCompressedSize(FlatBufferBuilder builder, long compressedSize) { builder.addLong(2, compressedSize, 0L); }
public static void addCodec(FlatBufferBuilder builder, byte codec) { builder.addByte(3, codec, 0); }
public static void addSize(FlatBufferBuilder builder, long size) { builder.addLong(1, size, 0L); }
public static void addUncompressedSize(FlatBufferBuilder builder, long uncompressedSize) { builder.addLong(2, uncompressedSize, 0L); }
public static void addCodecBufferDescrs(FlatBufferBuilder builder, int codecBufferDescrsOffset) { builder.addOffset(3, codecBufferDescrsOffset, 0); }
public static int createCodecBufferDescrsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startCodecBufferDescrsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static int endBufferMeta(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// automatically generated by the FlatBuffers compiler, do not modify

package com.nvidia.spark.rapids.format;

import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;

@SuppressWarnings("unused")
/**
* Descriptor for a compressed buffer
*/
public final class CodecBufferDescriptor extends Table {
public static CodecBufferDescriptor getRootAsCodecBufferDescriptor(ByteBuffer _bb) { return getRootAsCodecBufferDescriptor(_bb, new CodecBufferDescriptor()); }
public static CodecBufferDescriptor getRootAsCodecBufferDescriptor(ByteBuffer _bb, CodecBufferDescriptor obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; vtable_start = bb_pos - bb.getInt(bb_pos); vtable_size = bb.getShort(vtable_start); }
public CodecBufferDescriptor __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }

/**
* the compression codec used
*/
public byte codec() { int o = __offset(4); return o != 0 ? bb.get(o + bb_pos) : 0; }
public boolean mutateCodec(byte codec) { int o = __offset(4); if (o != 0) { bb.put(o + bb_pos, codec); return true; } else { return false; } }
/**
* byte offset from the start of the enclosing compressed buffer
* where the compressed data begins
*/
public long compressedOffset() { int o = __offset(6); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateCompressedOffset(long compressed_offset) { int o = __offset(6); if (o != 0) { bb.putLong(o + bb_pos, compressed_offset); return true; } else { return false; } }
/**
* size of the compressed data in bytes
*/
public long compressedSize() { int o = __offset(8); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateCompressedSize(long compressed_size) { int o = __offset(8); if (o != 0) { bb.putLong(o + bb_pos, compressed_size); return true; } else { return false; } }
/**
* byte offset from the start of the enclosing uncompressed buffer
* where the uncompressed data should be written
*/
public long uncompressedOffset() { int o = __offset(10); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateUncompressedOffset(long uncompressed_offset) { int o = __offset(10); if (o != 0) { bb.putLong(o + bb_pos, uncompressed_offset); return true; } else { return false; } }
/**
* size of the uncompressed data in bytes
*/
public long uncompressedSize() { int o = __offset(12); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public boolean mutateUncompressedSize(long uncompressed_size) { int o = __offset(12); if (o != 0) { bb.putLong(o + bb_pos, uncompressed_size); return true; } else { return false; } }

public static int createCodecBufferDescriptor(FlatBufferBuilder builder,
byte codec,
long compressed_offset,
long compressed_size,
long uncompressed_offset,
long uncompressed_size) {
builder.startObject(5);
CodecBufferDescriptor.addUncompressedSize(builder, uncompressed_size);
CodecBufferDescriptor.addUncompressedOffset(builder, uncompressed_offset);
CodecBufferDescriptor.addCompressedSize(builder, compressed_size);
CodecBufferDescriptor.addCompressedOffset(builder, compressed_offset);
CodecBufferDescriptor.addCodec(builder, codec);
return CodecBufferDescriptor.endCodecBufferDescriptor(builder);
}

public static void startCodecBufferDescriptor(FlatBufferBuilder builder) { builder.startObject(5); }
public static void addCodec(FlatBufferBuilder builder, byte codec) { builder.addByte(0, codec, 0); }
public static void addCompressedOffset(FlatBufferBuilder builder, long compressedOffset) { builder.addLong(1, compressedOffset, 0L); }
public static void addCompressedSize(FlatBufferBuilder builder, long compressedSize) { builder.addLong(2, compressedSize, 0L); }
public static void addUncompressedOffset(FlatBufferBuilder builder, long uncompressedOffset) { builder.addLong(3, uncompressedOffset, 0L); }
public static void addUncompressedSize(FlatBufferBuilder builder, long uncompressedSize) { builder.addLong(4, uncompressedSize, 0L); }
public static int endCodecBufferDescriptor(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

public final class CodecType {
private CodecType() { }
/**
* data simply copied, codec is only for testing
*/
public static final byte COPY = -1;
/**
* no compression codec was used on the data
*/
public static final byte UNCOMPRESSED = 0;

public static final String[] names = { "UNCOMPRESSED", };
public static final String[] names = { "COPY", "UNCOMPRESSED", };

public static String name(int e) { return names[e]; }
public static String name(int e) { return names[e - COPY]; }
}

125 changes: 94 additions & 31 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,81 @@ object MetaUtils {
buffer)
}

/**
* Build a TableMeta message
* @param tableId the ID to use for this table
* @param columns the columns in the table
* @param numRows the number of rows in the table
* @param buffer the contiguous buffer backing the columns in the table
* @return heap-based flatbuffer message
*/
def buildTableMeta(
tableId: Int,
columns: Seq[ColumnVector],
numRows: Long,
buffer: DeviceMemoryBuffer): TableMeta = {
val fbb = new FlatBufferBuilder(1024)
val baseAddress = buffer.getAddress
val bufferSize = buffer.getLength
val bufferMetaOffset = BufferMeta.createBufferMeta(
BufferMeta.startBufferMeta(fbb)
BufferMeta.addId(fbb, tableId)
BufferMeta.addSize(fbb, bufferSize)
BufferMeta.addUncompressedSize(fbb, bufferSize)
val bufferMetaOffset = BufferMeta.endBufferMeta(fbb)
buildTableMeta(fbb, bufferMetaOffset, columns, numRows, buffer.getAddress)
}

/**
* Build a TableMeta message from a Table that originated in contiguous memory that has
* since been compressed.
* @param tableId ID to use for this table
* @param table table whose metadata will be encoded in the message
* @param uncompressedBuffer uncompressed buffer that backs the Table
* @param codecId identifier of the codec being used, see CodecType
* @param compressedSize compressed data from the uncompressed buffer
* @return heap-based flatbuffer message
*/
def buildTableMeta(
tableId: Int,
table: Table,
uncompressedBuffer: DeviceMemoryBuffer,
codecId: Byte,
compressedSize: Long): TableMeta = {
val fbb = new FlatBufferBuilder(1024)
val codecDescrOffset = CodecBufferDescriptor.createCodecBufferDescriptor(
fbb,
tableId,
bufferSize,
bufferSize,
CodecType.UNCOMPRESSED)
codecId,
0,
compressedSize,
0,
uncompressedBuffer.getLength)
val codecDescrArrayOffset =
BufferMeta.createCodecBufferDescrsVector(fbb, Array(codecDescrOffset))
BufferMeta.startBufferMeta(fbb)
BufferMeta.addId(fbb, tableId)
BufferMeta.addSize(fbb, compressedSize)
BufferMeta.addUncompressedSize(fbb, uncompressedBuffer.getLength)
BufferMeta.addCodecBufferDescrs(fbb, codecDescrArrayOffset)
val bufferMetaOffset = BufferMeta.endBufferMeta(fbb)
val columns = (0 until table.getNumberOfColumns).map(i => table.getColumn(i))
buildTableMeta(fbb, bufferMetaOffset, columns, table.getRowCount, uncompressedBuffer.getAddress)
}

/**
* Build a TableMeta message with a pre-built BufferMeta message
* @param fbb flatbuffer builder that has an already built BufferMeta message
* @param bufferMetaOffset offset where the BufferMeta message was built
* @param columns the columns in the table
* @param numRows the number of rows in the table
* @param baseAddress address of uncompressed contiguous buffer holding the table
* @return flatbuffer message
*/
def buildTableMeta(
fbb: FlatBufferBuilder,
bufferMetaOffset: Int,
columns: Seq[ColumnVector],
numRows: Long,
baseAddress: Long): TableMeta = {
val columnMetaOffsets = columns.map(col => addColumnMeta(fbb, baseAddress, col)).toArray

val columnMetasOffset = TableMeta.createColumnMetasVector(fbb, columnMetaOffsets)
TableMeta.startTableMeta(fbb)
TableMeta.addBufferMeta(fbb, bufferMetaOffset)
Expand Down Expand Up @@ -187,6 +245,30 @@ object ShuffleMetadata extends Logging{

val bbFactory = new DirectByteBufferFactory

private def copyBufferMeta(fbb: FlatBufferBuilder, buffMeta: BufferMeta): Int = {
val descrOffsets = (0 until buffMeta.codecBufferDescrsLength()).map { i =>
val descr = buffMeta.codecBufferDescrs(i)
CodecBufferDescriptor.createCodecBufferDescriptor(
fbb,
descr.codec,
descr.compressedOffset,
descr.compressedSize,
descr.uncompressedOffset,
descr.uncompressedSize)
}
val codecDescrArrayOffset = if (descrOffsets.nonEmpty) {
Some(BufferMeta.createCodecBufferDescrsVector(fbb, descrOffsets.toArray))
} else {
None
}
BufferMeta.startBufferMeta(fbb)
BufferMeta.addId(fbb, buffMeta.id)
BufferMeta.addSize(fbb, buffMeta.size)
BufferMeta.addUncompressedSize(fbb, buffMeta.uncompressedSize)
codecDescrArrayOffset.foreach(off => BufferMeta.addCodecBufferDescrs(fbb, off))
BufferMeta.endBufferMeta(fbb)
}

/**
* Given a sequence of `TableMeta`, re-lay the metas using the flat buffer builder in `fbb`.
* @param fbb builder to use
Expand All @@ -196,10 +278,8 @@ object ShuffleMetadata extends Logging{
def copyTables(fbb: FlatBufferBuilder, tables: Seq[TableMeta]): Array[Int] = {
tables.map { tableMeta =>
val buffMeta = tableMeta.bufferMeta()

val buffMetaOffset = if (buffMeta != null) {
Some(BufferMeta.createBufferMeta(fbb, buffMeta.id(), buffMeta.actualSize(),
buffMeta.compressedSize(), buffMeta.codec()))
Some(copyBufferMeta(fbb, buffMeta))
} else {
None
}
Expand Down Expand Up @@ -240,12 +320,8 @@ object ShuffleMetadata extends Logging{
}

TableMeta.startTableMeta(fbb)
if (buffMetaOffset.isDefined) {
TableMeta.addBufferMeta(fbb, buffMetaOffset.get)
}
if (columnMetaOffset.isDefined) {
TableMeta.addColumnMetas(fbb, columnMetaOffset.get)
}
buffMetaOffset.foreach(bmo => TableMeta.addBufferMeta(fbb, bmo))
columnMetaOffset.foreach(cmo => TableMeta.addColumnMetas(fbb, cmo))
TableMeta.addRowCount(fbb, tableMeta.rowCount())
TableMeta.endTableMeta(fbb)
}.toArray
Expand All @@ -272,7 +348,7 @@ object ShuffleMetadata extends Logging{
blockIds : Seq[ShuffleBlockBatchId],
maxResponseSize: Long) : ByteBuffer = {
val fbb = new FlatBufferBuilder(1024, bbFactory)
val blockIdOffsets = blockIds.map { case blockId =>
val blockIdOffsets = blockIds.map { blockId =>
BlockIdMeta.createBlockIdMeta(fbb,
blockId.shuffleId,
blockId.mapId,
Expand Down Expand Up @@ -313,8 +389,7 @@ object ShuffleMetadata extends Logging{
def buildBufferTransferResponse(bufferMetas: Seq[BufferMeta]): ByteBuffer = {
val fbb = new FlatBufferBuilder(1024, bbFactory)
val responses = bufferMetas.map { bm =>
val buffMetaOffset = BufferMeta.createBufferMeta(fbb, bm.id(), bm.actualSize(),
bm.compressedSize(), bm.codec())
val buffMetaOffset = copyBufferMeta(fbb, bm)
BufferTransferResponse.createBufferTransferResponse(fbb, bm.id(), TransferState.STARTED,
buffMetaOffset)
}.toArray
Expand Down Expand Up @@ -365,24 +440,12 @@ object ShuffleMetadata extends Logging{
// TODO: Need to expose native ID in cudf
if (DType.STRING == DType.fromNative(columnMeta.dtype())) {
val offsetLenStr = columnMeta.offsets().length().toString
val validityLen = if (columnMeta.validity() == null) {
-1
} else {
columnMeta.validity().length()
}
out.append(s"column: $i [rows=${columnMeta.rowCount}, " +
s"data_len=${columnMeta.data().length()}, offset_len=${offsetLenStr}, " +
s"validity_len=$validityLen, type=${DType.fromNative(columnMeta.dtype())}, " +
s"null_count=${columnMeta.nullCount()}]\n")
} else {
val offsetLenStr = "NC"

val validityLen = if (columnMeta.validity() == null) {
-1
} else {
columnMeta.validity().length()
}

out.append(s"column: $i [rows=${columnMeta.rowCount}, " +
s"data_len=${columnMeta.data().length()}, offset_len=${offsetLenStr}, " +
s"validity_len=$validityLen, type=${DType.fromNative(columnMeta.dtype())}, " +
Expand Down
Loading

0 comments on commit ad31730

Please sign in to comment.