Skip to content

Commit

Permalink
Provides a method for the user to remove the hook and re-register the…
Browse files Browse the repository at this point in the history
… hook in a custom shutdown hook manager (#11161)

Contributes to NVIDIA/spark-rapids#5854

###  Problem
Prints `RapidsHostMemoryStore.pool` leaked error log when running Rapids Accelerator test cases.
```
All tests passed.

22/06/27 17:45:57.298 Thread-7 ERROR HostMemoryBuffer: A HOST BUFFER WAS LEAKED (ID: 1 7f8557fff010)
22/06/27 17:45:57.303 Thread-7 ERROR MemoryCleaner: Leaked host buffer (ID: 1): 2022-06-27 09:45:16.0171 UTC: INC
java.lang.Thread.getStackTrace(Thread.java:1559)
ai.rapids.cudf.MemoryCleaner$RefCountDebugItem.<init>(MemoryCleaner.java:301)
ai.rapids.cudf.MemoryCleaner$Cleaner.addRef(MemoryCleaner.java:82)
ai.rapids.cudf.MemoryBuffer.incRefCount(MemoryBuffer.java:232)
ai.rapids.cudf.MemoryBuffer.<init>(MemoryBuffer.java:98)
ai.rapids.cudf.HostMemoryBuffer.<init>(HostMemoryBuffer.java:196)
ai.rapids.cudf.HostMemoryBuffer.<init>(HostMemoryBuffer.java:192)
ai.rapids.cudf.HostMemoryBuffer.allocate(HostMemoryBuffer.java:144)
com.nvidia.spark.rapids.RapidsHostMemoryStore.<init>(RapidsHostMemoryStore.scala:38)

```
### Root cause
`RapidsHostMemoryStore.pool` is not closed before `MemoryCleaner` checking the leaks.
It's actually not a leak, it's caused by hooks execution order.
`RapidsHostMemoryStore.pool` is closed in the [Spark executor plugin hook](https://github.com/apache/spark/blob/v3.3.0/core/src/main/scala/org/apache/spark/executor/Executor.scala#L351toL381).
```
plugins.foreach(_.shutdown())  // this line will eventually close the RapidsHostMemoryStore.pool
```
The close path is:
```
  The close path is: 
    Spark executor plugin hook ->
      RapidsExecutorPlugin.shutdown ->
        GpuDeviceManager.shutdown ->
          RapidsBufferCatalog.close() ->
            RapidsHostMemoryStore.close ->
              RapidsHostMemoryStore.pool.close ->
```

Rapids Accelerator JNI also checks leaks in a shutdown hook.
Shutdown hooks are executed concurrently, there is no execution order guarantee.

### solution 1 - Not recommanded
Just wait one second before checking the leak in the `MemoryCleaner`.
It's modifying debug code, it's modifying closing code, and has no impact on production code.

### solution 2 - Not recommanded
Spark has a util class `ShutdownHookManager` which is a  ShutdownHook wrapper.
It can [addShutdownHook with priority](https://github.com/apache/spark/blob/v3.3.0/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala#L152) via `Hadoop ShutdownHookManager`
```
def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
```

Leveraging Hadoop ShutdownHookManager as Spark does is feasible.

###  Solution 3 Recommanded
Provides a method for the user to remove the hook and re-register the hook in a custom shutdown hook manager.

Signed-off-by: Chong Gao <res_life@163.com>

Authors:
  - Chong Gao (https://github.com/res-life)

Approvers:
  - Robert (Bobby) Evans (https://github.com/revans2)

URL: #11161
  • Loading branch information
res-life authored Jul 7, 2022
1 parent 34f4d48 commit 8426a99
Showing 1 changed file with 49 additions and 17 deletions.
66 changes: 49 additions & 17 deletions java/src/main/java/ai/rapids/cudf/MemoryCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ public final class MemoryCleaner {
private static final Logger log = LoggerFactory.getLogger(MemoryCleaner.class);
private static final AtomicLong idGen = new AtomicLong(0);

/**
* Check if configured the shutdown hook which checks leaks at shutdown time.
*
* @return true if configured, false otherwise.
*/
public static boolean configuredDefaultShutdownHook() {
return REF_COUNT_DEBUG;
}

/**
* API that can be used to clean up the resources for a vector, even if there was a leak
*/
Expand Down Expand Up @@ -197,30 +206,53 @@ static void setDefaultGpu(int defaultGpuId) {
}
}, "Cleaner Thread");

/**
* Default shutdown runnable used to be added to Java default shutdown hook.
* It checks the leaks at shutdown time.
*/
private static final Runnable DEFAULT_SHUTDOWN_RUNNABLE = () -> {
// If we are debugging things do a best effort to check for leaks at the end

System.gc();
// Avoid issues on shutdown with the cleaner thread.
t.interrupt();
try {
t.join(1000);
} catch (InterruptedException e) {
// Ignored
}
if (defaultGpu >= 0) {
Cuda.setDevice(defaultGpu);
}

for (CleanerWeakReference cwr : all) {
cwr.clean();
}
};

private static final Thread DEFAULT_SHUTDOWN_THREAD = new Thread(DEFAULT_SHUTDOWN_RUNNABLE);

static {
t.setDaemon(true);
t.start();
if (REF_COUNT_DEBUG) {
// If we are debugging things do a best effort to check for leaks at the end
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.gc();
// Avoid issues on shutdown with the cleaner thread.
t.interrupt();
try {
t.join(1000);
} catch (InterruptedException e) {
// Ignored
}
if (defaultGpu >= 0) {
Cuda.setDevice(defaultGpu);
}
for (CleanerWeakReference cwr : all) {
cwr.clean();
}
}));
Runtime.getRuntime().addShutdownHook(DEFAULT_SHUTDOWN_THREAD);
}
}

/**
* De-register the default shutdown hook from Java default Runtime, then return the corresponding
* shutdown runnable.
* If you want to register the default shutdown runnable in a custom shutdown hook manager
* instead of Java default Runtime, should first remove it using this method and then add it
*
* @return the default shutdown runnable
*/
public static Runnable removeDefaultShutdownHook() {
Runtime.getRuntime().removeShutdownHook(DEFAULT_SHUTDOWN_THREAD);
return DEFAULT_SHUTDOWN_RUNNABLE;
}

static void register(ColumnVector vec, Cleaner cleaner) {
// It is now registered...
all.add(new CleanerWeakReference(vec, cleaner, collected, true));
Expand Down

0 comments on commit 8426a99

Please sign in to comment.