Skip to content

Commit

Permalink
Improve error message after failed RMM shutdown (NVIDIA#2080)
Browse files Browse the repository at this point in the history
* Improve error message after failed RMM shutdown

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Remove blank line

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala

Co-authored-by: Jason Lowe <jlowe@nvidia.com>

* Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala

Co-authored-by: Jason Lowe <jlowe@nvidia.com>

* Rename Uninitializable to Errored

Signed-off-by: Andy Grove <andygrove@nvidia.com>

Co-authored-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
andygrove and jlowe authored Apr 6, 2021
1 parent 7421e8c commit 47f8aad
Showing 1 changed file with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.sql.rapids.GpuShuffleEnv

sealed trait MemoryState
private case object Initialized extends MemoryState
private case object Uninitialized extends MemoryState
private case object Errored extends MemoryState

object GpuDeviceManager extends Logging {
// This config controls whether RMM/Pinned memory are initialized from the task
// or from the executor side plugin. The default is to initialize from the
Expand All @@ -43,7 +48,7 @@ object GpuDeviceManager extends Logging {
}

private val threadGpuInitialized = new ThreadLocal[Boolean]()
@volatile private var singletonMemoryInitialized: Boolean = false
@volatile private var singletonMemoryInitialized: MemoryState = Uninitialized
@volatile private var deviceId: Option[Int] = None

/**
Expand Down Expand Up @@ -127,9 +132,11 @@ object GpuDeviceManager extends Logging {
}

def shutdown(): Unit = synchronized {
// assume error during shutdown until we complete it
singletonMemoryInitialized = Errored
RapidsBufferCatalog.close()
Rmm.shutdown()
singletonMemoryInitialized = false
singletonMemoryInitialized = Uninitialized
}

def getResourcesFromTaskContext: Map[String, ResourceInformation] = {
Expand Down Expand Up @@ -283,15 +290,18 @@ object GpuDeviceManager extends Logging {
* @param rapidsConf the config to use.
*/
def initializeMemory(gpuId: Option[Int], rapidsConf: Option[RapidsConf] = None): Unit = {
if (singletonMemoryInitialized == false) {
if (singletonMemoryInitialized != Initialized) {
// Memory or memory related components that only need to be initialized once per executor.
// This synchronize prevents multiple tasks from trying to initialize these at the same time.
GpuDeviceManager.synchronized {
if (singletonMemoryInitialized == false) {
if (singletonMemoryInitialized == Errored) {
throw new IllegalStateException(
"Cannot initialize memory due to previous shutdown failing")
} else if (singletonMemoryInitialized == Uninitialized) {
val gpu = gpuId.getOrElse(findGpuAndAcquire())
initializeRmm(gpu, rapidsConf)
allocatePinnedMemory(gpu, rapidsConf)
singletonMemoryInitialized = true
singletonMemoryInitialized = Initialized
}
}
}
Expand Down

0 comments on commit 47f8aad

Please sign in to comment.