Skip to content

Commit

Permalink
Rewrite (again) repository worker threads.
Browse files Browse the repository at this point in the history
This time, using an RPC paradigm where the worker thread can request
things from the host thread (but not the other way round).
  • Loading branch information
lberki committed May 2, 2024
1 parent 8d370b2 commit e918fe9
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
package com.google.devtools.build.lib.bazel.repository.starlark;

import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.bazel.repository.starlark.RepoFetchingSkyKeyComputeState.HostState.EnvironmentSent;
import com.google.devtools.build.lib.bazel.repository.starlark.RepoFetchingSkyKeyComputeState.WorkerState.EnvironmentReceived;
import com.google.devtools.build.lib.bazel.repository.starlark.RepoFetchingSkyKeyComputeState.HostState.TerminationRequested;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand All @@ -40,82 +38,65 @@
* restart to get a fresh environment object.
*/
class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {
/** The state the worker thread is stopped in. */
sealed interface WorkerState {
/**
* Indicates that the worker thread added a Skyframe dependency that's not evaluated yet.
*
* <p>The host thread should return {@code null}, causing a Skyframe restart, then send a new
* environment to the worker to unblock it.
*/
record EnvironmentRequested() implements WorkerState {}
sealed interface Message {}
record Packet<M extends Message>(int sequenceNo, M message) {}

/** Indicates that the worker received an environment it requested.
/** Requests the worker thread can make to the host thread. */
sealed interface Request extends Message {

/** Ask for the current Skyframe environment. This does *not* cause a restart, just causes
* the host thread to send over the current one, whatever it may be.
*
* <P>The next state it will be in is {@code EnvironmentRequested}, {@code Success} or
* {@code Failure}.
* <p>The host thread will answer with {@code Environment}.
*/
record EnvironmentReceived() implements WorkerState {}
record GetEnvironment() implements Request {}

/** Indicates that the worker thread has finished running with a failure.
/**
* Signals that the worker thread discovered new dependencies and that the host thread must
* restart.
*
* <p>It will terminate after successfully sending this signal.
* <p>The host thread will respond with {@code Restarting}.
*/
record Failure(Throwable e) implements WorkerState {}
record NewDependencies() implements Request {}

/** Indicates that the worker thread has finished fetching the repository successfully.
/** Signals that the repository was successfully fetched.
*
* <p>It is now waiting for the host thread to decide whether it should terminate or fetch the
* repository again (happens due to a nuance in environment handling, see {@code
* StarlarkRepositoryFunction.fetch()}.
* <p>The host thread will respnond with {@cod RestartDecision}, which can cause the fetching to
* be done again. If the response is that no restart is required, the worker thread will
* terminate.
*/
record Success(RepositoryDirectoryValue.Builder result) implements WorkerState {}
record Success(RepositoryDirectoryValue.Builder result) implements Request {}

/** The worker thread has finished fetching the repository successfully.
*
* <p>It is now waiting for the host thread to decide whether it should terminate or fetch the
* repository again (happens due to a nuance in environment handling, see {@code
* StarlarkRepositoryFunction.fetch()}. It should respond with {@code FullRestartRequested}
* or {@code TerminationRequested} as appropriate.
/**
* Fetching failed. The host will answer with {@code FailureAcknowledged}. The worker thread
* will terminate.
*/
record WaitingForRestartDecision() implements WorkerState {}
record Failure(Throwable e) implements Request {}
}

/** The state the host thread is stopped in. */
sealed interface HostState {
sealed interface Response extends Message {
/** A new Skyframe environment for the worker thread. */
record Environment(SkyFunction.Environment environment) implements Response {}

/** A new environment the worker thread requested.
*
* <p>The worker thread needs to reply with {@code EnvironmentReceived}.
*/
record EnvironmentSent(SkyFunction.Environment environment) implements HostState {}
/** Acknowledgement of a restart request. */
record Restarting() implements Response {}

/** The host thread has sent an environment to the worker thread.
*
* <p>It is now waiting for the worker to decide whether it needs a new environment or it can
* do its job without one. The worker should respond with {@code EnvironmentSent},
* {@code Success} or {@code Failure} as a appropriate.
/** Response for {@code Success}. If {@code restart} is true, the fetching needs to be repeated.
*/
record WaitingForWorker() implements HostState {}
record RestartDecision(boolean restart) implements Response {}

/** Indicates that the worker thread needs to be restarted, despite finishing with success.
*
* <p>After this, the host thread will send a new environment (with {@code EnvironmentSent}).
*/
record FullRestartRequested() implements HostState {}

/** Indicates that the worker thread should terminate after finishing with success.
*
* <p>After this, the host thread will join the worker one.
*/
record TerminationRequested() implements HostState {}
/** Response for {@code Failure}. */
record FailureAcknowledged() implements Response {}
}

/** The name of the repository this state is for. Only there to ease debugging. */
private final String repositoryName;

/** Used to ensure that the worker and Skyframe threads both are at a known place. */
private final Exchanger<Object> rendezvous = new Exchanger();
/** The sequence number of the next request the worker thread will make. */
private int nextRequestSequenceNo = 0;

private final BlockingQueue<Packet<Request>> requestQueue = new ArrayBlockingQueue<>(1);
private final BlockingQueue<Packet<Response>> responseQueue = new ArrayBlockingQueue<>(1);

/** The working thread that actually performs the fetching logic. */
private final Thread workerThread;
Expand All @@ -141,67 +122,76 @@ record TerminationRequested() implements HostState {}
}

SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
// First unblock the host thread, then wait until it has something else to say. The only thing
// the host thread can do to this one is an interrupt so there is no need to do anything special
// in between.
HostState hostState = rendezvousFromWorker(new WorkerState.EnvironmentRequested());
if (!(hostState instanceof HostState.WaitingForWorker)) {
throw new IllegalStateException("Host thread is in unexpected state %s" + hostState);
Response response = sendRequest(new Request.NewDependencies());
if (!(response instanceof Response.Restarting)) {
throw new IllegalStateException("Unexpected response %s" + response);
}
return getEnvironmentFromHost();
}

/** Get a new Environment from the host.
*
* <p>This should either be called right after the worker thread starts or after sending a
* {@code EnvironmentSent} to the host.
*/
SkyFunction.Environment getEnvironmentFromHost() throws InterruptedException {
HostState hostState = rendezvousFromWorker(new EnvironmentReceived());
return switch (hostState) {
case EnvironmentSent(SkyFunction.Environment env) -> env;
default -> throw new IllegalStateException(
"Host thread is in unexpected state %s" + hostState);
};
response = sendRequest(new Request.GetEnvironment());
if (response instanceof Response.Environment environmentResponse) {
return environmentResponse.environment;
} else {
throw new IllegalStateException("Unexpected response %s" + response);
}
}

private void logMaybe(String msg) {
// System.err.println(msg);
}

Response sendRequest(Request request) throws InterruptedException {
int seq = nextRequestSequenceNo++;
Packet<Request> packet = new Packet(seq, request);
logMaybe(String.format("LOG %s/worker: sending request %s", repositoryName, packet));
requestQueue.put(packet);
while (true) {
Packet<Response> response = responseQueue.take();
if (response.sequenceNo == seq) {
return response.message;
} else {
logMaybe(String.format("LOG %s/worker: discarding %s", repositoryName, response));
}
}
}

Response sendRequestUninterruptibly(Request request) {
int seq = nextRequestSequenceNo++;
Packet<Request> packet = new Packet(seq, request);
logMaybe(String.format("LOG %s/worker: sending request uninterruptibly %s", repositoryName, packet));
Uninterruptibles.putUninterruptibly(requestQueue, packet);
while (true) {
Packet<Response> response = Uninterruptibles.takeUninterruptibly(responseQueue);
logMaybe(String.format("LOG %s/worker: got response packet %s", repositoryName, response));
if (response.sequenceNo == seq) {
return response.message;
}
}
}

HostState rendezvousFromWorker(WorkerState workerState) throws InterruptedException {
logMaybe(String.format("LOG %s/worker: sending %s", repositoryName, workerState));
HostState result = (HostState) rendezvous.exchange(workerState);
logMaybe(String.format("LOG %s/worker: received %s", repositoryName, result));
Packet<Request> getRequest() throws InterruptedException {
logMaybe(String.format("LOG %s/host: taking request", repositoryName));
Packet<Request> result = requestQueue.take();
logMaybe(String.format("LOG %s/host: Got request %s", repositoryName, result));
return result;
}

WorkerState rendezvousFromHost(HostState hostState) throws InterruptedException {
logMaybe(String.format("LOG %s/host: sending %s", repositoryName, hostState));
WorkerState result = (WorkerState) rendezvous.exchange(hostState);
logMaybe(String.format("LOG %s/host: received %s", repositoryName, result));
Packet<Request> getRequestUninterruptibly() {
logMaybe(String.format("LOG %s/host: taking request uninterruptibly", repositoryName));
Packet<Request> result = Uninterruptibles.takeUninterruptibly(requestQueue);
logMaybe(String.format("LOG %s/host: request received %s", repositoryName, result));
return result;
}

HostState rendezvousUninterruptiblyFromWorker(WorkerState signal) {
logMaybe(String.format("LOG %s/worker: sending %s uninterruptibly", repositoryName, signal));
while(true) {
try {
return rendezvousFromWorker(signal);
} catch (InterruptedException e) {
logMaybe(String.format("LOG %s/worker: retrying on interrupt", repositoryName));
}
}
void sendResponse(Packet<Request> request, Response response) throws InterruptedException {
logMaybe(String.format("LOG %s/host: sending response %s", repositoryName, response));
Packet<Response> packet = new Packet<>(request.sequenceNo, response);
responseQueue.put(packet);
}

WorkerState rendezvousUninterruptiblyFromHost(HostState signal) {
logMaybe(String.format("LOG %s/host: sending %s uninterruptibly", repositoryName, signal));
while(true) {
try {
return rendezvousFromHost(signal);
} catch (InterruptedException e) {
logMaybe(String.format("LOG %s/host: retrying on interrupt", repositoryName));
}
}
void sendResponseUninterruptibly(Packet<Request> request, Response response) {
logMaybe(String.format("LOG %s/host: sending response uninterruptibly %s", repositoryName, response));
Packet<Response> packet = new Packet<>(request.sequenceNo, response);
Uninterruptibles.putUninterruptibly(responseQueue, packet);
}

/** Join the worker thread.
Expand Down Expand Up @@ -233,24 +223,26 @@ public void close() {
logMaybe(String.format("LOG %s/host: closing with interrupt", repositoryName));
workerThread.interrupt();

// Wait until the worker thread actually gets interrupted. Be resilient to cases where despite
// the interrupt above, the worker thread was already trying to post a restart. I'm not sure if
// that can happen; theoretically, it looks like it shouldn't be but I'm not intimately familiar
// with the exact semantics of thread interruption and it's cheap to be resilient. The important
// part is that in case an interrupt happens, a Success or Failure is eventually posted by the
// worker thread.
SignalLoop:
// Drain the request queue until we get a message that signals that the worker thread will
// quit. Don't let an InterruptedException deter us from joining the worker thread because
// otherwise, it'll be in an unknown state.
MessageLoop:
while (true) {
WorkerState signal = rendezvousUninterruptiblyFromHost(null);
logMaybe(String.format("LOG %s/host: close signal is %s", repositoryName, signal));
switch (signal) {
case WorkerState.Failure unused: break SignalLoop;
case WorkerState.Success unused: {
// In this case, the worker thread expects an answer. */
rendezvousUninterruptiblyFromHost(new TerminationRequested());
break SignalLoop;
Packet<Request> packet = getRequestUninterruptibly();
switch (packet.message) {
case Request.Failure unused: {
sendResponseUninterruptibly(packet, new Response.FailureAcknowledged());
break MessageLoop;
}
case Request.Success unused: {
sendResponseUninterruptibly(packet, new Response.RestartDecision(false));
break MessageLoop;
}

default: {
sendResponseUninterruptibly(packet, new Response.FailureAcknowledged());
continue MessageLoop;
}
default: continue SignalLoop;
}
}

Expand Down
Loading

0 comments on commit e918fe9

Please sign in to comment.