diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java index cef52f3ea9456a..6bd36490c02093 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java @@ -31,6 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; /** * Captures state that persists across different invocations of {@link @@ -66,16 +67,16 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState { * This future holds on to the worker thread in order to cancel it when necessary; it also serves * to tell whether a worker thread is already running. */ - // This is volatile since we set it to null to indicate the worker thread isn't running, and this - // could happen on multiple threads. Canceling a future multiple times is safe, though, so we - // only need to worry about nullness. Using a mutex/synchronization is an alternative but it means - // we might block in `close()`, which is potentially bad (see its javadoc). - @Nullable volatile ListenableFuture workerFuture = null; + @GuardedBy("this") + @Nullable + private ListenableFuture workerFuture = null; /** The executor service that manages the worker thread. */ // We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make // sure the worker thread has shut down (with its blocking `close()` method). - ListeningExecutorService workerExecutorService; + @GuardedBy("this") + @Nullable + private ListeningExecutorService workerExecutorService = null; private final String repoName; @@ -89,19 +90,6 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState { RepoFetchingSkyKeyComputeState(String repoName) { this.repoName = repoName; - reset(); - } - - // This may only be called from the host Skyframe thread, *and* only when no worker thread is - // running. - private void reset() { - workerExecutorService = - MoreExecutors.listeningDecorator( - Executors.newThreadPerTaskExecutor( - Thread.ofVirtual().name("starlark-repository-" + repoName).factory())); - signalSemaphore.drainPermits(); - delegateEnvQueue.clear(); - recordedInputValues.clear(); } /** @@ -114,44 +102,48 @@ SkyFunction.Environment signalForFreshEnv() throws InterruptedException { } /** - * Starts a worker thread running the given callable. This sets the {@code workerFuture} field, - * and makes sure to release a permit on the {@code signalSemaphore} when the worker finishes, - * successfully or otherwise. Returns the worker future. This may only be called from the host + * Returns the worker future, or if a worker is not already running, starts a worker thread + * running the given callable. This makes sure to release a permit on the {@code signalSemaphore} + * when the worker finishes, successfully or otherwise. This may only be called from the host * Skyframe thread. */ - ListenableFuture startWorker( + synchronized ListenableFuture getOrStartWorker( Callable c) { - var workerFuture = workerExecutorService.submit(c); - this.workerFuture = workerFuture; + if (workerFuture != null) { + return workerFuture; + } + // We reset the state object back to its very initial state, since the host SkyFunction may have + // been re-entered (for example b/330892334 and + // https://github.com/bazelbuild/bazel/issues/21238), and/or the previous worker thread may have + // been interrupted while the host SkyFunction was inactive. + workerExecutorService = + MoreExecutors.listeningDecorator( + Executors.newThreadPerTaskExecutor( + Thread.ofVirtual().name("starlark-repository-" + repoName).factory())); + signalSemaphore.drainPermits(); + delegateEnvQueue.clear(); + recordedInputValues.clear(); + + // Start the worker. + workerFuture = workerExecutorService.submit(c); workerFuture.addListener(signalSemaphore::release, directExecutor()); return workerFuture; } + /** + * Closes the state object, and blocks until all pending async work is finished. The state object + * will reset to a clean slate after this method finishes. + */ // This may be called from any thread, including the host Skyframe thread and the // high-memory-pressure listener thread. @Override - public void close() { - var myWorkerFuture = workerFuture; - workerFuture = null; - if (myWorkerFuture != null) { - myWorkerFuture.cancel(true); + public synchronized void close() { + if (workerFuture != null) { + workerFuture.cancel(true); } - workerExecutorService.shutdownNow(); - } - - /** - * Closes the state object, and blocks until all pending async work is finished. The state object - * will reset to a clean slate after this method finishes. This may only be called from the host - * Skyframe thread. - */ - public void closeAndWaitForTermination() throws InterruptedException { - close(); - workerExecutorService.close(); // This blocks - // We reset the state object back to its very initial state, since the host SkyFunction may be - // re-entered (for example b/330892334 and https://github.com/bazelbuild/bazel/issues/21238). - reset(); - if (Thread.interrupted()) { - throw new InterruptedException(); + workerFuture = null; + if (workerExecutorService != null) { + workerExecutorService.close(); // This blocks } } } diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingWorkerSkyFunctionEnvironment.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingWorkerSkyFunctionEnvironment.java index 951ae1200cbc09..5bc9e9ecbaa17d 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingWorkerSkyFunctionEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingWorkerSkyFunctionEnvironment.java @@ -44,10 +44,10 @@ class RepoFetchingWorkerSkyFunctionEnvironment private final RepoFetchingSkyKeyComputeState state; private SkyFunction.Environment delegate; - RepoFetchingWorkerSkyFunctionEnvironment( - RepoFetchingSkyKeyComputeState state, SkyFunction.Environment delegate) { + RepoFetchingWorkerSkyFunctionEnvironment(RepoFetchingSkyKeyComputeState state) + throws InterruptedException { this.state = state; - this.delegate = delegate; + this.delegate = state.delegateEnvQueue.take(); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java index 136b9ea9097f99..a20c56df9e546a 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.collect.Table; +import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.analysis.BlazeDirectories; import com.google.devtools.build.lib.analysis.RuleDefinition; import com.google.devtools.build.lib.bazel.bzlmod.NonRegistryOverride; @@ -143,66 +144,51 @@ public RepositoryDirectoryValue.Builder fetch( if (!useWorkers) { return fetchInternal(args); } - var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName())); - if (state.workerExecutorService.isShutdown()) { - // If we get here and the worker executor is shut down, this can only mean that the worker - // future was cancelled while we (the host Skyframe thread) were inactive (as in, having - // returned `null` but not yet restarted). So we wait for the previous worker thread to finish - // first. - // TODO: instead of this complicated dance, consider making it legal for - // `SkyKeyComputeState#close()` to block. This would undo the advice added in commit 8ef0a51, - // but would allow us to merge `close()` and `closeAndWaitForTermination()` and avoid some - // headache. - state.closeAndWaitForTermination(); - } - boolean shouldShutDownWorkerExecutorInFinally = true; - try { - var workerFuture = state.workerFuture; - if (workerFuture == null) { - // No worker is running yet, which means we're just starting to fetch this repo. Start with - // a clean slate, and create the worker. - setupRepoRoot(outputDirectory); - Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env); - workerFuture = - state.startWorker( - () -> fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues))); - } else { - // A worker is already running. This can only mean one thing -- we just had a Skyframe - // restart, and need to send over a fresh Environment. + // See below (the `catch CancellationException` clause) for why there's a `while` loop here. + while (true) { + var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName())); + ListenableFuture workerFuture = + state.getOrStartWorker( + () -> { + Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state); + setupRepoRoot(outputDirectory); + return fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues)); + }); + try { state.delegateEnvQueue.put(env); - } - state.signalSemaphore.acquire(); - if (!workerFuture.isDone()) { - // This means that the worker is still running, and expecting a fresh Environment. Return - // null to trigger a Skyframe restart, but *don't* shut down the worker executor. - shouldShutDownWorkerExecutorInFinally = false; - return null; - } - RepositoryDirectoryValue.Builder result = workerFuture.get(); - recordedInputValues.putAll(state.recordedInputValues); - return result; - } catch (ExecutionException e) { - Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class); - Throwables.throwIfUnchecked(e.getCause()); - throw new IllegalStateException("unexpected exception type: " + e.getClass(), e.getCause()); - } catch (CancellationException e) { - // This can only happen if the state object was invalidated due to memory pressure, in - // which case we can simply reattempt the fetch. - env.getListener() - .post( - RepositoryFetchProgress.ongoing( - RepositoryName.createUnvalidated(rule.getName()), - "fetch interrupted due to memory pressure; restarting.")); - return fetch(rule, outputDirectory, directories, env, recordedInputValues, key); - } finally { - if (shouldShutDownWorkerExecutorInFinally) { - // Unless we know the worker is waiting on a fresh Environment, we should *always* shut down - // the worker executor and reset the state by the time we finish executing (successfully or - // otherwise). This ensures that 1) no background work happens without our knowledge, and - // 2) if the SkyFunction is re-entered for any reason (for example b/330892334 and - // https://github.com/bazelbuild/bazel/issues/21238), we don't have lingering state messing - // things up. - state.closeAndWaitForTermination(); + state.signalSemaphore.acquire(); + if (!workerFuture.isDone()) { + // This means that the worker is still running, and expecting a fresh Environment. Return + // null to trigger a Skyframe restart, but *don't* shut down the worker executor. + return null; + } + RepositoryDirectoryValue.Builder result = workerFuture.get(); + recordedInputValues.putAll(state.recordedInputValues); + return result; + } catch (ExecutionException e) { + Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class); + Throwables.throwIfUnchecked(e.getCause()); + throw new IllegalStateException( + "unexpected exception type: " + e.getCause().getClass(), e.getCause()); + } catch (CancellationException e) { + // This can only happen if the state object was invalidated due to memory pressure, in + // which case we can simply reattempt the fetch. Show a message and continue into the next + // `while` iteration. + env.getListener() + .post( + RepositoryFetchProgress.ongoing( + RepositoryName.createUnvalidated(rule.getName()), + "fetch interrupted due to memory pressure; restarting.")); + } finally { + if (workerFuture.isDone()) { + // Unless we know the worker is waiting on a fresh Environment, we should *always* shut + // down the worker executor by the time we finish executing (successfully or otherwise). + // This ensures that 1) no background work happens without our knowledge, and 2) if the + // SkyFunction is re-entered for any reason (for example b/330892334 and + // https://github.com/bazelbuild/bazel/issues/21238), we know we'll need to create a new + // worker from scratch. + state.close(); + } } } } diff --git a/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java b/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java index 97596273e2b184..ffdffca390ce48 100644 --- a/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java +++ b/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java @@ -417,8 +417,8 @@ interface SkyKeyComputeState extends AutoCloseable { * *

Implementations MUST be idempotent. * - *

Note also that this method should not perform any heavy work (especially blocking - * operations). + *

Note also that this method could be invoked from arbitrary threads, so avoid heavy + * operations if possible. */ @Override default void close() {}