From 123a9c9d99e1016caaac3a7f79d18e01e09a5730 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 19 Aug 2020 10:29:13 -0500 Subject: [PATCH] Update buffer store to return compressed batches directly, add compression NVTX ranges (#572) * Update buffer store to return compressed batches directly, add compression NVTX ranges Signed-off-by: Jason Lowe * Update parameter name for clarity Signed-off-by: Jason Lowe --- .../nvidia/spark/rapids/GpuColumnVector.java | 11 ++- .../rapids/GpuCompressedColumnVector.java | 16 +++- .../spark/rapids/GpuCoalesceBatches.scala | 2 +- .../nvidia/spark/rapids/RapidsBuffer.scala | 4 +- .../spark/rapids/RapidsBufferStore.scala | 77 ++++--------------- .../rapids/RapidsDeviceMemoryStore.scala | 7 +- .../spark/rapids/TableCompressionCodec.scala | 59 ++++++++++---- 7 files changed, 88 insertions(+), 88 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 0b27c65e395..fef3de23c3c 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -395,8 +395,15 @@ public final int numNulls() { public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) { long sum = 0; - for (int i = 0; i < batch.numCols(); i++) { - sum += ((GpuColumnVector) batch.column(i)).getBase().getDeviceMemorySize(); + if (batch.numCols() > 0) { + if (batch.column(0) instanceof GpuCompressedColumnVector) { + GpuCompressedColumnVector gccv = (GpuCompressedColumnVector) batch.column(0); + sum += gccv.getBuffer().getLength(); + } else { + for (int i = 0; i < batch.numCols(); i++) { + sum += ((GpuColumnVector) batch.column(i)).getBase().getDeviceMemorySize(); + } + } } return sum; } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java index ea7e16e9d56..044a709a279 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java @@ -33,9 +33,19 @@ public final class GpuCompressedColumnVector extends GpuColumnVectorBase { private final DeviceMemoryBuffer buffer; private final TableMeta tableMeta; + /** + * Build a columnar batch from a compressed table. + * NOTE: The data remains compressed and cannot be accessed directly from the columnar batch. + */ public static ColumnarBatch from(CompressedTable compressedTable) { - DeviceMemoryBuffer buffer = compressedTable.buffer(); - TableMeta tableMeta = compressedTable.meta(); + return from(compressedTable.buffer(), compressedTable.meta()); + } + + /** + * Build a columnar batch from a compressed data buffer and specified table metadata + * NOTE: The data remains compressed and cannot be accessed directly from the columnar batch. + */ + public static ColumnarBatch from(DeviceMemoryBuffer compressedBuffer, TableMeta tableMeta) { long rows = tableMeta.rowCount(); if (rows != (int) rows) { throw new IllegalStateException("Cannot support a batch larger that MAX INT rows"); @@ -49,7 +59,7 @@ public static ColumnarBatch from(CompressedTable compressedTable) { tableMeta.columnMetas(columnMeta, i); DType dtype = DType.fromNative(columnMeta.dtype()); DataType type = GpuColumnVector.getSparkType(dtype); - DeviceMemoryBuffer slicedBuffer = buffer.slice(0, buffer.getLength()); + DeviceMemoryBuffer slicedBuffer = compressedBuffer.slice(0, compressedBuffer.getLength()); columns[i] = new GpuCompressedColumnVector(type, slicedBuffer, tableMeta); } } catch (Throwable t) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index 812c3ece914..52735e2d017 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -480,7 +480,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], val buffer = cv.getBuffer.slice(0, cv.getBuffer.getLength) decompressor.addBufferToDecompress(buffer, bufferMeta) } - closeOnExcept(decompressor.finish()) { outputBuffers => + withResource(decompressor.finish()) { outputBuffers => outputBuffers.zipWithIndex.foreach { case (outputBuffer, outputIndex) => val cv = compressedVecs(outputIndex) val batchIndex = compressedBatchIndices(outputIndex) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index 097f12e2621..f69b8459444 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -76,7 +76,9 @@ trait RapidsBuffer extends AutoCloseable { * successfully acquired the buffer beforehand. * @see [[addReference]] * @note It is the responsibility of the caller to close the batch. - * @note This may be an expensive operation (e.g.: batch may need to be decompressed). + * @note If the buffer is compressed data then the resulting batch will be built using + * `GpuCompressedColumnVector`, and it is the responsibility of the caller to deal + * with decompressing the data if necessary. */ def getColumnarBatch: ColumnarBatch diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index ac44dbb2828..866c44ebbd7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -21,8 +21,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, NvtxColor, NvtxRange} -import com.nvidia.spark.rapids.StorageTier.StorageTier -import com.nvidia.spark.rapids.format.{BufferMeta, CodecBufferDescriptor, CodecType, TableMeta} +import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.internal.Logging import org.apache.spark.sql.vectorized.ColumnarBatch @@ -267,30 +266,24 @@ abstract class RapidsBufferStore( // allocated. Allocations can trigger synchronous spills which can // deadlock if another thread holds the device store lock and is trying // to spill to this store. - var deviceBuffer = DeviceMemoryBuffer.allocate(size) - try { - val buffer = getMemoryBuffer - try { - buffer match { - case h: HostMemoryBuffer => - logDebug(s"copying from host $h to device $deviceBuffer") - deviceBuffer.copyFromHostBuffer(h) - case _ => throw new IllegalStateException( - "must override getColumnarBatch if not providing a host buffer") - } - } finally { - buffer.close() - } - - if (meta.bufferMeta.codecBufferDescrsLength > 0) { - withResource(deviceBuffer) { compressedBuffer => - deviceBuffer = uncompressBuffer(compressedBuffer, meta.bufferMeta) - } + withResource(DeviceMemoryBuffer.allocate(size)) { deviceBuffer => + withResource(getMemoryBuffer) { + case h: HostMemoryBuffer => + logDebug(s"copying from host $h to device $deviceBuffer") + deviceBuffer.copyFromHostBuffer(h) + case _ => throw new IllegalStateException( + "must override getColumnarBatch if not providing a host buffer") } + columnarBatchFromDeviceBuffer(deviceBuffer) + } + } - MetaUtils.getBatchFromMeta(deviceBuffer, meta) - } finally { - deviceBuffer.close() + protected def columnarBatchFromDeviceBuffer(devBuffer: DeviceMemoryBuffer): ColumnarBatch = { + val bufferMeta = meta.bufferMeta() + if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { + MetaUtils.getBatchFromMeta(devBuffer, meta) + } else { + GpuCompressedColumnVector.from(devBuffer, meta) } } @@ -343,42 +336,6 @@ abstract class RapidsBufferStore( } } - protected def uncompressBuffer( - compressedBuffer: DeviceMemoryBuffer, - meta: BufferMeta): DeviceMemoryBuffer = { - closeOnExcept(DeviceMemoryBuffer.allocate(meta.uncompressedSize)) { uncompressedBuffer => - val cbd = new CodecBufferDescriptor - (0 until meta.codecBufferDescrsLength).foreach { i => - meta.codecBufferDescrs(cbd, i) - if (cbd.codec == CodecType.UNCOMPRESSED) { - uncompressedBuffer.copyFromDeviceBufferAsync( - cbd.uncompressedOffset, - compressedBuffer, - cbd.compressedOffset, - cbd.compressedSize, - Cuda.DEFAULT_STREAM) - } else { - val startTime = System.nanoTime() - val codec = TableCompressionCodec.getCodec(cbd.codec) - codec.decompressBuffer( - uncompressedBuffer, - cbd.uncompressedOffset, - cbd.uncompressedSize, - compressedBuffer, - cbd.compressedOffset, - cbd.compressedSize) - val duration = System.nanoTime() - startTime - val compressedSize = cbd.compressedSize() - val uncompressedSize = cbd.uncompressedSize - logDebug(s"Decompressed buffer with ${codec.name} in ${duration / 1000} us," + - s"rate=${compressedSize.toFloat / duration} GB/s " + - s"from $compressedSize to $uncompressedSize") - } - } - uncompressedBuffer - } - } - override def toString: String = s"$name buffer size=$size" } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index 48845f6c5f3..d5fc8c943b4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -140,12 +140,7 @@ class RapidsDeviceMemoryStore( if (table.isDefined) { GpuColumnVector.from(table.get) //REFCOUNT ++ of all columns } else { - val uncompressedBuffer = uncompressBuffer(contigBuffer, meta.bufferMeta) - try { - MetaUtils.getBatchFromMeta(uncompressedBuffer, meta) - } finally { - uncompressedBuffer.close() - } + columnarBatchFromDeviceBuffer(contigBuffer) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala index c3215887f5e..67d07b2cc43 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala @@ -18,10 +18,12 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer} +import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer, NvtxColor, NvtxRange} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.format.{BufferMeta, CodecType, TableMeta} +import org.apache.spark.internal.Logging + /** * Compressed table descriptor * @note the buffer may be significantly oversized for the amount of compressed data @@ -122,7 +124,8 @@ object TableCompressionCodec { * temporary and output memory above this limit is allowed but will * be compressed individually. */ -abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm { +abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm + with Logging { // The tables that need to be compressed in the next batch private[this] val tables = new ArrayBuffer[ContiguousTable] @@ -220,23 +223,37 @@ abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoClos private def compressBatch(): Unit = if (tables.nonEmpty) { require(oversizedOutBuffers.length == tables.length) require(tempBuffers.length == tables.length) - val metas = compress(oversizedOutBuffers.toArray, tables.toArray, tempBuffers.toArray) + val startTime = System.nanoTime() + val metas = withResource(new NvtxRange("batch ompress", NvtxColor.ORANGE)) { _ => + compress(oversizedOutBuffers.toArray, tables.toArray, tempBuffers.toArray) + } require(metas.length == tables.length) + val inputSize = tables.map(_.getBuffer.getLength).sum + var outputSize: Long = 0 + // copy the output data into correctly-sized buffers - metas.zipWithIndex.foreach { case (meta, i) => - val oversizedBuffer = oversizedOutBuffers(i) - val compressedSize = meta.bufferMeta.size - val buffer = if (oversizedBuffer.getLength > compressedSize) { - oversizedBuffer.sliceWithCopy(0, compressedSize) - } else { - // use this buffer as-is, don't close it at the end of this method - oversizedOutBuffers(i) = null - oversizedBuffer + withResource(new NvtxRange("copy compressed buffers", NvtxColor.PURPLE)) { _ => + metas.zipWithIndex.foreach { case (meta, i) => + val oversizedBuffer = oversizedOutBuffers(i) + val compressedSize = meta.bufferMeta.size + outputSize += compressedSize + val buffer = if (oversizedBuffer.getLength > compressedSize) { + oversizedBuffer.sliceWithCopy(0, compressedSize) + } else { + // use this buffer as-is, don't close it at the end of this method + oversizedOutBuffers(i) = null + oversizedBuffer + } + results += CompressedTable(compressedSize, meta, buffer) } - results += CompressedTable(compressedSize, meta, buffer) } + val duration = (System.nanoTime() - startTime).toFloat + logDebug(s"Compressed ${tables.length} tables from $inputSize to $outputSize " + + s"in ${duration / 1000000} msec rate=${inputSize / duration} GB/s " + + s"ratio=${outputSize.toFloat/inputSize}") + // free all the inputs to this batch tables.safeClose() tables.clear() @@ -277,7 +294,8 @@ abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoClos * temporary and output memory above this limit is allowed but will * be compressed individually. */ -abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm { +abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm + with Logging { // The buffers of compressed data that will be decompressed in the next batch private[this] val inputBuffers = new ArrayBuffer[DeviceMemoryBuffer] @@ -343,16 +361,27 @@ abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoC inputBuffers.safeClose() tempBuffers.safeClose() outputBuffers.safeClose() + results.safeClose() } protected def decompressBatch(): Unit = { if (inputBuffers.nonEmpty) { require(outputBuffers.length == inputBuffers.length) require(tempBuffers.length == inputBuffers.length) - decompress(outputBuffers.toArray, inputBuffers.toArray, tempBuffers.toArray) + val startTime = System.nanoTime() + withResource(new NvtxRange("batch decompress", NvtxColor.ORANGE)) { _ => + decompress(outputBuffers.toArray, inputBuffers.toArray, tempBuffers.toArray) + } + val duration = (System.nanoTime - startTime).toFloat + val inputSize = inputBuffers.map(_.getLength).sum + val outputSize = outputBuffers.map(_.getLength).sum + results ++= outputBuffers outputBuffers.clear() + logDebug(s"Decompressed ${inputBuffers.length} buffers from $inputSize " + + s"to $outputSize in ${duration / 1000000} msec rate=${outputSize / duration} GB/s") + // free all the inputs to this batch inputBuffers.safeClose() inputBuffers.clear()