Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused async buffer spill support #503

Merged
merged 1 commit into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ Name | Description | Default Value
<a name="memory.gpu.allocFraction"></a>spark.rapids.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory. Extra memory will be allocated as needed, but it may result in more fragmentation.|0.9
<a name="memory.gpu.debug"></a>spark.rapids.memory.gpu.debug|Provides a log of GPU memory allocations and frees. If set to STDOUT or STDERR the logging will go there. Setting it to NONE disables logging. All other values are reserved for possible future expansion and in the mean time will disable logging.|NONE
<a name="memory.gpu.pooling.enabled"></a>spark.rapids.memory.gpu.pooling.enabled|Should RMM act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly.|true
<a name="memory.gpu.spillAsyncStart"></a>spark.rapids.memory.gpu.spillAsyncStart|Fraction of device memory utilization at which data will start spilling asynchronously to free up device memory|0.9
<a name="memory.gpu.spillAsyncStop"></a>spark.rapids.memory.gpu.spillAsyncStop|Fraction of device memory utilization at which data will stop spilling asynchronously to free up device memory|0.8
<a name="memory.host.spillStorageSize"></a>spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824
<a name="memory.pinnedPool.size"></a>spark.rapids.memory.pinnedPool.size|The size of the pinned memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|0
<a name="memory.uvm.enabled"></a>spark.rapids.memory.uvm.enabled|UVM or universal memory can allow main host memory to act essentially as swap for device(GPU) memory. This allows the GPU to process more data than fits in memory, but can result in slower processing. This is an experimental feature.|false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,16 @@

package com.nvidia.spark.rapids

import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import ai.rapids.cudf.{Cuda, NvtxColor, NvtxRange, RmmEventHandler}
import ai.rapids.cudf.{NvtxColor, NvtxRange, RmmEventHandler}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.execution.TrampolineUtil.bytesToString

object DeviceMemoryEventHandler {
private val EXHAUSTED_POLL_INTERVAL_MSEC = 10
}

/**
* RMM event handler to trigger spilling from the device memory store.
* @param store device memory store that will be triggered to spill
* @param startSpillThreshold memory allocation threshold that will trigger spill to start
* @param endSpillThreshold memory allocation threshold that will stop spilling
*/
class DeviceMemoryEventHandler(
store: RapidsDeviceMemoryStore,
startSpillThreshold: Long,
endSpillThreshold: Long) extends RmmEventHandler with Logging with AutoCloseable {

private[this] val threadFactory: ThreadFactory = GpuDeviceManager.wrapThreadFactory(
new ThreadFactory() {
private[this] val factory = Executors.defaultThreadFactory()

override def newThread(runnable: Runnable): Thread = {
val t = factory.newThread(runnable)
t.setName("device store spill")
t.setDaemon(true)
t
}
})

private[this] val executor = Executors.newSingleThreadExecutor(threadFactory)

private[this] val spillStream = new Cuda.Stream(true)

private[this] val asyncSpillActive = new AtomicBoolean(false)
class DeviceMemoryEventHandler(store: RapidsDeviceMemoryStore)
extends RmmEventHandler with Logging {

/**
* Handles RMM allocation failures by spilling buffers from device memory.
Expand Down Expand Up @@ -88,90 +58,13 @@ class DeviceMemoryEventHandler(
}
}

override def getAllocThresholds: Array[Long] = Array(startSpillThreshold)
override def getAllocThresholds: Array[Long] = null

override def getDeallocThresholds: Array[Long] = Array(endSpillThreshold)
override def getDeallocThresholds: Array[Long] = null

override def onAllocThreshold(totalAllocated: Long): Unit = {
try {
if (asyncSpillActive.compareAndSet(false, true)) {
logInfo(s"Asynchronous spill start triggered, total device memory " +
s"allocated=${bytesToString(totalAllocated)}")

// TODO: Because this is copying the buffer on a separate stream it needs to synchronize
// with the other stream that created the buffer, otherwise we could end up copying
// a buffer that is not done being computed. Plan is to use events for this, but
// for now just turning this off until we have a fix.
logWarning("ASYNCHRONOUS SPILL DISABLED DUE TO MISSING SYNCHRONIZATION")
// executor.submit(new Runnable() {
// override def run(): Unit = {
// try {
// asyncSpill()
// } catch {
// case t: Throwable => logError("Error during asynchronous spill", t)
// }
// }
// })
}
} catch {
case t: Throwable => logError("Error handling allocation threshold callback", t)
}
}

override def onDeallocThreshold(totalAllocated: Long): Unit = {
try {
if (asyncSpillActive.compareAndSet(true, false)) {
logInfo(s"Asynchronous spill stop triggered, total device memory " +
s"allocated=${bytesToString(totalAllocated)}")
}
} catch {
case t: Throwable => logError("Error handling deallocation threshold callback", t)
}
}

/**
* Spill device memory asynchronously using non-blocking CUDA streams
* to avoid contending with activity on the default stream as much
* as possible.
*/
private def asyncSpill(): Unit = {
val nvtx = new NvtxRange("spill", NvtxColor.ORANGE)
try {
var loggedSpillExhausted = false
logInfo(s"Starting async device spill, device store " +
s"size=${bytesToString(store.currentSize)}")
var spillActive = asyncSpillActive.get

while (spillActive) {
if (!store.asyncSpillSingleBuffer(endSpillThreshold, spillStream)) {
if (!loggedSpillExhausted) {
logInfo(s"Device store has no more spillable buffers, resorting to polling")
loggedSpillExhausted = true
}
val nvtx = new NvtxRange("sleeping", NvtxColor.PURPLE)
try {
Thread.sleep(DeviceMemoryEventHandler.EXHAUSTED_POLL_INTERVAL_MSEC)
} catch {
case _: InterruptedException =>
logInfo(s"Async device spill polling interrupted")
spillActive = false
} finally {
nvtx.close()
}
}
spillActive = asyncSpillActive.get
}

logInfo(s"Async device spill complete, device store " +
s"size=${bytesToString(store.currentSize)}")
} finally {
nvtx.close()
}
}

override def close(): Unit = {
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
spillStream.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,13 @@ abstract class RapidsBufferStore(
def getTotalBytes: Long = synchronized { totalBytesStored }
}

private[this] val spilledBytesStored = new AtomicLong(0L)
private[this] val pendingFreeBytes = new AtomicLong(0L)

private[this] val buffers = new BufferTracker

/** Tracks buffers that are waiting on outstanding references to be freed. */
private[this] val pendingFreeBuffers = new ConcurrentHashMap[RapidsBufferId, RapidsBufferBase]

/** Tracks buffers that have been spilled but remain in this store. */
private[this] val spilledBuffers = new ConcurrentHashMap[RapidsBufferId, RapidsBufferBase]

/** A monitor that can be used to wait for memory to be freed from this store. */
protected[this] val memoryFreedMonitor = new Object

Expand Down Expand Up @@ -156,9 +152,7 @@ abstract class RapidsBufferStore(
var waited = false
var exhausted = false
while (!exhausted && buffers.getTotalBytes > targetTotalSize) {
if (freeSpilledBuffers(targetTotalSize)) {
waited = false
} else if (trySpillAndFreeBuffer(stream)) {
if (trySpillAndFreeBuffer(stream)) {
waited = false
} else {
if (!waited && pendingFreeBytes.get > 0) {
Expand Down Expand Up @@ -187,50 +181,6 @@ abstract class RapidsBufferStore(
}
}

def asyncSpillSingleBuffer(targetUnspilledSize: Long, stream: Cuda.Stream): Boolean =
synchronized {
require(targetUnspilledSize >= 0)
val unspilledSize = buffers.getTotalBytes - spilledBytesStored.get
if (unspilledSize <= targetUnspilledSize) {
return false
}
val bufferToSpill = buffers.nextSpillableBuffer()
if (bufferToSpill == null) {
return false
}
// If unable to get a reference then its a race condition where the buffer was invalidated
// just as we were going to spill it. Either way, indicate to the caller that a spillable
// buffer was found and more may be available in subsequent invocations.
if (bufferToSpill.addReference()) {
val newBuffer = try {
logInfo(s"Async spilling $bufferToSpill to ${spillStore.name}")
spillStore.copyBuffer(bufferToSpill, stream)
} finally {
bufferToSpill.close()
}
if (newBuffer != null) {
bufferToSpill.markAsSpilled()
}
}
true
}

/**
* Free buffers that has already been spilled via asynchronous spill to
* reach the specified target store total size.
* @param targetTotalSize desired total size of the store
* @return true if at least one spilled buffer was found, false otherwise
*/
def freeSpilledBuffers(targetTotalSize: Long): Boolean = {
val it = spilledBuffers.values().iterator()
val result = it.hasNext
while (it.hasNext && buffers.getTotalBytes > targetTotalSize) {
val buffer = it.next()
buffer.free()
}
result
}

/**
* Create a new buffer from an existing buffer in another store.
* If the data transfer will be performed asynchronously, this method is responsible for
Expand Down Expand Up @@ -281,30 +231,6 @@ abstract class RapidsBufferStore(
}
}

/**
* Update the catalog for an already spilled buffer that was freed.
* Since the buffer was spilled, another store in the spill chain
* now contains the buffer that should be listed in the catalog,
* and this method finds which store in the chain has that buffer.
* @param tier the storage tier of the spilled buffer that was freed
* @param id the buffer ID of the spilled buffer that was freed
*/
@scala.annotation.tailrec
private def updateCatalog(tier: StorageTier, id: RapidsBufferId): Unit = {
val buffer = buffers.get(id)
if (buffer != null) {
catalog.updateBufferMap(tier, buffer)
} else {
// buffer is not in this store, try the next store
if (spillStore != null) {
spillStore.updateCatalog(tier, id)
} else {
// buffer may have been deleted in another thread
logDebug(s"Ignoring catalog update on unknown buffer $id")
}
}
}

/** Base class for all buffers in this store. */
abstract class RapidsBufferBase(
override val id: RapidsBufferId,
Expand All @@ -329,17 +255,6 @@ abstract class RapidsBufferStore(
refcount > 0
}

def markAsSpilled(): Unit = synchronized {
if (isValid) {
spilledBuffers.put(id, this)
spilledBytesStored.addAndGet(size)
} else {
// Spilled a buffer that was freed in the interim. The spill store
// needs to update the catalog or free if no longer in the catalog.
spillStore.updateCatalog(storageTier, id)
}
}

override def addReference(): Boolean = synchronized {
if (isValid) {
refcount += 1
Expand Down Expand Up @@ -393,12 +308,6 @@ abstract class RapidsBufferStore(
if (isValid) {
isValid = false
buffers.remove(id)
if (spilledBuffers.remove(id) != null) {
spillStore.updateCatalog(storageTier, id)
spilledBytesStored.addAndGet(-size)
logDebug(s"$name store freed pre-spilled buffer size=$size")
}

if (refcount == 0) {
freeBuffer()
} else {
Expand Down
18 changes: 0 additions & 18 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,20 +260,6 @@ object RapidsConf {
.checkValue(v => v >= 0 && v <= 1, "The fraction value must be in [0, 1].")
.createWithDefault(0.9)

val RMM_SPILL_ASYNC_START = conf("spark.rapids.memory.gpu.spillAsyncStart")
.doc("Fraction of device memory utilization at which data will start " +
"spilling asynchronously to free up device memory")
.doubleConf
.checkValue(v => v >= 0 && v <= 1, "The fraction value must be in [0, 1].")
.createWithDefault(0.9)

val RMM_SPILL_ASYNC_STOP = conf("spark.rapids.memory.gpu.spillAsyncStop")
.doc("Fraction of device memory utilization at which data will stop " +
"spilling asynchronously to free up device memory")
.doubleConf
.checkValue(v => v >= 0 && v <= 1, "The fraction value must be in [0, 1].")
.createWithDefault(0.8)

val HOST_SPILL_STORAGE_SIZE = conf("spark.rapids.memory.host.spillStorageSize")
.doc("Amount of off-heap host memory to use for buffering spilled GPU data " +
"before spilling to local disk")
Expand Down Expand Up @@ -781,10 +767,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val rmmAllocFraction: Double = get(RMM_ALLOC_FRACTION)

lazy val rmmSpillAsyncStart: Double = get(RMM_SPILL_ASYNC_START)

lazy val rmmSpillAsyncStop: Double = get(RMM_SPILL_ASYNC_STOP)

lazy val hostSpillStorageSize: Long = get(HOST_SPILL_STORAGE_SIZE)

lazy val hasNans: Boolean = get(HAS_NANS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,8 @@ class GpuShuffleEnv extends Logging {
deviceStorage.setSpillStore(hostStorage)
hostStorage.setSpillStore(diskStorage)

val spillStart = (devInfo.total * rapidsConf.rmmSpillAsyncStart).toLong
val spillStop = (devInfo.total * rapidsConf.rmmSpillAsyncStop).toLong
logInfo("Installing GPU memory handler to start spill at " +
s"${Utils.bytesToString(spillStart)} and stop at " +
s"${Utils.bytesToString(spillStop)}")
memoryEventHandler = new DeviceMemoryEventHandler(deviceStorage, spillStart, spillStop)
logInfo("Installing GPU memory handler for spill")
memoryEventHandler = new DeviceMemoryEventHandler(deviceStorage)
Rmm.setEventHandler(memoryEventHandler)

shuffleCatalog = new ShuffleBufferCatalog(catalog, diskBlockManager)
Expand Down