Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove single-buffer compression codec APIs #5092

Merged
merged 2 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,6 @@ class CopyCompressionCodec extends TableCompressionCodec with Arm {
override val name: String = "COPY"
override val codecId: Byte = CodecType.COPY

override def compress(
jlowe marked this conversation as resolved.
Show resolved Hide resolved
tableId: Int,
contigTable: ContiguousTable,
stream: Cuda.Stream): CompressedTable = {
val buffer = contigTable.getBuffer
closeOnExcept(DeviceMemoryBuffer.allocate(buffer.getLength)) { outputBuffer =>
outputBuffer.copyFromDeviceBufferAsync(0, buffer, 0, buffer.getLength, stream)
val meta = MetaUtils.buildTableMeta(
Some(tableId),
contigTable,
codecId,
outputBuffer.getLength)
stream.sync()
CompressedTable(buffer.getLength, meta, outputBuffer)
}
}

override def decompressBufferAsync(
outputBuffer: DeviceMemoryBuffer,
outputOffset: Long,
outputLength: Long,
inputBuffer: DeviceMemoryBuffer,
inputOffset: Long,
inputLength: Long,
stream: Cuda.Stream): Unit = {
require(outputLength == inputLength)
outputBuffer.copyFromDeviceBufferAsync(
outputOffset,
inputBuffer,
inputOffset,
inputLength,
stream)
}

override def createBatchCompressor(
maxBatchMemorySize: Long,
stream: Cuda.Stream): BatchedTableCompressor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,6 @@ class NvcompLZ4CompressionCodec(codecConfigs: TableCompressionCodecConfig)
override val name: String = "nvcomp-LZ4"
override val codecId: Byte = CodecType.NVCOMP_LZ4

override def compress(
tableId: Int,
contigTable: ContiguousTable,
stream: Cuda.Stream): CompressedTable = {
val tableBuffer = contigTable.getBuffer
val (compressedSize, oversizedBuffer) =
NvcompLZ4CompressionCodec.compress(tableBuffer, codecConfigs, stream)
closeOnExcept(oversizedBuffer) { oversizedBuffer =>
require(compressedSize <= oversizedBuffer.getLength, "compressed buffer overrun")
val tableMeta = MetaUtils.buildTableMeta(
Some(tableId),
contigTable,
CodecType.NVCOMP_LZ4,
compressedSize)
CompressedTable(compressedSize, tableMeta, oversizedBuffer)
}
}

override def decompressBufferAsync(
outputBuffer: DeviceMemoryBuffer,
outputOffset: Long,
outputLength: Long,
inputBuffer: DeviceMemoryBuffer,
inputOffset: Long,
inputLength: Long,
stream: Cuda.Stream): Unit = {
withResource(outputBuffer.slice(outputOffset, outputLength)) { outSlice =>
withResource(inputBuffer.slice(inputOffset, inputLength)) { inSlice =>
NvcompLZ4CompressionCodec.decompressAsync(outSlice, inSlice, stream)
}
}
}

override def createBatchCompressor(
maxBatchMemoryBytes: Long,
stream: Cuda.Stream): BatchedTableCompressor = {
Expand All @@ -85,7 +52,7 @@ object NvcompLZ4CompressionCodec extends Arm {
input: DeviceMemoryBuffer,
codecConfigs: TableCompressionCodecConfig,
stream: Cuda.Stream): (Long, DeviceMemoryBuffer) = {
val lz4Config = LZ4Compressor.configure(codecConfigs.lz4ChunkSize, input.getLength())
val lz4Config = LZ4Compressor.configure(codecConfigs.lz4ChunkSize, input.getLength)
withResource(DeviceMemoryBuffer.allocate(lz4Config.getTempBytes)) { tempBuffer =>
var compressedSize: Long = 0L
val outputSize = lz4Config.getMaxCompressedBytes
Expand All @@ -109,12 +76,12 @@ object NvcompLZ4CompressionCodec extends Arm {
inputBuffer: DeviceMemoryBuffer,
stream: Cuda.Stream): Unit = {
withResource(LZ4Decompressor.configure(inputBuffer, stream)) { decompressConf =>
val outputSize = decompressConf.getUncompressedBytes()
val outputSize = decompressConf.getUncompressedBytes
if (outputSize != outputBuffer.getLength) {
throw new IllegalStateException(
s"metadata uncompressed size is $outputSize, buffer size is ${outputBuffer.getLength}")
}
val tempSize = decompressConf.getTempBytes()
val tempSize = decompressConf.getTempBytes
withResource(DeviceMemoryBuffer.allocate(tempSize)) { tempBuffer =>
LZ4Decompressor.decompressAsync(inputBuffer, decompressConf, tempBuffer, outputBuffer,
stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,40 +45,6 @@ trait TableCompressionCodec {
/** The ID used for this codec. See the definitions in `CodecType`. */
val codecId: Byte

/**
* Compress a contiguous table.
* @note The contiguous table is NOT closed by this operation and must be closed separately.
* @note The compressed buffer MAY NOT be ideally sized to the compressed data. It may be
* significantly larger than the size of the compressed data. Releasing this unused
* memory will require making a copy of the data to a buffer of the appropriate size.
* @param tableId ID to use for this table
* @param contigTable contiguous table to compress
* @param stream CUDA stream to use
* @return compressed table
*/
def compress(tableId: Int, contigTable: ContiguousTable, stream: Cuda.Stream): CompressedTable

/**
* Decompress the compressed data buffer from a table compression operation asynchronously
* using the specified stream.
* @note The compressed buffer is NOT closed by this method.
* @param outputBuffer buffer where uncompressed data will be written
* @param outputOffset offset in the uncompressed buffer to start writing data
* @param outputLength expected length of the uncompressed data in bytes
* @param inputBuffer buffer containing the compressed data
* @param inputOffset offset in the compressed buffer where compressed data starts
* @param inputLength length of the compressed data in bytes
* @param stream CUDA stream to use
*/
def decompressBufferAsync(
outputBuffer: DeviceMemoryBuffer,
outputOffset: Long,
outputLength: Long,
inputBuffer: DeviceMemoryBuffer,
inputOffset: Long,
inputLength: Long,
stream: Cuda.Stream): Unit

/**
* Create a batched compressor instance
* @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at
Expand Down