From b84e65a3cabf978a9222cf9c6e00fad70f89fbe2 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 19 Jan 2021 16:50:04 -0600 Subject: [PATCH] Synchronize the UCX instance, removing extra monitor --- .../com/nvidia/spark/rapids/shuffle/ucx/UCX.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala b/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala index 014f9c95cebe..4db548f2ceaf 100644 --- a/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala +++ b/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala @@ -70,7 +70,6 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose private var worker: UcpWorker = _ private val endpoints = new ConcurrentHashMap[Long, UcpEndpoint]() @volatile private var initialized = false - private[this] val initializedMonitor = new Object // a peer tag identifies an incoming connection uniquely private val peerTag = new AtomicLong(0) // peer tags @@ -116,7 +115,7 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose * UCX worker/endpoint relationship. */ def init(): Unit = { - initializedMonitor.synchronized { + synchronized { if (initialized) { throw new IllegalStateException("UCX already initialized") } @@ -483,16 +482,16 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose logInfo(s"De-registering UCX ${registeredMemory.size} memory buffers.") registeredMemory.foreach(_.deregister()) registeredMemory.clear() - initializedMonitor.synchronized { + synchronized { initialized = false - initializedMonitor.notifyAll() + notifyAll() // exit the loop } }) - initializedMonitor.synchronized { + synchronized { while (initialized) { - initializedMonitor.wait(100) + wait(100) } }