diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 91f9f509f600ed..90c9e45c2a36ff 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -353,7 +353,12 @@ ListenableFuture start() { AtomicLong committedOffset = new AtomicLong(0); return Futures.transformAsync( retrier.executeAsync( - () -> ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff)), + () -> { + if (committedOffset.get() < chunker.getSize()) { + return ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff)); + } + return Futures.immediateFuture(null); + }, progressiveBackoff), (result) -> { long committedSize = committedOffset.get(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index cda7a3aa7819e6..8904e59e20bd37 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -218,7 +218,7 @@ public Chunk next() throws IOException { return new Chunk(blob, offsetBefore); } - private long bytesLeft() { + public long bytesLeft() { return getSize() - getOffset(); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index cd28f561de083c..2b47d949f697d7 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -333,6 +333,65 @@ public void queryWriteStatus( withEmptyMetadata.detach(prevContext); } + @Test + public void concurrentlyCompletedUploadIsNotRetried() throws Exception { + // Test that after an upload has failed and the QueryWriteStatus call returns + // that the upload has completed that we'll not retry the upload. + Context prevContext = withEmptyMetadata.attach(); + RemoteRetrier retrier = + TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService); + ByteStreamUploader uploader = + new ByteStreamUploader( + INSTANCE_NAME, new ReferenceCountedChannel(channel), null, 1, retrier); + + byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; + new Random().nextBytes(blob); + + Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); + HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); + + AtomicInteger numWriteCalls = new AtomicInteger(0); + + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + numWriteCalls.getAndIncrement(); + streamObserver.onError(Status.DEADLINE_EXCEEDED.asException()); + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) {} + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onCompleted() {} + }; + } + + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, StreamObserver response) { + response.onNext( + QueryWriteStatusResponse.newBuilder() + .setCommittedSize(blob.length) + .setComplete(true) + .build()); + response.onCompleted(); + } + }); + + uploader.uploadBlob(hash, chunker, true); + + // This test should not have triggered any retries. + assertThat(numWriteCalls.get()).isEqualTo(1); + + blockUntilInternalStateConsistent(uploader); + + withEmptyMetadata.detach(prevContext); + } + @Test public void unimplementedQueryShouldRestartUpload() throws Exception { Context prevContext = withEmptyMetadata.attach();