From a91fe7f25f52ac45fcbb1110b1c01e28b5cf4ad2 Mon Sep 17 00:00:00 2001 From: Benjamin Peterson Date: Wed, 15 Mar 2023 14:08:56 -0700 Subject: [PATCH] Rework ByteStreamUploader early return logic. There are several points where ByteStreamUploader may discover that the server already has the blob fully uploaded. These points tried to effect an early return from the upload code, generally by "lying" to higher layers that the upload fully finished. That could lead to bugs. For example, consider the added test case: A compressed upload completes its writing but receives an error instead of a server ACK. On retry, QueryWriteStatus reveals the blob exists and returns its uncompressed size. This confused the checkCommittedSize logic, which expected the final committed size of a compressed upload to be the total compressed data size or -1. The code added by https://github.com/bazelbuild/bazel/commit/daa3dbe22adb03338c75b53ea97954c9434099b4 also looks broken in the case of compressed uploads. Rework the uploader code, so that early returns throw a AlreadyExists exception. The exception control flow naturally reflects the desire to escape quickly to the top level. --- .../build/lib/remote/ByteStreamUploader.java | 95 +++++++++---------- .../lib/remote/ByteStreamUploaderTest.java | 66 ++++++++++++- 2 files changed, 108 insertions(+), 53 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 106207d6f10e69..5a52c7f312a814 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -25,12 +25,10 @@ import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub; import com.google.bytestream.ByteStreamGrpc.ByteStreamStub; import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; -import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse; import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ascii; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.util.concurrent.AsyncCallable; import com.google.common.util.concurrent.Futures; @@ -238,6 +236,16 @@ private ListenableFuture startAsyncUpload( return currUpload; } + /** + * Signal that the blob already exists on the server, so upload should complete early but + * successfully. + */ + private static final class AlreadyExists extends Exception { + private AlreadyExists() { + super(); + } + } + private static final class AsyncUpload implements AsyncCallable { private final RemoteActionExecutionContext context; private final ReferenceCountedChannel channel; @@ -269,28 +277,26 @@ private static final class AsyncUpload implements AsyncCallable { } ListenableFuture start() { - return Futures.transformAsync( - Utils.refreshIfUnauthenticatedAsync( - () -> retrier.executeAsync(this, progressiveBackoff), callCredentialsProvider), - committedSize -> { - try { - checkCommittedSize(committedSize); - } catch (IOException e) { - return Futures.immediateFailedFuture(e); - } - return immediateVoidFuture(); - }, + return Futures.catching( + Futures.transformAsync( + Utils.refreshIfUnauthenticatedAsync( + () -> retrier.executeAsync(this, progressiveBackoff), callCredentialsProvider), + committedSize -> { + try { + checkCommittedSize(committedSize); + } catch (IOException e) { + return Futures.immediateFailedFuture(e); + } + return immediateVoidFuture(); + }, + MoreExecutors.directExecutor()), + AlreadyExists.class, + ae -> null, MoreExecutors.directExecutor()); } + /** Check the committed_size the server returned makes sense after a successful full upload. */ private void checkCommittedSize(long committedSize) throws IOException { - // Only check for matching committed size if we have completed the upload. If another client - // did, they might have used a different compression level/algorithm, so we cannot know the - // expected committed offset - if (chunker.hasNext()) { - return; - } - long expected = chunker.getOffset(); if (committedSize == expected) { @@ -329,9 +335,6 @@ public ListenableFuture call() { firstAttempt ? Futures.immediateFuture(0L) : query(), committedSize -> { if (!firstAttempt) { - if (chunker.getSize() == committedSize) { - return Futures.immediateFuture(committedSize); - } if (committedSize > lastCommittedOffset) { // We have made progress on this upload in the last request. Reset the backoff so // that this request has a full deck of retries @@ -362,7 +365,7 @@ private ByteStreamStub bsAsyncStub(Channel channel) { private ListenableFuture query() { ListenableFuture committedSizeFuture = - Futures.transform( + Futures.transformAsync( channel.withChannelFuture( channel -> bsFutureStub(channel) @@ -370,7 +373,10 @@ private ListenableFuture query() { QueryWriteStatusRequest.newBuilder() .setResourceName(resourceName) .build())), - QueryWriteStatusResponse::getCommittedSize, + r -> + r.getComplete() + ? Futures.immediateFailedFuture(new AlreadyExists()) + : Futures.immediateFuture(r.getCommittedSize()), MoreExecutors.directExecutor()); return Futures.catchingAsync( committedSizeFuture, @@ -392,24 +398,7 @@ private ListenableFuture upload(long pos) { channel -> { SettableFuture uploadResult = SettableFuture.create(); bsAsyncStub(channel).write(new Writer(resourceName, chunker, pos, uploadResult)); - return Futures.catchingAsync( - uploadResult, - Throwable.class, - throwable -> { - Preconditions.checkNotNull(throwable); - - Status status = Status.fromThrowable(throwable); - switch (status.getCode()) { - case ALREADY_EXISTS: - // Server indicated the blob already exists, so we translate the error to a - // successful upload. - return Futures.immediateFuture(chunker.getSize()); - - default: - return Futures.immediateFailedFuture(throwable); - } - }, - MoreExecutors.directExecutor()); + return uploadResult; }); } } @@ -423,6 +412,7 @@ private static final class Writer private long committedSize = -1; private ClientCallStreamObserver requestObserver; private boolean first = true; + private boolean finishedWriting; private Writer( String resourceName, Chunker chunker, long pos, SettableFuture uploadResult) { @@ -447,10 +437,6 @@ public void beforeStart(ClientCallStreamObserver requestObserver) @Override public void run() { - if (committedSize != -1) { - requestObserver.cancel("server has returned early", null); - return; - } while (requestObserver.isReady()) { WriteRequest.Builder request = WriteRequest.newBuilder(); if (first) { @@ -477,6 +463,7 @@ public void run() { .build()); if (isLastChunk) { requestObserver.onCompleted(); + finishedWriting = true; return; } } @@ -515,12 +502,22 @@ public void onNext(WriteResponse response) { @Override public void onCompleted() { - uploadResult.set(committedSize); + if (finishedWriting) { + uploadResult.set(committedSize); + } else { + // Server completed succesfully before we finished writing all the data, meaning the blob + // already exists. The server is supposed to set committed_size to the size of the blob (for + // uncompressed uploads) or -1 (for compressed uploads), but we do not verify this. + requestObserver.cancel("server has returned early", null); + uploadResult.setException(new AlreadyExists()); + } } @Override public void onError(Throwable t) { - uploadResult.setException(t); + requestObserver.cancel("failed", t); + uploadResult.setException( + (Status.fromThrowable(t).getCode() == Code.ALREADY_EXISTS) ? new AlreadyExists() : t); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 4831e7c0e108b7..53efb95f23255d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -478,6 +478,66 @@ public void queryWriteStatus( Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts(); } + @Test + public void progressiveCompressedUploadSeesAlreadyExistsAtTheEnd() throws Exception { + RemoteRetrier retrier = + TestUtils.newRemoteRetrier( + () -> new FixedBackoff(1, 0), + e -> Status.fromThrowable(e).getCode() == Code.INTERNAL, + retryService); + ByteStreamUploader uploader = + new ByteStreamUploader( + INSTANCE_NAME, + referenceCountedChannel, + CallCredentialsProvider.NO_CREDENTIALS, + 300, + retrier, + /* maximumOpenFiles= */ -1); + + int chunkSize = 1024; + int skipSize = chunkSize + 1; + byte[] blob = new byte[chunkSize * 2 + 1]; + new Random().nextBytes(blob); + + Chunker chunker = + Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build(); + Digest digest = DIGEST_UTIL.compute(blob); + + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) {} + + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } + + @Override + public void onCompleted() { + streamObserver.onError(Status.INTERNAL.asException()); + } + }; + } + + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, StreamObserver response) { + response.onNext( + QueryWriteStatusResponse.newBuilder() + .setCommittedSize(blob.length) + .setComplete(true) + .build()); + response.onCompleted(); + } + }); + + uploader.uploadBlob(context, digest, chunker); + } + @Test public void concurrentlyCompletedUploadIsNotRetried() throws Exception { // Test that after an upload has failed and the QueryWriteStatus call returns @@ -610,8 +670,7 @@ public void queryWriteStatus( @Test public void earlyWriteResponseShouldCompleteUpload() throws Exception { - RemoteRetrier retrier = - TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService); + RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> mockBackoff, e -> false, retryService); ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, @@ -701,8 +760,7 @@ public void onCompleted() { @Test public void incorrectCommittedSizeDoesNotFailIncompleteUpload() throws Exception { - RemoteRetrier retrier = - TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService); + RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> mockBackoff, e -> false, retryService); ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME,