Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework ByteStreamUploader early return logic. #17791

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ascii;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -238,6 +236,16 @@ private ListenableFuture<Void> startAsyncUpload(
return currUpload;
}

/**
* Signal that the blob already exists on the server, so upload should complete early but
* successfully.
*/
private static final class AlreadyExists extends Exception {
private AlreadyExists() {
super();
}
}

private static final class AsyncUpload implements AsyncCallable<Long> {
private final RemoteActionExecutionContext context;
private final ReferenceCountedChannel channel;
Expand Down Expand Up @@ -269,28 +277,26 @@ private static final class AsyncUpload implements AsyncCallable<Long> {
}

ListenableFuture<Void> start() {
return Futures.transformAsync(
Utils.refreshIfUnauthenticatedAsync(
() -> retrier.executeAsync(this, progressiveBackoff), callCredentialsProvider),
committedSize -> {
try {
checkCommittedSize(committedSize);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return immediateVoidFuture();
},
return Futures.catching(
Futures.transformAsync(
Utils.refreshIfUnauthenticatedAsync(
() -> retrier.executeAsync(this, progressiveBackoff), callCredentialsProvider),
committedSize -> {
try {
checkCommittedSize(committedSize);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return immediateVoidFuture();
},
MoreExecutors.directExecutor()),
AlreadyExists.class,
ae -> null,
MoreExecutors.directExecutor());
}

/** Check the committed_size the server returned makes sense after a successful full upload. */
private void checkCommittedSize(long committedSize) throws IOException {
// Only check for matching committed size if we have completed the upload. If another client
// did, they might have used a different compression level/algorithm, so we cannot know the
// expected committed offset
if (chunker.hasNext()) {
return;
}

long expected = chunker.getOffset();

if (committedSize == expected) {
Expand Down Expand Up @@ -329,9 +335,6 @@ public ListenableFuture<Long> call() {
firstAttempt ? Futures.immediateFuture(0L) : query(),
committedSize -> {
if (!firstAttempt) {
if (chunker.getSize() == committedSize) {
return Futures.immediateFuture(committedSize);
}
if (committedSize > lastCommittedOffset) {
// We have made progress on this upload in the last request. Reset the backoff so
// that this request has a full deck of retries
Expand Down Expand Up @@ -362,15 +365,18 @@ private ByteStreamStub bsAsyncStub(Channel channel) {

private ListenableFuture<Long> query() {
ListenableFuture<Long> committedSizeFuture =
Futures.transform(
Futures.transformAsync(
channel.withChannelFuture(
channel ->
bsFutureStub(channel)
.queryWriteStatus(
QueryWriteStatusRequest.newBuilder()
.setResourceName(resourceName)
.build())),
QueryWriteStatusResponse::getCommittedSize,
r ->
r.getComplete()
? Futures.immediateFailedFuture(new AlreadyExists())
: Futures.immediateFuture(r.getCommittedSize()),
MoreExecutors.directExecutor());
return Futures.catchingAsync(
committedSizeFuture,
Expand All @@ -392,24 +398,7 @@ private ListenableFuture<Long> upload(long pos) {
channel -> {
SettableFuture<Long> uploadResult = SettableFuture.create();
bsAsyncStub(channel).write(new Writer(resourceName, chunker, pos, uploadResult));
return Futures.catchingAsync(
uploadResult,
Throwable.class,
throwable -> {
Preconditions.checkNotNull(throwable);

Status status = Status.fromThrowable(throwable);
switch (status.getCode()) {
case ALREADY_EXISTS:
// Server indicated the blob already exists, so we translate the error to a
// successful upload.
return Futures.immediateFuture(chunker.getSize());

default:
return Futures.immediateFailedFuture(throwable);
}
},
MoreExecutors.directExecutor());
return uploadResult;
});
}
}
Expand All @@ -423,6 +412,7 @@ private static final class Writer
private long committedSize = -1;
private ClientCallStreamObserver<WriteRequest> requestObserver;
private boolean first = true;
private boolean finishedWriting;

private Writer(
String resourceName, Chunker chunker, long pos, SettableFuture<Long> uploadResult) {
Expand All @@ -447,10 +437,6 @@ public void beforeStart(ClientCallStreamObserver<WriteRequest> requestObserver)

@Override
public void run() {
if (committedSize != -1) {
requestObserver.cancel("server has returned early", null);
return;
}
while (requestObserver.isReady()) {
WriteRequest.Builder request = WriteRequest.newBuilder();
if (first) {
Expand All @@ -477,6 +463,7 @@ public void run() {
.build());
if (isLastChunk) {
requestObserver.onCompleted();
finishedWriting = true;
return;
}
}
Expand Down Expand Up @@ -515,12 +502,22 @@ public void onNext(WriteResponse response) {

@Override
public void onCompleted() {
uploadResult.set(committedSize);
if (finishedWriting) {
uploadResult.set(committedSize);
} else {
// Server completed succesfully before we finished writing all the data, meaning the blob
// already exists. The server is supposed to set committed_size to the size of the blob (for
// uncompressed uploads) or -1 (for compressed uploads), but we do not verify this.
requestObserver.cancel("server has returned early", null);
uploadResult.setException(new AlreadyExists());
}
}

@Override
public void onError(Throwable t) {
uploadResult.setException(t);
requestObserver.cancel("failed", t);
uploadResult.setException(
(Status.fromThrowable(t).getCode() == Code.ALREADY_EXISTS) ? new AlreadyExists() : t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,66 @@ public void queryWriteStatus(
Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts();
}

@Test
public void progressiveCompressedUploadSeesAlreadyExistsAtTheEnd() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(
() -> new FixedBackoff(1, 0),
e -> Status.fromThrowable(e).getCode() == Code.INTERNAL,
retryService);
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
referenceCountedChannel,
CallCredentialsProvider.NO_CREDENTIALS,
300,
retrier,
/* maximumOpenFiles= */ -1);

int chunkSize = 1024;
int skipSize = chunkSize + 1;
byte[] blob = new byte[chunkSize * 2 + 1];
new Random().nextBytes(blob);

Chunker chunker =
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build();
Digest digest = DIGEST_UTIL.compute(blob);

serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}

@Override
public void onError(Throwable throwable) {
fail("onError should never be called.");
}

@Override
public void onCompleted() {
streamObserver.onError(Status.INTERNAL.asException());
}
};
}

@Override
public void queryWriteStatus(
QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
response.onNext(
QueryWriteStatusResponse.newBuilder()
.setCommittedSize(blob.length)
.setComplete(true)
.build());
response.onCompleted();
}
});

uploader.uploadBlob(context, digest, chunker);
}

@Test
public void concurrentlyCompletedUploadIsNotRetried() throws Exception {
// Test that after an upload has failed and the QueryWriteStatus call returns
Expand Down Expand Up @@ -610,8 +670,7 @@ public void queryWriteStatus(

@Test
public void earlyWriteResponseShouldCompleteUpload() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> mockBackoff, e -> false, retryService);
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
Expand Down Expand Up @@ -701,8 +760,7 @@ public void onCompleted() {

@Test
public void incorrectCommittedSizeDoesNotFailIncompleteUpload() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> mockBackoff, e -> false, retryService);
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
Expand Down