Skip to content

Commit

Permalink
Initial code changes to support spilling outside of shuffle (#722)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Sep 10, 2020
1 parent 393e5e9 commit 52e6399
Show file tree
Hide file tree
Showing 17 changed files with 295 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package com.nvidia.spark.rapids
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BufferType, NvtxColor, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.SpillPriorities.COALESCE_BATCH_ON_DECK_PRIORITY
import com.nvidia.spark.rapids.format.{ColumnMeta, SubBufferMeta, TableMeta}

import org.apache.spark.TaskContext
Expand All @@ -29,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.TempSpillBufferId
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -158,9 +161,28 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
peakDevMemory: SQLMetric,
opName: String) extends Iterator[ColumnarBatch] with Logging {
private val iter = new RemoveEmptyBatchIterator(origIter, numInputBatches)
private var onDeck: Option[ColumnarBatch] = None
private var batchInitialized: Boolean = false

/**
* Return true if there is something saved on deck for later processing.
*/
protected def hasOnDeck: Boolean

/**
* Save a batch for later processing.
*/
protected def saveOnDeck(batch: ColumnarBatch): Unit

/**
* If there is anything saved on deck close it.
*/
protected def clearOnDeck(): Unit

/**
* Remove whatever is on deck and return it.
*/
protected def popOnDeck(): ColumnarBatch

/** We need to track the sizes of string columns to make sure we don't exceed 2GB */
private val stringFieldIndices: Array[Int] = schema.fields.zipWithIndex
.filter(_._1.dataType == DataTypes.StringType)
Expand All @@ -172,9 +194,9 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
// note that TaskContext.get() can return null during unit testing so we wrap it in an
// option here
Option(TaskContext.get())
.foreach(_.addTaskCompletionListener[Unit](_ => onDeck.foreach(_.close())))
.foreach(_.addTaskCompletionListener[Unit]( _ => clearOnDeck()))

override def hasNext: Boolean = onDeck.isDefined || iter.hasNext
override def hasNext: Boolean = hasOnDeck || iter.hasNext

/**
* Called first to initialize any state needed for a new batch to be created.
Expand Down Expand Up @@ -251,10 +273,9 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
var numBatches = 0

// check if there is a batch "on deck" from a previous call to next()
if (onDeck.isDefined) {
val batch = onDeck.get
if (hasOnDeck) {
val batch = popOnDeck()
addBatch(batch)
onDeck = None
numBatches += 1
numRows += batch.numRows()
columnSizes = getColumnSizes(batch)
Expand All @@ -265,7 +286,7 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
try {

// there is a hard limit of 2^31 rows
while (numRows < Int.MaxValue && onDeck.isEmpty && iter.hasNext) {
while (numRows < Int.MaxValue && !hasOnDeck && iter.hasNext) {

val cb = iter.next()
val nextRows = cb.numRows()
Expand Down Expand Up @@ -300,19 +321,19 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
s" but cuDF only supports ${Int.MaxValue} rows. At least $wouldBeRows are in" +
s" this partition. Please try increasing your partition count.")
}
onDeck = Some(cb)
saveOnDeck(cb)
} else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) {
onDeck = Some(cb)
saveOnDeck(cb)
} else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) {
onDeck = Some(cb)
saveOnDeck(cb)
} else if (wouldBeStringColumnSizes.exists(size => size > Int.MaxValue)) {
if (goal == RequireSingleBatch) {
throw new IllegalStateException("A single batch is required for this operation," +
s" but cuDF only supports ${Int.MaxValue} bytes in a single string column." +
s" At least ${wouldBeStringColumnSizes.max} are in a single column in this" +
s" partition. Please try increasing your partition count.")
}
onDeck = Some(cb)
saveOnDeck(cb)
} else {
addBatch(cb)
numBatches += 1
Expand All @@ -327,7 +348,7 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
}

// enforce single batch limit when appropriate
if (goal == RequireSingleBatch && (onDeck.isDefined || iter.hasNext)) {
if (goal == RequireSingleBatch && (hasOnDeck || iter.hasNext)) {
throw new IllegalStateException("A single batch is required for this operation." +
" Please try increasing your partition count.")
}
Expand Down Expand Up @@ -500,6 +521,59 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch],
peakDevMemory.set(maxDeviceMemory)
batches.foreach(_.close())
}

private var onDeck: Option[TempSpillBufferId] = None

override protected def hasOnDeck: Boolean = onDeck.isDefined

override protected def saveOnDeck(batch: ColumnarBatch): Unit = {
assert(onDeck.isEmpty)
val id = TempSpillBufferId()
val priority = COALESCE_BATCH_ON_DECK_PRIORITY
val numColumns = batch.numCols()

if (numColumns > 0 && batch.column(0).isInstanceOf[GpuCompressedColumnVector]) {
val cv = batch.column(0).asInstanceOf[GpuCompressedColumnVector]
RapidsBufferCatalog.addBuffer(id, cv.getBuffer, cv.getTableMeta, priority)
} else if (numColumns > 0 &&
(0 until numColumns)
.forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) {
val cv = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer]
withResource(GpuColumnVector.from(batch)) { table =>
RapidsBufferCatalog.addTable(id, table, cv.getBuffer, priority)
}
} else {
withResource(batch) { batch =>
withResource(GpuColumnVector.from(batch)) { tmpTable =>
val contigTables = tmpTable.contiguousSplit(batch.numRows())
val tab = contigTables.head
contigTables.tail.safeClose()
RapidsBufferCatalog.addTable(id, tab.getTable, tab.getBuffer, priority)
}
}
}

onDeck = Some(id)
}

override protected def clearOnDeck(): Unit = {
onDeck.foreach { id =>
withResource(RapidsBufferCatalog.acquireBuffer(id)) { rapidsBuffer =>
rapidsBuffer.free()
}
}
onDeck = None
}

override protected def popOnDeck(): ColumnarBatch = {
val id = onDeck.get
val ret = withResource(RapidsBufferCatalog.acquireBuffer(id)) { rapidsBuffer =>
rapidsBuffer.free()
rapidsBuffer.getColumnarBatch
}
onDeck = None
ret
}
}

case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ object GpuDeviceManager extends Logging {
try {
Cuda.setDevice(gpuId)
Rmm.initialize(init, logConf, initialAllocation, maxAllocation)
GpuShuffleEnv.init(conf, info)
RapidsBufferCatalog.init(conf)
GpuShuffleEnv.init(conf)
} catch {
case e: Exception => logError("Could not initialize RMM", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
totalRows = 0
peakDevMemory.set(maxDeviceMemory)
}

private var onDeck: Option[ColumnarBatch] = None

override protected def hasOnDeck: Boolean = onDeck.isDefined
override protected def saveOnDeck(batch: ColumnarBatch): Unit = onDeck = Some(batch)
override protected def clearOnDeck(): Unit = {
onDeck.foreach(_.close())
onDeck = None
}
override protected def popOnDeck(): ColumnarBatch = {
val ret = onDeck.get
onDeck = None
ret
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package com.nvidia.spark.rapids
import java.util.concurrent.ConcurrentHashMap
import java.util.function.BiFunction

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{DeviceMemoryBuffer, Rmm, Table}
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.RapidsDiskBlockManager

/** Catalog for lookup of buffers by ID */
/**
* Catalog for lookup of buffers by ID. The constructor is only visible for testing, generally
* `RapidsBufferCatalog.singleton` should be used instead.
*/
class RapidsBufferCatalog extends Logging {
/** Map of buffer IDs to buffers */
private[this] val bufferMap = new ConcurrentHashMap[RapidsBufferId, RapidsBuffer]
Expand Down Expand Up @@ -99,6 +103,95 @@ class RapidsBufferCatalog extends Logging {
}
}

object RapidsBufferCatalog {
object RapidsBufferCatalog extends Logging {
private val MAX_BUFFER_LOOKUP_ATTEMPTS = 100
}

val singleton = new RapidsBufferCatalog
private var deviceStorage: RapidsDeviceMemoryStore = _
private var hostStorage: RapidsHostMemoryStore = _
private var diskStorage: RapidsDiskStore = _
private var memoryEventHandler: DeviceMemoryEventHandler = _

private lazy val conf = SparkEnv.get.conf

def init(rapidsConf: RapidsConf): Unit = {
// We are going to re-initialize so make sure all of the old things were closed...
closeImpl()
assert(memoryEventHandler == null)
deviceStorage = new RapidsDeviceMemoryStore()
hostStorage = new RapidsHostMemoryStore(rapidsConf.hostSpillStorageSize)
val diskBlockManager = new RapidsDiskBlockManager(conf)
diskStorage = new RapidsDiskStore(diskBlockManager)
deviceStorage.setSpillStore(hostStorage)
hostStorage.setSpillStore(diskStorage)

logInfo("Installing GPU memory handler for spill")
memoryEventHandler = new DeviceMemoryEventHandler(deviceStorage)
Rmm.setEventHandler(memoryEventHandler)
}

def close(): Unit = {
logInfo("Closing storage")
closeImpl()
}

private def closeImpl(): Unit = {
if (memoryEventHandler != null) {
// Workaround for shutdown ordering problems where device buffers allocated with this handler
// are being freed after the handler is destroyed
//Rmm.clearEventHandler()
memoryEventHandler = null
}

if (deviceStorage != null) {
deviceStorage.close()
deviceStorage = null
}
if (hostStorage != null) {
hostStorage.close()
hostStorage = null
}
if (diskStorage != null) {
diskStorage.close()
diskStorage = null
}
}

def getDeviceStorage: RapidsDeviceMemoryStore = deviceStorage

/**
* Adds a contiguous table to the device storage, taking ownership of the table.
* @param id buffer ID to associate with this buffer
* @param table cudf table based from the contiguous buffer
* @param contigBuffer device memory buffer backing the table
* @param initialSpillPriority starting spill priority value for the buffer
*/
def addTable(
id: RapidsBufferId,
table: Table,
contigBuffer: DeviceMemoryBuffer,
initialSpillPriority: Long): Unit =
deviceStorage.addTable(id, table, contigBuffer, initialSpillPriority)

/**
* Adds a buffer to the device storage, taking ownership of the buffer.
* @param id buffer ID to associate with this buffer
* @param buffer buffer that will be owned by the store
* @param tableMeta metadata describing the buffer layout
* @param initialSpillPriority starting spill priority value for the buffer
*/
def addBuffer(
id: RapidsBufferId,
buffer: DeviceMemoryBuffer,
tableMeta: TableMeta,
initialSpillPriority: Long): Unit =
deviceStorage.addBuffer(id, buffer, tableMeta, initialSpillPriority)

/**
* Lookup the buffer that corresponds to the specified buffer ID and acquire it.
* NOTE: It is the responsibility of the caller to close the buffer.
* @param id buffer identifier
* @return buffer that has been acquired
*/
def acquireBuffer(id: RapidsBufferId): RapidsBuffer = singleton.acquireBuffer(id)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ object RapidsBufferStore {
*/
abstract class RapidsBufferStore(
val name: String,
catalog: RapidsBufferCatalog) extends AutoCloseable with Logging {
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton)
extends AutoCloseable with Logging {

private class BufferTracker {
private[this] val comparator: Comparator[RapidsBufferBase] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* Buffer storage using device memory.
* @param catalog catalog to register this store
*/
class RapidsDeviceMemoryStore(
catalog: RapidsBufferCatalog) extends RapidsBufferStore("GPU", catalog) {
class RapidsDeviceMemoryStore(catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton)
extends RapidsBufferStore("GPU", catalog) {
override protected def createBuffer(
other: RapidsBuffer,
stream: Cuda.Stream): RapidsBufferBase = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import org.apache.spark.sql.rapids.RapidsDiskBlockManager

/** A buffer store using files on the local disks. */
class RapidsDiskStore(
catalog: RapidsBufferCatalog,
diskBlockManager: RapidsDiskBlockManager) extends RapidsBufferStore("disk", catalog) {
diskBlockManager: RapidsDiskBlockManager,
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton)
extends RapidsBufferStore("disk", catalog) {
private[this] val sharedBufferFiles = new ConcurrentHashMap[RapidsBufferId, File]

override def createBuffer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import com.nvidia.spark.rapids.format.TableMeta
* @param maxSize maximum size in bytes for all buffers in this store
*/
class RapidsHostMemoryStore(
catalog: RapidsBufferCatalog,
maxSize: Long) extends RapidsBufferStore("host", catalog) {
maxSize: Long,
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton)
extends RapidsBufferStore("host", catalog) {
private[this] val pool = HostMemoryBuffer.allocate(maxSize, false)
private[this] val addressAllocator = new AddressSpaceAllocator(maxSize)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,16 @@ object SpillPriorities {

/**
* Priorities for buffers received from shuffle.
* Shuffle input buffers are about to be read by a task, so only spill
* them if there's no other choice.
* Shuffle input buffers are about to be read by a task, so spill
* them if there's no other choice, but leave some space at the end of the priority range
* so there can be some things after it.
*/
// TODO: Should these be ordered amongst themselves? Maybe consider buffer size?
val INPUT_FROM_SHUFFLE_PRIORITY: Long = Long.MaxValue
val INPUT_FROM_SHUFFLE_PRIORITY: Long = Long.MaxValue - 1000

/**
* Priority for buffers in coalesce batch that did not fit into the batch we are working on.
* Most of the time this is shuffle input data that we read early so it should be slightly higher
* priority to keep around than other input shuffle buffers.
*/
val COALESCE_BATCH_ON_DECK_PRIORITY: Long = INPUT_FROM_SHUFFLE_PRIORITY + 1
}
Loading

0 comments on commit 52e6399

Please sign in to comment.