Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error message after failed RMM shutdown #2080

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.sql.rapids.GpuShuffleEnv

sealed trait MemoryState
private case object Initialized extends MemoryState
private case object Uninitialized extends MemoryState
private case object Errored extends MemoryState

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space nit.

object GpuDeviceManager extends Logging {
// This config controls whether RMM/Pinned memory are initialized from the task
// or from the executor side plugin. The default is to initialize from the
Expand All @@ -43,7 +48,7 @@ object GpuDeviceManager extends Logging {
}

private val threadGpuInitialized = new ThreadLocal[Boolean]()
@volatile private var singletonMemoryInitialized: Boolean = false
@volatile private var singletonMemoryInitialized: MemoryState = Uninitialized
@volatile private var deviceId: Option[Int] = None

/**
Expand Down Expand Up @@ -127,9 +132,11 @@ object GpuDeviceManager extends Logging {
}

def shutdown(): Unit = synchronized {
// assume error during shutdown until we complete it
singletonMemoryInitialized = Errored
RapidsBufferCatalog.close()
Rmm.shutdown()
singletonMemoryInitialized = false
singletonMemoryInitialized = Uninitialized
}

def getResourcesFromTaskContext: Map[String, ResourceInformation] = {
Expand Down Expand Up @@ -283,15 +290,18 @@ object GpuDeviceManager extends Logging {
* @param rapidsConf the config to use.
*/
def initializeMemory(gpuId: Option[Int], rapidsConf: Option[RapidsConf] = None): Unit = {
if (singletonMemoryInitialized == false) {
if (singletonMemoryInitialized != Initialized) {
// Memory or memory related components that only need to be initialized once per executor.
// This synchronize prevents multiple tasks from trying to initialize these at the same time.
GpuDeviceManager.synchronized {
if (singletonMemoryInitialized == false) {
if (singletonMemoryInitialized == Errored) {
throw new IllegalStateException(
"Cannot initialize memory due to previous shutdown failing")
} else if (singletonMemoryInitialized == Uninitialized) {
val gpu = gpuId.getOrElse(findGpuAndAcquire())
initializeRmm(gpu, rapidsConf)
allocatePinnedMemory(gpu, rapidsConf)
singletonMemoryInitialized = true
singletonMemoryInitialized = Initialized
}
}
}
Expand Down