Skip to content

Commit

Permalink
Cleanup shutdown logging for UCX shuffle (NVIDIA#1546)
Browse files Browse the repository at this point in the history
* Cleanup shutdown logging for UCX shuffle

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>

* Address code review comments

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
  • Loading branch information
abellina authored Jan 20, 2021
1 parent e93d075 commit 23814e4
Showing 1 changed file with 37 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,7 @@
package com.nvidia.spark.rapids.shuffle.ucx

import java.io._
import java.net.{InetSocketAddress, ServerSocket, Socket}
import java.net.{InetSocketAddress, ServerSocket, Socket, SocketException}
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicLong
Expand Down Expand Up @@ -69,7 +69,7 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose

private var worker: UcpWorker = _
private val endpoints = new ConcurrentHashMap[Long, UcpEndpoint]()
private var initialized = false
@volatile private var initialized = false

// a peer tag identifies an incoming connection uniquely
private val peerTag = new AtomicLong(0) // peer tags
Expand Down Expand Up @@ -148,17 +148,15 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose

while(initialized) {
try {
if (initialized) {
worker.progress()
// else worker.progress returned 0
if (usingWakeupFeature) {
drainWorker()
val sleepRange = new NvtxRange("UCX Sleeping", NvtxColor.PURPLE)
try {
worker.waitForEvents()
} finally {
sleepRange.close()
}
worker.progress()
// else worker.progress returned 0
if (usingWakeupFeature) {
drainWorker()
val sleepRange = new NvtxRange("UCX Sleeping", NvtxColor.PURPLE)
try {
worker.waitForEvents()
} finally {
sleepRange.close()
}
}

Expand Down Expand Up @@ -222,12 +220,18 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose
handleSocket(s)
})
} catch {
case e: Throwable =>
if (!initialized) {
logWarning(s"UCX management socket closing", e)
} else {
logError(s"Got exception while waiting for a UCX management connection", e)
}
case e: Throwable if initialized =>
// This will cause the `SparkUncaughtExceptionHandler` to get invoked
// and it will shut down the executor (as it should).
throw e
case _: SocketException if !initialized =>
// `initialized = false` means we are shutting down,
// the socket will throw `SocketException` in this case
// to unblock the accept, when `close()` is called.
logWarning(s"UCX management socket closing")
case ue: Throwable =>
// a catch-all in case we get a non `SocketException` while closing (!initialized)
logError(s"Unexpected exception while closing UCX management socket", ue)
}
}
})
Expand Down Expand Up @@ -474,22 +478,21 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose

def getNextTransactionId: Long = txId.incrementAndGet()

private[this] val shutdownMonitor = new Object

override def close(): Unit = {
shutdownMonitor.synchronized {
onWorkerThreadAsync(() => {
logInfo(s"De-registering UCX ${registeredMemory.size} memory buffers.")
registeredMemory.foreach(_.deregister())
registeredMemory.clear()
shutdownMonitor.synchronized {
shutdownMonitor.notify()
initialized = false
// exit the loop
}
})
onWorkerThreadAsync(() => {
logInfo(s"De-registering UCX ${registeredMemory.size} memory buffers.")
registeredMemory.foreach(_.deregister())
registeredMemory.clear()
synchronized {
initialized = false
notifyAll()
// exit the loop
}
})

synchronized {
while (initialized) {
shutdownMonitor.wait(100)
wait(100)
}
}

Expand Down

0 comments on commit 23814e4

Please sign in to comment.