Skip to content

Commit

Permalink
Update buffer store to return compressed batches directly, add compre…
Browse files Browse the repository at this point in the history
…ssion NVTX ranges (#572)

* Update buffer store to return compressed batches directly, add compression NVTX ranges

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Update parameter name for clarity

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Aug 19, 2020
1 parent a4e86f6 commit 123a9c9
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,15 @@ public final int numNulls() {

public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) {
long sum = 0;
for (int i = 0; i < batch.numCols(); i++) {
sum += ((GpuColumnVector) batch.column(i)).getBase().getDeviceMemorySize();
if (batch.numCols() > 0) {
if (batch.column(0) instanceof GpuCompressedColumnVector) {
GpuCompressedColumnVector gccv = (GpuCompressedColumnVector) batch.column(0);
sum += gccv.getBuffer().getLength();
} else {
for (int i = 0; i < batch.numCols(); i++) {
sum += ((GpuColumnVector) batch.column(i)).getBase().getDeviceMemorySize();
}
}
}
return sum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,19 @@ public final class GpuCompressedColumnVector extends GpuColumnVectorBase {
private final DeviceMemoryBuffer buffer;
private final TableMeta tableMeta;

/**
* Build a columnar batch from a compressed table.
* NOTE: The data remains compressed and cannot be accessed directly from the columnar batch.
*/
public static ColumnarBatch from(CompressedTable compressedTable) {
DeviceMemoryBuffer buffer = compressedTable.buffer();
TableMeta tableMeta = compressedTable.meta();
return from(compressedTable.buffer(), compressedTable.meta());
}

/**
* Build a columnar batch from a compressed data buffer and specified table metadata
* NOTE: The data remains compressed and cannot be accessed directly from the columnar batch.
*/
public static ColumnarBatch from(DeviceMemoryBuffer compressedBuffer, TableMeta tableMeta) {
long rows = tableMeta.rowCount();
if (rows != (int) rows) {
throw new IllegalStateException("Cannot support a batch larger that MAX INT rows");
Expand All @@ -49,7 +59,7 @@ public static ColumnarBatch from(CompressedTable compressedTable) {
tableMeta.columnMetas(columnMeta, i);
DType dtype = DType.fromNative(columnMeta.dtype());
DataType type = GpuColumnVector.getSparkType(dtype);
DeviceMemoryBuffer slicedBuffer = buffer.slice(0, buffer.getLength());
DeviceMemoryBuffer slicedBuffer = compressedBuffer.slice(0, compressedBuffer.getLength());
columns[i] = new GpuCompressedColumnVector(type, slicedBuffer, tableMeta);
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
val buffer = cv.getBuffer.slice(0, cv.getBuffer.getLength)
decompressor.addBufferToDecompress(buffer, bufferMeta)
}
closeOnExcept(decompressor.finish()) { outputBuffers =>
withResource(decompressor.finish()) { outputBuffers =>
outputBuffers.zipWithIndex.foreach { case (outputBuffer, outputIndex) =>
val cv = compressedVecs(outputIndex)
val batchIndex = compressedBatchIndices(outputIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ trait RapidsBuffer extends AutoCloseable {
* successfully acquired the buffer beforehand.
* @see [[addReference]]
* @note It is the responsibility of the caller to close the batch.
* @note This may be an expensive operation (e.g.: batch may need to be decompressed).
* @note If the buffer is compressed data then the resulting batch will be built using
* `GpuCompressedColumnVector`, and it is the responsibility of the caller to deal
* with decompressing the data if necessary.
*/
def getColumnarBatch: ColumnarBatch

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.{BufferMeta, CodecBufferDescriptor, CodecType, TableMeta}
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.internal.Logging
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -267,30 +266,24 @@ abstract class RapidsBufferStore(
// allocated. Allocations can trigger synchronous spills which can
// deadlock if another thread holds the device store lock and is trying
// to spill to this store.
var deviceBuffer = DeviceMemoryBuffer.allocate(size)
try {
val buffer = getMemoryBuffer
try {
buffer match {
case h: HostMemoryBuffer =>
logDebug(s"copying from host $h to device $deviceBuffer")
deviceBuffer.copyFromHostBuffer(h)
case _ => throw new IllegalStateException(
"must override getColumnarBatch if not providing a host buffer")
}
} finally {
buffer.close()
}

if (meta.bufferMeta.codecBufferDescrsLength > 0) {
withResource(deviceBuffer) { compressedBuffer =>
deviceBuffer = uncompressBuffer(compressedBuffer, meta.bufferMeta)
}
withResource(DeviceMemoryBuffer.allocate(size)) { deviceBuffer =>
withResource(getMemoryBuffer) {
case h: HostMemoryBuffer =>
logDebug(s"copying from host $h to device $deviceBuffer")
deviceBuffer.copyFromHostBuffer(h)
case _ => throw new IllegalStateException(
"must override getColumnarBatch if not providing a host buffer")
}
columnarBatchFromDeviceBuffer(deviceBuffer)
}
}

MetaUtils.getBatchFromMeta(deviceBuffer, meta)
} finally {
deviceBuffer.close()
protected def columnarBatchFromDeviceBuffer(devBuffer: DeviceMemoryBuffer): ColumnarBatch = {
val bufferMeta = meta.bufferMeta()
if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) {
MetaUtils.getBatchFromMeta(devBuffer, meta)
} else {
GpuCompressedColumnVector.from(devBuffer, meta)
}
}

Expand Down Expand Up @@ -343,42 +336,6 @@ abstract class RapidsBufferStore(
}
}

protected def uncompressBuffer(
compressedBuffer: DeviceMemoryBuffer,
meta: BufferMeta): DeviceMemoryBuffer = {
closeOnExcept(DeviceMemoryBuffer.allocate(meta.uncompressedSize)) { uncompressedBuffer =>
val cbd = new CodecBufferDescriptor
(0 until meta.codecBufferDescrsLength).foreach { i =>
meta.codecBufferDescrs(cbd, i)
if (cbd.codec == CodecType.UNCOMPRESSED) {
uncompressedBuffer.copyFromDeviceBufferAsync(
cbd.uncompressedOffset,
compressedBuffer,
cbd.compressedOffset,
cbd.compressedSize,
Cuda.DEFAULT_STREAM)
} else {
val startTime = System.nanoTime()
val codec = TableCompressionCodec.getCodec(cbd.codec)
codec.decompressBuffer(
uncompressedBuffer,
cbd.uncompressedOffset,
cbd.uncompressedSize,
compressedBuffer,
cbd.compressedOffset,
cbd.compressedSize)
val duration = System.nanoTime() - startTime
val compressedSize = cbd.compressedSize()
val uncompressedSize = cbd.uncompressedSize
logDebug(s"Decompressed buffer with ${codec.name} in ${duration / 1000} us," +
s"rate=${compressedSize.toFloat / duration} GB/s " +
s"from $compressedSize to $uncompressedSize")
}
}
uncompressedBuffer
}
}

override def toString: String = s"$name buffer size=$size"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,7 @@ class RapidsDeviceMemoryStore(
if (table.isDefined) {
GpuColumnVector.from(table.get) //REFCOUNT ++ of all columns
} else {
val uncompressedBuffer = uncompressBuffer(contigBuffer, meta.bufferMeta)
try {
MetaUtils.getBatchFromMeta(uncompressedBuffer, meta)
} finally {
uncompressedBuffer.close()
}
columnarBatchFromDeviceBuffer(contigBuffer)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer}
import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer, NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.format.{BufferMeta, CodecType, TableMeta}

import org.apache.spark.internal.Logging

/**
* Compressed table descriptor
* @note the buffer may be significantly oversized for the amount of compressed data
Expand Down Expand Up @@ -122,7 +124,8 @@ object TableCompressionCodec {
* temporary and output memory above this limit is allowed but will
* be compressed individually.
*/
abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm {
abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm
with Logging {
// The tables that need to be compressed in the next batch
private[this] val tables = new ArrayBuffer[ContiguousTable]

Expand Down Expand Up @@ -220,23 +223,37 @@ abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoClos
private def compressBatch(): Unit = if (tables.nonEmpty) {
require(oversizedOutBuffers.length == tables.length)
require(tempBuffers.length == tables.length)
val metas = compress(oversizedOutBuffers.toArray, tables.toArray, tempBuffers.toArray)
val startTime = System.nanoTime()
val metas = withResource(new NvtxRange("batch ompress", NvtxColor.ORANGE)) { _ =>
compress(oversizedOutBuffers.toArray, tables.toArray, tempBuffers.toArray)
}
require(metas.length == tables.length)

val inputSize = tables.map(_.getBuffer.getLength).sum
var outputSize: Long = 0

// copy the output data into correctly-sized buffers
metas.zipWithIndex.foreach { case (meta, i) =>
val oversizedBuffer = oversizedOutBuffers(i)
val compressedSize = meta.bufferMeta.size
val buffer = if (oversizedBuffer.getLength > compressedSize) {
oversizedBuffer.sliceWithCopy(0, compressedSize)
} else {
// use this buffer as-is, don't close it at the end of this method
oversizedOutBuffers(i) = null
oversizedBuffer
withResource(new NvtxRange("copy compressed buffers", NvtxColor.PURPLE)) { _ =>
metas.zipWithIndex.foreach { case (meta, i) =>
val oversizedBuffer = oversizedOutBuffers(i)
val compressedSize = meta.bufferMeta.size
outputSize += compressedSize
val buffer = if (oversizedBuffer.getLength > compressedSize) {
oversizedBuffer.sliceWithCopy(0, compressedSize)
} else {
// use this buffer as-is, don't close it at the end of this method
oversizedOutBuffers(i) = null
oversizedBuffer
}
results += CompressedTable(compressedSize, meta, buffer)
}
results += CompressedTable(compressedSize, meta, buffer)
}

val duration = (System.nanoTime() - startTime).toFloat
logDebug(s"Compressed ${tables.length} tables from $inputSize to $outputSize " +
s"in ${duration / 1000000} msec rate=${inputSize / duration} GB/s " +
s"ratio=${outputSize.toFloat/inputSize}")

// free all the inputs to this batch
tables.safeClose()
tables.clear()
Expand Down Expand Up @@ -277,7 +294,8 @@ abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoClos
* temporary and output memory above this limit is allowed but will
* be compressed individually.
*/
abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm {
abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm
with Logging {
// The buffers of compressed data that will be decompressed in the next batch
private[this] val inputBuffers = new ArrayBuffer[DeviceMemoryBuffer]

Expand Down Expand Up @@ -343,16 +361,27 @@ abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoC
inputBuffers.safeClose()
tempBuffers.safeClose()
outputBuffers.safeClose()
results.safeClose()
}

protected def decompressBatch(): Unit = {
if (inputBuffers.nonEmpty) {
require(outputBuffers.length == inputBuffers.length)
require(tempBuffers.length == inputBuffers.length)
decompress(outputBuffers.toArray, inputBuffers.toArray, tempBuffers.toArray)
val startTime = System.nanoTime()
withResource(new NvtxRange("batch decompress", NvtxColor.ORANGE)) { _ =>
decompress(outputBuffers.toArray, inputBuffers.toArray, tempBuffers.toArray)
}
val duration = (System.nanoTime - startTime).toFloat
val inputSize = inputBuffers.map(_.getLength).sum
val outputSize = outputBuffers.map(_.getLength).sum

results ++= outputBuffers
outputBuffers.clear()

logDebug(s"Decompressed ${inputBuffers.length} buffers from $inputSize " +
s"to $outputSize in ${duration / 1000000} msec rate=${outputSize / duration} GB/s")

// free all the inputs to this batch
inputBuffers.safeClose()
inputBuffers.clear()
Expand Down

0 comments on commit 123a9c9

Please sign in to comment.