Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error message after failed RMM shutdown #2080

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.sql.rapids.GpuShuffleEnv

sealed trait MemoryState
case object Initialized extends MemoryState
case object Uninitialized extends MemoryState
case object Uninitializable extends MemoryState
andygrove marked this conversation as resolved.
Show resolved Hide resolved


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space nit.

object GpuDeviceManager extends Logging {
// This config controls whether RMM/Pinned memory are initialized from the task
// or from the executor side plugin. The default is to initialize from the
Expand All @@ -43,7 +49,7 @@ object GpuDeviceManager extends Logging {
}

private val threadGpuInitialized = new ThreadLocal[Boolean]()
@volatile private var singletonMemoryInitialized: Boolean = false
@volatile private var singletonMemoryInitialized: MemoryState = Uninitialized
@volatile private var deviceId: Option[Int] = None

/**
Expand Down Expand Up @@ -127,9 +133,17 @@ object GpuDeviceManager extends Logging {
}

def shutdown(): Unit = synchronized {
RapidsBufferCatalog.close()
Rmm.shutdown()
singletonMemoryInitialized = false
try {
RapidsBufferCatalog.close()
Rmm.shutdown()
singletonMemoryInitialized = Uninitialized
} catch {
case e: Throwable =>
// flag as Uninitializable so that any future initialization attempts can throw
// a helpful error message
singletonMemoryInitialized = Uninitializable
throw e
}
andygrove marked this conversation as resolved.
Show resolved Hide resolved
}

def getResourcesFromTaskContext: Map[String, ResourceInformation] = {
Expand Down Expand Up @@ -283,15 +297,18 @@ object GpuDeviceManager extends Logging {
* @param rapidsConf the config to use.
*/
def initializeMemory(gpuId: Option[Int], rapidsConf: Option[RapidsConf] = None): Unit = {
if (singletonMemoryInitialized == false) {
if (singletonMemoryInitialized != Initialized) {
// Memory or memory related components that only need to be initialized once per executor.
// This synchronize prevents multiple tasks from trying to initialize these at the same time.
GpuDeviceManager.synchronized {
if (singletonMemoryInitialized == false) {
if (singletonMemoryInitialized == Uninitializable) {
throw new IllegalStateException(
"Cannot initialize memory due to previous shutdown failing")
} else if (singletonMemoryInitialized == Uninitialized) {
val gpu = gpuId.getOrElse(findGpuAndAcquire())
initializeRmm(gpu, rapidsConf)
allocatePinnedMemory(gpu, rapidsConf)
singletonMemoryInitialized = true
singletonMemoryInitialized = Initialized
}
}
}
Expand Down