diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 8cc1aca2319..74796816f67 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -26,10 +26,7 @@ import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.*; import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.spark.sql.vectorized.ColumnarMap; -import org.apache.spark.unsafe.types.UTF8String; import java.util.List; @@ -39,7 +36,7 @@ * is on the host, and we want to keep as much of the data on the device as possible. * We also provide GPU accelerated versions of the transitions to and from rows. */ -public class GpuColumnVector extends ColumnVector { +public class GpuColumnVector extends GpuColumnVectorBase { public static final class GpuColumnarBatchBuilder implements AutoCloseable { private final ai.rapids.cudf.HostColumnVector.Builder[] builders; @@ -174,7 +171,7 @@ public static final DType getRapidsType(DataType type) { return result; } - protected static final DataType getSparkType(DType type) { + static final DataType getSparkType(DType type) { switch (type) { case BOOL8: return DataTypes.BooleanType; @@ -396,78 +393,6 @@ public final int numNulls() { return (int) cudfCv.getNullCount(); } - private final static String BAD_ACCESS = "DATA ACCESS MUST BE ON A HOST VECTOR"; - - @Override - public final boolean isNullAt(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final boolean getBoolean(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final byte getByte(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final short getShort(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final int getInt(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final long getLong(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final float getFloat(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final double getDouble(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final ColumnarArray getArray(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final ColumnarMap getMap(int ordinal) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final Decimal getDecimal(int rowId, int precision, int scale) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final UTF8String getUTF8String(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final byte[] getBinary(int rowId) { - throw new IllegalStateException(BAD_ACCESS); - } - - @Override - public final ColumnVector getChild(int ordinal) { - throw new IllegalStateException(BAD_ACCESS); - } - public static final long getTotalDeviceMemoryUsed(ColumnarBatch batch) { long sum = 0; for (int i = 0; i < batch.numCols(); i++) { diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorBase.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorBase.java new file mode 100644 index 00000000000..85b3e9098c1 --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVectorBase.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import ai.rapids.cudf.DType; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** Base class for all GPU column vectors. */ +abstract class GpuColumnVectorBase extends ColumnVector { + private final static String BAD_ACCESS = "DATA ACCESS MUST BE ON A HOST VECTOR"; + + protected GpuColumnVectorBase(DataType type) { + super(type); + } + + @Override + public final boolean isNullAt(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final boolean getBoolean(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final byte getByte(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final short getShort(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final int getInt(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final long getLong(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final float getFloat(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final double getDouble(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final ColumnarArray getArray(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final ColumnarMap getMap(int ordinal) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final Decimal getDecimal(int rowId, int precision, int scale) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final UTF8String getUTF8String(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final byte[] getBinary(int rowId) { + throw new IllegalStateException(BAD_ACCESS); + } + + @Override + public final ColumnVector getChild(int ordinal) { + throw new IllegalStateException(BAD_ACCESS); + } +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java new file mode 100644 index 00000000000..ea7e16e9d56 --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuCompressedColumnVector.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import ai.rapids.cudf.DType; +import ai.rapids.cudf.DeviceMemoryBuffer; +import com.nvidia.spark.rapids.format.ColumnMeta; +import com.nvidia.spark.rapids.format.TableMeta; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +/** + * A GPU column vector that has been compressed. The columnar data within cannot + * be accessed directly. This class primarily serves the role of tracking the + * compressed data and table metadata so it can be decompressed later. + */ +public final class GpuCompressedColumnVector extends GpuColumnVectorBase { + private final DeviceMemoryBuffer buffer; + private final TableMeta tableMeta; + + public static ColumnarBatch from(CompressedTable compressedTable) { + DeviceMemoryBuffer buffer = compressedTable.buffer(); + TableMeta tableMeta = compressedTable.meta(); + long rows = tableMeta.rowCount(); + if (rows != (int) rows) { + throw new IllegalStateException("Cannot support a batch larger that MAX INT rows"); + } + + ColumnMeta columnMeta = new ColumnMeta(); + int numColumns = tableMeta.columnMetasLength(); + ColumnVector[] columns = new ColumnVector[numColumns]; + try { + for (int i = 0; i < numColumns; ++i) { + tableMeta.columnMetas(columnMeta, i); + DType dtype = DType.fromNative(columnMeta.dtype()); + DataType type = GpuColumnVector.getSparkType(dtype); + DeviceMemoryBuffer slicedBuffer = buffer.slice(0, buffer.getLength()); + columns[i] = new GpuCompressedColumnVector(type, slicedBuffer, tableMeta); + } + } catch (Throwable t) { + for (int i = 0; i < numColumns; ++i) { + if (columns[i] != null) { + columns[i].close(); + } + } + throw t; + } + + return new ColumnarBatch(columns, (int) rows); + } + + private GpuCompressedColumnVector(DataType type, DeviceMemoryBuffer buffer, TableMeta tableMeta) { + super(type); + this.buffer = buffer; + this.tableMeta = tableMeta; + } + + public DeviceMemoryBuffer getBuffer() { + return buffer; + } + + public TableMeta getTableMeta() { + return tableMeta; + } + + @Override + public void close() { + buffer.close(); + } + + @Override + public boolean hasNull() { + throw new IllegalStateException("column vector is compressed"); + } + + @Override + public int numNulls() { + throw new IllegalStateException("column vector is compressed"); + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala new file mode 100644 index 00000000000..e658eedbb63 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CopyCompressionCodec.scala @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer} +import com.nvidia.spark.rapids.format.{CodecType, TableMeta} + +/** A table compression codec used only for testing that copies the data. */ +class CopyCompressionCodec extends TableCompressionCodec with Arm { + override val name: String = "COPY" + override val codecId: Byte = CodecType.COPY + + override def compress( + tableId: Int, + contigTable: ContiguousTable): CompressedTable = { + val buffer = contigTable.getBuffer + closeOnExcept(buffer.sliceWithCopy(0, buffer.getLength)) { outputBuffer => + val meta = MetaUtils.buildTableMeta( + tableId, + contigTable.getTable, + buffer, + codecId, + outputBuffer.getLength) + CompressedTable(buffer.getLength, meta, outputBuffer) + } + } + + override def decompressBuffer( + outputBuffer: DeviceMemoryBuffer, + outputOffset: Long, + outputLength: Long, + inputBuffer: DeviceMemoryBuffer, + inputOffset: Long, + inputLength: Long): Unit = { + require(outputLength == inputLength) + outputBuffer.copyFromDeviceBufferAsync( + outputOffset, + inputBuffer, + inputOffset, + inputLength, + Cuda.DEFAULT_STREAM) + } + + override def createBatchCompressor(maxBatchMemorySize: Long): BatchedTableCompressor = + new BatchedCopyCompressor(maxBatchMemorySize) + + override def createBatchDecompressor(maxBatchMemorySize: Long): BatchedBufferDecompressor = + new BatchedCopyDecompressor(maxBatchMemorySize) +} + +class BatchedCopyCompressor(maxBatchMemory: Long) extends BatchedTableCompressor(maxBatchMemory) { + override protected def getTempSpaceNeeded(buffer: DeviceMemoryBuffer): Long = 0 + + override protected def getOutputSpaceNeeded( + dataBuffer: DeviceMemoryBuffer, + tempBuffer: DeviceMemoryBuffer): Long = dataBuffer.getLength + + override protected def compress( + outputBuffers: Array[DeviceMemoryBuffer], + tables: Array[ContiguousTable], + tempBuffers: Array[DeviceMemoryBuffer]): Array[TableMeta] = { + outputBuffers.zip(tables).map { case (outBuffer, ct) => + val inBuffer = ct.getBuffer + outBuffer.copyFromDeviceBufferAsync(0, inBuffer, 0, inBuffer.getLength, Cuda.DEFAULT_STREAM) + MetaUtils.buildTableMeta( + 0, + ct.getTable, + inBuffer, + CodecType.COPY, + outBuffer.getLength) + } + } +} + +class BatchedCopyDecompressor(maxBatchMemory: Long) + extends BatchedBufferDecompressor(maxBatchMemory) { + override val codecId: Byte = CodecType.COPY + + override def decompressTempSpaceNeeded(inputBuffer: DeviceMemoryBuffer): Long = 0 + + override def decompress( + outputBuffers: Array[DeviceMemoryBuffer], + inputBuffers: Array[DeviceMemoryBuffer], + tempBuffers: Array[DeviceMemoryBuffer]): Unit = { + outputBuffers.zip(inputBuffers).foreach { case (outputBuffer, inputBuffer) => + outputBuffer.copyFromDeviceBufferAsync(0, inputBuffer, 0, + outputBuffer.getLength, Cuda.DEFAULT_STREAM) + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index 22028e9321a..812c3ece914 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{BufferType, NvtxColor, Table} +import com.nvidia.spark.rapids.format.{ColumnMeta, SubBufferMeta, TableMeta} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -364,6 +365,7 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch], class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], schema: StructType, goal: CoalesceGoal, + maxDecompressBatchMemory: Long, numInputRows: SQLMetric, numInputBatches: SQLMetric, numOutputRows: SQLMetric, @@ -384,21 +386,75 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], concatTime, totalTime, peakDevMemory, - opName) { + opName) with Arm { private var batches: ArrayBuffer[ColumnarBatch] = ArrayBuffer.empty private var maxDeviceMemory: Long = 0 - override def initNewBatch(): Unit = - batches = ArrayBuffer[ColumnarBatch]() + // batch indices that are compressed batches + private[this] var compressedBatchIndices: ArrayBuffer[Int] = ArrayBuffer.empty - override def addBatchToConcat(batch: ColumnarBatch): Unit = + private[this] var codec: TableCompressionCodec = _ + + override def initNewBatch(): Unit = { + batches.clear() + compressedBatchIndices.clear() + } + + override def addBatchToConcat(batch: ColumnarBatch): Unit = { + if (isBatchCompressed(batch)) { + compressedBatchIndices += batches.size + } batches += batch + } + + private def isBatchCompressed(batch: ColumnarBatch): Boolean = { + if (batch.numCols == 0) { + false + } else { + batch.column(0) match { + case _: GpuCompressedColumnVector => true + case _ => false + } + } + } - override def getColumnSizes(cb: ColumnarBatch): Array[Long] = - GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize) + private def getUncompressedColumnSizes(tableMeta: TableMeta): Array[Long] = { + val numCols = tableMeta.columnMetasLength + val columnMeta = new ColumnMeta + val subBufferMetaObj = new SubBufferMeta + val sizes = new Array[Long](numCols) + (0 until numCols).foreach { i => + tableMeta.columnMetas(columnMeta, i) + var subBuffer = columnMeta.data(subBufferMetaObj) + if (subBuffer != null) { + sizes(i) += subBuffer.length + } + subBuffer = columnMeta.offsets(subBufferMetaObj) + if (subBuffer != null) { + sizes(i) += subBuffer.length + } + subBuffer = columnMeta.validity(subBufferMetaObj) + if (subBuffer != null) { + sizes(i) += subBuffer.length + } + } + sizes + } + + override def getColumnSizes(cb: ColumnarBatch): Array[Long] = { + if (!isBatchCompressed(cb)) { + GpuColumnVector.extractBases(cb).map(_.getDeviceMemorySize) + } else { + val compressedVector = cb.column(0).asInstanceOf[GpuCompressedColumnVector] + val tableMeta = compressedVector.getTableMeta + require(tableMeta.columnMetasLength == cb.numCols) + getUncompressedColumnSizes(tableMeta) + } + } override def concatAllAndPutOnGPU(): ColumnarBatch = { + decompressBatches() val tmp = batches.toArray // Clear the buffer so we don't close it again (buildNonEmptyBatch closed it for us). batches = ArrayBuffer.empty @@ -408,6 +464,35 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], ret } + private def decompressBatches(): Unit = { + if (compressedBatchIndices.nonEmpty) { + val compressedVecs = compressedBatchIndices.map { batchIndex => + batches(batchIndex).column(0).asInstanceOf[GpuCompressedColumnVector] + } + if (codec == null) { + val descr = compressedVecs.head.getTableMeta.bufferMeta.codecBufferDescrs(0) + codec = TableCompressionCodec.getCodec(descr.codec) + } + withResource(codec.createBatchDecompressor(maxDecompressBatchMemory)) { decompressor => + compressedVecs.foreach { cv => + val bufferMeta = cv.getTableMeta.bufferMeta + // don't currently support switching codecs when partitioning + val buffer = cv.getBuffer.slice(0, cv.getBuffer.getLength) + decompressor.addBufferToDecompress(buffer, bufferMeta) + } + closeOnExcept(decompressor.finish()) { outputBuffers => + outputBuffers.zipWithIndex.foreach { case (outputBuffer, outputIndex) => + val cv = compressedVecs(outputIndex) + val batchIndex = compressedBatchIndices(outputIndex) + val compressedBatch = batches(batchIndex) + batches(batchIndex) = MetaUtils.getBatchFromMeta(outputBuffer, cv.getTableMeta) + compressedBatch.close() + } + } + } + } + } + override def cleanupConcatIsDone(): Unit = { peakDevMemory.set(maxDeviceMemory) batches.foreach(_.close()) @@ -416,8 +501,11 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) extends UnaryExecNode with GpuExec { - import GpuMetricNames._ + private[this] val maxDecompressBatchMemory = + new RapidsConf(child.conf).shuffleCompressionMaxBatchMemory + + import GpuMetricNames._ override lazy val additionalMetrics: Map[String, SQLMetric] = Map( NUM_INPUT_ROWS -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_ROWS), NUM_INPUT_BATCHES -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_BATCHES), @@ -447,7 +535,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) val batches = child.executeColumnar() batches.mapPartitions { iter => if (child.schema.nonEmpty) { - new GpuCoalesceIterator(iter, schema, goal, + new GpuCoalesceIterator(iter, schema, goal, maxDecompressBatchMemory, numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches") } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 43c1092a669..d0a28e2a5ad 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -18,15 +18,18 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ContiguousTable, NvtxColor, NvtxRange, Table} +import ai.rapids.cudf.{NvtxColor, NvtxRange, Table} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuShuffleEnv import org.apache.spark.sql.vectorized.ColumnarBatch -trait GpuPartitioning extends Partitioning { +trait GpuPartitioning extends Partitioning with Arm { + private[this] val maxCompressionBatchSize = + new RapidsConf(SQLConf.get).shuffleCompressionMaxBatchMemory def sliceBatch(vectors: Array[RapidsHostColumnVector], start: Int, end: Int): ColumnarBatch = { var ret: ColumnarBatch = null @@ -43,24 +46,25 @@ trait GpuPartitioning extends Partitioning { // The first index will always be 0, so we need to skip it. val batches = if (numRows > 0) { val parts = partitionIndexes.slice(1, partitionIndexes.length) - val splits = new ArrayBuffer[ColumnarBatch](numPartitions) - val table = new Table(partitionColumns.map(_.getBase).toArray: _*) - val contiguousTables: Array[ContiguousTable] = try { - table.contiguousSplit(parts: _*) - } finally { - table.close() - } - var succeeded = false - try { - contiguousTables.foreach { ct => splits.append(GpuColumnVectorFromBuffer.from(ct)) } - succeeded = true - } finally { - contiguousTables.foreach(_.close()) - if (!succeeded) { - splits.foreach(_.close()) + closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits => + val table = new Table(partitionColumns.map(_.getBase).toArray: _*) + val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*)) + GpuShuffleEnv.rapidsShuffleCodec match { + case Some(codec) => + withResource(codec.createBatchCompressor(maxCompressionBatchSize)) { compressor => + // batchCompress takes ownership of the contiguous tables and will close + compressor.addTables(contiguousTables) + withResource(compressor.finish()) { compressedTables => + compressedTables.foreach(ct => splits.append(GpuCompressedColumnVector.from(ct))) + } + } + case None => + withResource(contiguousTables) { cts => + cts.foreach { ct => splits.append(GpuColumnVectorFromBuffer.from(ct)) } + } } + splits.toArray } - splits.toArray } else { Array[ColumnarBatch]() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala index 98779a9908f..4996f7f6d15 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/MetaUtils.scala @@ -132,12 +132,20 @@ object MetaUtils extends Arm { /** * Build a TableMeta message for a degenerate table (zero columns or rows) - * @param tableId the ID to use for this table * @param batch the degenerate columnar batch * @return heap-based flatbuffer message */ - def buildDegenerateTableMeta(tableId: Int, batch: ColumnarBatch): TableMeta = { + def buildDegenerateTableMeta(batch: ColumnarBatch): TableMeta = { require(batch.numRows == 0 || batch.numCols == 0, "batch not degenerate") + if (batch.numCols > 0) { + // Batched compression can result in degenerate batches appearing compressed. In that case + // the table metadata has already been computed and can be returned directly. + batch.column(0) match { + case c: GpuCompressedColumnVector => + return c.getTableMeta + case _ => + } + } val fbb = new FlatBufferBuilder(1024) val columnMetaOffset = if (batch.numCols > 0) { val columns = GpuColumnVector.extractBases(batch) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index 8122b9c2c30..097f12e2621 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -76,6 +76,7 @@ trait RapidsBuffer extends AutoCloseable { * successfully acquired the buffer beforehand. * @see [[addReference]] * @note It is the responsibility of the caller to close the batch. + * @note This may be an expensive operation (e.g.: batch may need to be decompressed). */ def getColumnarBatch: ColumnarBatch diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 26eb486cba1..ac44dbb2828 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, NvtxColor, NvtxRange} import com.nvidia.spark.rapids.StorageTier.StorageTier -import com.nvidia.spark.rapids.format.TableMeta +import com.nvidia.spark.rapids.format.{BufferMeta, CodecBufferDescriptor, CodecType, TableMeta} import org.apache.spark.internal.Logging import org.apache.spark.sql.vectorized.ColumnarBatch @@ -236,7 +236,7 @@ abstract class RapidsBufferStore( override val id: RapidsBufferId, override val size: Long, override val meta: TableMeta, - initialSpillPriority: Long) extends RapidsBuffer { + initialSpillPriority: Long) extends RapidsBuffer with Arm { private[this] var isValid = true protected[this] var refcount = 0 private[this] var spillPriority: Long = initialSpillPriority @@ -267,7 +267,7 @@ abstract class RapidsBufferStore( // allocated. Allocations can trigger synchronous spills which can // deadlock if another thread holds the device store lock and is trying // to spill to this store. - val deviceBuffer = DeviceMemoryBuffer.allocate(size) + var deviceBuffer = DeviceMemoryBuffer.allocate(size) try { val buffer = getMemoryBuffer try { @@ -281,6 +281,13 @@ abstract class RapidsBufferStore( } finally { buffer.close() } + + if (meta.bufferMeta.codecBufferDescrsLength > 0) { + withResource(deviceBuffer) { compressedBuffer => + deviceBuffer = uncompressBuffer(compressedBuffer, meta.bufferMeta) + } + } + MetaUtils.getBatchFromMeta(deviceBuffer, meta) } finally { deviceBuffer.close() @@ -336,6 +343,42 @@ abstract class RapidsBufferStore( } } + protected def uncompressBuffer( + compressedBuffer: DeviceMemoryBuffer, + meta: BufferMeta): DeviceMemoryBuffer = { + closeOnExcept(DeviceMemoryBuffer.allocate(meta.uncompressedSize)) { uncompressedBuffer => + val cbd = new CodecBufferDescriptor + (0 until meta.codecBufferDescrsLength).foreach { i => + meta.codecBufferDescrs(cbd, i) + if (cbd.codec == CodecType.UNCOMPRESSED) { + uncompressedBuffer.copyFromDeviceBufferAsync( + cbd.uncompressedOffset, + compressedBuffer, + cbd.compressedOffset, + cbd.compressedSize, + Cuda.DEFAULT_STREAM) + } else { + val startTime = System.nanoTime() + val codec = TableCompressionCodec.getCodec(cbd.codec) + codec.decompressBuffer( + uncompressedBuffer, + cbd.uncompressedOffset, + cbd.uncompressedSize, + compressedBuffer, + cbd.compressedOffset, + cbd.compressedSize) + val duration = System.nanoTime() - startTime + val compressedSize = cbd.compressedSize() + val uncompressedSize = cbd.uncompressedSize + logDebug(s"Decompressed buffer with ${codec.name} in ${duration / 1000} us," + + s"rate=${compressedSize.toFloat / duration} GB/s " + + s"from $compressedSize to $uncompressedSize") + } + } + uncompressedBuffer + } + } + override def toString: String = s"$name buffer size=$size" } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index f7af1402998..a0dc79d4009 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -601,8 +601,21 @@ object RapidsConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(50 * 1024) + val SHUFFLE_COMPRESSION_CODEC = conf("spark.rapids.shuffle.compression.codec") + .doc("The GPU codec used to compress shuffle data when using RAPIDS shuffle. " + + "Supported codecs: copy, none") + .internal() + .stringConf + .createWithDefault("none") + // USER FACING DEBUG CONFIGS + val SHUFFLE_COMPRESSION_MAX_BATCH_MEMORY = + conf("spark.rapids.shuffle.compression.maxBatchMemory") + .internal() + .bytesConf(ByteUnit.BYTE) + .createWithDefault(1024 * 1024 * 1024) + val EXPLAIN = conf("spark.rapids.sql.explain") .doc("Explain why some parts of a query were not placed on a GPU or not. Possible " + "values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of " + @@ -861,6 +874,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shuffleMaxMetadataSize: Long = get(SHUFFLE_MAX_METADATA_SIZE) + lazy val shuffleCompressionCodec: String = get(SHUFFLE_COMPRESSION_CODEC) + + lazy val shuffleCompressionMaxBatchMemory: Long = get(SHUFFLE_COMPRESSION_MAX_BATCH_MEMORY) + def isOperatorEnabled(key: String, incompat: Boolean, isDisabledByDefault: Boolean): Boolean = { val default = !(isDisabledByDefault || incompat) || (incompat && isIncompatEnabled) conf.get(key).map(toBoolean(_, key)).getOrElse(default) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index 931a76a9f95..48845f6c5f3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -140,7 +140,12 @@ class RapidsDeviceMemoryStore( if (table.isDefined) { GpuColumnVector.from(table.get) //REFCOUNT ++ of all columns } else { - throw new UnsupportedOperationException("compressed buffer support not implemented") + val uncompressedBuffer = uncompressBuffer(contigBuffer, meta.bufferMeta) + try { + MetaUtils.getBatchFromMeta(uncompressedBuffer, meta) + } finally { + uncompressedBuffer.close() + } } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala new file mode 100644 index 00000000000..c3215887f5e --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TableCompressionCodec.scala @@ -0,0 +1,383 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer} +import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.format.{BufferMeta, CodecType, TableMeta} + +/** + * Compressed table descriptor + * @note the buffer may be significantly oversized for the amount of compressed data + * @param compressedSize size of the compressed data in bytes + * @param meta metadata describing the table layout when uncompressed + * @param buffer buffer containing the compressed data + */ +case class CompressedTable( + compressedSize: Long, + meta: TableMeta, + buffer: DeviceMemoryBuffer) extends AutoCloseable { + override def close(): Unit = buffer.close() +} + +/** An interface to a compression codec that can compress a contiguous Table on the GPU */ +trait TableCompressionCodec { + /** The name of the codec, used for logging. */ + val name: String + + /** The ID used for this codec. See the definitions in `CodecType`. */ + val codecId: Byte + + /** + * Compress a contiguous table. + * @note The contiguous table is NOT closed by this operation and must be closed separately. + * @note The compressed buffer MAY NOT be ideally sized to the compressed data. It may be + * significantly larger than the size of the compressed data. Releasing this unused + * memory will require making a copy of the data to a buffer of the appropriate size. + * @param tableId ID to use for this table + * @param contigTable contiguous table to compress + * @return compressed table + */ + def compress(tableId: Int, contigTable: ContiguousTable): CompressedTable + + /** + * Decompress the compressed data buffer from a table compression operation. + * @note The compressed buffer is NOT closed by this method. + * @param outputBuffer buffer where uncompressed data will be written + * @param outputOffset offset in the uncompressed buffer to start writing data + * @param outputLength expected length of the uncompressed data in bytes + * @param inputBuffer buffer containing the compressed data + * @param inputOffset offset in the compressed buffer where compressed data starts + * @param inputLength length of the compressed data in bytes + */ + def decompressBuffer( + outputBuffer: DeviceMemoryBuffer, + outputOffset: Long, + outputLength: Long, + inputBuffer: DeviceMemoryBuffer, + inputOffset: Long, + inputLength: Long): Unit + + /** + * Create a batched compressor instance + * @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at + * which a batch should be compressed. A single table that requires + * temporary and output memory above this limit is allowed but will + * be compressed individually. + * @return batched compressor instance + */ + def createBatchCompressor(maxBatchMemorySize: Long): BatchedTableCompressor + + /** + * Create a batched decompressor instance + * @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at + * which a batch should be decompressed. A single buffer that requires + * temporary and output memory above this limit is allowed but will + * be decompressed individually. + * @return batched decompressor instance + */ + def createBatchDecompressor(maxBatchMemorySize: Long): BatchedBufferDecompressor +} + +object TableCompressionCodec { + private val codecNameToId = Map( + "copy" -> CodecType.COPY) + + /** Get a compression codec by short name or fully qualified class name */ + def getCodec(name: String): TableCompressionCodec = { + val codecId = codecNameToId.getOrElse(name, + throw new IllegalArgumentException(s"Unknown table codec: $name")) + getCodec(codecId) + } + + /** Get a compression codec by ID, using a cache. */ + def getCodec(codecId: Byte): TableCompressionCodec = { + codecId match { + case CodecType.COPY => new CopyCompressionCodec + case _ => throw new IllegalArgumentException(s"Unknown codec ID: $codecId") + } + } +} + +/** + * Base class for batched compressors + * @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at + * which a batch should be compressed. A single table that requires + * temporary and output memory above this limit is allowed but will + * be compressed individually. + */ +abstract class BatchedTableCompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm { + // The tables that need to be compressed in the next batch + private[this] val tables = new ArrayBuffer[ContiguousTable] + + // The temporary compression buffers needed to compress each table in the next batch + private[this] val tempBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The estimate-sized output buffers to hold the compressed output in the next batch + private[this] val oversizedOutBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The compressed outputs of all tables across all batches + private[this] val results = new ArrayBuffer[CompressedTable] + + // temporary and output memory being used as part of the current batch + private[this] var batchMemUsed: Long = 0 + + /** + * Add a contiguous table to be batch-compressed. Ownership of the table is transferred to the + * batch compressor which is responsible for closing the table. + * @param contigTable the contiguous table to be compressed + */ + def addTableToCompress(contigTable: ContiguousTable): Unit = { + closeOnExcept(contigTable) { contigTable => + val tempSize = getTempSpaceNeeded(contigTable.getBuffer) + var memNeededToCompressThisBuffer = tempSize + if (batchMemUsed + memNeededToCompressThisBuffer > maxBatchMemorySize) { + compressBatch() + } + val tempBuffer = if (tempSize > 0) { + DeviceMemoryBuffer.allocate(memNeededToCompressThisBuffer) + } else { + null + } + try { + val outputSize = getOutputSpaceNeeded(contigTable.getBuffer, tempBuffer) + memNeededToCompressThisBuffer += outputSize + if (batchMemUsed + memNeededToCompressThisBuffer > maxBatchMemorySize) { + compressBatch() + } + oversizedOutBuffers += DeviceMemoryBuffer.allocate(outputSize) + tempBuffers += tempBuffer + tables += contigTable + batchMemUsed += memNeededToCompressThisBuffer + } catch { + case t: Throwable => + if (tempBuffer != null) { + tempBuffer.safeClose() + } + throw t + } + } + } + + /** + * Add an array of contiguous tables to be compressed. The tables will be closed by the + * batch compressor. + * @param contigTable contiguous tables to compress + */ + def addTables(contigTable: Array[ContiguousTable]): Unit = { + var i = 0 + try { + contigTable.foreach { ct => + addTableToCompress(ct) + i += 1 + } + } catch { + case t: Throwable => + contigTable.drop(i).foreach(_.safeClose()) + throw t + } + } + + /** + * This must be called after all tables to be compressed have been added to retrieve the + * compression results. + * @note the table IDs in the TableMeta of all tables will be set to zero + * @return compressed tables + */ + def finish(): Array[CompressedTable] = { + // compress the last batch + compressBatch() + + val compressedTables = results.toArray + results.clear() + compressedTables + } + + /** Must be closed to release the resources owned by the batch compressor */ + override def close(): Unit = { + tables.safeClose() + tempBuffers.safeClose() + oversizedOutBuffers.safeClose() + results.safeClose() + } + + private def compressBatch(): Unit = if (tables.nonEmpty) { + require(oversizedOutBuffers.length == tables.length) + require(tempBuffers.length == tables.length) + val metas = compress(oversizedOutBuffers.toArray, tables.toArray, tempBuffers.toArray) + require(metas.length == tables.length) + + // copy the output data into correctly-sized buffers + metas.zipWithIndex.foreach { case (meta, i) => + val oversizedBuffer = oversizedOutBuffers(i) + val compressedSize = meta.bufferMeta.size + val buffer = if (oversizedBuffer.getLength > compressedSize) { + oversizedBuffer.sliceWithCopy(0, compressedSize) + } else { + // use this buffer as-is, don't close it at the end of this method + oversizedOutBuffers(i) = null + oversizedBuffer + } + results += CompressedTable(compressedSize, meta, buffer) + } + + // free all the inputs to this batch + tables.safeClose() + tables.clear() + tempBuffers.safeClose() + tempBuffers.clear() + oversizedOutBuffers.safeClose() + oversizedOutBuffers.clear() + batchMemUsed = 0 + } + + /** Return the amount of temporary space needed to compress this buffer */ + protected def getTempSpaceNeeded(buffer: DeviceMemoryBuffer): Long + + /** Return the amount of estimated output space needed to compress this buffer */ + protected def getOutputSpaceNeeded( + dataBuffer: DeviceMemoryBuffer, + tempBuffer: DeviceMemoryBuffer): Long + + /** + * Batch-compress contiguous tables + * @param outputBuffers output buffers allocated based on `getOutputSpaceNeeded` results + * @param tables contiguous tables to compress + * @param tempBuffers temporary buffers allocated based on `getTempSpaceNeeded` results. + * If the temporary space needed was zero then the corresponding buffer + * entry may be null. + * @return table metadata for the compressed tables. Table IDs should be set to 0. + */ + protected def compress( + outputBuffers: Array[DeviceMemoryBuffer], + tables: Array[ContiguousTable], + tempBuffers: Array[DeviceMemoryBuffer]): Array[TableMeta] +} + +/** + * Base class for batched decompressors + * @param maxBatchMemorySize The upper limit in bytes of temporary and output memory usage at + * which a batch should be compressed. A single table that requires + * temporary and output memory above this limit is allowed but will + * be compressed individually. + */ +abstract class BatchedBufferDecompressor(maxBatchMemorySize: Long) extends AutoCloseable with Arm { + // The buffers of compressed data that will be decompressed in the next batch + private[this] val inputBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The temporary buffers needed to be decompressed the next batch + private[this] val tempBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The output buffers that will contain the decompressed data in the next batch + private[this] val outputBuffers = new ArrayBuffer[DeviceMemoryBuffer] + + // The decompressed data results for all input buffers across all batches + private[this] val results = new ArrayBuffer[DeviceMemoryBuffer] + + // temporary and output memory being used as part of the current batch + private[this] var batchMemUsed: Long = 0 + + /** The codec ID corresponding to this decompressor */ + val codecId: Byte + + def addBufferToDecompress(buffer: DeviceMemoryBuffer, meta: BufferMeta): Unit = { + closeOnExcept(buffer) { buffer => + // Only supports a single codec per buffer for now. + require(meta.codecBufferDescrsLength == 1) + val descr = meta.codecBufferDescrs(0) + require(descr.codec == codecId) + + // Only support codec that consumes entire input buffer for now. + require(descr.compressedOffset == 0) + require(descr.compressedSize == buffer.getLength) + + val tempNeeded = decompressTempSpaceNeeded(buffer) + val outputNeeded = descr.uncompressedSize + if (batchMemUsed + tempNeeded + outputNeeded > maxBatchMemorySize) { + decompressBatch() + } + + val tempBuffer = if (tempNeeded > 0) { + DeviceMemoryBuffer.allocate(tempNeeded) + } else { + null + } + val outputBuffer = DeviceMemoryBuffer.allocate(outputNeeded) + batchMemUsed += tempNeeded + outputNeeded + tempBuffers += tempBuffer + outputBuffers += outputBuffer + inputBuffers += buffer + } + } + + /** + * This must be called after all buffers to be decompressed have been added to retrieve the + * decompression results. + * @return decompressed tables + */ + def finish(): Array[DeviceMemoryBuffer] = { + // decompress the last batch + decompressBatch() + val resultsArray = results.toArray + results.clear() + resultsArray + } + + override def close(): Unit = { + inputBuffers.safeClose() + tempBuffers.safeClose() + outputBuffers.safeClose() + } + + protected def decompressBatch(): Unit = { + if (inputBuffers.nonEmpty) { + require(outputBuffers.length == inputBuffers.length) + require(tempBuffers.length == inputBuffers.length) + decompress(outputBuffers.toArray, inputBuffers.toArray, tempBuffers.toArray) + results ++= outputBuffers + outputBuffers.clear() + + // free all the inputs to this batch + inputBuffers.safeClose() + inputBuffers.clear() + tempBuffers.safeClose() + tempBuffers.clear() + batchMemUsed = 0 + } + } + + /** + * Compute the amount of temporary buffer space required to decompress a buffer + * @param inputBuffer buffer to decompress + * @return required temporary buffer space in bytes + */ + protected def decompressTempSpaceNeeded(inputBuffer: DeviceMemoryBuffer): Long + + /** + * Decompress a batch of compressed buffers + * @param outputBuffers buffers that will contain the uncompressed output + * @param inputBuffers buffers that contain the compressed input + * @param tempBuffers buffers to used for temporary space + */ + protected def decompress( + outputBuffers: Array[DeviceMemoryBuffer], + inputBuffers: Array[DeviceMemoryBuffer], + tempBuffers: Array[DeviceMemoryBuffer]): Unit + +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala index 53e26d8c550..cb38dd9f6db 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.rapids +import java.util.Locale + import ai.rapids.cudf.{CudaMemInfo, Rmm} import com.nvidia.spark.rapids._ @@ -38,6 +40,14 @@ class GpuShuffleEnv(rapidsConf: RapidsConf) extends Logging { conf.get("spark.shuffle.manager") == GpuShuffleEnv.RAPIDS_SHUFFLE_CLASS } + lazy val rapidsShuffleCodec: Option[TableCompressionCodec] = { + val codecName = rapidsConf.shuffleCompressionCodec.toLowerCase(Locale.ROOT) + if (codecName == "none") { + None + } else { + Some(TableCompressionCodec.getCodec(codecName)) + } + } def initStorage(devInfo: CudaMemInfo): Unit = { if (isRapidsShuffleConfigured) { @@ -130,4 +140,6 @@ object GpuShuffleEnv extends Logging { def getReceivedCatalog: ShuffleReceivedBufferCatalog = env.getReceivedCatalog def getDeviceStorage: RapidsDeviceMemoryStore = env.getDeviceStorage + + def rapidsShuffleCodec: Option[TableCompressionCodec] = env.rapidsShuffleCodec } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala index 0211a8a8b18..2939645fbb8 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala @@ -23,8 +23,7 @@ import com.nvidia.spark.rapids.shuffle.{RapidsShuffleRequestHandler, RapidsShuff import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv, TaskContext} -import org.apache.spark.internal.{config, Logging} -import org.apache.spark.io.CompressionCodec +import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle._ @@ -41,9 +40,8 @@ class GpuShuffleHandle[K, V]( override def toString: String = s"GPU SHUFFLE HANDLE $shuffleId" } -class GpuShuffleBlockResolver(private val wrapped: ShuffleBlockResolver, - private val blockManager: BlockManager, - private val compressionCodec: CompressionCodec, +class GpuShuffleBlockResolver( + private val wrapped: ShuffleBlockResolver, catalog: ShuffleBufferCatalog) extends ShuffleBlockResolver with Logging { override def getBlockData(blockId: BlockId, dirs: Option[Array[String]]): ManagedBuffer = { @@ -73,14 +71,11 @@ object RapidsShuffleInternalManagerBase extends Logging { } class RapidsCachingWriter[K, V]( - conf: SparkConf, blockManager: BlockManager, - blockResolver: IndexShuffleBlockResolver, // Never keep a reference to the ShuffleHandle in the cache as it being GCed triggers // the data being released handle: GpuShuffleHandle[K, V], mapId: Long, - compressionCodec: CompressionCodec, metrics: ShuffleWriteMetricsReporter, catalog: ShuffleBufferCatalog, shuffleStorage: RapidsDeviceMemoryStore, @@ -100,27 +95,39 @@ class RapidsCachingWriter[K, V]( val batch = p._2.asInstanceOf[ColumnarBatch] logDebug(s"Caching shuffle_id=${handle.shuffleId} map_id=$mapId, partId=$partId, " + s"batch=[num_cols=${batch.numCols()}, num_rows=${batch.numRows()}]") - val partSize = GpuColumnVector.extractBases(batch).map(_.getDeviceMemorySize).sum recordsWritten = recordsWritten + batch.numRows() - bytesWritten = bytesWritten + partSize - sizes(partId) += partSize + var partSize: Long = 0 val blockId = ShuffleBlockId(handle.shuffleId, mapId, partId) val bufferId = catalog.nextShuffleBufferId(blockId) if (batch.numRows > 0 && batch.numCols > 0) { - val buffer = { - val buff = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer].getBuffer - buff.slice(0, buff.getLength) - } - // Add the table to the shuffle store - shuffleStorage.addTable( - bufferId, - GpuColumnVector.from(batch), - buffer, - SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY) + batch.column(0) match { + case c: GpuColumnVectorFromBuffer => + val buffer = c.getBuffer.slice(0, c.getBuffer.getLength) + partSize = buffer.getLength + shuffleStorage.addTable( + bufferId, + GpuColumnVector.from(batch), + buffer, + SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY) + case c: GpuCompressedColumnVector => + val buffer = c.getBuffer.slice(0, c.getBuffer.getLength) + partSize = buffer.getLength + val tableMeta = c.getTableMeta + // update the table metadata for the buffer ID generated above + tableMeta.bufferMeta.mutateId(bufferId.tableId) + shuffleStorage.addBuffer( + bufferId, + buffer, + tableMeta, + SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY) + case c => throw new IllegalStateException(s"Unexpected column type: ${c.getClass}") + } + bytesWritten += partSize + sizes(partId) += partSize } else { // no device data, tracking only metadata - val tableMeta = MetaUtils.buildDegenerateTableMeta(bufferId.tableId, batch) + val tableMeta = MetaUtils.buildDegenerateTableMeta(batch) catalog.registerNewBuffer(new DegenerateRapidsBuffer(bufferId, tableMeta)) // The size of the data is really only used to tell if the data should be shuffled or not @@ -214,22 +221,11 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole } private lazy val localBlockManagerId = blockManager.blockManagerId - private lazy val compressionEnabled: Boolean = conf.get(config.SHUFFLE_COMPRESS) - private lazy val compressionCodec: CompressionCodec = - if (compressionEnabled) { - CompressionCodec.createCodec(conf) - } else { - null - } private lazy val resolver = if (shouldFallThroughOnEverything) { wrapped.shuffleBlockResolver } else { - new GpuShuffleBlockResolver( - wrapped.shuffleBlockResolver, - blockManager, - compressionCodec, - catalog) + new GpuShuffleBlockResolver(wrapped.shuffleBlockResolver, catalog) } private[this] lazy val transport: Option[RapidsShuffleTransport] = { @@ -283,12 +279,10 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole handle match { case gpu: GpuShuffleHandle[_, _] => registerGpuShuffle(handle.shuffleId) - new RapidsCachingWriter(conf, + new RapidsCachingWriter( env.blockManager, - wrapped.shuffleBlockResolver, gpu.asInstanceOf[GpuShuffleHandle[K, V]], mapId, - compressionCodec, metrics, GpuShuffleEnv.getCatalog, GpuShuffleEnv.getDeviceStorage, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index d13fd1c98f4..393cfb51ee0 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -19,9 +19,12 @@ package com.nvidia.spark.rapids import java.io.File import java.nio.file.Files +import ai.rapids.cudf.{ContiguousTable, HostColumnVector, Table} +import com.nvidia.spark.rapids.format.CodecType + import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.rapids.metrics.source.MockTaskContext -import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.types.{DataTypes, LongType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { @@ -77,6 +80,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val it = new GpuCoalesceIterator(input, schema, TargetSize(Long.MaxValue), + 0, numInputRows, numInputBatches, numOutputRows, @@ -243,4 +247,158 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { }, conf) } + def testCompressedBatches(maxCompressedBatchMemoryLimit: Long) { + val coalesceTargetBytes = 8000 + val stop = 10000 + var start = 0 + var numBatchRows = 100 + var expectedEnd = 0 + val batchIter = new Iterator[ColumnarBatch] { + override def hasNext: Boolean = if (start < stop) { + true + } else { + expectedEnd = start + false + } + override def next(): ColumnarBatch = { + val batch = buildCompressedBatch(start, numBatchRows) + start += batch.numRows + numBatchRows *= 2 + batch + } + } + + val schema = new StructType().add("i", LongType) + val dummyMetric = new SQLMetric("ignored") + val coalesceIter = new GpuCoalesceIterator( + batchIter, + schema, + TargetSize(coalesceTargetBytes), + maxCompressedBatchMemoryLimit, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + "test concat") + + var expected = 0 + while (coalesceIter.hasNext) { + withResource(coalesceIter.next()) { batch => + assertResult(1)(batch.numCols) + val col = GpuColumnVector.extractBases(batch).head + withResource(col.copyToHost) { hcv => + (0 until hcv.getRowCount.toInt).foreach { i => + assertResult(expected)(hcv.getLong(i)) + expected += 1 + } + } + } + } + assertResult(expectedEnd)(expected) + } + + test("all compressed low memory limit") { + testCompressedBatches(0) + } + + test("all compressed high memory limit") { + testCompressedBatches(Long.MaxValue) + } + + test("mixed compressed and uncompressed low memory limit") { + testMixedCompressedUncompressed(0) + } + + test("mixed compressed and uncompressed high memory limit") { + testMixedCompressedUncompressed(Long.MaxValue) + } + + def testMixedCompressedUncompressed(maxCompressedBatchMemoryLimit: Long): Unit = { + val coalesceTargetBytes = 8000 + val stop = 10000 + var start = 0 + var numBatchRows = 100 + var nextBatchCompressed = false + var expectedEnd = 0 + val batchIter = new Iterator[ColumnarBatch] { + override def hasNext: Boolean = if (start < stop) { + true + } else { + expectedEnd = start + false + } + override def next(): ColumnarBatch = { + val batch = if (nextBatchCompressed) { + buildCompressedBatch(start, numBatchRows) + } else { + buildUncompressedBatch(start, numBatchRows) + } + nextBatchCompressed = !nextBatchCompressed + start += batch.numRows + numBatchRows *= 2 + batch + } + } + + val schema = new StructType().add("i", LongType) + val dummyMetric = new SQLMetric("ignored") + val coalesceIter = new GpuCoalesceIterator( + batchIter, + schema, + TargetSize(coalesceTargetBytes), + maxCompressedBatchMemoryLimit, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + dummyMetric, + "test concat") + + var expected = 0 + while (coalesceIter.hasNext) { + withResource(coalesceIter.next()) { batch => + assertResult(1)(batch.numCols) + val col = GpuColumnVector.extractBases(batch).head + withResource(col.copyToHost) { hcv => + (0 until hcv.getRowCount.toInt).foreach { i => + assertResult(expected)(hcv.getLong(i)) + expected += 1 + } + } + } + } + assertResult(expectedEnd)(expected) + } + + private def buildContiguousTable(start: Int, numRows: Int): ContiguousTable = { + val vals = (0 until numRows).map(_.toLong + start) + withResource(HostColumnVector.fromLongs(vals:_*)) { hcv => + withResource(hcv.copyToDevice()) { cv => + withResource(new Table(cv)) { table => + table.contiguousSplit()(0) + } + } + } + } + + private def buildUncompressedBatch(start: Int, numRows: Int): ColumnarBatch = { + withResource(buildContiguousTable(start, numRows)) { ct => + GpuColumnVector.from(ct.getTable) + } + } + + private def buildCompressedBatch(start: Int, numRows: Int): ColumnarBatch = { + val codec = TableCompressionCodec.getCodec(CodecType.COPY) + withResource(codec.createBatchCompressor(0)) { compressor => + compressor.addTableToCompress(buildContiguousTable(start, numRows)) + GpuCompressedColumnVector.from(compressor.finish().head) + } + } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala index 8bce10f8a5a..2fd73d0a29a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala @@ -16,13 +16,15 @@ package com.nvidia.spark.rapids +import java.io.File + import ai.rapids.cudf.{Cuda, Table} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.scalatest.FunSuite import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.rapids.GpuShuffleEnv +import org.apache.spark.sql.rapids.{GpuShuffleEnv, RapidsDiskBlockManager} import org.apache.spark.sql.vectorized.ColumnarBatch class GpuPartitioningSuite extends FunSuite with Arm { @@ -58,10 +60,10 @@ class GpuPartitioningSuite extends FunSuite with Arm { test("GPU partition") { SparkSession.getActiveSession.foreach(_.close()) - val conf = new SparkConf() + val conf = new SparkConf().set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "none") TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo()) - val partitionIndices = Array(0, 2) + val partitionIndices = Array(0, 2, 2) val gp = new GpuPartitioning { override val numPartitions: Int = partitionIndices.length } @@ -91,4 +93,57 @@ class GpuPartitioningSuite extends FunSuite with Arm { } } } + + test("GPU partition with compression") { + val conf = new SparkConf() + .set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "copy") + TestUtils.withGpuSparkSession(conf) { _ => + GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo()) + val spillPriority = 7L + val catalog = new RapidsBufferCatalog + withResource(new RapidsDeviceMemoryStore(catalog)) { deviceStore => + val partitionIndices = Array(0, 2, 2) + val gp = new GpuPartitioning { + override val numPartitions: Int = partitionIndices.length + } + withResource(buildBatch()) { batch => + val columns = GpuColumnVector.extractColumns(batch) + val numRows = batch.numRows + withResource(gp.sliceInternalOnGpu(numRows, partitionIndices, columns)) { partitions => + partitions.zipWithIndex.foreach { case (partBatch, partIndex) => + val startRow = partitionIndices(partIndex) + val endRow = if (partIndex < partitionIndices.length - 1) { + partitionIndices(partIndex + 1) + } else { + batch.numRows + } + val expectedRows = endRow - startRow + assertResult(expectedRows)(partBatch.numRows) + val columns = (0 until partBatch.numCols).map(i => partBatch.column(i)) + columns.foreach { column => + assert(column.isInstanceOf[GpuCompressedColumnVector]) + assertResult(expectedRows) { + column.asInstanceOf[GpuCompressedColumnVector].getTableMeta.rowCount + } + } + val gccv = columns.head.asInstanceOf[GpuCompressedColumnVector] + val bufferId = MockRapidsBufferId(partIndex) + val devBuffer = gccv.getBuffer.slice(0, gccv.getBuffer.getLength) + deviceStore.addBuffer(bufferId, devBuffer, gccv.getTableMeta, spillPriority) + withResource(buildSubBatch(batch, startRow, endRow)) { expectedBatch => + withResource(catalog.acquireBuffer(bufferId)) { buffer => + compareBatches(expectedBatch, buffer.getColumnarBatch) + } + } + } + } + } + } + } + } +} + +case class MockRapidsBufferId(tableId: Int) extends RapidsBufferId { + override def getDiskPath(diskBlockManager: RapidsDiskBlockManager): File = + throw new UnsupportedOperationException } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala index 967a798f59b..cf05e9b11de 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala @@ -34,8 +34,9 @@ class GpuSinglePartitioningSuite extends FunSuite with Arm { } } - test("generates contiguous split") { + test("generates contiguous split uncompressed") { val conf = new SparkConf().set("spark.shuffle.manager", GpuShuffleEnv.RAPIDS_SHUFFLE_CLASS) + .set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "none") TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo()) val partitioner = GpuSinglePartitioning(Nil) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala index 38c9e6e57a8..2ebb97f3066 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{BufferType, ContiguousTable, Table} +import ai.rapids.cudf.{BufferType, ContiguousTable, DeviceMemoryBuffer, Table} import com.nvidia.spark.rapids.format.{CodecType, ColumnMeta} import org.scalatest.FunSuite @@ -101,7 +101,7 @@ class MetaUtilsSuite extends FunSuite with Arm { test("buildDegenerateTableMeta no columns") { val degenerateBatch = new ColumnarBatch(Array(), 127) - val meta = MetaUtils.buildDegenerateTableMeta(8, degenerateBatch) + val meta = MetaUtils.buildDegenerateTableMeta(degenerateBatch) assertResult(null)(meta.bufferMeta) assertResult(0)(meta.columnMetasLength) assertResult(127)(meta.rowCount) @@ -110,7 +110,7 @@ class MetaUtilsSuite extends FunSuite with Arm { test("buildDegenerateTableMeta no rows") { val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE") withResource(GpuColumnVector.emptyBatch(schema)) { batch => - val meta = MetaUtils.buildDegenerateTableMeta(9, batch) + val meta = MetaUtils.buildDegenerateTableMeta(batch) assertResult(null)(meta.bufferMeta) assertResult(0)(meta.rowCount) assertResult(3)(meta.columnMetasLength) @@ -127,6 +127,33 @@ class MetaUtilsSuite extends FunSuite with Arm { } } + test("buildDegenerateTableMeta no rows compressed table") { + val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE") + withResource(GpuColumnVector.emptyBatch(schema)) { uncompressedBatch => + val uncompressedMeta = MetaUtils.buildDegenerateTableMeta(uncompressedBatch) + withResource(DeviceMemoryBuffer.allocate(0)) { buffer => + val compressedTable = CompressedTable(0, uncompressedMeta, buffer) + withResource(GpuCompressedColumnVector.from(compressedTable)) { batch => + val meta = MetaUtils.buildDegenerateTableMeta(batch) + assertResult(null)(meta.bufferMeta) + assertResult(0)(meta.rowCount) + assertResult(3)(meta.columnMetasLength) + (0 until meta.columnMetasLength).foreach { i => + val columnMeta = meta.columnMetas(i) + assertResult(0)(columnMeta.nullCount) + assertResult(0)(columnMeta.rowCount) + val expectedType = uncompressedBatch.column(i).asInstanceOf[GpuColumnVector] + .getBase.getType + assertResult(expectedType.getNativeId)(columnMeta.dtype) + assertResult(null)(columnMeta.data) + assertResult(null)(columnMeta.validity) + assertResult(null)(columnMeta.offsets) + } + } + } + } + } + test("getBatchFromMeta") { withResource(buildContiguousTable()) { contigTable => val table = contigTable.getTable diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala index bcc32b00a17..92cae53351a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala @@ -189,17 +189,12 @@ class RapidsDiskStoreSuite extends FunSuite with BeforeAndAfterEach with Arm wit devStore: RapidsDeviceMemoryStore, bufferId: RapidsBufferId, spillPriority: Long): Long = { - val ct = buildContiguousTable() - val bufferSize = ct.getBuffer.getLength - try { + closeOnExcept(buildContiguousTable()) { ct => + val bufferSize = ct.getBuffer.getLength // store takes ownership of the table devStore.addTable(bufferId, ct.getTable, ct.getBuffer, spillPriority) - } catch { - case t: Throwable => - ct.close() - throw t + bufferSize } - bufferSize } case class MockRapidsBufferId( diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala b/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala index 5514679eeff..4be02c8ccdd 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala @@ -17,9 +17,8 @@ package com.nvidia.spark.rapids import java.io.File -import java.nio.ByteBuffer -import ai.rapids.cudf.{BufferType, ColumnVector, DType, HostColumnVector, Table} +import ai.rapids.cudf.{ColumnVector, DType, Table} import org.scalatest.Assertions import org.apache.spark.SparkConf diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala index 994e9aa6649..35c836555fc 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala @@ -76,8 +76,8 @@ object RapidsShuffleTestHelper extends MockitoSugar with Arm { MetaUtils.buildTableMeta(tableId, cols, tbl.getRowCount, contigTable.getBuffer) } - def buildDegenerateMockTableMeta(tableId: Int): TableMeta = { - MetaUtils.buildDegenerateTableMeta(tableId, new ColumnarBatch(Array.empty, 123)) + def buildDegenerateMockTableMeta(): TableMeta = { + MetaUtils.buildDegenerateTableMeta(new ColumnarBatch(Array.empty, 123)) } def withMockContiguousTable[T](numRows: Long)(body: ContiguousTable => T): T = { @@ -113,7 +113,7 @@ object RapidsShuffleTestHelper extends MockitoSugar with Arm { numRows: Long, numBatches: Int, maximumResponseSize: Long = 10000): Seq[TableMeta] = { - val tableMetas = (0 until numBatches).map(b => buildDegenerateMockTableMeta(b)) + val tableMetas = (0 until numBatches).map(b => buildDegenerateMockTableMeta()) val res = ShuffleMetadata.buildMetaResponse(tableMetas, maximumResponseSize) val refCountedRes = new RefCountedDirectByteBuffer(res) when(mockTransport.getMetaBuffer(any())).thenReturn(refCountedRes)