From 0969c5702bf277ac501fc82be9ce931646d763b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?X=C3=B9d=C5=8Dng=20Y=C3=A1ng?= Date: Tue, 23 Apr 2024 17:58:05 -0400 Subject: [PATCH 1/2] Hold on to the worker executor service in RepoFetching I managed to reproduce some deadlocks during repo fetching with virtual worker threads. One notable trigger was some _other_ repo failing to fetch, which seems to cause Skyframe to try to interrupt other concurrent repo fetches. This _might_ be the cause for a deadlock where we submit a task to the worker executor service, but the task never starts running before it gets cancelled, which causes us to wait forever for a `DONE` signal that never comes. (The worker task puts a `DONE` signal in the queue in a `finally` block -- but we don't even enter the `try`.) I then tried various things to fix this; this PR is an attempt that actually seemed to eliminate the deadlock. Instead of waiting for a `DONE` signal to make sure the worker thread has finished, we now hold on to the executor service, which offers a `close()` method that essentially uninterruptibly waits for any scheduled tasks to terminate, whether or not they have started running. (@justinhorvitz had suggested a similar idea before.) To make sure distinct repo fetches don't interfere with each other, we start a separate worker executor service for each repo fetch instead of making everyone share the same worker executor service. (This is recommended for virtual threads; see https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html#GUID-C0FEE349-D998-4C9D-B032-E01D06BE55F2 for example.) Related: https://github.com/bazelbuild/bazel/issues/22003 --- .../lib/bazel/BazelRepositoryModule.java | 41 +--------- .../bazel/repository/RepositoryOptions.java | 6 +- .../RepoFetchingSkyKeyComputeState.java | 18 ++++ .../starlark/StarlarkRepositoryFunction.java | 82 +++++++++---------- 4 files changed, 63 insertions(+), 84 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java b/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java index 70a1500eb6bf6b..d7026e5407c91c 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.analysis.BlazeDirectories; import com.google.devtools.build.lib.analysis.ConfiguredRuleClassProvider; import com.google.devtools.build.lib.analysis.RuleDefinition; @@ -63,6 +62,7 @@ import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.CheckDirectDepsMode; import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.LockfileMode; import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.RepositoryOverride; +import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.WorkerForRepoFetching; import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache; import com.google.devtools.build.lib.bazel.repository.downloader.DelegatingDownloader; import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager; @@ -120,9 +120,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -162,9 +159,6 @@ public class BazelRepositoryModule extends BlazeModule { private List allowedYankedVersions = ImmutableList.of(); private boolean disableNativeRepoRules; private SingleExtensionEvalFunction singleExtensionEvalFunction; - private final ExecutorService repoFetchingWorkerThreadPool = - Executors.newFixedThreadPool( - 100, new ThreadFactoryBuilder().setNameFormat("repo-fetching-worker-%d").build()); @Nullable private CredentialModule credentialModule; @@ -314,37 +308,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { RepositoryOptions repoOptions = env.getOptions().getOptions(RepositoryOptions.class); if (repoOptions != null) { - switch (repoOptions.workerForRepoFetching) { - case OFF: - starlarkRepositoryFunction.setWorkerExecutorService(null); - break; - case PLATFORM: - starlarkRepositoryFunction.setWorkerExecutorService(repoFetchingWorkerThreadPool); - break; - case VIRTUAL: - case AUTO: - try { - // Since Google hasn't migrated to JDK 21 yet, we can't directly call - // Executors.newVirtualThreadPerTaskExecutor here. But a bit of reflection never hurt - // anyone... right? (OSS Bazel already ships with a bundled JDK 21) - starlarkRepositoryFunction.setWorkerExecutorService( - (ExecutorService) - Executors.class - .getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class) - .invoke( - null, Thread.ofVirtual().name("starlark-repository-", 0).factory())); - } catch (ReflectiveOperationException e) { - if (repoOptions.workerForRepoFetching == RepositoryOptions.WorkerForRepoFetching.AUTO) { - starlarkRepositoryFunction.setWorkerExecutorService(null); - } else { - throw new AbruptExitException( - detailedExitCode( - "couldn't create virtual worker thread executor for repo fetching", - Code.BAD_DOWNLOADER_CONFIG), - e); - } - } - } + starlarkRepositoryFunction.setUseWorkers( + repoOptions.workerForRepoFetching != WorkerForRepoFetching.OFF); downloadManager.setDisableDownload(repoOptions.disableDownload); if (repoOptions.repositoryDownloaderRetries >= 0) { downloadManager.setRetries(repoOptions.repositoryDownloaderRetries); diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/RepositoryOptions.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/RepositoryOptions.java index 7bc16929c1e7f8..63a527d89887ee 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/RepositoryOptions.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/RepositoryOptions.java @@ -246,10 +246,8 @@ public Converter() { effectTags = {OptionEffectTag.UNKNOWN}, help = "The threading mode to use for repo fetching. If set to 'off', no worker thread is used," - + " and the repo fetching is subject to restarts. Otherwise, uses a platform thread" - + " (i.e. OS thread) if set to 'platform' or a virtual thread if set to 'virtual'. If" - + " set to 'auto', virtual threads are used if available (i.e. running on JDK 21+)," - + " otherwise no worker thread is used.") + + " and the repo fetching is subject to restarts. Otherwise, uses a virtual worker" + + " thread.") public WorkerForRepoFetching workerForRepoFetching; @Option( diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java index 19ce5cb66d849f..f5bbe3745e346a 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java @@ -14,6 +14,7 @@ package com.google.devtools.build.lib.bazel.repository.starlark; +import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.rules.repository.RepoRecordedInput; import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue; import com.google.devtools.build.skyframe.SkyFunction; @@ -21,6 +22,8 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import javax.annotation.Nullable; @@ -71,6 +74,13 @@ enum Signal { // we might block in `close()`, which is potentially bad (see its javadoc). @Nullable volatile Future workerFuture = null; + /** The executor service that manages the worker thread. */ + // We hold on to this alongside `workerFuture` because it's the only reliable way to make sure the + // worker thread has shut down (the `Future` class doesn't have the capability). + final ExecutorService workerExecutorService = + Executors.newThreadPerTaskExecutor( + Profiler.instance().profileableVirtualThreadFactory("starlark-repository-")); + /** * This is where the recorded inputs & values for the whole invocation is collected. * @@ -92,4 +102,12 @@ public void close() { myWorkerFuture.cancel(true); } } + + public void closeAndWaitForTermination() throws InterruptedException { + close(); + workerExecutorService.close(); // This blocks + if (Thread.interrupted()) { + throw new InterruptedException(); + } + } } diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java index 12e5e38f12ad30..45a664ed86a44a 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Table; -import com.google.common.util.concurrent.Uninterruptibles; import com.google.devtools.build.lib.analysis.BlazeDirectories; import com.google.devtools.build.lib.analysis.RuleDefinition; import com.google.devtools.build.lib.bazel.repository.RepositoryResolvedEvent; @@ -59,7 +58,7 @@ import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import javax.annotation.Nullable; import net.starlark.java.eval.EvalException; import net.starlark.java.eval.Mutability; @@ -73,7 +72,7 @@ public final class StarlarkRepositoryFunction extends RepositoryFunction { private final DownloadManager downloadManager; private double timeoutScaling = 1.0; - @Nullable private ExecutorService workerExecutorService = null; + private boolean useWorkers; @Nullable private ProcessWrapper processWrapper = null; @Nullable private RepositoryRemoteExecutor repositoryRemoteExecutor; @Nullable private SyscallCache syscallCache; @@ -94,15 +93,15 @@ public void setSyscallCache(SyscallCache syscallCache) { this.syscallCache = checkNotNull(syscallCache); } - public void setWorkerExecutorService(@Nullable ExecutorService workerExecutorService) { - this.workerExecutorService = workerExecutorService; + public void setUseWorkers(boolean useWorkers) { + this.useWorkers = useWorkers; } @Override protected void setupRepoRootBeforeFetching(Path repoRoot) throws RepositoryFunctionException { // DON'T delete the repo root here if we're using a worker thread, since when this SkyFunction // restarts, fetching is still happening inside the worker thread. - if (workerExecutorService == null) { + if (!useWorkers) { setupRepoRoot(repoRoot); } } @@ -111,7 +110,7 @@ protected void setupRepoRootBeforeFetching(Path repoRoot) throws RepositoryFunct public void reportSkyframeRestart(Environment env, RepositoryName repoName) { // DON'T report a "restarting." event if we're using a worker thread, since the actual fetch // function run by the worker thread never restarts. - if (workerExecutorService == null) { + if (!useWorkers) { super.reportSkyframeRestart(env, repoName); } } @@ -126,8 +125,7 @@ public RepositoryDirectoryValue.Builder fetch( Map recordedInputValues, SkyKey key) throws RepositoryFunctionException, InterruptedException { - if (workerExecutorService == null - || env.inErrorBubblingForSkyFunctionsThatCanFullyRecoverFromErrors()) { + if (!useWorkers || env.inErrorBubblingForSkyFunctionsThatCanFullyRecoverFromErrors()) { // Don't use the worker thread if we're in Skyframe error bubbling. For some reason, using a // worker thread during error bubbling very frequently causes deadlocks on Linux platforms. // The deadlock is rather elusive and this is just the immediate thing that seems to help. @@ -143,21 +141,27 @@ public RepositoryDirectoryValue.Builder fetch( // clean slate, and create the worker. setupRepoRoot(outputDirectory); Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env); - workerFuture = - workerExecutorService.submit( - () -> { - try { - return fetchInternal( - rule, - outputDirectory, - directories, - workerEnv, - state.recordedInputValues, - key); - } finally { - state.signalQueue.put(Signal.DONE); - } - }); + try { + workerFuture = + state.workerExecutorService.submit( + () -> { + try { + return fetchInternal( + rule, + outputDirectory, + directories, + workerEnv, + state.recordedInputValues, + key); + } finally { + state.signalQueue.put(Signal.DONE); + } + }); + } catch (RejectedExecutionException e) { + // This means that the worker executor service is already shut down. Fall back to not using + // worker threads. + return fetchInternal(rule, outputDirectory, directories, env, recordedInputValues, key); + } state.workerFuture = workerFuture; } else { // A worker is already running. This can only mean one thing -- we just had a Skyframe @@ -168,23 +172,18 @@ public RepositoryDirectoryValue.Builder fetch( try { signal = state.signalQueue.take(); } catch (InterruptedException e) { - // This means that we caught a Ctrl-C. Make sure to close the state object to interrupt the - // worker thread, wait for it to finish, and then propagate the InterruptedException. - state.close(); - signal = Uninterruptibles.takeUninterruptibly(state.signalQueue); - // The call to Uninterruptibles.takeUninterruptibly() above may set the thread interrupted - // status if it suppressed an InterruptedException, so we clear it again. - Thread.interrupted(); - throw new InterruptedException(); + // If the host Skyframe thread is interrupted for any reason, we make sure to shut down any + // worker threads and wait for their termination before propagating the interrupt. + state.closeAndWaitForTermination(); + throw e; } - switch (signal) { - case RESTART: - return null; - case DONE: + return switch (signal) { + case RESTART -> null; + case DONE -> { try { RepositoryDirectoryValue.Builder result = workerFuture.get(); recordedInputValues.putAll(state.recordedInputValues); - return result; + yield result; } catch (ExecutionException e) { Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class); Throwables.throwIfUnchecked(e.getCause()); @@ -198,17 +197,16 @@ public RepositoryDirectoryValue.Builder fetch( RepositoryFetchProgress.ongoing( RepositoryName.createUnvalidated(rule.getName()), "fetch interrupted due to memory pressure; restarting.")); - return fetch(rule, outputDirectory, directories, env, recordedInputValues, key); + yield fetch(rule, outputDirectory, directories, env, recordedInputValues, key); } finally { // At this point, the worker thread has definitely finished. But in some corner cases (see // b/330892334), a Skyframe restart might still happen; to ensure we're not tricked into // a deadlock, we clean up the worker thread and so that next time we come into fetch(), // we actually restart the entire computation. - state.close(); + state.closeAndWaitForTermination(); } - } - // TODO(wyv): use a switch expression above instead and remove this. - throw new IllegalStateException(); + } + }; } @Nullable From 4a84f3020316b2390a520148c991eaa87f13e366 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?X=C3=B9d=C5=8Dng=20Y=C3=A1ng?= Date: Wed, 24 Apr 2024 17:09:29 -0400 Subject: [PATCH 2/2] Failed attempt to directly use a Thread for repo fetching --- .../RepoFetchingSkyKeyComputeState.java | 59 +++++----- .../starlark/StarlarkRepositoryFunction.java | 111 +++++++++--------- 2 files changed, 83 insertions(+), 87 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java index f5bbe3745e346a..7131e01143661a 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java @@ -14,7 +14,7 @@ package com.google.devtools.build.lib.bazel.repository.starlark; -import com.google.devtools.build.lib.profiler.Profiler; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.devtools.build.lib.rules.repository.RepoRecordedInput; import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue; import com.google.devtools.build.skyframe.SkyFunction; @@ -22,8 +22,6 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import javax.annotation.Nullable; @@ -41,18 +39,19 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState { /** A signal that the worker thread can send to the host Skyframe thread. */ - enum Signal { + sealed interface Signal { /** * Indicates that the host thread should return {@code null}, causing a Skyframe restart. After * sending this signal, the client will immediately block on {@code delegateEnvQueue}, waiting * for the host thread to send a fresh {@link SkyFunction.Environment} over. */ - RESTART, - /** - * Indicates that the worker thread has finished running, either yielding a result or an - * exception. - */ - DONE + record Restart() implements Signal {} + + /** Indicates that the worker thread has finished running successfully. */ + record Success(RepositoryDirectoryValue.Builder result) implements Signal {} + + /** Indicates that the worker thread has finished running with a failure. */ + record Failure(Throwable e) implements Signal {} } /** The channel for the worker thread to send a signal to the host Skyframe thread. */ @@ -64,22 +63,13 @@ enum Signal { */ final BlockingQueue delegateEnvQueue = new SynchronousQueue<>(); - /** - * This future holds on to the worker thread in order to cancel it when necessary; it also serves - * to tell whether a worker thread is already running. - */ + /** The working thread that actually performs the fetching logic. */ // This is volatile since we set it to null to indicate the worker thread isn't running, and this - // could happen on multiple threads. Canceling a future multiple times is safe, though, so we - // only need to worry about nullness. Using a mutex/synchronization is an alternative but it means - // we might block in `close()`, which is potentially bad (see its javadoc). - @Nullable volatile Future workerFuture = null; - - /** The executor service that manages the worker thread. */ - // We hold on to this alongside `workerFuture` because it's the only reliable way to make sure the - // worker thread has shut down (the `Future` class doesn't have the capability). - final ExecutorService workerExecutorService = - Executors.newThreadPerTaskExecutor( - Profiler.instance().profileableVirtualThreadFactory("starlark-repository-")); + // could happen on multiple threads. Interrupting and joining a thread multiple times is safe, + // though, so we only need to worry about nullness. Using a mutex/synchronization is an + // alternative but it means we might block in `close()`, which is potentially bad (see its + // javadoc). + @Nullable volatile Thread workerThread = null; /** * This is where the recorded inputs & values for the whole invocation is collected. @@ -90,22 +80,27 @@ enum Signal { final Map recordedInputValues = new TreeMap<>(); SkyFunction.Environment signalForFreshEnv() throws InterruptedException { - signalQueue.put(Signal.RESTART); + signalQueue.put(new Signal.Restart()); return delegateEnvQueue.take(); } @Override public void close() { - var myWorkerFuture = workerFuture; - workerFuture = null; - if (myWorkerFuture != null) { - myWorkerFuture.cancel(true); + var myWorkerThread = workerThread; + if (myWorkerThread != null) { + myWorkerThread.interrupt(); } + // DON'T set workerThread to null; someone should always call `closeAndWaitForTermination` to + // make sure the workerThread stops running. } public void closeAndWaitForTermination() throws InterruptedException { - close(); - workerExecutorService.close(); // This blocks + var myWorkerThread = workerThread; + workerThread = null; + if (myWorkerThread != null) { + myWorkerThread.interrupt(); + Uninterruptibles.joinUninterruptibly(myWorkerThread); + } if (Thread.interrupted()) { throw new InterruptedException(); } diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java index 45a664ed86a44a..492b41550a2abc 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java @@ -20,17 +20,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Table; +import com.google.common.collect.Table.Cell; import com.google.devtools.build.lib.analysis.BlazeDirectories; import com.google.devtools.build.lib.analysis.RuleDefinition; import com.google.devtools.build.lib.bazel.repository.RepositoryResolvedEvent; import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager; import com.google.devtools.build.lib.bazel.repository.starlark.RepoFetchingSkyKeyComputeState.Signal; -import com.google.devtools.build.lib.cmdline.Label; +import com.google.devtools.build.lib.cmdline.Label.RepoMappingRecorder; import com.google.devtools.build.lib.cmdline.LabelConstants; import com.google.devtools.build.lib.cmdline.RepositoryName; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.packages.BazelStarlarkContext; +import com.google.devtools.build.lib.packages.BazelStarlarkContext.Phase; import com.google.devtools.build.lib.packages.Rule; import com.google.devtools.build.lib.packages.semantics.BuildLanguageOptions; import com.google.devtools.build.lib.pkgcache.PathPackageLocator; @@ -40,6 +41,8 @@ import com.google.devtools.build.lib.repository.RepositoryFetchProgress; import com.google.devtools.build.lib.rules.repository.NeedsSkyframeRestartException; import com.google.devtools.build.lib.rules.repository.RepoRecordedInput; +import com.google.devtools.build.lib.rules.repository.RepoRecordedInput.EnvVar; +import com.google.devtools.build.lib.rules.repository.RepoRecordedInput.RecordedRepoMapping; import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue; import com.google.devtools.build.lib.rules.repository.RepositoryFunction; import com.google.devtools.build.lib.rules.repository.WorkspaceFileHelper; @@ -56,9 +59,6 @@ import com.google.devtools.build.skyframe.SkyKey; import java.io.IOException; import java.util.Map; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RejectedExecutionException; import javax.annotation.Nullable; import net.starlark.java.eval.EvalException; import net.starlark.java.eval.Mutability; @@ -135,34 +135,37 @@ public RepositoryDirectoryValue.Builder fetch( return fetchInternal(rule, outputDirectory, directories, env, recordedInputValues, key); } var state = env.getState(RepoFetchingSkyKeyComputeState::new); - var workerFuture = state.workerFuture; - if (workerFuture == null) { + var workerThread = state.workerThread; + if (workerThread == null) { // No worker is running yet, which means we're just starting to fetch this repo. Start with a // clean slate, and create the worker. setupRepoRoot(outputDirectory); Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env); - try { - workerFuture = - state.workerExecutorService.submit( - () -> { - try { - return fetchInternal( - rule, - outputDirectory, - directories, - workerEnv, - state.recordedInputValues, - key); - } finally { - state.signalQueue.put(Signal.DONE); - } - }); - } catch (RejectedExecutionException e) { - // This means that the worker executor service is already shut down. Fall back to not using - // worker threads. - return fetchInternal(rule, outputDirectory, directories, env, recordedInputValues, key); - } - state.workerFuture = workerFuture; + workerThread = + Thread.ofVirtual() + .name("starlark-repository-" + rule.getName()) + .start( + () -> { + try { + try { + state.signalQueue.put( + new Signal.Success( + fetchInternal( + rule, + outputDirectory, + directories, + workerEnv, + state.recordedInputValues, + key))); + } catch (Throwable e) { + state.signalQueue.put(new Signal.Failure(e)); + } + } catch (InterruptedException e) { + // Do nothing. We already tried to put a signal onto the queue, and got + // interrupted again. Guess we'll die! + } + }); + state.workerThread = workerThread; } else { // A worker is already running. This can only mean one thing -- we just had a Skyframe // restart, and need to send over a fresh Environment. @@ -177,19 +180,24 @@ public RepositoryDirectoryValue.Builder fetch( state.closeAndWaitForTermination(); throw e; } + if (!(signal instanceof Signal.Restart)) { + // If `signal` is not a restart, the worker thread has definitely finished. But in some corner + // cases (see b/330892334), a Skyframe restart might still happen; to ensure we're not tricked + // into a deadlock, we clean up the worker thread and so that next time we come into fetch(), + // we actually restart the entire computation. + state.closeAndWaitForTermination(); + } return switch (signal) { - case RESTART -> null; - case DONE -> { - try { - RepositoryDirectoryValue.Builder result = workerFuture.get(); - recordedInputValues.putAll(state.recordedInputValues); - yield result; - } catch (ExecutionException e) { - Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class); - Throwables.throwIfUnchecked(e.getCause()); - throw new IllegalStateException( - "unexpected exception type: " + e.getClass(), e.getCause()); - } catch (CancellationException e) { + case Signal.Restart() -> null; + case Signal.Success(RepositoryDirectoryValue.Builder result) -> { + recordedInputValues.putAll(state.recordedInputValues); + yield result; + } + case Signal.Failure(Throwable e) -> { + Throwables.throwIfInstanceOf(e, RepositoryFunctionException.class); + Throwables.throwIfUnchecked(e); + if (e instanceof InterruptedException) { + // This means that the worker thread was interrupted, but the host Skyframe thread was not. // This can only happen if the state object was invalidated due to memory pressure, in // which case we can simply reattempt the fetch. env.getListener() @@ -198,13 +206,8 @@ public RepositoryDirectoryValue.Builder fetch( RepositoryName.createUnvalidated(rule.getName()), "fetch interrupted due to memory pressure; restarting.")); yield fetch(rule, outputDirectory, directories, env, recordedInputValues, key); - } finally { - // At this point, the worker thread has definitely finished. But in some corner cases (see - // b/330892334), a Skyframe restart might still happen; to ensure we're not tricked into - // a deadlock, we clean up the worker thread and so that next time we come into fetch(), - // we actually restart the entire computation. - state.closeAndWaitForTermination(); } + throw new IllegalStateException("unexpected exception type: " + e.getClass(), e); } }; } @@ -248,17 +251,17 @@ private RepositoryDirectoryValue.Builder fetchInternal( StarlarkThread.create( mu, starlarkSemantics, /* contextDescription= */ "", SymbolGenerator.create(key)); thread.setPrintHandler(Event.makeDebugPrintHandler(env.getListener())); - var repoMappingRecorder = new Label.RepoMappingRecorder(); + var repoMappingRecorder = new RepoMappingRecorder(); // For repos defined in Bzlmod, record any used repo mappings in the marker file. // Repos defined in WORKSPACE are impossible to verify given the chunked loading (we'd have to // record which chunk the repo mapping was used in, and ain't nobody got time for that). if (!isWorkspaceRepo(rule)) { repoMappingRecorder.mergeEntries( rule.getRuleClassObject().getRuleDefinitionEnvironmentRepoMappingEntries()); - thread.setThreadLocal(Label.RepoMappingRecorder.class, repoMappingRecorder); + thread.setThreadLocal(RepoMappingRecorder.class, repoMappingRecorder); } - new BazelStarlarkContext(BazelStarlarkContext.Phase.LOADING).storeInThread(thread); // "fetch" + new BazelStarlarkContext(Phase.LOADING).storeInThread(thread); // "fetch" StarlarkRepositoryContext starlarkRepositoryContext = new StarlarkRepositoryContext( @@ -337,15 +340,13 @@ private RepositoryDirectoryValue.Builder fetchInternal( // Ditto for environment variables accessed via `getenv`. for (String envKey : starlarkRepositoryContext.getAccumulatedEnvKeys()) { - recordedInputValues.put( - new RepoRecordedInput.EnvVar(envKey), clientEnvironment.get(envKey)); + recordedInputValues.put(new EnvVar(envKey), clientEnvironment.get(envKey)); } - for (Table.Cell repoMappings : + for (Cell repoMappings : repoMappingRecorder.recordedEntries().cellSet()) { recordedInputValues.put( - new RepoRecordedInput.RecordedRepoMapping( - repoMappings.getRowKey(), repoMappings.getColumnKey()), + new RecordedRepoMapping(repoMappings.getRowKey(), repoMappings.getColumnKey()), repoMappings.getValue().getName()); }