Skip to content

Commit

Permalink
Synchronize the UCX instance, removing extra monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
abellina committed Jan 19, 2021
1 parent 9f52f4c commit b84e65a
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose
private var worker: UcpWorker = _
private val endpoints = new ConcurrentHashMap[Long, UcpEndpoint]()
@volatile private var initialized = false
private[this] val initializedMonitor = new Object

// a peer tag identifies an incoming connection uniquely
private val peerTag = new AtomicLong(0) // peer tags
Expand Down Expand Up @@ -116,7 +115,7 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose
* UCX worker/endpoint relationship.
*/
def init(): Unit = {
initializedMonitor.synchronized {
synchronized {
if (initialized) {
throw new IllegalStateException("UCX already initialized")
}
Expand Down Expand Up @@ -483,16 +482,16 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose
logInfo(s"De-registering UCX ${registeredMemory.size} memory buffers.")
registeredMemory.foreach(_.deregister())
registeredMemory.clear()
initializedMonitor.synchronized {
synchronized {
initialized = false
initializedMonitor.notifyAll()
notifyAll()
// exit the loop
}
})

initializedMonitor.synchronized {
synchronized {
while (initialized) {
initializedMonitor.wait(100)
wait(100)
}
}

Expand Down

0 comments on commit b84e65a

Please sign in to comment.