From 58ede660b0ea05a0ef9a504e17885758056def1b Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 3 Aug 2020 14:47:40 -0500 Subject: [PATCH] Remove unused async buffer spill support Signed-off-by: Jason Lowe --- docs/configs.md | 2 - .../rapids/DeviceMemoryEventHandler.scala | 117 +----------------- .../spark/rapids/RapidsBufferStore.scala | 93 +------------- .../com/nvidia/spark/rapids/RapidsConf.scala | 18 --- .../spark/sql/rapids/GpuShuffleEnv.scala | 8 +- 5 files changed, 8 insertions(+), 230 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 460c1173f47..a2ad8a3ebc8 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -32,8 +32,6 @@ Name | Description | Default Value spark.rapids.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory. Extra memory will be allocated as needed, but it may result in more fragmentation.|0.9 spark.rapids.memory.gpu.debug|Provides a log of GPU memory allocations and frees. If set to STDOUT or STDERR the logging will go there. Setting it to NONE disables logging. All other values are reserved for possible future expansion and in the mean time will disable logging.|NONE spark.rapids.memory.gpu.pooling.enabled|Should RMM act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly.|true -spark.rapids.memory.gpu.spillAsyncStart|Fraction of device memory utilization at which data will start spilling asynchronously to free up device memory|0.9 -spark.rapids.memory.gpu.spillAsyncStop|Fraction of device memory utilization at which data will stop spilling asynchronously to free up device memory|0.8 spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824 spark.rapids.memory.pinnedPool.size|The size of the pinned memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|0 spark.rapids.memory.uvm.enabled|UVM or universal memory can allow main host memory to act essentially as swap for device(GPU) memory. This allows the GPU to process more data than fits in memory, but can result in slower processing. This is an experimental feature.|false diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala index 3f921b9f8bb..2b934eb7bd5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala @@ -16,46 +16,16 @@ package com.nvidia.spark.rapids -import java.util.concurrent.{Executors, ThreadFactory, TimeUnit} -import java.util.concurrent.atomic.AtomicBoolean - -import ai.rapids.cudf.{Cuda, NvtxColor, NvtxRange, RmmEventHandler} +import ai.rapids.cudf.{NvtxColor, NvtxRange, RmmEventHandler} import org.apache.spark.internal.Logging -import org.apache.spark.sql.rapids.execution.TrampolineUtil.bytesToString - -object DeviceMemoryEventHandler { - private val EXHAUSTED_POLL_INTERVAL_MSEC = 10 -} /** * RMM event handler to trigger spilling from the device memory store. * @param store device memory store that will be triggered to spill - * @param startSpillThreshold memory allocation threshold that will trigger spill to start - * @param endSpillThreshold memory allocation threshold that will stop spilling */ -class DeviceMemoryEventHandler( - store: RapidsDeviceMemoryStore, - startSpillThreshold: Long, - endSpillThreshold: Long) extends RmmEventHandler with Logging with AutoCloseable { - - private[this] val threadFactory: ThreadFactory = GpuDeviceManager.wrapThreadFactory( - new ThreadFactory() { - private[this] val factory = Executors.defaultThreadFactory() - - override def newThread(runnable: Runnable): Thread = { - val t = factory.newThread(runnable) - t.setName("device store spill") - t.setDaemon(true) - t - } - }) - - private[this] val executor = Executors.newSingleThreadExecutor(threadFactory) - - private[this] val spillStream = new Cuda.Stream(true) - - private[this] val asyncSpillActive = new AtomicBoolean(false) +class DeviceMemoryEventHandler(store: RapidsDeviceMemoryStore) + extends RmmEventHandler with Logging { /** * Handles RMM allocation failures by spilling buffers from device memory. @@ -88,90 +58,13 @@ class DeviceMemoryEventHandler( } } - override def getAllocThresholds: Array[Long] = Array(startSpillThreshold) + override def getAllocThresholds: Array[Long] = null - override def getDeallocThresholds: Array[Long] = Array(endSpillThreshold) + override def getDeallocThresholds: Array[Long] = null override def onAllocThreshold(totalAllocated: Long): Unit = { - try { - if (asyncSpillActive.compareAndSet(false, true)) { - logInfo(s"Asynchronous spill start triggered, total device memory " + - s"allocated=${bytesToString(totalAllocated)}") - - // TODO: Because this is copying the buffer on a separate stream it needs to synchronize - // with the other stream that created the buffer, otherwise we could end up copying - // a buffer that is not done being computed. Plan is to use events for this, but - // for now just turning this off until we have a fix. - logWarning("ASYNCHRONOUS SPILL DISABLED DUE TO MISSING SYNCHRONIZATION") -// executor.submit(new Runnable() { -// override def run(): Unit = { -// try { -// asyncSpill() -// } catch { -// case t: Throwable => logError("Error during asynchronous spill", t) -// } -// } -// }) - } - } catch { - case t: Throwable => logError("Error handling allocation threshold callback", t) - } } override def onDeallocThreshold(totalAllocated: Long): Unit = { - try { - if (asyncSpillActive.compareAndSet(true, false)) { - logInfo(s"Asynchronous spill stop triggered, total device memory " + - s"allocated=${bytesToString(totalAllocated)}") - } - } catch { - case t: Throwable => logError("Error handling deallocation threshold callback", t) - } - } - - /** - * Spill device memory asynchronously using non-blocking CUDA streams - * to avoid contending with activity on the default stream as much - * as possible. - */ - private def asyncSpill(): Unit = { - val nvtx = new NvtxRange("spill", NvtxColor.ORANGE) - try { - var loggedSpillExhausted = false - logInfo(s"Starting async device spill, device store " + - s"size=${bytesToString(store.currentSize)}") - var spillActive = asyncSpillActive.get - - while (spillActive) { - if (!store.asyncSpillSingleBuffer(endSpillThreshold, spillStream)) { - if (!loggedSpillExhausted) { - logInfo(s"Device store has no more spillable buffers, resorting to polling") - loggedSpillExhausted = true - } - val nvtx = new NvtxRange("sleeping", NvtxColor.PURPLE) - try { - Thread.sleep(DeviceMemoryEventHandler.EXHAUSTED_POLL_INTERVAL_MSEC) - } catch { - case _: InterruptedException => - logInfo(s"Async device spill polling interrupted") - spillActive = false - } finally { - nvtx.close() - } - } - spillActive = asyncSpillActive.get - } - - logInfo(s"Async device spill complete, device store " + - s"size=${bytesToString(store.currentSize)}") - } finally { - nvtx.close() - } - } - - override def close(): Unit = { - executor.shutdown() - executor.awaitTermination(10, TimeUnit.SECONDS) - spillStream.close() } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index b78d680c01c..26eb486cba1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -87,7 +87,6 @@ abstract class RapidsBufferStore( def getTotalBytes: Long = synchronized { totalBytesStored } } - private[this] val spilledBytesStored = new AtomicLong(0L) private[this] val pendingFreeBytes = new AtomicLong(0L) private[this] val buffers = new BufferTracker @@ -95,9 +94,6 @@ abstract class RapidsBufferStore( /** Tracks buffers that are waiting on outstanding references to be freed. */ private[this] val pendingFreeBuffers = new ConcurrentHashMap[RapidsBufferId, RapidsBufferBase] - /** Tracks buffers that have been spilled but remain in this store. */ - private[this] val spilledBuffers = new ConcurrentHashMap[RapidsBufferId, RapidsBufferBase] - /** A monitor that can be used to wait for memory to be freed from this store. */ protected[this] val memoryFreedMonitor = new Object @@ -156,9 +152,7 @@ abstract class RapidsBufferStore( var waited = false var exhausted = false while (!exhausted && buffers.getTotalBytes > targetTotalSize) { - if (freeSpilledBuffers(targetTotalSize)) { - waited = false - } else if (trySpillAndFreeBuffer(stream)) { + if (trySpillAndFreeBuffer(stream)) { waited = false } else { if (!waited && pendingFreeBytes.get > 0) { @@ -187,50 +181,6 @@ abstract class RapidsBufferStore( } } - def asyncSpillSingleBuffer(targetUnspilledSize: Long, stream: Cuda.Stream): Boolean = - synchronized { - require(targetUnspilledSize >= 0) - val unspilledSize = buffers.getTotalBytes - spilledBytesStored.get - if (unspilledSize <= targetUnspilledSize) { - return false - } - val bufferToSpill = buffers.nextSpillableBuffer() - if (bufferToSpill == null) { - return false - } - // If unable to get a reference then its a race condition where the buffer was invalidated - // just as we were going to spill it. Either way, indicate to the caller that a spillable - // buffer was found and more may be available in subsequent invocations. - if (bufferToSpill.addReference()) { - val newBuffer = try { - logInfo(s"Async spilling $bufferToSpill to ${spillStore.name}") - spillStore.copyBuffer(bufferToSpill, stream) - } finally { - bufferToSpill.close() - } - if (newBuffer != null) { - bufferToSpill.markAsSpilled() - } - } - true - } - - /** - * Free buffers that has already been spilled via asynchronous spill to - * reach the specified target store total size. - * @param targetTotalSize desired total size of the store - * @return true if at least one spilled buffer was found, false otherwise - */ - def freeSpilledBuffers(targetTotalSize: Long): Boolean = { - val it = spilledBuffers.values().iterator() - val result = it.hasNext - while (it.hasNext && buffers.getTotalBytes > targetTotalSize) { - val buffer = it.next() - buffer.free() - } - result - } - /** * Create a new buffer from an existing buffer in another store. * If the data transfer will be performed asynchronously, this method is responsible for @@ -281,30 +231,6 @@ abstract class RapidsBufferStore( } } - /** - * Update the catalog for an already spilled buffer that was freed. - * Since the buffer was spilled, another store in the spill chain - * now contains the buffer that should be listed in the catalog, - * and this method finds which store in the chain has that buffer. - * @param tier the storage tier of the spilled buffer that was freed - * @param id the buffer ID of the spilled buffer that was freed - */ - @scala.annotation.tailrec - private def updateCatalog(tier: StorageTier, id: RapidsBufferId): Unit = { - val buffer = buffers.get(id) - if (buffer != null) { - catalog.updateBufferMap(tier, buffer) - } else { - // buffer is not in this store, try the next store - if (spillStore != null) { - spillStore.updateCatalog(tier, id) - } else { - // buffer may have been deleted in another thread - logDebug(s"Ignoring catalog update on unknown buffer $id") - } - } - } - /** Base class for all buffers in this store. */ abstract class RapidsBufferBase( override val id: RapidsBufferId, @@ -329,17 +255,6 @@ abstract class RapidsBufferStore( refcount > 0 } - def markAsSpilled(): Unit = synchronized { - if (isValid) { - spilledBuffers.put(id, this) - spilledBytesStored.addAndGet(size) - } else { - // Spilled a buffer that was freed in the interim. The spill store - // needs to update the catalog or free if no longer in the catalog. - spillStore.updateCatalog(storageTier, id) - } - } - override def addReference(): Boolean = synchronized { if (isValid) { refcount += 1 @@ -393,12 +308,6 @@ abstract class RapidsBufferStore( if (isValid) { isValid = false buffers.remove(id) - if (spilledBuffers.remove(id) != null) { - spillStore.updateCatalog(storageTier, id) - spilledBytesStored.addAndGet(-size) - logDebug(s"$name store freed pre-spilled buffer size=$size") - } - if (refcount == 0) { freeBuffer() } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index ac4b35eb826..23ac99394ca 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -260,20 +260,6 @@ object RapidsConf { .checkValue(v => v >= 0 && v <= 1, "The fraction value must be in [0, 1].") .createWithDefault(0.9) - val RMM_SPILL_ASYNC_START = conf("spark.rapids.memory.gpu.spillAsyncStart") - .doc("Fraction of device memory utilization at which data will start " + - "spilling asynchronously to free up device memory") - .doubleConf - .checkValue(v => v >= 0 && v <= 1, "The fraction value must be in [0, 1].") - .createWithDefault(0.9) - - val RMM_SPILL_ASYNC_STOP = conf("spark.rapids.memory.gpu.spillAsyncStop") - .doc("Fraction of device memory utilization at which data will stop " + - "spilling asynchronously to free up device memory") - .doubleConf - .checkValue(v => v >= 0 && v <= 1, "The fraction value must be in [0, 1].") - .createWithDefault(0.8) - val HOST_SPILL_STORAGE_SIZE = conf("spark.rapids.memory.host.spillStorageSize") .doc("Amount of off-heap host memory to use for buffering spilled GPU data " + "before spilling to local disk") @@ -781,10 +767,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val rmmAllocFraction: Double = get(RMM_ALLOC_FRACTION) - lazy val rmmSpillAsyncStart: Double = get(RMM_SPILL_ASYNC_START) - - lazy val rmmSpillAsyncStop: Double = get(RMM_SPILL_ASYNC_STOP) - lazy val hostSpillStorageSize: Long = get(HOST_SPILL_STORAGE_SIZE) lazy val hasNans: Boolean = get(HAS_NANS) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala index dd60daf0912..3d19330bc1e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala @@ -58,12 +58,8 @@ class GpuShuffleEnv extends Logging { deviceStorage.setSpillStore(hostStorage) hostStorage.setSpillStore(diskStorage) - val spillStart = (devInfo.total * rapidsConf.rmmSpillAsyncStart).toLong - val spillStop = (devInfo.total * rapidsConf.rmmSpillAsyncStop).toLong - logInfo("Installing GPU memory handler to start spill at " + - s"${Utils.bytesToString(spillStart)} and stop at " + - s"${Utils.bytesToString(spillStop)}") - memoryEventHandler = new DeviceMemoryEventHandler(deviceStorage, spillStart, spillStop) + logInfo("Installing GPU memory handler for spill") + memoryEventHandler = new DeviceMemoryEventHandler(deviceStorage) Rmm.setEventHandler(memoryEventHandler) shuffleCatalog = new ShuffleBufferCatalog(catalog, diskBlockManager)