Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak in time zone DB #1689

Merged
merged 5 commits into from
Jan 17, 2024

Conversation

res-life
Copy link
Collaborator

@res-life res-life commented Jan 10, 2024

closes NVIDIA/spark-rapids#10169
closes #1571

root cases

My debug logs show there are two times running for doLoadCache
The implementation causes doLoadCache running two times, so here leaks.

HostColumnVector.fromLists

here is not synced:

    if (!instance.isLoaded()) {
      cacheDatabase(); // lazy load the database
    }

When running case test_cast_timestamp_to_date, there are multiple threads running the fromUtcTimestampToTimestamp concurrently, so doLoadCache runs multiple times.

link

executor.execute(this::doLoadData);

Above code starts a new thread, and the sync scope breaks.

changes

  • Make a final GpuTimeZoneDB, because it's a synced object. final makes it more safe.
  • Add cacheDatabaseAsync to let plugin invokes.
  • Make the whole cacheDatabase function synced.
  • Add cacheEmpty bool to avoid multiple caching the transition rules.

Now there are no leaks, because shutdown will close the singleton memory.

Signed-off-by: Chong Gao res_life@163.com

Signed-off-by: Chong Gao <res_life@163.com>
@res-life res-life requested a review from revans2 January 10, 2024 12:53
@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

res-life commented Jan 10, 2024

My debug logs:

my debug: who call cacheDatabase
java.lang.Thread.getStackTrace(Thread.java:1564)
com.nvidia.spark.rapids.jni.GpuTimeZoneDB.cacheDatabase(GpuTimeZoneDB.java:87)
com.nvidia.spark.rapids.jni.GpuTimeZoneDB.fromUtcTimestampToTimestamp(GpuTimeZoneDB.java:136)
com.nvidia.spark.rapids.GpuCast$.doCast(GpuCast.scala:631)
com.nvidia.spark.rapids.GpuCast.doColumnar(GpuCast.scala:1903)
com.nvidia.spark.rapids.GpuUnaryExpression.doItColumnar(GpuExpressions.scala:250)
com.nvidia.spark.rapids.GpuUnaryExpression.$anonfun$columnarEval$1(GpuExpressions.scala:261)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:260)
com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
com.nvidia.spark.rapids.GpuAlias.columnarEval(namedExpressions.scala:110)
com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
com.nvidia.spark.rapids.GpuProjectExec$.$anonfun$project$1(basicPhysicalOperators.scala:110)
com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:221)
com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:218)
scala.collection.immutable.List.foreach(List.scala:392)
com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:218)
com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:253)
com.nvidia.spark.rapids.GpuProjectExec$.project(basicPhysicalOperators.scala:110)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$project$2(basicPhysicalOperators.scala:619)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.GpuTieredProject.recurse$2(basicPhysicalOperators.scala:618)
com.nvidia.spark.rapids.GpuTieredProject.project(basicPhysicalOperators.scala:631)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$5(basicPhysicalOperators.scala:567)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRestoreOnRetry(RmmRapidsRetryIterator.scala:272)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$4(basicPhysicalOperators.scala:567)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$3(basicPhysicalOperators.scala:565)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$NoInputSpliterator.next(RmmRapidsRetryIterator.scala:395)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:613)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:517)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$.drainSingleWithVerification(RmmRapidsRetryIterator.scala:291)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRetryNoSplit(RmmRapidsRetryIterator.scala:185)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$1(basicPhysicalOperators.scala:565)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:38)
com.nvidia.spark.rapids.GpuTieredProject.projectWithRetrySingleBatchInternal(basicPhysicalOperators.scala:562)
com.nvidia.spark.rapids.GpuTieredProject.projectAndCloseWithRetrySingleBatch(basicPhysicalOperators.scala:601)
com.nvidia.spark.rapids.GpuProjectExec.$anonfun$internalDoExecuteColumnar$2(basicPhysicalOperators.scala:384)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.GpuProjectExec.$anonfun$internalDoExecuteColumnar$1(basicPhysicalOperators.scala:380)
scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$3(GpuColumnarToRowExec.scala:290)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:287)
com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:257)
com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:304)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.run(Task.scala:131)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:750)
my debug: done
my debug: who call cacheDatabase
java.lang.Thread.getStackTrace(Thread.java:1564)
com.nvidia.spark.rapids.jni.GpuTimeZoneDB.cacheDatabase(GpuTimeZoneDB.java:87)
com.nvidia.spark.rapids.jni.GpuTimeZoneDB.fromUtcTimestampToTimestamp(GpuTimeZoneDB.java:136)
com.nvidia.spark.rapids.GpuCast$.doCast(GpuCast.scala:631)
com.nvidia.spark.rapids.GpuCast.doColumnar(GpuCast.scala:1903)
com.nvidia.spark.rapids.GpuUnaryExpression.doItColumnar(GpuExpressions.scala:250)
com.nvidia.spark.rapids.GpuUnaryExpression.$anonfun$columnarEval$1(GpuExpressions.scala:261)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.GpuUnaryExpression.columnarEval(GpuExpressions.scala:260)
com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
com.nvidia.spark.rapids.GpuAlias.columnarEval(namedExpressions.scala:110)
com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
com.nvidia.spark.rapids.GpuProjectExec$.$anonfun$project$1(basicPhysicalOperators.scala:110)
com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1(implicits.scala:221)
com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.$anonfun$safeMap$1$adapted(implicits.scala:218)
scala.collection.immutable.List.foreach(List.scala:392)
com.nvidia.spark.rapids.RapidsPluginImplicits$MapsSafely.safeMap(implicits.scala:218)
com.nvidia.spark.rapids.RapidsPluginImplicits$AutoCloseableProducingSeq.safeMap(implicits.scala:253)
com.nvidia.spark.rapids.GpuProjectExec$.project(basicPhysicalOperators.scala:110)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$project$2(basicPhysicalOperators.scala:619)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.GpuTieredProject.recurse$2(basicPhysicalOperators.scala:618)
com.nvidia.spark.rapids.GpuTieredProject.project(basicPhysicalOperators.scala:631)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$5(basicPhysicalOperators.scala:567)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRestoreOnRetry(RmmRapidsRetryIterator.scala:272)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$4(basicPhysicalOperators.scala:567)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$3(basicPhysicalOperators.scala:565)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$NoInputSpliterator.next(RmmRapidsRetryIterator.scala:395)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:613)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:517)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$.drainSingleWithVerification(RmmRapidsRetryIterator.scala:291)
com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRetryNoSplit(RmmRapidsRetryIterator.scala:185)
com.nvidia.spark.rapids.GpuTieredProject.$anonfun$projectWithRetrySingleBatchInternal$1(basicPhysicalOperators.scala:565)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:38)
com.nvidia.spark.rapids.GpuTieredProject.projectWithRetrySingleBatchInternal(basicPhysicalOperators.scala:562)
com.nvidia.spark.rapids.GpuTieredProject.projectAndCloseWithRetrySingleBatch(basicPhysicalOperators.scala:601)
com.nvidia.spark.rapids.GpuProjectExec.$anonfun$internalDoExecuteColumnar$2(basicPhysicalOperators.scala:384)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.GpuProjectExec.$anonfun$internalDoExecuteColumnar$1(basicPhysicalOperators.scala:380)
scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$3(GpuColumnarToRowExec.scala:290)
com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:287)
com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:257)
com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:304)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.run(Task.scala:131)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:750)
my debug: done

There are 2 times of cacheDatabase

// Recreate a new instance to reload the database if necessary
instance = new GpuTimeZoneDB();
public synchronized void shutdownImpl() {
if (!cacheEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a potential race condition here where a shutdown occurs very quickly after a startup, so quickly (or database load is so slow) that we don't think the cache is there but it will be once the async thread completes? Seems like we need to also check if there's a pending async load and wait on it if that's the case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caches APIs are synchronized, so multiple callings will be serial and wait other to be done.

private synchronized void cacheDatabaseImpl() {
private synchronized void shutdownImpl() {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized doesn't solve the race I'm talking about. All synchronized does is guarantee two threads aren't in that method at the same time, but it does nothing to ensure what order those threads call it. If two or more threads are trying to do the same thing (e.g.; initialize the cache), we're fine, since we don't really care about the order of the threads since the operation they are performing is identical and idempotent. However if the threads are doing different operations, then we care very much about the order of operations.

If one thread is trying to start the cache and the other is trying to shutdown, we could leave the database in a cached state despite trying to shut it down. Here's a simplified example:

  • On startup, we launch the background thread to initialize the cache
  • Before the background thread is able to boot up and call the cache init method, the shutdown method is called (e.g.: very quick test that doesn't involve time DB and tears down on test completion, error on startup, whatever)
  • shutdown is called first, notices the cache is not initialized, does nothing.
  • background thread finally gets around to calling cache init (note there is no guarantee how quick this happens) and proceeds to initialize the cache
  • Now we're in a state where the application thinks the time DB cache has been shutdown and freed, but in reality it has not and we've "leaked" it in a sense.

One way to solve this is to set a boolean flag before we start the background thread. The background caching thread would atomically clear this flag and call the cache init method when it runs. The shutdown method would check for this flag being set, and if it is, release the lock and wait on the background thread to complete before proceeding to retry the shutdown. We can optimize it a bit by having shutdown set a flag after the cache clear has occurred indicating a shutdown has happened. The background init thread can check for this flag being set and skip cache init when that happens.

Copy link
Member

@jlowe jlowe Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simpler approach is to have a boolean flag indicating a shutdown has occurred. Caching the database will clear this flag. The background thread grabs the lock and avoids initializing the cache if the shutdown flag has been set. The shutdown method always wait on the background init Thread (outside of the lock) if it's not null, setting it to null afterwards and sets the shutdown flag one the cache is cleared.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache is a global singleton, and we do not want to shutdown it unless the Plugin is shutdown.
It’s almost not a scenario that one thread is caching cache and other is shutdown the cache concurrently.

Currently test cases do not call cacheDatabase/shutdown concurrently, see the following:

public class TimeZoneTest {
  @BeforeAll
  static void cacheTimezoneDatabase() {
    GpuTimeZoneDB.cacheDatabase();
  }
  
  @AfterAll
  static void cleanup() {
    GpuTimeZoneDB.shutdown();
  }

One potential corner case is:
Another time zone test like TimeZoneTest2 doing the same like TimeZoneTest.
The following sequence will cause stale cache as you mentioned.

  • TimeZoneTest.cache
  • do TimeZoneTest.test1
  • TimeZoneTest2.cache
  • TimeZoneTest.shutdown
  • do TimeZoneTest2.test2: cache is shutdown, will cause a problem.
  • TimeZoneTest2.shutdown

Typically production usage is:
Call cacheDatabaseAsync start the Plugin
Call shutdown when stop the Plugin.

And another solution:
Remove the shutdown and mark the host memory column as noWarnLeakExpected, this will disable the cache leak warning for the global singleton host column.When the JVM exits the host memory column will be cleaned.

Do we need to do the implementation you mentioned for the rare use case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not worried about the production use-case but rather the potential corner cases with tests that may shutdown and restart the database. It's really not that hard to fix this race condition (I posted the details for two approaches above, happy to provide the patch for either), so why not close that race? Even if we can prove to ourselves it's never going to be a race today, it doesn't guarantee someone won't change the way this is used to expose the race in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, I'll fix it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to solve this is to set a boolean flag before we start the background thread. The background caching thread would atomically clear this flag and call the cache init method when it runs. The shutdown method would check for this flag being set, and if it is, release the lock and wait on the background thread to complete before proceeding to retry the shutdown. We can optimize it a bit by having shutdown set a flag after the cache clear has occurred indicating a shutdown has happened. The background init thread can check for this flag being set and skip cache init when that happens.

Now use this approach you mentioned.
Please review.
Changes are:

  • Move all sync code into static methods.
  • You mentioned changes

@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

build

1 similar comment
@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

build

Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relatively minor nits. I think the locking is overly complicated and would like to see it simplified, but I also think it technically works as written so won't block it.

* `cacheDatabaseAsync`, `cacheDatabase` and `shutdown` are synchronized.
* When cacheDatabaseAsync is running, the `shutdown` and `cacheDatabase` will wait;
* These APIs guarantee only one thread is loading transitions cache,
* And guarantee loading cache only occurs one time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not technically true and not desired in the unit testing environment. What's guaranteed here is that we're only going to try to asynchronously load the cache once, but synchronous cache loads (e.g.: cache API called after a shutdown) are still going to work.

Comment on lines +73 to +79
static class LoadingLock {
Boolean isLoading = false;

// record whether a shutdown is called ever.
// if `isCloseCalledEver` is true, then the following loading should be skipped.
Boolean isShutdownCalledEver = false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need a separate lock class for this rather than locking the whole database via synchronized methods as was done previously. We just need to lock the GpuTimeZoneDB instance when manipulating the boolean flags which can be direct members of the database. cacheDatabaseAsync, cacheDatabaseImpl and shutdown can just be synchronized methods, which removes the need for most of them to wait on threads -- the wait will be implicit in obtaining the database lock for the synchronized method. The only wait we need is in the shutdown method, and only when isLoading is true.

By leaving the methods synchronized rather than having a separate lock class, it's a little easier to reason about the synchronization safety because we don't have to keep track of whether we're holding LoadingLock at the right times.

@res-life res-life merged commit c3be4f4 into NVIDIA:branch-24.02 Jan 17, 2024
3 checks passed
@res-life res-life deleted the fix-time-zone-DB-leak branch January 17, 2024 02:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants