Skip to content

Commit

Permalink
Fixing the handling of retries for watch and execute calls.
Browse files Browse the repository at this point in the history
TESTED=remote worker, triggered some errors
RELNOTES: fixes #3305, helps #3356
PiperOrigin-RevId: 161722997
  • Loading branch information
olaola authored and laszlocsomor committed Jul 13, 2017
1 parent f626144 commit 60eb53b
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ private WatcherBlockingStub watcherBlockingStub() {
.withCallCredentials(callCredentials);
}

private @Nullable ExecuteResponse getOperationResponse(Operation op)
throws IOException {
private @Nullable ExecuteResponse getOperationResponse(Operation op) throws IOException {
if (op.getResultCase() == Operation.ResultCase.ERROR) {
StatusRuntimeException e = StatusProto.toStatusRuntimeException(op.getError());
if (e.getStatus().getCode() == Code.DEADLINE_EXCEEDED) {
Expand All @@ -90,53 +89,83 @@ private WatcherBlockingStub watcherBlockingStub() {
return null;
}

/* Execute has two components: the execute call and the watch call.
* This is the simple flow without any errors:
*
* - A call to execute returns an Operation object.
* - That Operation may already have an inlined result; if so, we return that result.
* - Otherwise, we call watch on that operation to receive a stream of Changes to the Operation
* object, until the first such change is an Operation with a result, which we return.
*
* Error possibilities:
* - Any Operation object can have an error field instead of a result. Such Operations are
* completed and failed; however, some of these errors may be retriable. These errors should
* trigger a retry of the full execute+watch call, resulting in a new Operation.
* - An execute call may fail with a retriable error (raise a StatusRuntimeException). We then
* retry that call.
* - A watch call may fail with a retriable error (either raise a StatusRuntimeException, or
* return an ERROR in the ChangeBatch field). In that case, we retry the watch call only on the
* same operation object.
* */
public ExecuteResponse executeRemotely(ExecuteRequest request)
throws IOException, InterruptedException {
Operation op = retrier.execute(() -> execBlockingStub().execute(request));
ExecuteResponse resp = getOperationResponse(op);
if (resp != null) {
return resp;
}
Request wr = Request.newBuilder().setTarget(op.getName()).build();
return retrier.execute(
() -> {
Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr);
while (replies.hasNext()) {
ChangeBatch cb = replies.next();
for (Change ch : cb.getChangesList()) {
switch (ch.getState()) {
case INITIAL_STATE_SKIPPED:
continue;
case ERROR:
try {
throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
case DOES_NOT_EXIST:
// TODO(olaola): either make this retriable, or use a different exception.
throw new IOException(
String.format("Operation %s lost on the remote server.", op.getName()));
case EXISTS:
Operation o;
try {
o = ch.getData().unpack(Operation.class);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
ExecuteResponse r = getOperationResponse(o);
if (r != null) {
return r;
}
continue;
default:
// This can only happen if the enum gets unexpectedly extended.
throw new IOException(String.format("Illegal change state: %s", ch.getState()));
// The only errors retried here are transient failures of the Action itself on the server, not
// any gRPC errors that occurred during the call.
return retrier.execute(() -> {
// Here all transient gRPC errors will be retried.
Operation op = retrier.execute(() -> execBlockingStub().execute(request));
ExecuteResponse resp = getOperationResponse(op);
if (resp != null) {
return resp;
}
Request wr = Request.newBuilder().setTarget(op.getName()).build();
// Here all transient gRPC errors will be retried, while transient failures of the Action
// itself will be propagated.
return retrier.execute(
() -> {
Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr);
while (replies.hasNext()) {
ChangeBatch cb = replies.next();
for (Change ch : cb.getChangesList()) {
switch (ch.getState()) {
case INITIAL_STATE_SKIPPED:
continue;
case ERROR:
try {
throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class));
} catch (InvalidProtocolBufferException e) {
throw new IOException(e);
}
case DOES_NOT_EXIST:
// TODO(olaola): either make this retriable, or use a different exception.
throw new IOException(
String.format("Operation %s lost on the remote server.", op.getName()));
case EXISTS:
Operation o;
try {
o = ch.getData().unpack(Operation.class);
} catch (InvalidProtocolBufferException e) {
throw new IOException(e);
}
try {
ExecuteResponse r = getOperationResponse(o);
if (r != null) {
return r;
}
} catch (StatusRuntimeException e) {
// Pass through the Watch retry and retry the whole execute+watch call.
throw new Retrier.PassThroughException(e);
}
continue;
default:
// This can only happen if the enum gets unexpectedly extended.
throw new IOException(String.format("Illegal change state: %s", ch.getState()));
}
}
}
}
throw new IOException(
String.format("Watch request for %s terminated with no result.", op.getName()));
});
throw new IOException(
String.format("Watch request for %s terminated with no result.", op.getName()));
});
});
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/Retrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@
* In both cases you need to instantiate and keep a Backoff object, and use onFailure(s) to retry.
*/
public class Retrier {
/** Wraps around a StatusRuntimeException to make it pass through a single layer of retries. */
public static class PassThroughException extends Exception {
public PassThroughException(StatusRuntimeException e) {
super(e);
}
}

/**
* Backoff is a stateful object providing a sequence of durations that are used to time delays
* between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather
Expand Down Expand Up @@ -297,6 +304,10 @@ public <T> T execute(Callable<T> c) throws InterruptedException, IOException {
while (true) {
try {
return c.call();
} catch (PassThroughException e) {
throw (StatusRuntimeException) e.getCause();
} catch (RetryException e) {
throw e; // Nested retries are always pass-through.
} catch (StatusException | StatusRuntimeException e) {
onFailure(backoff, Status.fromThrowable(e));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.google.longrunning.Operation;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import com.google.watcher.v1.Change;
import com.google.watcher.v1.ChangeBatch;
import com.google.watcher.v1.Request;
Expand Down Expand Up @@ -405,73 +406,112 @@ public void getActionResult(
.setStderrRaw(ByteString.copyFromUtf8("stderr"))
.build();
final String opName = "operations/xyz";
serviceRegistry.addService(
new ExecutionImplBase() {
private int numErrors = 4;

@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
if (numErrors-- <= 0) {
responseObserver.onNext(Operation.newBuilder().setName(opName).build());
responseObserver.onCompleted();
} else {
ExecutionImplBase mockExecutionImpl = Mockito.mock(ExecutionImplBase.class);
Answer<Void> successAnswer =
invocationOnMock -> {
@SuppressWarnings("unchecked") StreamObserver<Operation> responseObserver =
(StreamObserver<Operation>) invocationOnMock.getArguments()[1];
responseObserver.onNext(Operation.newBuilder().setName(opName).build());
responseObserver.onCompleted();
return null;
};
Mockito.doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked") StreamObserver<Operation> responseObserver =
(StreamObserver<Operation>) invocationOnMock.getArguments()[1];
responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
}
}
});
serviceRegistry.addService(
new WatcherImplBase() {
private int numErrors = 4;

@Override
public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) {
assertThat(request.getTarget()).isEqualTo(opName);
if (numErrors-- > 0) {
return null;
})
.doAnswer(successAnswer)
.doAnswer(successAnswer)
.when(mockExecutionImpl)
.execute(
Mockito.<ExecuteRequest>anyObject(), Mockito.<StreamObserver<Operation>>anyObject());
serviceRegistry.addService(mockExecutionImpl);

WatcherImplBase mockWatcherImpl = Mockito.mock(WatcherImplBase.class);
Mockito.doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked") StreamObserver<ChangeBatch> responseObserver =
(StreamObserver<ChangeBatch>) invocationOnMock.getArguments()[1];
// Retry the execution call as well as the watch call.
responseObserver.onNext(
ChangeBatch.newBuilder()
.addChanges(
Change.newBuilder()
.setState(Change.State.EXISTS)
.setData(
Any.pack(
Operation.newBuilder()
.setName(opName)
.setError(
com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.build())
.build()))
.build())
.build());
responseObserver.onCompleted();
return null;
})
.doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked") StreamObserver<ChangeBatch> responseObserver =
(StreamObserver<ChangeBatch>) invocationOnMock.getArguments()[1];
// Retry the watch call.
responseObserver.onError(Status.UNAVAILABLE.asRuntimeException());
return;
}
// Some optional initial state.
responseObserver.onNext(
ChangeBatch.newBuilder()
.addChanges(
Change.newBuilder().setState(Change.State.INITIAL_STATE_SKIPPED).build())
.build());
// Still executing.
responseObserver.onNext(
ChangeBatch.newBuilder()
.addChanges(
Change.newBuilder()
.setState(Change.State.EXISTS)
.setData(Any.pack(Operation.newBuilder().setName(opName).build()))
.build())
.addChanges(
Change.newBuilder()
.setState(Change.State.EXISTS)
.setData(Any.pack(Operation.newBuilder().setName(opName).build()))
.build())
.build());
// Finished executing.
responseObserver.onNext(
ChangeBatch.newBuilder()
.addChanges(
Change.newBuilder()
.setState(Change.State.EXISTS)
.setData(
Any.pack(
Operation.newBuilder()
.setName(opName)
.setDone(true)
.setResponse(
Any.pack(
ExecuteResponse.newBuilder()
.setResult(actionResult)
.build()))
.build()))
.build())
.build());
responseObserver.onCompleted();
}
});
return null;
})
.doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked") StreamObserver<ChangeBatch> responseObserver =
(StreamObserver<ChangeBatch>) invocationOnMock.getArguments()[1];
// Some optional initial state.
responseObserver.onNext(
ChangeBatch.newBuilder()
.addChanges(
Change.newBuilder().setState(Change.State.INITIAL_STATE_SKIPPED).build())
.build());
// Still executing.
responseObserver.onNext(
ChangeBatch.newBuilder()
.addChanges(
Change.newBuilder()
.setState(Change.State.EXISTS)
.setData(Any.pack(Operation.newBuilder().setName(opName).build()))
.build())
.addChanges(
Change.newBuilder()
.setState(Change.State.EXISTS)
.setData(Any.pack(Operation.newBuilder().setName(opName).build()))
.build())
.build());
// Finished executing.
responseObserver.onNext(
ChangeBatch.newBuilder()
.addChanges(
Change.newBuilder()
.setState(Change.State.EXISTS)
.setData(
Any.pack(
Operation.newBuilder()
.setName(opName)
.setDone(true)
.setResponse(
Any.pack(
ExecuteResponse.newBuilder()
.setResult(actionResult)
.build()))
.build()))
.build())
.build());
responseObserver.onCompleted();
return null;
})
.when(mockWatcherImpl)
.watch(Mockito.<Request>anyObject(), Mockito.<StreamObserver<ChangeBatch>>anyObject());
serviceRegistry.addService(mockWatcherImpl);
final Command command =
Command.newBuilder()
.addAllArguments(ImmutableList.of("/bin/echo", "Hi!"))
Expand Down Expand Up @@ -511,10 +551,10 @@ public void findMissingBlobs(

ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
.thenAnswer(blobWriteAnswerError()) // Error on command upload.
.thenAnswer(blobWriteAnswer(command.toByteArray())) // Upload command successfully.
.thenAnswer(blobWriteAnswerError()) // Error on the input file.
.thenAnswer(blobWriteAnswerError()) // Error on the input file again.
.thenAnswer(blobWriteAnswerError()) // Error on command upload.
.thenAnswer(blobWriteAnswer(command.toByteArray())) // Upload command successfully.
.thenAnswer(blobWriteAnswerError()) // Error on the input file.
.thenAnswer(blobWriteAnswerError()) // Error on the input file again.
.thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8))); // Upload input file successfully.
serviceRegistry.addService(mockByteStreamImpl);

Expand All @@ -523,5 +563,11 @@ public void findMissingBlobs(
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
Mockito.verify(mockExecutionImpl, Mockito.times(3))
.execute(
Mockito.<ExecuteRequest>anyObject(), Mockito.<StreamObserver<Operation>>anyObject());
Mockito.verify(mockWatcherImpl, Mockito.times(3))
.watch(
Mockito.<Request>anyObject(), Mockito.<StreamObserver<ChangeBatch>>anyObject());
}
}
Loading

0 comments on commit 60eb53b

Please sign in to comment.