From 6ecbabd885bd13f066f8fafb9839ed0e73ea7db1 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 31 Jul 2020 14:03:49 -0500 Subject: [PATCH 1/7] Add framework for batch compression of shuffle partitions Signed-off-by: Jason Lowe --- .../nvidia/spark/rapids/GpuColumnVector.java | 2 +- .../rapids/GpuCompressedColumnVector.java | 169 ++++++++ .../spark/rapids/CopyCompressionCodec.scala | 104 +++++ .../spark/rapids/GpuCoalesceBatches.scala | 104 ++++- .../nvidia/spark/rapids/GpuPartitioning.scala | 42 +- .../spark/rapids/RapidsBufferStore.scala | 49 ++- .../com/nvidia/spark/rapids/RapidsConf.scala | 25 ++ .../rapids/RapidsDeviceMemoryStore.scala | 7 +- .../spark/rapids/TableCompressionCodec.scala | 367 ++++++++++++++++++ .../spark/sql/rapids/GpuShuffleEnv.scala | 10 + .../rapids/RapidsShuffleInternalManager.scala | 58 ++- .../rapids/GpuCoalesceBatchesSuite.scala | 160 +++++++- .../spark/rapids/GpuPartitioningSuite.scala | 60 ++- .../rapids/GpuSinglePartitioningSuite.scala | 3 +- .../spark/rapids/RapidsDiskStoreSuite.scala | 11 +- .../com/nvidia/spark/rapids/TestUtils.scala | 3 +- 16 files changed, 1095 insertions(+), 79 deletions(-) create mode 100644 sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index c69c13527f3..d1e66b469a1 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -173,7 +173,7 @@ public static final DType getRapidsType(DataType type) { return result; } - protected static final DataType getSparkType(DType type) { + static final DataType getSparkType(DType type) { switch (type) { case BOOL8: return DataTypes.BooleanType; diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java new file mode 100644 index 00000000000..1f92289111f --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import ai.rapids.cudf.DType; +import ai.rapids.cudf.DeviceMemoryBuffer; +import com.nvidia.spark.rapids.format.ColumnMeta; +import com.nvidia.spark.rapids.format.TableMeta; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A GPU column vector that has been compressed. The columnar data within cannot + * be accessed directly. This class primarily serves the role of tracking the + * compressed data and table metadata so it can be decompressed later. + */ +public final class GpuCompressedColumnVector extends ColumnVector { + private final DeviceMemoryBuffer buffer; + private final TableMeta tableMeta; + + public static ColumnarBatch from(CompressedTable compressedTable) { + DeviceMemoryBuffer buffer = compressedTable.buffer(); + TableMeta tableMeta = compressedTable.meta(); + long rows = tableMeta.rowCount(); + if (rows != (int) rows) { + throw new IllegalStateException("Cannot support a batch larger that MAX INT rows"); + } + + ColumnMeta columnMeta = new ColumnMeta(); + int numColumns = tableMeta.columnMetasLength(); + ColumnVector[] columns = new ColumnVector[numColumns]; + try { + for (int i = 0; i < numColumns; ++i) { + tableMeta.columnMetas(columnMeta, i); + DType dtype = DType.fromNative(columnMeta.dtype()); + DataType type = GpuColumnVector.getSparkType(dtype); + DeviceMemoryBuffer slicedBuffer = buffer.slice(0, buffer.getLength()); + columns[i] = new GpuCompressedColumnVector(type, slicedBuffer, tableMeta); + } + } catch (Throwable t) { + for (int i = 0; i < numColumns; ++i) { + if (columns[i] != null) { + columns[i].close(); + } + } + throw t; + } + + return new ColumnarBatch(columns, (int) rows); + } + + private GpuCompressedColumnVector(DataType type, DeviceMemoryBuffer buffer, TableMeta tableMeta) { + super(type); + this.buffer = buffer; + this.tableMeta = tableMeta; + } + + public DeviceMemoryBuffer getBuffer() { + return buffer; + } + + public TableMeta getTableMeta() { + return tableMeta; + } + + @Override + public void close() { + buffer.close(); + } + + @Override + public boolean hasNull() { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public int numNulls() { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public boolean isNullAt(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public boolean getBoolean(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public byte getByte(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public short getShort(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public int getInt(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public long getLong(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public float getFloat(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public double getDouble(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public ColumnarMap getMap(int ordinal) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public UTF8String getUTF8String(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public byte[] getBinary(int rowId) { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public ColumnVector getChild(int ordinal) { + throw new IllegalStateException("column vector is compressed"); + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala new file mode 100644 index 00000000000..c93b1406611 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer} +import com.nvidia.spark.rapids.format.{CodecType, TableMeta} + +/** A table compression codec used only for testing that copies the data. */ +class CopyCompressionCodec extends TableCompressionCodec with Arm { + override val name: String = "COPY" + override val codecType: Byte = CodecType.COPY + + override def compress( + tableId: Int, + contigTable: ContiguousTable): CompressedTable = { + val buffer = contigTable.getBuffer + closeOnExcept(buffer.sliceWithCopy(0, buffer.getLength)) { outputBuffer => + val meta = MetaUtils.buildTableMeta( + tableId, + contigTable.getTable, + buffer, + codecType, + outputBuffer.getLength) + CompressedTable(buffer.getLength, meta, outputBuffer) + } + } + + override def decompressBuffer( + outputBuffer: DeviceMemoryBuffer, + outputOffset: Long, + outputLength: Long, + inputBuffer: DeviceMemoryBuffer, + inputOffset: Long, + inputLength: Long): Unit = { + require(outputLength == inputLength) + outputBuffer.copyFromDeviceBufferAsync( + outputOffset, + inputBuffer, + inputOffset, + inputLength, + Cuda.DEFAULT_STREAM) + } + + override def createBatchCompressor(maxBatchMemorySize: Long): BatchedTableCompressor = + new BatchedCopyCompressor(maxBatchMemorySize) + + override def createBatchDecompressor(maxBatchMemorySize: Long): BatchedBufferDecompressor = + new BatchedCopyDecompressor(maxBatchMemorySize) +} + +class BatchedCopyCompressor(maxBatchMemory: Long) extends BatchedTableCompressor(maxBatchMemory) { + override protected def getTempSpaceNeeded(buffer: DeviceMemoryBuffer): Long = 0 + + override protected def getOutputSpaceNeeded( + dataBuffer: DeviceMemoryBuffer, + tempBuffer: DeviceMemoryBuffer): Long = dataBuffer.getLength + + override protected def compress( + outputBuffers: Array[DeviceMemoryBuffer], + tables: Array[ContiguousTable], + tempBuffers: Array[DeviceMemoryBuffer]): Array[TableMeta] = { + outputBuffers.zip(tables).map { case (outBuffer, ct) => + val inBuffer = ct.getBuffer + outBuffer.copyFromDeviceBufferAsync(0, inBuffer, 0, inBuffer.getLength, Cuda.DEFAULT_STREAM) + MetaUtils.buildTableMeta( + 0, + ct.getTable, + inBuffer, + CodecType.COPY, + outBuffer.getLength) + } + } +} + +class BatchedCopyDecompressor(maxBatchMemory: Long) + extends BatchedBufferDecompressor(maxBatchMemory) { + override val codecId: Byte = CodecType.COPY + + override def decompressTempSpaceNeeded(inputBuffer: DeviceMemoryBuffer): Long = 0 + + override def decompress( + outputBuffers: Array[DeviceMemoryBuffer], + inputBuffers: Array[DeviceMemoryBuffer], + tempBuffers: Array[DeviceMemoryBuffer]): Unit = { + outputBuffers.zip(inputBuffers).foreach { case (outputBuffer, inputBuffer) => + outputBuffer.copyFromDeviceBufferAsync(0, inputBuffer, 0, + outputBuffer.getLength, Cuda.DEFAULT_STREAM) + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index 22028e9321a..18f05ea5360 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{BufferType, NvtxColor, Table} +import com.nvidia.spark.rapids.format.{ColumnMeta, SubBufferMeta, TableMeta} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -364,6 +365,7 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch], class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], schema: StructType, goal: CoalesceGoal, + maxDecompressBatchMemory: Long, numInputRows: SQLMetric, numInputBatches: SQLMetric, numOutputRows: SQLMetric, @@ -384,21 +386,75 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], concatTime, totalTime, peakDevMemory, - opName) { + opName) with Arm { private var batches: ArrayBuffer[ColumnarBatch] = ArrayBuffer.empty private var maxDeviceMemory: Long = 0 - override def initNewBatch(): Unit = - batches = ArrayBuffer[ColumnarBatch]() + // batch indices that are compressed batches + private[this] var compressedBatchIndices: ArrayBuffer[Int] = ArrayBuffer.empty - override def addBatchToConcat(batch: ColumnarBatch): Unit = + private[this] var codec: TableCompressionCodec = _ + + override def initNewBatch(): Unit = { + batches.clear() + compressedBatchIndices.clear() + } + + override def addBatchToConcat(batch: ColumnarBatch): Unit = { + if (isBatchCompressed(batch)) { + compressedBatchIndices += batches.size + } batches += batch + } + + private def isBatchCompressed(batch: ColumnarBatch): Boolean = { + if (batch.numCols == 0) { + false + } else { + batch.column(0) match { + case _: GpuCompressedColumnVector => true + case _ => false + } + } + } - override def getColumnSizes(cb: ColumnarBatch): Array[Long] = - GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize) + private def getUncompressedColumnSizes(tableMeta: TableMeta): Array[Long] = { + val numCols = tableMeta.columnMetasLength + val columnMeta = new ColumnMeta + val subBufferMetaObj = new SubBufferMeta + val sizes = new Array[Long](numCols) + (0 until numCols).foreach { i => + tableMeta.columnMetas(columnMeta, i) + var subBuffer = columnMeta.data(subBufferMetaObj) + if (subBuffer != null) { + sizes(i) += subBuffer.length + } + subBuffer = columnMeta.offsets(subBufferMetaObj) + if (subBuffer != null) { + sizes(i) += subBuffer.length + } + subBuffer = columnMeta.validity(subBufferMetaObj) + if (subBuffer != null) { + sizes(i) += subBuffer.length + } + } + sizes + } + + override def getColumnSizes(cb: ColumnarBatch): Array[Long] = { + if (!isBatchCompressed(cb)) { + GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize) + } else { + val compressedVector = cb.column(0).asInstanceOf[GpuCompressedColumnVector] + val tableMeta = compressedVector.getTableMeta + require(tableMeta.columnMetasLength == cb.numCols) + getUncompressedColumnSizes(tableMeta) + } + } override def concatAllAndPutOnGPU(): ColumnarBatch = { + decompressBatches() val tmp = batches.toArray // Clear the buffer so we don't close it again (buildNonEmptyBatch closed it for us). batches = ArrayBuffer.empty @@ -408,6 +464,35 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], ret } + private def decompressBatches(): Unit = { + if (compressedBatchIndices.nonEmpty) { + val compressedVecs = compressedBatchIndices.map { batchIndex => + batches(batchIndex).column(0).asInstanceOf[GpuCompressedColumnVector] + } + if (codec == null) { + val descr = compressedVecs.head.getTableMeta.bufferMeta.codecBufferDescrs(0) + codec = TableCompressionCodec.getCodec(descr.codec) + } + withResource(codec.createBatchDecompressor(maxDecompressBatchMemory)) { decompressor => + compressedVecs.foreach { cv => + val bufferMeta = cv.getTableMeta.bufferMeta + // don't currently support switching codecs when partitioning + val buffer = cv.getBuffer.slice(0, cv.getBuffer.getLength) + decompressor.addBufferToDecode(buffer, bufferMeta) + } + closeOnExcept(decompressor.finish()) { outputBuffers => + outputBuffers.zipWithIndex.foreach { case (outputBuffer, outputIndex) => + val cv = compressedVecs(outputIndex) + val batchIndex = compressedBatchIndices(outputIndex) + val compressedBatch = batches(batchIndex) + batches(batchIndex) = MetaUtils.getBatchFromMeta(outputBuffer, cv.getTableMeta) + compressedBatch.close() + } + } + } + } + } + override def cleanupConcatIsDone(): Unit = { peakDevMemory.set(maxDeviceMemory) batches.foreach(_.close()) @@ -416,8 +501,11 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) extends UnaryExecNode with GpuExec { - import GpuMetricNames._ + private[this] val maxDecompressBatchMemory = + new RapidsConf(child.conf).shuffleCompressionMaxBatchMemory + + import GpuMetricNames._ override lazy val additionalMetrics: Map[String, SQLMetric] = Map( NUM_INPUT_ROWS -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_ROWS), NUM_INPUT_BATCHES -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_BATCHES), @@ -447,7 +535,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) val batches = child.executeColumnar() batches.mapPartitions { iter => if (child.schema.nonEmpty) { - new GpuCoalesceIterator(iter, schema, goal, + new GpuCoalesceIterator(iter, schema, goal, maxDecompressBatchMemory, numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches") } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 43c1092a669..db6ae6b11c8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -18,15 +18,18 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ContiguousTable, NvtxColor, NvtxRange, Table} +import ai.rapids.cudf.{NvtxColor, NvtxRange, Table} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuShuffleEnv import org.apache.spark.sql.vectorized.ColumnarBatch -trait GpuPartitioning extends Partitioning { +trait GpuPartitioning extends Partitioning with Arm { + private[this] val maxCompressionBatchSize = + new RapidsConf(SQLConf.get).shuffleCompressionMaxBatchMemory def sliceBatch(vectors: Array[RapidsHostColumnVector], start: Int, end: Int): ColumnarBatch = { var ret: ColumnarBatch = null @@ -43,24 +46,25 @@ trait GpuPartitioning extends Partitioning { // The first index will always be 0, so we need to skip it. val batches = if (numRows > 0) { val parts = partitionIndexes.slice(1, partitionIndexes.length) - val splits = new ArrayBuffer[ColumnarBatch](numPartitions) - val table = new Table(partitionColumns.map(_.getBase).toArray: _*) - val contiguousTables: Array[ContiguousTable] = try { - table.contiguousSplit(parts: _*) - } finally { - table.close() - } - var succeeded = false - try { - contiguousTables.foreach { ct => splits.append(GpuColumnVectorFromBuffer.from(ct)) } - succeeded = true - } finally { - contiguousTables.foreach(_.close()) - if (!succeeded) { - splits.foreach(_.close()) + closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits => + val table = new Table(partitionColumns.map(_.getBase).toArray: _*) + val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*)) + GpuShuffleEnv.get.rapidsShuffleCodec match { + case Some(codec) => + withResource(codec.createBatchCompressor(maxCompressionBatchSize)) { compressor => + // batchCompress takes ownership of the contiguous tables and will close + compressor.addTables(contiguousTables) + closeOnExcept(compressor.finish()) { compressedTables => + compressedTables.foreach(ct => splits.append(GpuCompressedColumnVector.from(ct))) + } + } + case None => + withResource(contiguousTables) { cts => + cts.foreach { ct => splits.append(GpuColumnVectorFromBuffer.from(ct)) } + } } + splits.toArray } - splits.toArray } else { Array[ColumnarBatch]() } @@ -96,7 +100,7 @@ trait GpuPartitioning extends Partitioning { def sliceInternalGpuOrCpu(numRows: Int, partitionIndexes: Array[Int], partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = { - val rapidsShuffleEnabled = GpuShuffleEnv.isRapidsShuffleEnabled + val rapidsShuffleEnabled = GpuShuffleEnv.get.isRapidsShuffleEnabled val nvtxRangeKey = if (rapidsShuffleEnabled) { "sliceInternalOnGpu" } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index b78d680c01c..9cf5b5d9a26 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, NvtxColor, NvtxRange} import com.nvidia.spark.rapids.StorageTier.StorageTier -import com.nvidia.spark.rapids.format.TableMeta +import com.nvidia.spark.rapids.format.{BufferMeta, CodecBufferDescriptor, CodecType, TableMeta} import org.apache.spark.internal.Logging import org.apache.spark.sql.vectorized.ColumnarBatch @@ -310,7 +310,7 @@ abstract class RapidsBufferStore( override val id: RapidsBufferId, override val size: Long, override val meta: TableMeta, - initialSpillPriority: Long) extends RapidsBuffer { + initialSpillPriority: Long) extends RapidsBuffer with Arm { private[this] var isValid = true protected[this] var refcount = 0 private[this] var spillPriority: Long = initialSpillPriority @@ -352,7 +352,7 @@ abstract class RapidsBufferStore( // allocated. Allocations can trigger synchronous spills which can // deadlock if another thread holds the device store lock and is trying // to spill to this store. - val deviceBuffer = DeviceMemoryBuffer.allocate(size) + var deviceBuffer = DeviceMemoryBuffer.allocate(size) try { val buffer = getMemoryBuffer try { @@ -366,6 +366,13 @@ abstract class RapidsBufferStore( } finally { buffer.close() } + + if (meta.bufferMeta.codecBufferDescrsLength > 0) { + val compressedBuffer = deviceBuffer + deviceBuffer = uncompressBuffer(deviceBuffer, meta.bufferMeta) + compressedBuffer.close() + } + MetaUtils.getBatchFromMeta(deviceBuffer, meta) } finally { deviceBuffer.close() @@ -427,6 +434,42 @@ abstract class RapidsBufferStore( } } + protected def uncompressBuffer( + compressedBuffer: DeviceMemoryBuffer, + meta: BufferMeta): DeviceMemoryBuffer = { + closeOnExcept(DeviceMemoryBuffer.allocate(meta.uncompressedSize)) { uncompressedBuffer => + val cbd = new CodecBufferDescriptor + (0 until meta.codecBufferDescrsLength).foreach { i => + meta.codecBufferDescrs(cbd, i) + if (cbd.codec == CodecType.UNCOMPRESSED) { + uncompressedBuffer.copyFromDeviceBufferAsync( + cbd.uncompressedOffset, + compressedBuffer, + cbd.compressedOffset, + cbd.compressedSize, + Cuda.DEFAULT_STREAM) + } else { + val startTime = System.nanoTime() + val codec = TableCompressionCodec.getCodec(cbd.codec) + codec.decompressBuffer( + uncompressedBuffer, + cbd.uncompressedOffset, + cbd.uncompressedSize, + compressedBuffer, + cbd.compressedOffset, + cbd.compressedSize) + val duration = System.nanoTime() - startTime + val compressedSize = cbd.compressedSize() + val uncompressedSize = cbd.uncompressedSize + logWarning(s"Decompressed buffer with ${codec.name} in ${duration / 1000} us," + + s"rate=${compressedSize.toFloat / duration} GB/s " + + s"from $compressedSize to $uncompressedSize") + } + } + uncompressedBuffer + } + } + override def toString: String = s"$name buffer size=$size" } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index ac4b35eb826..c89a612eaa0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -604,8 +604,27 @@ object RapidsConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(50 * 1024) + val SHUFFLE_COMPRESSION_ENABLED = conf("spark.rapids.shuffle.compression.enabled") + .doc("Whether to enable compression of shuffle buffers") + .internal() + .booleanConf + .createWithDefault(false) + + val SHUFFLE_COMPRESSION_CODEC = conf("spark.rapids.shuffle.compression.codec") + .doc("The GPU codec used to compress shuffle data when using RAPIDS shuffle. " + + "Currently only one codec is supported, copy.") + .internal() + .stringConf + .createWithDefault("copy") + // USER FACING DEBUG CONFIGS + val SHUFFLE_COMPRESSION_MAX_BATCH_MEMORY = + conf("spark.rapids.shuffle.compression.maxBatchMemory") + .internal() + .bytesConf(ByteUnit.BYTE) + .createWithDefault(1024 * 1024 * 1024) + val EXPLAIN = conf("spark.rapids.sql.explain") .doc("Explain why some parts of a query were not placed on a GPU or not. Possible " + "values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of " + @@ -866,6 +885,12 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shuffleMaxMetadataSize: Long = get(SHUFFLE_MAX_METADATA_SIZE) + lazy val shuffleCompressionEnabled: Boolean = get(SHUFFLE_COMPRESSION_ENABLED) + + lazy val shuffleCompressionCodec: String = get(SHUFFLE_COMPRESSION_CODEC) + + lazy val shuffleCompressionMaxBatchMemory: Long = get(SHUFFLE_COMPRESSION_MAX_BATCH_MEMORY) + def isOperatorEnabled(key: String, incompat: Boolean, isDisabledByDefault: Boolean): Boolean = { val default = !(isDisabledByDefault || incompat) || (incompat && isIncompatEnabled) conf.get(key).map(toBoolean(_, key)).getOrElse(default) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index 931a76a9f95..48845f6c5f3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -140,7 +140,12 @@ class RapidsDeviceMemoryStore( if (table.isDefined) { GpuColumnVector.from(table.get) //REFCOUNT ++ of all columns } else { - throw new UnsupportedOperationException("compressed buffer support not implemented") + val uncompressedBuffer = uncompressBuffer(contigBuffer, meta.bufferMeta) + try { + MetaUtils.getBatchFromMeta(uncompressedBuffer, meta) + } finally { + uncompressedBuffer.close() + } } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala new file mode 100644 index 00000000000..ff175a5cf7b --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala @@ -0,0 +1,367 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.util.Locale + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer} +import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.format.{BufferMeta, CodecType, TableMeta} + +/** + * Compressed table descriptor + * @note the buffer may be significantly oversized for the amount of compressed data + * @param compressedSize size of the compressed data in bytes + * @param meta metadata describing the table layout when uncompressed + * @param buffer buffer containing the compressed data + */ +case class CompressedTable( + compressedSize: Long, + meta: TableMeta, + buffer: DeviceMemoryBuffer) extends AutoCloseable { + override def close(): Unit = buffer.close() +} + +/** An interface to a compression codec that can compress a contiguous Table on the GPU */ +trait TableCompressionCodec { + /** The name of the codec, used for logging. */ + val name: String + + /** The ID used for this codec. See the definitions in `CodecType`. */ + val codecType: Byte + + /** + * Compress a contiguous table. + * @note The contiguous table is NOT closed by this operation and must be closed separately. + * @note The compressed buffer MAY NOT be ideally sized to the compressed data. It may be + * significantly larger than the size of the compressed data. Releasing this unused + * memory will require making a copy of the data to a buffer of the appropriate size. + * @param tableId ID to use for this table + * @param contigTable contiguous table to compress + * @return compressed buffer + */ + def compress(tableId: Int, contigTable: ContiguousTable): CompressedTable + + /** + * Decompress the compressed data buffer from a table compression operation. + * @note The compressed buffer is NOT closed by this method. + * @param outputBuffer buffer where uncompressed data will be written + * @param outputOffset offset in the uncompressed buffer to start writing data + * @param outputLength expected length of the uncompressed data in bytes + * @param inputBuffer buffer containing the compressed data + * @param inputOffset offset in the compressed buffer where compressed data starts + * @param inputLength length of the compressed data in bytes + */ + def decompressBuffer( + outputBuffer: DeviceMemoryBuffer, + outputOffset: Long, + outputLength: Long, + inputBuffer: DeviceMemoryBuffer, + inputOffset: Long, + inputLength: Long): Unit + + /** + * Create a batched compressor instance + * @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at + * which a batch should be compressed. A single table that requires + * temporary and output memory above this limit is allowed but will + * be compressed individually. + * @return batched compressor instance + */ + def createBatchCompressor(maxBatchMemorySize: Long): BatchedTableCompressor + + /** + * Create a batched compressor instance + * @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at + * which a batch should be decompressed. A single buffer that requires + * temporary and output memory above this limit is allowed but will + * be decompressed individually. + * @return batched decompressor instance + */ + def createBatchDecompressor(maxBatchMemorySize: Long): BatchedBufferDecompressor +} + +object TableCompressionCodec { + private val codecNameToId = Map( + "copy" -> CodecType.COPY) + + /** Get a compression codec by short name or fully qualified class name */ + def getCodec(name: String): TableCompressionCodec = { + val codecId = codecNameToId.getOrElse(name.toLowerCase(Locale.ROOT), + throw new IllegalArgumentException(s"Unknown table codec: $name")) + getCodec(codecId) + } + + /** Get a compression codec by ID, using a cache. */ + def getCodec(codecType: Byte): TableCompressionCodec = { + codecType match { + case CodecType.COPY => new CopyCompressionCodec + case _ => throw new IllegalArgumentException(s"Unknown codec ID") + } + } +} + +/** + * Base class for batched compressors + * @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at + * which a batch should be compressed. A single table that requires + * temporary and output memory above this limit is allowed but will + * be compressed individually. + */ +abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm { + private[this] val tables = new ArrayBuffer[ContiguousTable] + private[this] val tempBuffers = new ArrayBuffer[DeviceMemoryBuffer] + private[this] val oversizedOutBuffers = new ArrayBuffer[DeviceMemoryBuffer] + private[this] val results = new ArrayBuffer[CompressedTable] + + // temporary and output memory being used as part of the current batch + private[this] var batchMemUsed: Long = 0 + + /** + * Add a contiguous table to be batch-compressed. Ownership of the table is transferred to the + * batch compressor which is responsible for closing the table. + * @param contigTable the contiguous table to be compressed + */ + def addTable(contigTable: ContiguousTable): Unit = { + closeOnExcept(contigTable) { contigTable => + val tempSize = getTempSpaceNeeded(contigTable.getBuffer) + var memNeededThisBuffer = tempSize + if (batchMemUsed + memNeededThisBuffer > maxBatchMemorySize) { + compressBatch() + } + val tempBuffer = if (tempSize > 0) { + DeviceMemoryBuffer.allocate(memNeededThisBuffer) + } else { + null + } + closeOnExcept(tempBuffer) { tempBuffer => + val outputSize = getOutputSpaceNeeded(contigTable.getBuffer, tempBuffer) + memNeededThisBuffer += outputSize + if (batchMemUsed + memNeededThisBuffer > maxBatchMemorySize) { + compressBatch() + } + oversizedOutBuffers += DeviceMemoryBuffer.allocate(outputSize) + tempBuffers += tempBuffer + tables += contigTable + batchMemUsed += memNeededThisBuffer + } + } + } + + /** + * Add an array of contiguous tables to be compressed. The tables will be closed by the + * batch compressor. + * @param contigTable contiguous tables to compress + */ + def addTables(contigTable: Array[ContiguousTable]): Unit = { + var i = 0 + try { + contigTable.foreach { ct => + addTable(ct) + i += 1 + } + } catch { + case t: Throwable => + contigTable.drop(i).foreach(_.safeClose()) + throw t + } + } + + /** + * This must be called after all tables to be compressed have been added to retrieve the + * compression results. + * @note the table IDs in the TableMeta of all tables will be set to zero + * @return compressed tables + */ + def finish(): Array[CompressedTable] = { + // compress the last batch + compressBatch() + + val compressedTables = results.toArray + results.clear() + compressedTables + } + + /** Must be closed to release the resources owned by the batch compressor */ + override def close(): Unit = { + tables.safeClose() + tempBuffers.safeClose() + oversizedOutBuffers.safeClose() + results.safeClose() + } + + private def compressBatch(): Unit = { + if (tables.nonEmpty) { + require(oversizedOutBuffers.length == tables.length) + require(tempBuffers.length == tables.length) + val metas = compress(oversizedOutBuffers.toArray, tables.toArray, tempBuffers.toArray) + require(metas.length == tables.length) + + // copy the output data into correctly-sized buffers + metas.zipWithIndex.foreach { case (meta, i) => + val oversizedBuffer = oversizedOutBuffers(i) + val compressedSize = meta.bufferMeta.size + val buffer = if (oversizedBuffer.getLength > compressedSize) { + oversizedBuffer.sliceWithCopy(0, compressedSize) + } else { + // use this buffer as-is, don't close it at the end of this method + oversizedOutBuffers(i) = null + oversizedBuffer + } + results += CompressedTable(compressedSize, meta, buffer) + } + + // free all the inputs to this batch + tables.safeClose() + tables.clear() + tempBuffers.safeClose() + tempBuffers.clear() + oversizedOutBuffers.safeClose() + oversizedOutBuffers.clear() + batchMemUsed = 0 + } + } + + /** Return the amount of temporary space needed to compress this buffer */ + protected def getTempSpaceNeeded(buffer: DeviceMemoryBuffer): Long + + /** Return the amount of estimated output space needed to compress this buffer */ + protected def getOutputSpaceNeeded( + dataBuffer: DeviceMemoryBuffer, + tempBuffer: DeviceMemoryBuffer): Long + + /** + * Batch-compress contiguous tables + * @param outputBuffers output buffers allocated based on `getOutputSpaceNeeded` results + * @param tables contiguous tables to compress + * @param tempBuffers temporary buffers allocated based on `getTempSpaceNeeded` results. + * If the temporary space needed was zero then the corresponding buffer + * entry may be null. + * @return table metadata for the compressed tables. Table IDs should be set to 0. + */ + protected def compress( + outputBuffers: Array[DeviceMemoryBuffer], + tables: Array[ContiguousTable], + tempBuffers: Array[DeviceMemoryBuffer]): Array[TableMeta] +} + +/** + * Base class for batched decompressors + * @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at + * which a batch should be compressed. A single table that requires + * temporary and output memory above this limit is allowed but will + * be compressed individually. + */ +abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm { + private[this] val inputBuffers = new ArrayBuffer[DeviceMemoryBuffer] + private[this] val tempBuffers = new ArrayBuffer[DeviceMemoryBuffer] + private[this] val outputBuffers = new ArrayBuffer[DeviceMemoryBuffer] + private[this] val results = new ArrayBuffer[DeviceMemoryBuffer] + + // temporary and output memory being used as part of the current batch + private[this] var batchMemUsed: Long = 0 + + /** The codec ID corresponding to this decompressor */ + val codecId: Byte + + def addBufferToDecode(buffer: DeviceMemoryBuffer, meta: BufferMeta): Unit = { + closeOnExcept(buffer) { buffer => + // Only supports a single codec per buffer for now. + require(meta.codecBufferDescrsLength == 1) + val descr = meta.codecBufferDescrs(0) + require(descr.codec == codecId) + + // Only support codec that consumes entire input buffer for now. + require(descr.compressedOffset == 0) + require(descr.compressedSize == buffer.getLength) + + val tempNeeded = decompressTempSpaceNeeded(buffer) + val outputNeeded = descr.uncompressedSize + if (batchMemUsed + tempNeeded + outputNeeded > maxBatchMemorySize) { + decompressBatch() + } + + val tempBuffer = if (tempNeeded > 0) { + DeviceMemoryBuffer.allocate(tempNeeded) + } else { + null + } + val outputBuffer = DeviceMemoryBuffer.allocate(outputNeeded) + batchMemUsed += tempNeeded + outputNeeded + tempBuffers += tempBuffer + outputBuffers += outputBuffer + inputBuffers += buffer + } + } + + /** + * This must be called after all buffers to be decompressed have been added to retrieve the + * decompression results. + * @return compressed tables + */ + def finish(): Array[DeviceMemoryBuffer] = { + // decompress the last batch + decompressBatch() + val resultsArray = results.toArray + results.clear() + resultsArray + } + + override def close(): Unit = { + inputBuffers.safeClose() + tempBuffers.safeClose() + outputBuffers.safeClose() + } + + protected def decompressBatch(): Unit = { + if (inputBuffers.nonEmpty) { + require(outputBuffers.length == inputBuffers.length) + require(tempBuffers.length == inputBuffers.length) + decompress(outputBuffers.toArray, inputBuffers.toArray, tempBuffers.toArray) + results ++= outputBuffers + outputBuffers.clear() + + // free all the inputs to this batch + inputBuffers.safeClose() + inputBuffers.clear() + tempBuffers.safeClose() + tempBuffers.clear() + batchMemUsed = 0 + } + } + + /** + * Compute the amount of temporary buffer space required to decode a buffer + * @param inputBuffer buffer to decode + * @return required temporary buffer space in bytes + */ + protected def decompressTempSpaceNeeded(inputBuffer: DeviceMemoryBuffer): Long + + /** + * Decompress a batch of compressed buffers + * @param outputBuffers buffers that will contain the uncompressed output + * @param inputBuffers buffers that contain the compressed input + * @param tempBuffers buffers to used for temporary space + */ + protected def decompress( + outputBuffers: Array[DeviceMemoryBuffer], + inputBuffers: Array[DeviceMemoryBuffer], + tempBuffers: Array[DeviceMemoryBuffer]): Unit + +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala index dd60daf0912..7dfdff4584b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala @@ -41,6 +41,14 @@ class GpuShuffleEnv extends Logging { conf.get("spark.shuffle.manager") == GpuShuffleEnv.RAPIDS_SHUFFLE_CLASS } + lazy val rapidsShuffleCodec: Option[TableCompressionCodec] = { + if (rapidsConf.shuffleCompressionEnabled) { + Some(TableCompressionCodec.getCodec(rapidsConf.shuffleCompressionCodec)) + } else { + None + } + } + lazy val isRapidsShuffleEnabled: Boolean = { val env = SparkEnv.get val isRapidsManager = GpuShuffleEnv.isRapidsShuffleManagerInitialized @@ -127,6 +135,8 @@ object GpuShuffleEnv extends Logging { def isRapidsShuffleEnabled: Boolean = env.isRapidsShuffleEnabled + def rapidsShuffleCodec: Option[TableCompressionCodec] = env.rapidsShuffleCodec + // the shuffle plugin will call this on initialize def setRapidsShuffleManagerInitialized(initialized: Boolean, className: String): Unit = { assert(className == GpuShuffleEnv.RAPIDS_SHUFFLE_CLASS) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala index d8d602205e9..12684d52c33 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala @@ -23,8 +23,7 @@ import com.nvidia.spark.rapids.shuffle.{RapidsShuffleRequestHandler, RapidsShuff import scala.collection.mutable.ArrayBuffer import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv, TaskContext} -import org.apache.spark.internal.{config, Logging} -import org.apache.spark.io.CompressionCodec +import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle._ @@ -40,9 +39,8 @@ class GpuShuffleHandle[K, V]( override def toString: String = s"GPU SHUFFLE HANDLE $shuffleId" } -class GpuShuffleBlockResolver(private val wrapped: ShuffleBlockResolver, - private val blockManager: BlockManager, - private val compressionCodec: CompressionCodec, +class GpuShuffleBlockResolver( + private val wrapped: ShuffleBlockResolver, catalog: ShuffleBufferCatalog) extends ShuffleBlockResolver with Logging { override def getBlockData(blockId: BlockId, dirs: Option[Array[String]]): ManagedBuffer = { @@ -72,14 +70,11 @@ object RapidsShuffleInternalManagerBase extends Logging { } class RapidsCachingWriter[K, V]( - conf: SparkConf, blockManager: BlockManager, - blockResolver: IndexShuffleBlockResolver, // Never keep a reference to the ShuffleHandle in the cache as it being GCed triggers // the data being released handle: GpuShuffleHandle[K, V], mapId: Long, - compressionCodec: CompressionCodec, metrics: ShuffleWriteMetricsReporter, catalog: ShuffleBufferCatalog, shuffleStorage: RapidsDeviceMemoryStore, @@ -106,17 +101,27 @@ class RapidsCachingWriter[K, V]( val blockId = ShuffleBlockId(handle.shuffleId, mapId, partId) val bufferId = catalog.nextShuffleBufferId(blockId) if (batch.numRows > 0 && batch.numCols > 0) { - val buffer = { - val buff = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer].getBuffer - buff.slice(0, buff.getLength) - } - // Add the table to the shuffle store - shuffleStorage.addTable( - bufferId, - GpuColumnVector.from(batch), - buffer, - SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY) + batch.column(0) match { + case c: GpuColumnVectorFromBuffer => + val buffer = c.getBuffer.slice(0, c.getBuffer.getLength) + shuffleStorage.addTable( + bufferId, + GpuColumnVector.from(batch), + buffer, + SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY) + case c: GpuCompressedColumnVector => + val buffer = c.getBuffer.slice(0, c.getBuffer.getLength) + val tableMeta = c.getTableMeta + // update the table metadata for the buffer ID generated above + tableMeta.bufferMeta.mutateId(bufferId.tableId) + shuffleStorage.addBuffer( + bufferId, + buffer, + tableMeta, + SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY) + case c => throw new IllegalStateException(s"Unexpected column type: ${c.getClass}") + } } else { // no device data, tracking only metadata val tableMeta = MetaUtils.buildDegenerateTableMeta(bufferId.tableId, batch) @@ -207,22 +212,11 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole } private lazy val localBlockManagerId = blockManager.blockManagerId - private lazy val compressionEnabled: Boolean = conf.get(config.SHUFFLE_COMPRESS) - private lazy val compressionCodec: CompressionCodec = - if (compressionEnabled) { - CompressionCodec.createCodec(conf) - } else { - null - } private lazy val resolver = if (shouldFallThroughOnEverything) { wrapped.shuffleBlockResolver } else { - new GpuShuffleBlockResolver( - wrapped.shuffleBlockResolver, - blockManager, - compressionCodec, - catalog) + new GpuShuffleBlockResolver(wrapped.shuffleBlockResolver, catalog) } private[this] lazy val transport: Option[RapidsShuffleTransport] = { @@ -276,12 +270,10 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole handle match { case gpu: GpuShuffleHandle[_, _] => registerGpuShuffle(handle.shuffleId) - new RapidsCachingWriter(conf, + new RapidsCachingWriter( env.blockManager, - wrapped.shuffleBlockResolver, gpu.asInstanceOf[GpuShuffleHandle[K, V]], mapId, - compressionCodec, metrics, GpuShuffleEnv.getCatalog, GpuShuffleEnv.getDeviceStorage, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index d13fd1c98f4..6298206bd78 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -19,9 +19,12 @@ package com.nvidia.spark.rapids import java.io.File import java.nio.file.Files +import ai.rapids.cudf.{ContiguousTable, HostColumnVector, Table} +import com.nvidia.spark.rapids.format.CodecType + import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.rapids.metrics.source.MockTaskContext -import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.types.{DataTypes, LongType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { @@ -77,6 +80,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val it = new GpuCoalesceIterator(input, schema, TargetSize(Long.MaxValue), + 0, numInputRows, numInputBatches, numOutputRows, @@ -243,4 +247,158 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { }, conf) } + def testCompressedBatches(maxCompressedBatchMemoryLimit: Long) { + val coalesceTargetBytes = 8000 + val stop = 10000 + var start = 0 + var numBatchRows = 100 + var expectedEnd = 0 + val batchIter = new Iterator[ColumnarBatch] { + override def hasNext: Boolean = if (start < stop) { + true + } else { + expectedEnd = start + false + } + override def next(): ColumnarBatch = { + val batch = buildCompressedBatch(start, numBatchRows) + start += batch.numRows + numBatchRows *= 2 + batch + } + } + + val schema = new StructType().add("i", LongType) + val dummyMetric = new SQLMetric("ignored") + val coalesceIter = new GpuCoalesceIterator( + batchIter, + schema, + TargetSize(coalesceTargetBytes), + maxCompressedBatchMemoryLimit, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + "test concat") + + var expected = 0 + while (coalesceIter.hasNext) { + withResource(coalesceIter.next()) { batch => + assertResult(1)(batch.numCols) + val col = GpuColumnVector.extractBases(batch).head + withResource(col.copyToHost) { hcv => + (0 until hcv.getRowCount.toInt).foreach { i => + assertResult(expected)(hcv.getLong(i)) + expected += 1 + } + } + } + } + assertResult(expectedEnd)(expected) + } + + test("all compressed low memory limit") { + testCompressedBatches(0) + } + + test("all compressed high memory limit") { + testCompressedBatches(Long.MaxValue) + } + + test("mixed compressed and uncompressed low memory limit") { + testMixedCompressedUncompressed(0) + } + + test("mixed compressed and uncompressed high memory limit") { + testMixedCompressedUncompressed(Long.MaxValue) + } + + def testMixedCompressedUncompressed(maxCompressedBatchMemoryLimit: Long): Unit = { + val coalesceTargetBytes = 8000 + val stop = 10000 + var start = 0 + var numBatchRows = 100 + var nextBatchCompressed = false + var expectedEnd = 0 + val batchIter = new Iterator[ColumnarBatch] { + override def hasNext: Boolean = if (start < stop) { + true + } else { + expectedEnd = start + false + } + override def next(): ColumnarBatch = { + val batch = if (nextBatchCompressed) { + buildCompressedBatch(start, numBatchRows) + } else { + buildUncompressedBatch(start, numBatchRows) + } + nextBatchCompressed = !nextBatchCompressed + start += batch.numRows + numBatchRows *= 2 + batch + } + } + + val schema = new StructType().add("i", LongType) + val dummyMetric = new SQLMetric("ignored") + val coalesceIter = new GpuCoalesceIterator( + batchIter, + schema, + TargetSize(coalesceTargetBytes), + maxCompressedBatchMemoryLimit, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + "test concat") + + var expected = 0 + while (coalesceIter.hasNext) { + withResource(coalesceIter.next()) { batch => + assertResult(1)(batch.numCols) + val col = GpuColumnVector.extractBases(batch).head + withResource(col.copyToHost) { hcv => + (0 until hcv.getRowCount.toInt).foreach { i => + assertResult(expected)(hcv.getLong(i)) + expected += 1 + } + } + } + } + assertResult(expectedEnd)(expected) + } + + private def buildContiguousTable(start: Int, numRows: Int): ContiguousTable = { + val vals = (0 until numRows).map(_.toLong + start) + withResource(HostColumnVector.fromLongs(vals:_*)) { hcv => + withResource(hcv.copyToDevice()) { cv => + withResource(new Table(cv)) { table => + table.contiguousSplit()(0) + } + } + } + } + + private def buildUncompressedBatch(start: Int, numRows: Int): ColumnarBatch = { + withResource(buildContiguousTable(start, numRows)) { ct => + GpuColumnVector.from(ct.getTable) + } + } + + private def buildCompressedBatch(start: Int, numRows: Int): ColumnarBatch = { + val codec = TableCompressionCodec.getCodec(CodecType.COPY) + withResource(codec.createBatchCompressor(0)) { compressor => + compressor.addTable(buildContiguousTable(start, numRows)) + GpuCompressedColumnVector.from(compressor.finish().head) + } + } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala index e1ebd71a7d6..b92ecef9de9 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala @@ -16,13 +16,15 @@ package com.nvidia.spark.rapids +import java.io.File + import ai.rapids.cudf.{Cuda, Table} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.scalatest.FunSuite import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.rapids.GpuShuffleEnv +import org.apache.spark.sql.rapids.{GpuShuffleEnv, RapidsDiskBlockManager} import org.apache.spark.sql.vectorized.ColumnarBatch class GpuPartitioningSuite extends FunSuite with Arm { @@ -58,7 +60,7 @@ class GpuPartitioningSuite extends FunSuite with Arm { test("GPU partition") { SparkSession.getActiveSession.foreach(_.close()) - val conf = new SparkConf() + val conf = new SparkConf().set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "false") TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(Cuda.memGetInfo()) val partitionIndices = Array(0, 2) @@ -91,4 +93,58 @@ class GpuPartitioningSuite extends FunSuite with Arm { } } } + + test("GPU partition with compression") { + val conf = new SparkConf() + .set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "true") + .set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "copy") + TestUtils.withGpuSparkSession(conf) { _ => + GpuShuffleEnv.init(Cuda.memGetInfo()) + val spillPriority = 7L + val catalog = new RapidsBufferCatalog + withResource(new RapidsDeviceMemoryStore(catalog)) { deviceStore => + val partitionIndices = Array(0, 2) + val gp = new GpuPartitioning { + override val numPartitions: Int = partitionIndices.length + } + withResource(buildBatch()) { batch => + val columns = GpuColumnVector.extractColumns(batch) + val numRows = batch.numRows + withResource(gp.sliceInternalOnGpu(numRows, partitionIndices, columns)) { partitions => + partitions.zipWithIndex.foreach { case (partBatch, partIndex) => + val startRow = partitionIndices(partIndex) + val endRow = if (partIndex < partitionIndices.length - 1) { + partitionIndices(partIndex + 1) + } else { + batch.numRows + } + val expectedRows = endRow - startRow + assertResult(expectedRows)(partBatch.numRows) + val columns = (0 until partBatch.numCols).map(i => partBatch.column(i)) + columns.foreach { column => + assert(column.isInstanceOf[GpuCompressedColumnVector]) + assertResult(expectedRows) { + column.asInstanceOf[GpuCompressedColumnVector].getTableMeta.rowCount + } + } + val gccv = columns.head.asInstanceOf[GpuCompressedColumnVector] + val bufferId = MockRapidsBufferId(partIndex) + val devBuffer = gccv.getBuffer.slice(0, gccv.getBuffer.getLength) + deviceStore.addBuffer(bufferId, devBuffer, gccv.getTableMeta, spillPriority) + withResource(buildSubBatch(batch, startRow, endRow)) { expectedBatch => + withResource(catalog.acquireBuffer(bufferId)) { buffer => + compareBatches(expectedBatch, buffer.getColumnarBatch) + } + } + } + } + } + } + } + } +} + +case class MockRapidsBufferId(tableId: Int) extends RapidsBufferId { + override def getDiskPath(diskBlockManager: RapidsDiskBlockManager): File = + throw new UnsupportedOperationException } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala index ee1e9a97d99..1b01af95061 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala @@ -34,8 +34,9 @@ class GpuSinglePartitioningSuite extends FunSuite with Arm { } } - test("generates contiguous split") { + test("generates contiguous split uncompressed") { val conf = new SparkConf().set("spark.shuffle.manager", GpuShuffleEnv.RAPIDS_SHUFFLE_CLASS) + .set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "false") TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(Cuda.memGetInfo()) val partitioner = GpuSinglePartitioning(Nil) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala index bcc32b00a17..92cae53351a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala @@ -189,17 +189,12 @@ class RapidsDiskStoreSuite extends FunSuite with BeforeAndAfterEach with Arm wit devStore: RapidsDeviceMemoryStore, bufferId: RapidsBufferId, spillPriority: Long): Long = { - val ct = buildContiguousTable() - val bufferSize = ct.getBuffer.getLength - try { + closeOnExcept(buildContiguousTable()) { ct => + val bufferSize = ct.getBuffer.getLength // store takes ownership of the table devStore.addTable(bufferId, ct.getTable, ct.getBuffer, spillPriority) - } catch { - case t: Throwable => - ct.close() - throw t + bufferSize } - bufferSize } case class MockRapidsBufferId( diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala b/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala index 5514679eeff..4be02c8ccdd 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala @@ -17,9 +17,8 @@ package com.nvidia.spark.rapids import java.io.File -import java.nio.ByteBuffer -import ai.rapids.cudf.{BufferType, ColumnVector, DType, HostColumnVector, Table} +import ai.rapids.cudf.{ColumnVector, DType, Table} import org.scalatest.Assertions import org.apache.spark.SparkConf From a15df77b816b107920ec626144b84051f40f673d Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 4 Aug 2020 17:31:15 -0500 Subject: [PATCH 2/7] Extract common row accessor methods to GpuColumnVectorBase Signed-off-by: Jason Lowe --- .../nvidia/spark/rapids/GpuColumnVector.java | 77 +------------ .../spark/rapids/GpuColumnVectorBase.java | 103 ++++++++++++++++++ .../rapids/GpuCompressedColumnVector.java | 76 +------------ 3 files changed, 105 insertions(+), 151 deletions(-) create mode 100644 sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorBase.java diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index d1e66b469a1..1b09239c69e 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -25,10 +25,7 @@ import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.*; import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.spark.sql.vectorized.ColumnarMap; -import org.apache.spark.unsafe.types.UTF8String; import java.util.List; @@ -38,7 +35,7 @@ * is on the host, and we want to keep as much of the data on the device as possible. * We also provide GPU accelerated versions of the transitions to and from rows. */ -public class GpuColumnVector extends ColumnVector { +public class GpuColumnVector extends GpuColumnVectorBase { public static final class GpuColumnarBatchBuilder implements AutoCloseable { private final ai.rapids.cudf.HostColumnVector.Builder[] builders; @@ -385,78 +382,6 @@ public final int numNulls() { return (int) cudfCv.getNullCount(); } - private final static String BAD_ACCESS = "DATA ACCESS MUST BE ON A HOST VECTOR"; - - @Override - public final boolean isNullAt(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final boolean getBoolean(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final byte getByte(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final short getShort(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final int getInt(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final long getLong(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final float getFloat(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final double getDouble(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final ColumnarArray getArray(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final ColumnarMap getMap(int ordinal) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final Decimal getDecimal(int rowId, int precision, int scale) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final UTF8String getUTF8String(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final byte[] getBinary(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final ColumnVector getChild(int ordinal) { - throw new IllegalStateException(BAD_ACCESS); - } - public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) { long sum = 0; for (int i = 0; i < batch.numCols(); i++) { diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorBase.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorBase.java new file mode 100644 index 00000000000..85b3e9098c1 --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorBase.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import ai.rapids.cudf.DType; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** Base class for all GPU column vectors. */ +abstract class GpuColumnVectorBase extends ColumnVector { + private final static String BAD_ACCESS = "DATA ACCESS MUST BE ON A HOST VECTOR"; + + protected GpuColumnVectorBase(DataType type) { + super(type); + } + + @Override + public final boolean isNullAt(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final boolean getBoolean(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final byte getByte(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final short getShort(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final int getInt(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final long getLong(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final float getFloat(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final double getDouble(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final ColumnarArray getArray(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final ColumnarMap getMap(int ordinal) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final Decimal getDecimal(int rowId, int precision, int scale) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final UTF8String getUTF8String(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final byte[] getBinary(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final ColumnVector getChild(int ordinal) { + throw new IllegalStateException(BAD_ACCESS); + } +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java index 1f92289111f..ea7e16e9d56 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java @@ -21,19 +21,15 @@ import com.nvidia.spark.rapids.format.ColumnMeta; import com.nvidia.spark.rapids.format.TableMeta; import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.spark.sql.vectorized.ColumnarMap; -import org.apache.spark.unsafe.types.UTF8String; /** * A GPU column vector that has been compressed. The columnar data within cannot * be accessed directly. This class primarily serves the role of tracking the * compressed data and table metadata so it can be decompressed later. */ -public final class GpuCompressedColumnVector extends ColumnVector { +public final class GpuCompressedColumnVector extends GpuColumnVectorBase { private final DeviceMemoryBuffer buffer; private final TableMeta tableMeta; @@ -96,74 +92,4 @@ public boolean hasNull() { public int numNulls() { throw new IllegalStateException("column vector is compressed"); } - - @Override - public boolean isNullAt(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public boolean getBoolean(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public byte getByte(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public short getShort(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public int getInt(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public long getLong(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public float getFloat(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public double getDouble(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public ColumnarArray getArray(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public ColumnarMap getMap(int ordinal) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public Decimal getDecimal(int rowId, int precision, int scale) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public UTF8String getUTF8String(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public byte[] getBinary(int rowId) { - throw new IllegalStateException("column vector is compressed"); - } - - @Override - public ColumnVector getChild(int ordinal) { - throw new IllegalStateException("column vector is compressed"); - } } From 33c96d8d26837a3b1ebf066606fcbeebca74cfe9 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 4 Aug 2020 17:32:33 -0500 Subject: [PATCH 3/7] Address review comments Signed-off-by: Jason Lowe --- .../spark/rapids/CopyCompressionCodec.scala | 4 +- .../spark/rapids/GpuCoalesceBatches.scala | 2 +- .../nvidia/spark/rapids/RapidsBuffer.scala | 1 + .../spark/rapids/RapidsBufferStore.scala | 8 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 2 +- .../spark/rapids/TableCompressionCodec.scala | 112 ++++++++++-------- .../rapids/GpuCoalesceBatchesSuite.scala | 2 +- 7 files changed, 75 insertions(+), 56 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala index c93b1406611..e658eedbb63 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.format.{CodecType, TableMeta} /** A table compression codec used only for testing that copies the data. */ class CopyCompressionCodec extends TableCompressionCodec with Arm { override val name: String = "COPY" - override val codecType: Byte = CodecType.COPY + override val codecId: Byte = CodecType.COPY override def compress( tableId: Int, @@ -33,7 +33,7 @@ class CopyCompressionCodec extends TableCompressionCodec with Arm { tableId, contigTable.getTable, buffer, - codecType, + codecId, outputBuffer.getLength) CompressedTable(buffer.getLength, meta, outputBuffer) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index 18f05ea5360..812c3ece914 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -478,7 +478,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], val bufferMeta = cv.getTableMeta.bufferMeta // don't currently support switching codecs when partitioning val buffer = cv.getBuffer.slice(0, cv.getBuffer.getLength) - decompressor.addBufferToDecode(buffer, bufferMeta) + decompressor.addBufferToDecompress(buffer, bufferMeta) } closeOnExcept(decompressor.finish()) { outputBuffers => outputBuffers.zipWithIndex.foreach { case (outputBuffer, outputIndex) => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index 8122b9c2c30..097f12e2621 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -76,6 +76,7 @@ trait RapidsBuffer extends AutoCloseable { * successfully acquired the buffer beforehand. * @see [[addReference]] * @note It is the responsibility of the caller to close the batch. + * @note This may be an expensive operation (e.g.: batch may need to be decompressed). */ def getColumnarBatch: ColumnarBatch diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 27676530cde..ac44dbb2828 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -283,9 +283,9 @@ abstract class RapidsBufferStore( } if (meta.bufferMeta.codecBufferDescrsLength > 0) { - val compressedBuffer = deviceBuffer - deviceBuffer = uncompressBuffer(deviceBuffer, meta.bufferMeta) - compressedBuffer.close() + withResource(deviceBuffer) { compressedBuffer => + deviceBuffer = uncompressBuffer(compressedBuffer, meta.bufferMeta) + } } MetaUtils.getBatchFromMeta(deviceBuffer, meta) @@ -370,7 +370,7 @@ abstract class RapidsBufferStore( val duration = System.nanoTime() - startTime val compressedSize = cbd.compressedSize() val uncompressedSize = cbd.uncompressedSize - logWarning(s"Decompressed buffer with ${codec.name} in ${duration / 1000} us," + + logDebug(s"Decompressed buffer with ${codec.name} in ${duration / 1000} us," + s"rate=${compressedSize.toFloat / duration} GB/s " + s"from $compressedSize to $uncompressedSize") } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index e87b3fdab50..043351358c6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -591,7 +591,7 @@ object RapidsConf { .createWithDefault(50 * 1024) val SHUFFLE_COMPRESSION_ENABLED = conf("spark.rapids.shuffle.compression.enabled") - .doc("Whether to enable compression of shuffle buffers") + .doc("Whether to enable compression of shuffle buffers when using RAPIDS shuffle") .internal() .booleanConf .createWithDefault(false) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala index ff175a5cf7b..5d97c41e82d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala @@ -44,7 +44,7 @@ trait TableCompressionCodec { val name: String /** The ID used for this codec. See the definitions in `CodecType`. */ - val codecType: Byte + val codecId: Byte /** * Compress a contiguous table. @@ -54,7 +54,7 @@ trait TableCompressionCodec { * memory will require making a copy of the data to a buffer of the appropriate size. * @param tableId ID to use for this table * @param contigTable contiguous table to compress - * @return compressed buffer + * @return compressed table */ def compress(tableId: Int, contigTable: ContiguousTable): CompressedTable @@ -87,7 +87,7 @@ trait TableCompressionCodec { def createBatchCompressor(maxBatchMemorySize: Long): BatchedTableCompressor /** - * Create a batched compressor instance + * Create a batched decompressor instance * @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at * which a batch should be decompressed. A single buffer that requires * temporary and output memory above this limit is allowed but will @@ -109,10 +109,10 @@ object TableCompressionCodec { } /** Get a compression codec by ID, using a cache. */ - def getCodec(codecType: Byte): TableCompressionCodec = { - codecType match { + def getCodec(codecId: Byte): TableCompressionCodec = { + codecId match { case CodecType.COPY => new CopyCompressionCodec - case _ => throw new IllegalArgumentException(s"Unknown codec ID") + case _ => throw new IllegalArgumentException(s"Unknown codec ID: $codecId") } } } @@ -125,9 +125,16 @@ object TableCompressionCodec { * be compressed individually. */ abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm { + // The tables that need to be compressed in the next batch private[this] val tables = new ArrayBuffer[ContiguousTable] + + // The temporary compression buffers needed to compress each table in the next batch private[this] val tempBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The estimate-sized output buffers to hold the compressed output in the next batch private[this] val oversizedOutBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The compressed outputs of all tables across all batches private[this] val results = new ArrayBuffer[CompressedTable] // temporary and output memory being used as part of the current batch @@ -138,28 +145,34 @@ abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoClos * batch compressor which is responsible for closing the table. * @param contigTable the contiguous table to be compressed */ - def addTable(contigTable: ContiguousTable): Unit = { + def addTableToCompress(contigTable: ContiguousTable): Unit = { closeOnExcept(contigTable) { contigTable => val tempSize = getTempSpaceNeeded(contigTable.getBuffer) - var memNeededThisBuffer = tempSize - if (batchMemUsed + memNeededThisBuffer > maxBatchMemorySize) { + var memNeededToCompressThisBuffer = tempSize + if (batchMemUsed + memNeededToCompressThisBuffer > maxBatchMemorySize) { compressBatch() } val tempBuffer = if (tempSize > 0) { - DeviceMemoryBuffer.allocate(memNeededThisBuffer) + DeviceMemoryBuffer.allocate(memNeededToCompressThisBuffer) } else { null } - closeOnExcept(tempBuffer) { tempBuffer => + try { val outputSize = getOutputSpaceNeeded(contigTable.getBuffer, tempBuffer) - memNeededThisBuffer += outputSize - if (batchMemUsed + memNeededThisBuffer > maxBatchMemorySize) { + memNeededToCompressThisBuffer += outputSize + if (batchMemUsed + memNeededToCompressThisBuffer > maxBatchMemorySize) { compressBatch() } oversizedOutBuffers += DeviceMemoryBuffer.allocate(outputSize) tempBuffers += tempBuffer tables += contigTable - batchMemUsed += memNeededThisBuffer + batchMemUsed += memNeededToCompressThisBuffer + } catch { + case t: Throwable => + if (tempBuffer != null) { + tempBuffer.safeClose() + } + throw t } } } @@ -173,7 +186,7 @@ abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoClos var i = 0 try { contigTable.foreach { ct => - addTable(ct) + addTableToCompress(ct) i += 1 } } catch { @@ -206,36 +219,34 @@ abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoClos results.safeClose() } - private def compressBatch(): Unit = { - if (tables.nonEmpty) { - require(oversizedOutBuffers.length == tables.length) - require(tempBuffers.length == tables.length) - val metas = compress(oversizedOutBuffers.toArray, tables.toArray, tempBuffers.toArray) - require(metas.length == tables.length) - - // copy the output data into correctly-sized buffers - metas.zipWithIndex.foreach { case (meta, i) => - val oversizedBuffer = oversizedOutBuffers(i) - val compressedSize = meta.bufferMeta.size - val buffer = if (oversizedBuffer.getLength > compressedSize) { - oversizedBuffer.sliceWithCopy(0, compressedSize) - } else { - // use this buffer as-is, don't close it at the end of this method - oversizedOutBuffers(i) = null - oversizedBuffer - } - results += CompressedTable(compressedSize, meta, buffer) + private def compressBatch(): Unit = if (tables.nonEmpty) { + require(oversizedOutBuffers.length == tables.length) + require(tempBuffers.length == tables.length) + val metas = compress(oversizedOutBuffers.toArray, tables.toArray, tempBuffers.toArray) + require(metas.length == tables.length) + + // copy the output data into correctly-sized buffers + metas.zipWithIndex.foreach { case (meta, i) => + val oversizedBuffer = oversizedOutBuffers(i) + val compressedSize = meta.bufferMeta.size + val buffer = if (oversizedBuffer.getLength > compressedSize) { + oversizedBuffer.sliceWithCopy(0, compressedSize) + } else { + // use this buffer as-is, don't close it at the end of this method + oversizedOutBuffers(i) = null + oversizedBuffer } - - // free all the inputs to this batch - tables.safeClose() - tables.clear() - tempBuffers.safeClose() - tempBuffers.clear() - oversizedOutBuffers.safeClose() - oversizedOutBuffers.clear() - batchMemUsed = 0 + results += CompressedTable(compressedSize, meta, buffer) } + + // free all the inputs to this batch + tables.safeClose() + tables.clear() + tempBuffers.safeClose() + tempBuffers.clear() + oversizedOutBuffers.safeClose() + oversizedOutBuffers.clear() + batchMemUsed = 0 } /** Return the amount of temporary space needed to compress this buffer */ @@ -269,9 +280,16 @@ abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoClos * be compressed individually. */ abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm { + // The buffers of compressed data that will be decompressed in the next batch private[this] val inputBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The temporary buffers needed to be decompressed the next batch private[this] val tempBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The output buffers that will contain the decompressed data in the next batch private[this] val outputBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The decompressed data results for all input buffers across all batches private[this] val results = new ArrayBuffer[DeviceMemoryBuffer] // temporary and output memory being used as part of the current batch @@ -280,7 +298,7 @@ abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoC /** The codec ID corresponding to this decompressor */ val codecId: Byte - def addBufferToDecode(buffer: DeviceMemoryBuffer, meta: BufferMeta): Unit = { + def addBufferToDecompress(buffer: DeviceMemoryBuffer, meta: BufferMeta): Unit = { closeOnExcept(buffer) { buffer => // Only supports a single codec per buffer for now. require(meta.codecBufferDescrsLength == 1) @@ -313,7 +331,7 @@ abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoC /** * This must be called after all buffers to be decompressed have been added to retrieve the * decompression results. - * @return compressed tables + * @return decompressed tables */ def finish(): Array[DeviceMemoryBuffer] = { // decompress the last batch @@ -347,8 +365,8 @@ abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoC } /** - * Compute the amount of temporary buffer space required to decode a buffer - * @param inputBuffer buffer to decode + * Compute the amount of temporary buffer space required to decompress a buffer + * @param inputBuffer buffer to decompress * @return required temporary buffer space in bytes */ protected def decompressTempSpaceNeeded(inputBuffer: DeviceMemoryBuffer): Long diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index 6298206bd78..393cfb51ee0 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -397,7 +397,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { private def buildCompressedBatch(start: Int, numRows: Int): ColumnarBatch = { val codec = TableCompressionCodec.getCodec(CodecType.COPY) withResource(codec.createBatchCompressor(0)) { compressor => - compressor.addTable(buildContiguousTable(start, numRows)) + compressor.addTableToCompress(buildContiguousTable(start, numRows)) GpuCompressedColumnVector.from(compressor.finish().head) } } From 9fc1cb3a5060a5ca0ac83ec8080d792c5a6f2cbd Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 7 Aug 2020 09:34:35 -0500 Subject: [PATCH 4/7] Fix handling of degenerate batches Signed-off-by: Jason Lowe --- .../com/nvidia/spark/rapids/MetaUtils.scala | 10 ++++-- .../rapids/RapidsShuffleInternalManager.scala | 10 +++--- .../spark/rapids/GpuPartitioningSuite.scala | 6 ++-- .../nvidia/spark/rapids/MetaUtilsSuite.scala | 33 +++++++++++++++++-- 4 files changed, 47 insertions(+), 12 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala index 98779a9908f..ec056faf4e6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala @@ -132,12 +132,18 @@ object MetaUtils extends Arm { /** * Build a TableMeta message for a degenerate table (zero columns or rows) - * @param tableId the ID to use for this table * @param batch the degenerate columnar batch * @return heap-based flatbuffer message */ - def buildDegenerateTableMeta(tableId: Int, batch: ColumnarBatch): TableMeta = { + def buildDegenerateTableMeta(batch: ColumnarBatch): TableMeta = { require(batch.numRows == 0 || batch.numCols == 0, "batch not degenerate") + if (batch.numCols > 0) { + batch.column(0) match { + case c: GpuCompressedColumnVector => + return c.getTableMeta + case _ => + } + } val fbb = new FlatBufferBuilder(1024) val columnMetaOffset = if (batch.numCols > 0) { val columns = GpuColumnVector.extractBases(batch) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala index 7f3bf5ec90b..2939645fbb8 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala @@ -95,10 +95,8 @@ class RapidsCachingWriter[K, V]( val batch = p._2.asInstanceOf[ColumnarBatch] logDebug(s"Caching shuffle_id=${handle.shuffleId} map_id=$mapId, partId=$partId, " + s"batch=[num_cols=${batch.numCols()}, num_rows=${batch.numRows()}]") - val partSize = GpuColumnVector.extractBases(batch).map(_.getDeviceMemorySize).sum recordsWritten = recordsWritten + batch.numRows() - bytesWritten = bytesWritten + partSize - sizes(partId) += partSize + var partSize: Long = 0 val blockId = ShuffleBlockId(handle.shuffleId, mapId, partId) val bufferId = catalog.nextShuffleBufferId(blockId) if (batch.numRows > 0 && batch.numCols > 0) { @@ -106,6 +104,7 @@ class RapidsCachingWriter[K, V]( batch.column(0) match { case c: GpuColumnVectorFromBuffer => val buffer = c.getBuffer.slice(0, c.getBuffer.getLength) + partSize = buffer.getLength shuffleStorage.addTable( bufferId, GpuColumnVector.from(batch), @@ -113,6 +112,7 @@ class RapidsCachingWriter[K, V]( SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY) case c: GpuCompressedColumnVector => val buffer = c.getBuffer.slice(0, c.getBuffer.getLength) + partSize = buffer.getLength val tableMeta = c.getTableMeta // update the table metadata for the buffer ID generated above tableMeta.bufferMeta.mutateId(bufferId.tableId) @@ -123,9 +123,11 @@ class RapidsCachingWriter[K, V]( SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY) case c => throw new IllegalStateException(s"Unexpected column type: ${c.getClass}") } + bytesWritten += partSize + sizes(partId) += partSize } else { // no device data, tracking only metadata - val tableMeta = MetaUtils.buildDegenerateTableMeta(bufferId.tableId, batch) + val tableMeta = MetaUtils.buildDegenerateTableMeta(batch) catalog.registerNewBuffer(new DegenerateRapidsBuffer(bufferId, tableMeta)) // The size of the data is really only used to tell if the data should be shuffled or not diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala index 894e36a3142..c529c73c5d2 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala @@ -63,7 +63,7 @@ class GpuPartitioningSuite extends FunSuite with Arm { val conf = new SparkConf().set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "false") TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo()) - val partitionIndices = Array(0, 2) + val partitionIndices = Array(0, 2, 2) val gp = new GpuPartitioning { override val numPartitions: Int = partitionIndices.length } @@ -99,11 +99,11 @@ class GpuPartitioningSuite extends FunSuite with Arm { .set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "true") .set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "copy") TestUtils.withGpuSparkSession(conf) { _ => - GpuShuffleEnv.init(Cuda.memGetInfo()) + GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo()) val spillPriority = 7L val catalog = new RapidsBufferCatalog withResource(new RapidsDeviceMemoryStore(catalog)) { deviceStore => - val partitionIndices = Array(0, 2) + val partitionIndices = Array(0, 2, 2) val gp = new GpuPartitioning { override val numPartitions: Int = partitionIndices.length } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala index 38c9e6e57a8..2ebb97f3066 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{BufferType, ContiguousTable, Table} +import ai.rapids.cudf.{BufferType, ContiguousTable, DeviceMemoryBuffer, Table} import com.nvidia.spark.rapids.format.{CodecType, ColumnMeta} import org.scalatest.FunSuite @@ -101,7 +101,7 @@ class MetaUtilsSuite extends FunSuite with Arm { test("buildDegenerateTableMeta no columns") { val degenerateBatch = new ColumnarBatch(Array(), 127) - val meta = MetaUtils.buildDegenerateTableMeta(8, degenerateBatch) + val meta = MetaUtils.buildDegenerateTableMeta(degenerateBatch) assertResult(null)(meta.bufferMeta) assertResult(0)(meta.columnMetasLength) assertResult(127)(meta.rowCount) @@ -110,7 +110,7 @@ class MetaUtilsSuite extends FunSuite with Arm { test("buildDegenerateTableMeta no rows") { val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE") withResource(GpuColumnVector.emptyBatch(schema)) { batch => - val meta = MetaUtils.buildDegenerateTableMeta(9, batch) + val meta = MetaUtils.buildDegenerateTableMeta(batch) assertResult(null)(meta.bufferMeta) assertResult(0)(meta.rowCount) assertResult(3)(meta.columnMetasLength) @@ -127,6 +127,33 @@ class MetaUtilsSuite extends FunSuite with Arm { } } + test("buildDegenerateTableMeta no rows compressed table") { + val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE") + withResource(GpuColumnVector.emptyBatch(schema)) { uncompressedBatch => + val uncompressedMeta = MetaUtils.buildDegenerateTableMeta(uncompressedBatch) + withResource(DeviceMemoryBuffer.allocate(0)) { buffer => + val compressedTable = CompressedTable(0, uncompressedMeta, buffer) + withResource(GpuCompressedColumnVector.from(compressedTable)) { batch => + val meta = MetaUtils.buildDegenerateTableMeta(batch) + assertResult(null)(meta.bufferMeta) + assertResult(0)(meta.rowCount) + assertResult(3)(meta.columnMetasLength) + (0 until meta.columnMetasLength).foreach { i => + val columnMeta = meta.columnMetas(i) + assertResult(0)(columnMeta.nullCount) + assertResult(0)(columnMeta.rowCount) + val expectedType = uncompressedBatch.column(i).asInstanceOf[GpuColumnVector] + .getBase.getType + assertResult(expectedType.getNativeId)(columnMeta.dtype) + assertResult(null)(columnMeta.data) + assertResult(null)(columnMeta.validity) + assertResult(null)(columnMeta.offsets) + } + } + } + } + } + test("getBatchFromMeta") { withResource(buildContiguousTable()) { contigTable => val table = contigTable.getTable From 85bcea3d8643121126430853687ea528b5bb15fa Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 7 Aug 2020 09:59:31 -0500 Subject: [PATCH 5/7] Fix buffer leak Signed-off-by: Jason Lowe --- .../main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index db6ae6b11c8..3e09ff0b88e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -54,7 +54,7 @@ trait GpuPartitioning extends Partitioning with Arm { withResource(codec.createBatchCompressor(maxCompressionBatchSize)) { compressor => // batchCompress takes ownership of the contiguous tables and will close compressor.addTables(contiguousTables) - closeOnExcept(compressor.finish()) { compressedTables => + withResource(compressor.finish()) { compressedTables => compressedTables.foreach(ct => splits.append(GpuCompressedColumnVector.from(ct))) } } From 1c9b2205a39444a39e67eb4a00efad7316e50e28 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 7 Aug 2020 11:17:15 -0500 Subject: [PATCH 6/7] Add comment about degenerate batches potentially appearing compressed Signed-off-by: Jason Lowe --- .../src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala index ec056faf4e6..4996f7f6d15 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala @@ -138,6 +138,8 @@ object MetaUtils extends Arm { def buildDegenerateTableMeta(batch: ColumnarBatch): TableMeta = { require(batch.numRows == 0 || batch.numCols == 0, "batch not degenerate") if (batch.numCols > 0) { + // Batched compression can result in degenerate batches appearing compressed. In that case + // the table metadata has already been computed and can be returned directly. batch.column(0) match { case c: GpuCompressedColumnVector => return c.getTableMeta From 18e4598a9f2b307da0efdee8a260ee25ee100a8a Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 11 Aug 2020 09:11:38 -0500 Subject: [PATCH 7/7] Use a shuffle compression codec of "none" to indicate no compression Signed-off-by: Jason Lowe --- .../scala/com/nvidia/spark/rapids/RapidsConf.scala | 12 ++---------- .../nvidia/spark/rapids/TableCompressionCodec.scala | 4 +--- .../org/apache/spark/sql/rapids/GpuShuffleEnv.scala | 9 ++++++--- .../nvidia/spark/rapids/GpuPartitioningSuite.scala | 3 +-- .../spark/rapids/GpuSinglePartitioningSuite.scala | 2 +- 5 files changed, 11 insertions(+), 19 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index f6c5ae2a3e2..2b36a9592de 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -590,18 +590,12 @@ object RapidsConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(50 * 1024) - val SHUFFLE_COMPRESSION_ENABLED = conf("spark.rapids.shuffle.compression.enabled") - .doc("Whether to enable compression of shuffle buffers when using RAPIDS shuffle") - .internal() - .booleanConf - .createWithDefault(false) - val SHUFFLE_COMPRESSION_CODEC = conf("spark.rapids.shuffle.compression.codec") .doc("The GPU codec used to compress shuffle data when using RAPIDS shuffle. " + - "Currently only one codec is supported, copy.") + "Supported codecs: copy, none") .internal() .stringConf - .createWithDefault("copy") + .createWithDefault("none") // USER FACING DEBUG CONFIGS @@ -867,8 +861,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shuffleMaxMetadataSize: Long = get(SHUFFLE_MAX_METADATA_SIZE) - lazy val shuffleCompressionEnabled: Boolean = get(SHUFFLE_COMPRESSION_ENABLED) - lazy val shuffleCompressionCodec: String = get(SHUFFLE_COMPRESSION_CODEC) lazy val shuffleCompressionMaxBatchMemory: Long = get(SHUFFLE_COMPRESSION_MAX_BATCH_MEMORY) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala index 5d97c41e82d..c3215887f5e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala @@ -16,8 +16,6 @@ package com.nvidia.spark.rapids -import java.util.Locale - import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer} @@ -103,7 +101,7 @@ object TableCompressionCodec { /** Get a compression codec by short name or fully qualified class name */ def getCodec(name: String): TableCompressionCodec = { - val codecId = codecNameToId.getOrElse(name.toLowerCase(Locale.ROOT), + val codecId = codecNameToId.getOrElse(name, throw new IllegalArgumentException(s"Unknown table codec: $name")) getCodec(codecId) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala index 7604239982e..cb38dd9f6db 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.rapids +import java.util.Locale + import ai.rapids.cudf.{CudaMemInfo, Rmm} import com.nvidia.spark.rapids._ @@ -39,10 +41,11 @@ class GpuShuffleEnv(rapidsConf: RapidsConf) extends Logging { } lazy val rapidsShuffleCodec: Option[TableCompressionCodec] = { - if (rapidsConf.shuffleCompressionEnabled) { - Some(TableCompressionCodec.getCodec(rapidsConf.shuffleCompressionCodec)) - } else { + val codecName = rapidsConf.shuffleCompressionCodec.toLowerCase(Locale.ROOT) + if (codecName == "none") { None + } else { + Some(TableCompressionCodec.getCodec(codecName)) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala index c529c73c5d2..2fd73d0a29a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala @@ -60,7 +60,7 @@ class GpuPartitioningSuite extends FunSuite with Arm { test("GPU partition") { SparkSession.getActiveSession.foreach(_.close()) - val conf = new SparkConf().set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "false") + val conf = new SparkConf().set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "none") TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo()) val partitionIndices = Array(0, 2, 2) @@ -96,7 +96,6 @@ class GpuPartitioningSuite extends FunSuite with Arm { test("GPU partition with compression") { val conf = new SparkConf() - .set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "true") .set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "copy") TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo()) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala index 7ffecdb8a87..cf05e9b11de 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala @@ -36,7 +36,7 @@ class GpuSinglePartitioningSuite extends FunSuite with Arm { test("generates contiguous split uncompressed") { val conf = new SparkConf().set("spark.shuffle.manager", GpuShuffleEnv.RAPIDS_SHUFFLE_CLASS) - .set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "false") + .set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "none") TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo()) val partitioner = GpuSinglePartitioning(Nil)