Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add framework for batch compression of shuffle partitions #487

Merged
merged 13 commits into from
Aug 11, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,18 @@ object MetaUtils extends Arm {

/**
* Build a TableMeta message for a degenerate table (zero columns or rows)
* @param tableId the ID to use for this table
* @param batch the degenerate columnar batch
* @return heap-based flatbuffer message
*/
def buildDegenerateTableMeta(tableId: Int, batch: ColumnarBatch): TableMeta = {
def buildDegenerateTableMeta(batch: ColumnarBatch): TableMeta = {
require(batch.numRows == 0 || batch.numCols == 0, "batch not degenerate")
if (batch.numCols > 0) {
abellina marked this conversation as resolved.
Show resolved Hide resolved
batch.column(0) match {
case c: GpuCompressedColumnVector =>
return c.getTableMeta
case _ =>
}
}
val fbb = new FlatBufferBuilder(1024)
val columnMetaOffset = if (batch.numCols > 0) {
val columns = GpuColumnVector.extractBases(batch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,24 @@ class RapidsCachingWriter[K, V](
val batch = p._2.asInstanceOf[ColumnarBatch]
logDebug(s"Caching shuffle_id=${handle.shuffleId} map_id=$mapId, partId=$partId, "
+ s"batch=[num_cols=${batch.numCols()}, num_rows=${batch.numRows()}]")
val partSize = GpuColumnVector.extractBases(batch).map(_.getDeviceMemorySize).sum
recordsWritten = recordsWritten + batch.numRows()
bytesWritten = bytesWritten + partSize
sizes(partId) += partSize
var partSize: Long = 0
val blockId = ShuffleBlockId(handle.shuffleId, mapId, partId)
val bufferId = catalog.nextShuffleBufferId(blockId)
if (batch.numRows > 0 && batch.numCols > 0) {
// Add the table to the shuffle store
batch.column(0) match {
case c: GpuColumnVectorFromBuffer =>
val buffer = c.getBuffer.slice(0, c.getBuffer.getLength)
partSize = buffer.getLength
shuffleStorage.addTable(
bufferId,
GpuColumnVector.from(batch),
buffer,
SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY)
case c: GpuCompressedColumnVector =>
val buffer = c.getBuffer.slice(0, c.getBuffer.getLength)
partSize = buffer.getLength
val tableMeta = c.getTableMeta
// update the table metadata for the buffer ID generated above
tableMeta.bufferMeta.mutateId(bufferId.tableId)
Expand All @@ -123,9 +123,11 @@ class RapidsCachingWriter[K, V](
SpillPriorities.OUTPUT_FOR_SHUFFLE_INITIAL_PRIORITY)
case c => throw new IllegalStateException(s"Unexpected column type: ${c.getClass}")
}
bytesWritten += partSize
sizes(partId) += partSize
} else {
// no device data, tracking only metadata
val tableMeta = MetaUtils.buildDegenerateTableMeta(bufferId.tableId, batch)
val tableMeta = MetaUtils.buildDegenerateTableMeta(batch)
catalog.registerNewBuffer(new DegenerateRapidsBuffer(bufferId, tableMeta))

// The size of the data is really only used to tell if the data should be shuffled or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class GpuPartitioningSuite extends FunSuite with Arm {
val conf = new SparkConf().set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "false")
TestUtils.withGpuSparkSession(conf) { _ =>
GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo())
val partitionIndices = Array(0, 2)
val partitionIndices = Array(0, 2, 2)
val gp = new GpuPartitioning {
override val numPartitions: Int = partitionIndices.length
}
Expand Down Expand Up @@ -99,11 +99,11 @@ class GpuPartitioningSuite extends FunSuite with Arm {
.set(RapidsConf.SHUFFLE_COMPRESSION_ENABLED.key, "true")
.set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "copy")
TestUtils.withGpuSparkSession(conf) { _ =>
GpuShuffleEnv.init(Cuda.memGetInfo())
GpuShuffleEnv.init(new RapidsConf(conf), Cuda.memGetInfo())
val spillPriority = 7L
val catalog = new RapidsBufferCatalog
withResource(new RapidsDeviceMemoryStore(catalog)) { deviceStore =>
val partitionIndices = Array(0, 2)
val partitionIndices = Array(0, 2, 2)
val gp = new GpuPartitioning {
override val numPartitions: Int = partitionIndices.length
}
Expand Down
33 changes: 30 additions & 3 deletions tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{BufferType, ContiguousTable, Table}
import ai.rapids.cudf.{BufferType, ContiguousTable, DeviceMemoryBuffer, Table}
import com.nvidia.spark.rapids.format.{CodecType, ColumnMeta}
import org.scalatest.FunSuite

Expand Down Expand Up @@ -101,7 +101,7 @@ class MetaUtilsSuite extends FunSuite with Arm {

test("buildDegenerateTableMeta no columns") {
val degenerateBatch = new ColumnarBatch(Array(), 127)
val meta = MetaUtils.buildDegenerateTableMeta(8, degenerateBatch)
val meta = MetaUtils.buildDegenerateTableMeta(degenerateBatch)
assertResult(null)(meta.bufferMeta)
assertResult(0)(meta.columnMetasLength)
assertResult(127)(meta.rowCount)
Expand All @@ -110,7 +110,7 @@ class MetaUtilsSuite extends FunSuite with Arm {
test("buildDegenerateTableMeta no rows") {
val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE")
withResource(GpuColumnVector.emptyBatch(schema)) { batch =>
val meta = MetaUtils.buildDegenerateTableMeta(9, batch)
val meta = MetaUtils.buildDegenerateTableMeta(batch)
assertResult(null)(meta.bufferMeta)
assertResult(0)(meta.rowCount)
assertResult(3)(meta.columnMetasLength)
Expand All @@ -127,6 +127,33 @@ class MetaUtilsSuite extends FunSuite with Arm {
}
}

test("buildDegenerateTableMeta no rows compressed table") {
val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE")
withResource(GpuColumnVector.emptyBatch(schema)) { uncompressedBatch =>
val uncompressedMeta = MetaUtils.buildDegenerateTableMeta(uncompressedBatch)
withResource(DeviceMemoryBuffer.allocate(0)) { buffer =>
val compressedTable = CompressedTable(0, uncompressedMeta, buffer)
withResource(GpuCompressedColumnVector.from(compressedTable)) { batch =>
val meta = MetaUtils.buildDegenerateTableMeta(batch)
assertResult(null)(meta.bufferMeta)
assertResult(0)(meta.rowCount)
assertResult(3)(meta.columnMetasLength)
(0 until meta.columnMetasLength).foreach { i =>
val columnMeta = meta.columnMetas(i)
assertResult(0)(columnMeta.nullCount)
assertResult(0)(columnMeta.rowCount)
val expectedType = uncompressedBatch.column(i).asInstanceOf[GpuColumnVector]
.getBase.getType
assertResult(expectedType.getNativeId)(columnMeta.dtype)
assertResult(null)(columnMeta.data)
assertResult(null)(columnMeta.validity)
assertResult(null)(columnMeta.offsets)
}
}
}
}
}

test("getBatchFromMeta") {
withResource(buildContiguousTable()) { contigTable =>
val table = contigTable.getTable
Expand Down