From 60eb53bca7348edebdb2b8e6734f6b748e990ec7 Mon Sep 17 00:00:00 2001 From: olaola Date: Thu, 13 Jul 2017 00:00:27 +0200 Subject: [PATCH] Fixing the handling of retries for watch and execute calls. TESTED=remote worker, triggered some errors RELNOTES: fixes #3305, helps #3356 PiperOrigin-RevId: 161722997 --- .../build/lib/remote/GrpcRemoteExecutor.java | 121 +++++++----- .../devtools/build/lib/remote/Retrier.java | 11 ++ .../remote/GrpcRemoteExecutionClientTest.java | 182 +++++++++++------- .../build/lib/remote/RetrierTest.java | 14 ++ .../build/remote/ExecutionServer.java | 32 +-- .../devtools/build/remote/StatusUtils.java | 30 +-- .../devtools/build/remote/WatcherServer.java | 22 ++- 7 files changed, 264 insertions(+), 148 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index ad4a4ddbe7bf15..68046fe2ca7fa8 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -67,8 +67,7 @@ private WatcherBlockingStub watcherBlockingStub() { .withCallCredentials(callCredentials); } - private @Nullable ExecuteResponse getOperationResponse(Operation op) - throws IOException { + private @Nullable ExecuteResponse getOperationResponse(Operation op) throws IOException { if (op.getResultCase() == Operation.ResultCase.ERROR) { StatusRuntimeException e = StatusProto.toStatusRuntimeException(op.getError()); if (e.getStatus().getCode() == Code.DEADLINE_EXCEEDED) { @@ -90,53 +89,83 @@ private WatcherBlockingStub watcherBlockingStub() { return null; } + /* Execute has two components: the execute call and the watch call. + * This is the simple flow without any errors: + * + * - A call to execute returns an Operation object. + * - That Operation may already have an inlined result; if so, we return that result. + * - Otherwise, we call watch on that operation to receive a stream of Changes to the Operation + * object, until the first such change is an Operation with a result, which we return. + * + * Error possibilities: + * - Any Operation object can have an error field instead of a result. Such Operations are + * completed and failed; however, some of these errors may be retriable. These errors should + * trigger a retry of the full execute+watch call, resulting in a new Operation. + * - An execute call may fail with a retriable error (raise a StatusRuntimeException). We then + * retry that call. + * - A watch call may fail with a retriable error (either raise a StatusRuntimeException, or + * return an ERROR in the ChangeBatch field). In that case, we retry the watch call only on the + * same operation object. + * */ public ExecuteResponse executeRemotely(ExecuteRequest request) throws IOException, InterruptedException { - Operation op = retrier.execute(() -> execBlockingStub().execute(request)); - ExecuteResponse resp = getOperationResponse(op); - if (resp != null) { - return resp; - } - Request wr = Request.newBuilder().setTarget(op.getName()).build(); - return retrier.execute( - () -> { - Iterator replies = watcherBlockingStub().watch(wr); - while (replies.hasNext()) { - ChangeBatch cb = replies.next(); - for (Change ch : cb.getChangesList()) { - switch (ch.getState()) { - case INITIAL_STATE_SKIPPED: - continue; - case ERROR: - try { - throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class)); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - case DOES_NOT_EXIST: - // TODO(olaola): either make this retriable, or use a different exception. - throw new IOException( - String.format("Operation %s lost on the remote server.", op.getName())); - case EXISTS: - Operation o; - try { - o = ch.getData().unpack(Operation.class); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - ExecuteResponse r = getOperationResponse(o); - if (r != null) { - return r; - } - continue; - default: - // This can only happen if the enum gets unexpectedly extended. - throw new IOException(String.format("Illegal change state: %s", ch.getState())); + // The only errors retried here are transient failures of the Action itself on the server, not + // any gRPC errors that occurred during the call. + return retrier.execute(() -> { + // Here all transient gRPC errors will be retried. + Operation op = retrier.execute(() -> execBlockingStub().execute(request)); + ExecuteResponse resp = getOperationResponse(op); + if (resp != null) { + return resp; + } + Request wr = Request.newBuilder().setTarget(op.getName()).build(); + // Here all transient gRPC errors will be retried, while transient failures of the Action + // itself will be propagated. + return retrier.execute( + () -> { + Iterator replies = watcherBlockingStub().watch(wr); + while (replies.hasNext()) { + ChangeBatch cb = replies.next(); + for (Change ch : cb.getChangesList()) { + switch (ch.getState()) { + case INITIAL_STATE_SKIPPED: + continue; + case ERROR: + try { + throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class)); + } catch (InvalidProtocolBufferException e) { + throw new IOException(e); + } + case DOES_NOT_EXIST: + // TODO(olaola): either make this retriable, or use a different exception. + throw new IOException( + String.format("Operation %s lost on the remote server.", op.getName())); + case EXISTS: + Operation o; + try { + o = ch.getData().unpack(Operation.class); + } catch (InvalidProtocolBufferException e) { + throw new IOException(e); + } + try { + ExecuteResponse r = getOperationResponse(o); + if (r != null) { + return r; + } + } catch (StatusRuntimeException e) { + // Pass through the Watch retry and retry the whole execute+watch call. + throw new Retrier.PassThroughException(e); + } + continue; + default: + // This can only happen if the enum gets unexpectedly extended. + throw new IOException(String.format("Illegal change state: %s", ch.getState())); + } } } - } - throw new IOException( - String.format("Watch request for %s terminated with no result.", op.getName())); - }); + throw new IOException( + String.format("Watch request for %s terminated with no result.", op.getName())); + }); + }); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java index 33d24221075cd4..4a248a17f44c57 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java @@ -149,6 +149,13 @@ * In both cases you need to instantiate and keep a Backoff object, and use onFailure(s) to retry. */ public class Retrier { + /** Wraps around a StatusRuntimeException to make it pass through a single layer of retries. */ + public static class PassThroughException extends Exception { + public PassThroughException(StatusRuntimeException e) { + super(e); + } + } + /** * Backoff is a stateful object providing a sequence of durations that are used to time delays * between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather @@ -297,6 +304,10 @@ public T execute(Callable c) throws InterruptedException, IOException { while (true) { try { return c.call(); + } catch (PassThroughException e) { + throw (StatusRuntimeException) e.getCause(); + } catch (RetryException e) { + throw e; // Nested retries are always pass-through. } catch (StatusException | StatusRuntimeException e) { onFailure(backoff, Status.fromThrowable(e)); } catch (Exception e) { diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java index 05e4814f87959c..87322359caa9ad 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java @@ -58,6 +58,7 @@ import com.google.longrunning.Operation; import com.google.protobuf.Any; import com.google.protobuf.ByteString; +import com.google.rpc.Code; import com.google.watcher.v1.Change; import com.google.watcher.v1.ChangeBatch; import com.google.watcher.v1.Request; @@ -405,73 +406,112 @@ public void getActionResult( .setStderrRaw(ByteString.copyFromUtf8("stderr")) .build(); final String opName = "operations/xyz"; - serviceRegistry.addService( - new ExecutionImplBase() { - private int numErrors = 4; - @Override - public void execute(ExecuteRequest request, StreamObserver responseObserver) { - if (numErrors-- <= 0) { - responseObserver.onNext(Operation.newBuilder().setName(opName).build()); - responseObserver.onCompleted(); - } else { + ExecutionImplBase mockExecutionImpl = Mockito.mock(ExecutionImplBase.class); + Answer successAnswer = + invocationOnMock -> { + @SuppressWarnings("unchecked") StreamObserver responseObserver = + (StreamObserver) invocationOnMock.getArguments()[1]; + responseObserver.onNext(Operation.newBuilder().setName(opName).build()); + responseObserver.onCompleted(); + return null; + }; + Mockito.doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") StreamObserver responseObserver = + (StreamObserver) invocationOnMock.getArguments()[1]; responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); - } - } - }); - serviceRegistry.addService( - new WatcherImplBase() { - private int numErrors = 4; - - @Override - public void watch(Request request, StreamObserver responseObserver) { - assertThat(request.getTarget()).isEqualTo(opName); - if (numErrors-- > 0) { + return null; + }) + .doAnswer(successAnswer) + .doAnswer(successAnswer) + .when(mockExecutionImpl) + .execute( + Mockito.anyObject(), Mockito.>anyObject()); + serviceRegistry.addService(mockExecutionImpl); + + WatcherImplBase mockWatcherImpl = Mockito.mock(WatcherImplBase.class); + Mockito.doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") StreamObserver responseObserver = + (StreamObserver) invocationOnMock.getArguments()[1]; + // Retry the execution call as well as the watch call. + responseObserver.onNext( + ChangeBatch.newBuilder() + .addChanges( + Change.newBuilder() + .setState(Change.State.EXISTS) + .setData( + Any.pack( + Operation.newBuilder() + .setName(opName) + .setError( + com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .build()) + .build())) + .build()) + .build()); + responseObserver.onCompleted(); + return null; + }) + .doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") StreamObserver responseObserver = + (StreamObserver) invocationOnMock.getArguments()[1]; + // Retry the watch call. responseObserver.onError(Status.UNAVAILABLE.asRuntimeException()); - return; - } - // Some optional initial state. - responseObserver.onNext( - ChangeBatch.newBuilder() - .addChanges( - Change.newBuilder().setState(Change.State.INITIAL_STATE_SKIPPED).build()) - .build()); - // Still executing. - responseObserver.onNext( - ChangeBatch.newBuilder() - .addChanges( - Change.newBuilder() - .setState(Change.State.EXISTS) - .setData(Any.pack(Operation.newBuilder().setName(opName).build())) - .build()) - .addChanges( - Change.newBuilder() - .setState(Change.State.EXISTS) - .setData(Any.pack(Operation.newBuilder().setName(opName).build())) - .build()) - .build()); - // Finished executing. - responseObserver.onNext( - ChangeBatch.newBuilder() - .addChanges( - Change.newBuilder() - .setState(Change.State.EXISTS) - .setData( - Any.pack( - Operation.newBuilder() - .setName(opName) - .setDone(true) - .setResponse( - Any.pack( - ExecuteResponse.newBuilder() - .setResult(actionResult) - .build())) - .build())) - .build()) - .build()); - responseObserver.onCompleted(); - } - }); + return null; + }) + .doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") StreamObserver responseObserver = + (StreamObserver) invocationOnMock.getArguments()[1]; + // Some optional initial state. + responseObserver.onNext( + ChangeBatch.newBuilder() + .addChanges( + Change.newBuilder().setState(Change.State.INITIAL_STATE_SKIPPED).build()) + .build()); + // Still executing. + responseObserver.onNext( + ChangeBatch.newBuilder() + .addChanges( + Change.newBuilder() + .setState(Change.State.EXISTS) + .setData(Any.pack(Operation.newBuilder().setName(opName).build())) + .build()) + .addChanges( + Change.newBuilder() + .setState(Change.State.EXISTS) + .setData(Any.pack(Operation.newBuilder().setName(opName).build())) + .build()) + .build()); + // Finished executing. + responseObserver.onNext( + ChangeBatch.newBuilder() + .addChanges( + Change.newBuilder() + .setState(Change.State.EXISTS) + .setData( + Any.pack( + Operation.newBuilder() + .setName(opName) + .setDone(true) + .setResponse( + Any.pack( + ExecuteResponse.newBuilder() + .setResult(actionResult) + .build())) + .build())) + .build()) + .build()); + responseObserver.onCompleted(); + return null; + }) + .when(mockWatcherImpl) + .watch(Mockito.anyObject(), Mockito.>anyObject()); + serviceRegistry.addService(mockWatcherImpl); final Command command = Command.newBuilder() .addAllArguments(ImmutableList.of("/bin/echo", "Hi!")) @@ -511,10 +551,10 @@ public void findMissingBlobs( ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); when(mockByteStreamImpl.write(Mockito.>anyObject())) - .thenAnswer(blobWriteAnswerError()) // Error on command upload. - .thenAnswer(blobWriteAnswer(command.toByteArray())) // Upload command successfully. - .thenAnswer(blobWriteAnswerError()) // Error on the input file. - .thenAnswer(blobWriteAnswerError()) // Error on the input file again. + .thenAnswer(blobWriteAnswerError()) // Error on command upload. + .thenAnswer(blobWriteAnswer(command.toByteArray())) // Upload command successfully. + .thenAnswer(blobWriteAnswerError()) // Error on the input file. + .thenAnswer(blobWriteAnswerError()) // Error on the input file again. .thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8))); // Upload input file successfully. serviceRegistry.addService(mockByteStreamImpl); @@ -523,5 +563,11 @@ public void findMissingBlobs( assertThat(result.exitCode()).isEqualTo(0); assertThat(outErr.outAsLatin1()).isEqualTo("stdout"); assertThat(outErr.errAsLatin1()).isEqualTo("stderr"); + Mockito.verify(mockExecutionImpl, Mockito.times(3)) + .execute( + Mockito.anyObject(), Mockito.>anyObject()); + Mockito.verify(mockWatcherImpl, Mockito.times(3)) + .watch( + Mockito.anyObject(), Mockito.>anyObject()); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java index 9a8e464914c4fd..d55e2b659db11b 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.Range; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import java.io.IOException; import java.time.Duration; import org.junit.Before; @@ -123,6 +124,19 @@ public void testInterruptedExceptionIsPassedThrough() throws Exception { } } + @Test + public void testPassThroughException() throws Exception { + StatusRuntimeException thrown = Status.Code.UNKNOWN.toStatus().asRuntimeException(); + try { + Retrier.NO_RETRIES.execute(() -> { + throw new Retrier.PassThroughException(thrown); + }); + fail(); + } catch (StatusRuntimeException expected) { + assertThat(expected).isSameAs(thrown); + } + } + @Test public void testIOExceptionIsPassedThrough() throws Exception { IOException thrown = new IOException(); diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java index b33fefb2b05a84..7de862c3245203 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java @@ -45,6 +45,7 @@ import com.google.protobuf.util.Durations; import com.google.rpc.Code; import com.google.rpc.Status; +import io.grpc.StatusException; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import java.io.ByteArrayOutputStream; @@ -122,7 +123,7 @@ public ActionResult call() throws Exception { } private ActionResult execute(ExecuteRequest request, String id) - throws IOException, InterruptedException, CacheNotFoundException { + throws IOException, InterruptedException, StatusException { Path tempRoot = workPath.getRelative("build-" + id); try { tempRoot.createDirectory(); @@ -133,7 +134,7 @@ private ActionResult execute(ExecuteRequest request, String id) request.getTotalInputFileCount(), request.getAction().getOutputFilesCount() }); ActionResult result = execute(request.getAction(), tempRoot); - logger.log(INFO, "Completed {0}.", id); + logger.log(FINE, "Completed {0}.", id); return result; } catch (Exception e) { logger.log(Level.SEVERE, "Work failed.", e); @@ -155,13 +156,18 @@ private ActionResult execute(ExecuteRequest request, String id) } private ActionResult execute(Action action, Path execRoot) - throws IOException, InterruptedException, CacheNotFoundException { + throws IOException, InterruptedException, StatusException { ByteArrayOutputStream stdoutBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream stderrBuffer = new ByteArrayOutputStream(); - com.google.devtools.remoteexecution.v1test.Command command = - com.google.devtools.remoteexecution.v1test.Command.parseFrom( - cache.downloadBlob(action.getCommandDigest())); - cache.downloadTree(action.getInputRootDigest(), execRoot); + com.google.devtools.remoteexecution.v1test.Command command = null; + try { + command = + com.google.devtools.remoteexecution.v1test.Command.parseFrom( + cache.downloadBlob(action.getCommandDigest())); + cache.downloadTree(action.getInputRootDigest(), execRoot); + } catch (CacheNotFoundException e) { + throw StatusUtils.notFoundError(e.getMissingDigest()); + } List outputs = new ArrayList<>(action.getOutputFilesList().size()); for (String output : action.getOutputFilesList()) { @@ -208,7 +214,7 @@ private ActionResult execute(Action action, Path execRoot) "Command:\n%s\nexceeded deadline of %f seconds.", Arrays.toString(command.getArgumentsList().toArray()), timeoutMillis / 1000.0); logger.warning(errMessage); - throw StatusProto.toStatusRuntimeException( + throw StatusProto.toStatusException( Status.newBuilder() .setCode(Code.DEADLINE_EXCEEDED.getNumber()) .setMessage(errMessage) @@ -271,14 +277,16 @@ private long getUid() { // Checks Action for docker container definition. If no docker container specified, returns // null. Otherwise returns docker container name from the parameters. - private String dockerContainer(Action action) { + private String dockerContainer(Action action) throws StatusException { String result = null; for (Platform.Property property : action.getPlatform().getPropertiesList()) { if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) { if (result != null) { // Multiple container name entries - throw new IllegalArgumentException( - "Multiple entries for " + CONTAINER_IMAGE_ENTRY_NAME + " in action.Platform"); + throw StatusUtils.invalidArgumentError( + "platform", // Field name. + String.format( + "Multiple entries for %s in action.Platform", CONTAINER_IMAGE_ENTRY_NAME)); } result = property.getValue(); } @@ -294,7 +302,7 @@ private Command getCommand( Action action, List commandLineElements, Map environmentVariables, - String pathString) { + String pathString) throws StatusException { String container = dockerContainer(action); if (container != null) { // Run command inside a docker container. diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java index cc860dbd72a7ee..494ee58453896f 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java @@ -20,26 +20,28 @@ import com.google.rpc.BadRequest.FieldViolation; import com.google.rpc.Code; import com.google.rpc.Status; -import io.grpc.StatusRuntimeException; +import io.grpc.StatusException; import io.grpc.protobuf.StatusProto; /** Some utility methods to convert exceptions to Status results. */ final class StatusUtils { private StatusUtils() {} - static StatusRuntimeException internalError(Exception e) { - return StatusProto.toStatusRuntimeException(internalErrorStatus(e)); + static StatusException internalError(Exception e) { + return StatusProto.toStatusException(internalErrorStatus(e)); } - static com.google.rpc.Status internalErrorStatus(Exception e) { - return Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) - .setMessage("Internal error: " + e) - .build(); + static Status internalErrorStatus(Exception e) { + // StatusProto.fromThrowable returns null on non-status errors or errors with no trailers, + // unlike Status.fromThrowable which returns the UNKNOWN code for these. + Status st = StatusProto.fromThrowable(e); + return st != null + ? st + : Status.newBuilder().setCode(Code.INTERNAL.getNumber()).setMessage(e.getMessage()).build(); } - static StatusRuntimeException notFoundError(Digest digest) { - return StatusProto.toStatusRuntimeException(notFoundStatus(digest)); + static StatusException notFoundError(Digest digest) { + return StatusProto.toStatusException(notFoundStatus(digest)); } static com.google.rpc.Status notFoundStatus(Digest digest) { @@ -49,8 +51,8 @@ static com.google.rpc.Status notFoundStatus(Digest digest) { .build(); } - static StatusRuntimeException interruptedError(Digest digest) { - return StatusProto.toStatusRuntimeException(interruptedStatus(digest)); + static StatusException interruptedError(Digest digest) { + return StatusProto.toStatusException(interruptedStatus(digest)); } static com.google.rpc.Status interruptedStatus(Digest digest) { @@ -60,8 +62,8 @@ static com.google.rpc.Status interruptedStatus(Digest digest) { .build(); } - static StatusRuntimeException invalidArgumentError(String field, String desc) { - return StatusProto.toStatusRuntimeException(invalidArgumentStatus(field, desc)); + static StatusException invalidArgumentError(String field, String desc) { + return StatusProto.toStatusException(invalidArgumentStatus(field, desc)); } static com.google.rpc.Status invalidArgumentStatus(String field, String desc) { diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java index 1bbd112c15725d..9a242f92f11b2d 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java @@ -74,16 +74,22 @@ public void watch(Request wr, StreamObserver responseObserver) { Throwables.throwIfUnchecked(e.getCause()); throw (Exception) e.getCause(); } - } catch (IllegalArgumentException e) { - responseObserver.onError( - StatusProto.toStatusRuntimeException( - Status.newBuilder() - .setCode(Code.INVALID_ARGUMENT.getNumber()) - .setMessage(e.toString()) - .build())); } catch (Exception e) { logger.log(Level.SEVERE, "Work failed: " + opName, e); - responseObserver.onError(StatusUtils.internalError(e)); + responseObserver.onNext( + ChangeBatch.newBuilder() + .addChanges( + Change.newBuilder() + .setState(Change.State.EXISTS) + .setData( + Any.pack( + Operation.newBuilder() + .setName(opName) + .setError(StatusUtils.internalErrorStatus(e)) + .build())) + .build()) + .build()); + responseObserver.onCompleted(); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); }