-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update BufferMeta to support multiple codec buffers per table #426
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
74 changes: 74 additions & 0 deletions
74
sql-plugin/src/main/java/com/nvidia/spark/rapids/format/CodecBufferDescriptor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
// automatically generated by the FlatBuffers compiler, do not modify | ||
|
||
package com.nvidia.spark.rapids.format; | ||
|
||
import java.nio.*; | ||
import java.lang.*; | ||
import java.util.*; | ||
import com.google.flatbuffers.*; | ||
|
||
@SuppressWarnings("unused") | ||
/** | ||
* Descriptor for a compressed buffer | ||
*/ | ||
public final class CodecBufferDescriptor extends Table { | ||
public static CodecBufferDescriptor getRootAsCodecBufferDescriptor(ByteBuffer _bb) { return getRootAsCodecBufferDescriptor(_bb, new CodecBufferDescriptor()); } | ||
public static CodecBufferDescriptor getRootAsCodecBufferDescriptor(ByteBuffer _bb, CodecBufferDescriptor obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } | ||
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; vtable_start = bb_pos - bb.getInt(bb_pos); vtable_size = bb.getShort(vtable_start); } | ||
public CodecBufferDescriptor __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } | ||
|
||
/** | ||
* the compression codec used | ||
*/ | ||
public byte codec() { int o = __offset(4); return o != 0 ? bb.get(o + bb_pos) : 0; } | ||
public boolean mutateCodec(byte codec) { int o = __offset(4); if (o != 0) { bb.put(o + bb_pos, codec); return true; } else { return false; } } | ||
/** | ||
* byte offset from the start of the enclosing compressed buffer | ||
* where the compressed data begins | ||
*/ | ||
public long compressedOffset() { int o = __offset(6); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } | ||
public boolean mutateCompressedOffset(long compressed_offset) { int o = __offset(6); if (o != 0) { bb.putLong(o + bb_pos, compressed_offset); return true; } else { return false; } } | ||
/** | ||
* size of the compressed data in bytes | ||
*/ | ||
public long compressedSize() { int o = __offset(8); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } | ||
public boolean mutateCompressedSize(long compressed_size) { int o = __offset(8); if (o != 0) { bb.putLong(o + bb_pos, compressed_size); return true; } else { return false; } } | ||
/** | ||
* byte offset from the start of the enclosing uncompressed buffer | ||
* where the uncompressed data should be written | ||
*/ | ||
public long uncompressedOffset() { int o = __offset(10); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } | ||
public boolean mutateUncompressedOffset(long uncompressed_offset) { int o = __offset(10); if (o != 0) { bb.putLong(o + bb_pos, uncompressed_offset); return true; } else { return false; } } | ||
/** | ||
* size of the uncompressed data in bytes | ||
*/ | ||
public long uncompressedSize() { int o = __offset(12); return o != 0 ? bb.getLong(o + bb_pos) : 0L; } | ||
public boolean mutateUncompressedSize(long uncompressed_size) { int o = __offset(12); if (o != 0) { bb.putLong(o + bb_pos, uncompressed_size); return true; } else { return false; } } | ||
|
||
public static int createCodecBufferDescriptor(FlatBufferBuilder builder, | ||
byte codec, | ||
long compressed_offset, | ||
long compressed_size, | ||
long uncompressed_offset, | ||
long uncompressed_size) { | ||
builder.startObject(5); | ||
CodecBufferDescriptor.addUncompressedSize(builder, uncompressed_size); | ||
CodecBufferDescriptor.addUncompressedOffset(builder, uncompressed_offset); | ||
CodecBufferDescriptor.addCompressedSize(builder, compressed_size); | ||
CodecBufferDescriptor.addCompressedOffset(builder, compressed_offset); | ||
CodecBufferDescriptor.addCodec(builder, codec); | ||
return CodecBufferDescriptor.endCodecBufferDescriptor(builder); | ||
} | ||
|
||
public static void startCodecBufferDescriptor(FlatBufferBuilder builder) { builder.startObject(5); } | ||
public static void addCodec(FlatBufferBuilder builder, byte codec) { builder.addByte(0, codec, 0); } | ||
public static void addCompressedOffset(FlatBufferBuilder builder, long compressedOffset) { builder.addLong(1, compressedOffset, 0L); } | ||
public static void addCompressedSize(FlatBufferBuilder builder, long compressedSize) { builder.addLong(2, compressedSize, 0L); } | ||
public static void addUncompressedOffset(FlatBufferBuilder builder, long uncompressedOffset) { builder.addLong(3, uncompressedOffset, 0L); } | ||
public static void addUncompressedSize(FlatBufferBuilder builder, long uncompressedSize) { builder.addLong(4, uncompressedSize, 0L); } | ||
public static int endCodecBufferDescriptor(FlatBufferBuilder builder) { | ||
int o = builder.endObject(); | ||
return o; | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious why the uncompressed size is 0. There are no docs anywhere that explain that uncompressed size is optional and in what ways it is optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See here and here.
Originally I was planning on using an uncompressedSize of 0 to indicate the data was not compressed with a codec, but I changed it to checking
codecBufferDescrsLength > 0
instead. If we want this to match the size when the buffer is uncompressed that's an easy change to make.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed those. It should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems knowing the uncompressed size would be great so we can allocate a buffer with the right size? And also to track things like max bytes in flight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
size
is always the size of the buffer, compressed or not. The shuffle transport will only ever need to look at that value, since it is never going to uncompress the data directly rather just transport it as-is.uncompressedSize
is only useful for those who have already detected the buffer is compressed and are interested in decoding it. If code doesn't want to know or care if the buffer is compressed,size
is all they should ever need.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of inflight limit, if compression is really great, the transport may want to use the uncompressed size to throttle, rather than the compressed size. Otherwise it makes the OOM cases harder to configure for (how do I tell the user to pick an in-flight size that fits all their data/jobs?)