diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java index 46070133aecfa9..64f1bddf3b4c46 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java @@ -59,15 +59,22 @@ public static RemoteCacheClient create( @Nullable Credentials creds, AuthAndTLSOptions authAndTlsOptions, Path workingDirectory, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { Preconditions.checkNotNull(workingDirectory, "workingDirectory"); if (isHttpCache(options) && isDiskCache(options)) { return createDiskAndHttpCache( - workingDirectory, options.diskCache, options, creds, authAndTlsOptions, digestUtil); + workingDirectory, + options.diskCache, + options, + creds, + authAndTlsOptions, + digestUtil, + retrier); } if (isHttpCache(options)) { - return createHttp(options, creds, authAndTlsOptions, digestUtil); + return createHttp(options, creds, authAndTlsOptions, digestUtil, retrier); } if (isDiskCache(options)) { return createDiskCache( @@ -90,7 +97,8 @@ private static RemoteCacheClient createHttp( RemoteOptions options, Credentials creds, AuthAndTLSOptions authAndTlsOptions, - DigestUtil digestUtil) { + DigestUtil digestUtil, + RemoteRetrier retrier) { Preconditions.checkNotNull(options.remoteCache, "remoteCache"); try { @@ -109,6 +117,7 @@ private static RemoteCacheClient createHttp( options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds, authAndTlsOptions); } else { @@ -122,6 +131,7 @@ private static RemoteCacheClient createHttp( options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds, authAndTlsOptions); } @@ -148,9 +158,10 @@ private static RemoteCacheClient createDiskAndHttpCache( RemoteOptions options, Credentials cred, AuthAndTLSOptions authAndTlsOptions, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { - RemoteCacheClient httpCache = createHttp(options, cred, authAndTlsOptions, digestUtil); + RemoteCacheClient httpCache = createHttp(options, cred, authAndTlsOptions, digestUtil, retrier); return createDiskAndRemoteClient( workingDirectory, diskCachePath, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 18db228f3e254b..eb8a970384575c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -66,6 +66,7 @@ import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader; +import com.google.devtools.build.lib.remote.http.HttpException; import com.google.devtools.build.lib.remote.logging.LoggingInterceptor; import com.google.devtools.build.lib.remote.options.RemoteBuildEventUploadMode; import com.google.devtools.build.lib.remote.options.RemoteOptions; @@ -105,10 +106,13 @@ import io.grpc.Channel; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; +import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.channels.ClosedChannelException; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -116,6 +120,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Predicate; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -221,6 +226,36 @@ private static ServerCapabilities getAndVerifyServerCapabilities( return capabilities; } + public static final Predicate RETRIABLE_HTTP_ERRORS = + e -> { + boolean retry = false; + if (e instanceof ClosedChannelException) { + retry = true; + } else if (e instanceof HttpException) { + int status = ((HttpException) e).response().status().code(); + retry = + status == HttpResponseStatus.INTERNAL_SERVER_ERROR.code() + || status == HttpResponseStatus.BAD_GATEWAY.code() + || status == HttpResponseStatus.SERVICE_UNAVAILABLE.code() + || status == HttpResponseStatus.GATEWAY_TIMEOUT.code(); + } else if (e instanceof IOException) { + String msg = e.getMessage().toLowerCase(); + if (msg.contains("connection reset by peer")) { + retry = true; + } else if (msg.contains("operation timed out")) { + retry = true; + } + } else { + // Workaround for a netty bug: https://github.com/netty/netty/issues/11815. Remove this + // once it is fixed in the upstream. + if (e instanceof DecoderException + && e.getMessage().endsWith("functions:OPENSSL_internal:BAD_DECRYPT")) { + retry = true; + } + } + return retry; + }; + private void initHttpAndDiskCache( CommandEnvironment env, Credentials credentials, @@ -235,7 +270,9 @@ private void initHttpAndDiskCache( credentials, authAndTlsOptions, Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"), - digestUtil); + digestUtil, + new RemoteRetrier( + remoteOptions, RETRIABLE_HTTP_ERRORS, retryScheduler, Retrier.ALLOW_ALL_CALLS)); } catch (IOException e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD index 86b527cd6219f1..fb05bc0c1e5d37 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD @@ -20,6 +20,7 @@ java_library( deps = [ "//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info", "//src/main/java/com/google/devtools/build/lib/authandtls", + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/util", diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java index a2e4abf9d83eb1..d68e7733f95cbc 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java @@ -25,12 +25,18 @@ final class DownloadCommand { private final boolean casDownload; private final Digest digest; private final OutputStream out; + private final long offset; - DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out, long offset) { this.uri = Preconditions.checkNotNull(uri); this.casDownload = casDownload; this.digest = Preconditions.checkNotNull(digest); this.out = Preconditions.checkNotNull(out); + this.offset = offset; + } + + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + this(uri, casDownload, digest, out, 0); } public URI uri() { @@ -48,4 +54,8 @@ public Digest digest() { public OutputStream out() { return out; } + + public long offset() { + return offset; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 6c6d4cc6f897fc..ff9b37ef16f542 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.remote.RemoteRetrier; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; @@ -81,8 +82,10 @@ import java.util.List; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -129,6 +132,7 @@ public final class HttpCacheClient implements RemoteCacheClient { private final boolean useTls; private final boolean verifyDownloads; private final DigestUtil digestUtil; + private final RemoteRetrier retrier; private final Object closeLock = new Object(); @@ -150,6 +154,7 @@ public static HttpCacheClient create( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions) throws Exception { @@ -162,6 +167,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, null); @@ -175,6 +181,7 @@ public static HttpCacheClient create( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions) throws Exception { @@ -189,6 +196,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, domainSocketAddress); @@ -202,6 +210,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, domainSocketAddress); @@ -219,6 +228,7 @@ private HttpCacheClient( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions, @Nullable SocketAddress socketAddress) @@ -284,6 +294,7 @@ public void channelCreated(Channel ch) { this.extraHttpHeaders = extraHttpHeaders; this.verifyDownloads = verifyDownloads; this.digestUtil = digestUtil; + this.retrier = retrier; } @SuppressWarnings("FutureReturnValueIgnored") @@ -441,8 +452,11 @@ public ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out) { final DigestOutputStream digestOut = verifyDownloads ? digestUtil.newDigestOutputStream(out) : null; + final AtomicLong casBytesDownloaded = new AtomicLong(); return Futures.transformAsync( - get(digest, digestOut != null ? digestOut : out, /* casDownload= */ true), + retrier.executeAsync( + () -> + get(digest, digestOut != null ? digestOut : out, Optional.of(casBytesDownloaded))), (v) -> { try { if (digestOut != null) { @@ -458,7 +472,8 @@ public ListenableFuture downloadBlob( } @SuppressWarnings("FutureReturnValueIgnored") - private ListenableFuture get(Digest digest, final OutputStream out, boolean casDownload) { + private ListenableFuture get( + Digest digest, final OutputStream out, Optional casBytesDownloaded) { final AtomicBoolean dataWritten = new AtomicBoolean(); OutputStream wrappedOut = new OutputStream() { @@ -469,12 +484,18 @@ private ListenableFuture get(Digest digest, final OutputStream out, boolea @Override public void write(byte[] b, int offset, int length) throws IOException { dataWritten.set(true); + if (casBytesDownloaded.isPresent()) { + casBytesDownloaded.get().addAndGet(length); + } out.write(b, offset, length); } @Override public void write(int b) throws IOException { dataWritten.set(true); + if (casBytesDownloaded.isPresent()) { + casBytesDownloaded.get().incrementAndGet(); + } out.write(b); } @@ -483,7 +504,12 @@ public void flush() throws IOException { out.flush(); } }; - DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut); + long offset = 0; + if (casBytesDownloaded.isPresent()) { + offset = casBytesDownloaded.get().get(); + } + DownloadCommand downloadCmd = + new DownloadCommand(uri, casBytesDownloaded.isPresent(), digest, wrappedOut, offset); SettableFuture outerF = SettableFuture.create(); acquireDownloadChannel() .addListener( @@ -575,8 +601,11 @@ private void getAfterCredentialRefresh(DownloadCommand cmd, SettableFuture public ListenableFuture downloadActionResult( RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) { return Futures.transform( - Utils.downloadAsActionResult( - actionKey, (digest, out) -> get(digest, out, /* casDownload= */ false)), + retrier.executeAsync( + () -> + Utils.downloadAsActionResult( + actionKey, + (digest, out) -> get(digest, out, /* casBytesDownloaded= */ Optional.empty()))), CachedActionResult::remote, MoreExecutors.directExecutor()); } @@ -670,20 +699,28 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadFile( RemoteActionExecutionContext context, Digest digest, Path file) { - try { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); - } catch (IOException e) { - // Can be thrown from file.getInputStream. - return Futures.immediateFailedFuture(e); - } + return retrier.executeAsync( + () -> { + try { + return uploadAsync( + digest.getHash(), + digest.getSizeBytes(), + file.getInputStream(), + /* casUpload= */ true); + } catch (IOException e) { + // Can be thrown from file.getInputStream. + return Futures.immediateFailedFuture(e); + } + }); } @Override public ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, ByteString data) { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true); + return retrier.executeAsync( + () -> + uploadAsync( + digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true)); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java index 50d83d138a1d66..a6ecc0c14543ab 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java @@ -51,6 +51,8 @@ final class HttpDownloadHandler extends AbstractHttpHandler { private long contentLength = -1; /** the path header in the http request */ private String path; + /** the bytes to skip in a full or chunked response */ + private long skipBytes; public HttpDownloadHandler( Credentials credentials, ImmutableList> extraHttpHeaders) { @@ -105,6 +107,19 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex ByteBuf content = ((HttpContent) msg).content(); int readableBytes = content.readableBytes(); + if (skipBytes > 0) { + int skipNow; + if (skipBytes < readableBytes) { + // readableBytes is an int, meaning skipBytes < readableBytes <= INT_MAX. + // So, this conversion is safe. + skipNow = (int) skipBytes; + } else { + skipNow = readableBytes; + } + content.readerIndex(content.readerIndex() + skipNow); + skipBytes -= skipNow; + readableBytes = readableBytes - skipNow; + } content.readBytes(out, readableBytes); bytesReceived += readableBytes; if (msg instanceof LastHttpContent) { @@ -137,6 +152,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) DownloadCommand cmd = (DownloadCommand) msg; out = cmd.out(); path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload()); + skipBytes = cmd.offset(); HttpRequest request = buildRequest(path, constructHost(cmd.uri())); addCredentialHeaders(request, cmd.uri()); addExtraRemoteHeaders(request); diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java index 89fde56046a8d6..6a2bfd5a50b246 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java @@ -18,7 +18,7 @@ import java.io.IOException; /** An exception that propagates the http status. */ -final class HttpException extends IOException { +public final class HttpException extends IOException { private final HttpResponse response; HttpException(HttpResponse response, String message, Throwable cause) { diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java index 4a44a5acc7e688..1d762664b5c6e5 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java @@ -17,6 +17,8 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; @@ -32,6 +34,7 @@ import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.devtools.common.options.Options; import java.io.IOException; +import java.util.concurrent.Executors; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,6 +50,14 @@ public class RemoteCacheClientFactoryTest { private final AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); private Path workingDirectory; private InMemoryFileSystem fs; + private ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + private RemoteRetrier retrier = + new RemoteRetrier( + () -> RemoteRetrier.RETRIES_DISABLED, + (e) -> false, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); @Before public final void setUp() { @@ -63,7 +74,12 @@ public void createCombinedCacheWithExistingWorkingDirectory() throws IOException RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, + /* creds= */ null, + authAndTlsOptions, + workingDirectory, + digestUtil, + retrier); assertThat(blobStore).isInstanceOf(DiskAndRemoteCacheClient.class); } @@ -76,7 +92,12 @@ public void createCombinedCacheWithNotExistingWorkingDirectory() throws IOExcept RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, + /* creds= */ null, + authAndTlsOptions, + workingDirectory, + digestUtil, + retrier); assertThat(blobStore).isInstanceOf(DiskAndRemoteCacheClient.class); assertThat(workingDirectory.exists()).isTrue(); @@ -96,7 +117,8 @@ public void createCombinedCacheWithMissingWorkingDirectoryShouldThrowException() /* creds= */ null, authAndTlsOptions, /* workingDirectory= */ null, - digestUtil)); + digestUtil, + retrier)); } @Test @@ -106,7 +128,12 @@ public void createHttpCacheWithProxy() throws IOException { RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, + /* creds= */ null, + authAndTlsOptions, + workingDirectory, + digestUtil, + retrier); assertThat(blobStore).isInstanceOf(HttpCacheClient.class); } @@ -125,7 +152,8 @@ public void createHttpCacheFailsWithUnsupportedProxyProtocol() { /* creds= */ null, authAndTlsOptions, workingDirectory, - digestUtil))) + digestUtil, + retrier))) .hasMessageThat() .contains("Remote cache proxy unsupported: bad-proxy"); } @@ -136,7 +164,12 @@ public void createHttpCacheWithoutProxy() throws IOException { RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, + /* creds= */ null, + authAndTlsOptions, + workingDirectory, + digestUtil, + retrier); assertThat(blobStore).isInstanceOf(HttpCacheClient.class); } @@ -147,7 +180,12 @@ public void createDiskCache() throws IOException { RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, + /* creds= */ null, + authAndTlsOptions, + workingDirectory, + digestUtil, + retrier); assertThat(blobStore).isInstanceOf(DiskCacheClient.class); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD index 1486dda102a960..d0274727b8fca9 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD @@ -21,6 +21,7 @@ java_test( test_class = "com.google.devtools.build.lib.AllTests", deps = [ "//src/main/java/com/google/devtools/build/lib/authandtls", + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/http", "//src/main/java/com/google/devtools/build/lib/remote/util", diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index 41cddce1827849..cfac498e07b677 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -27,13 +27,18 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import build.bazel.remote.execution.v2.ActionResult; import build.bazel.remote.execution.v2.Digest; import com.google.auth.Credentials; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.remote.RemoteRetrier; +import com.google.devtools.build.lib.remote.Retrier; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.vfs.DigestHashFunction; @@ -44,6 +49,7 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -66,9 +72,13 @@ import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpServerCodec; @@ -82,14 +92,16 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; +import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.function.IntFunction; import javax.annotation.Nullable; import org.junit.Before; @@ -155,6 +167,7 @@ interface TestServer { private static final class InetTestServer implements TestServer { + @Override public ServerChannel start(ChannelInboundHandler handler) { return createServer( NioServerSocketChannel.class, @@ -163,6 +176,7 @@ public ServerChannel start(ChannelInboundHandler handler) { handler); } + @Override public void stop(ServerChannel serverChannel) { try { serverChannel.close(); @@ -207,12 +221,14 @@ protected void initChannel(Channel ch) { } } + @Override public ServerChannel start(ChannelInboundHandler handler) { reset(this.serverChannel); this.handler = handler; return this.serverChannel; } + @Override public void stop(ServerChannel serverChannel) { // Note: In the tests, we expect that connecting to a closed server channel results // in a channel connection error. Netty doesn't seem to handle closing domain socket @@ -230,7 +246,7 @@ public void stop(ServerChannel serverChannel) { } @Parameters - public static Collection createInputValues() { + public static List createInputValues() { ArrayList parameters = new ArrayList(Arrays.asList(new Object[][] {{new InetTestServer()}})); @@ -262,9 +278,21 @@ private HttpCacheClient createHttpBlobStore( int timeoutSeconds, boolean remoteVerifyDownloads, @Nullable final Credentials creds, - AuthAndTLSOptions authAndTlsOptions) + AuthAndTLSOptions authAndTlsOptions, + Optional optRetrier) throws Exception { SocketAddress socketAddress = serverChannel.localAddress(); + RemoteRetrier retrier = + optRetrier.orElseGet( + () -> { + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + return new RemoteRetrier( + () -> RemoteRetrier.RETRIES_DISABLED, + (e) -> false, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); + }); if (socketAddress instanceof DomainSocketAddress) { DomainSocketAddress domainSocketAddress = (DomainSocketAddress) socketAddress; URI uri = new URI("http://localhost"); @@ -276,6 +304,7 @@ private HttpCacheClient createHttpBlobStore( remoteVerifyDownloads, ImmutableList.of(), DIGEST_UTIL, + retrier, creds, authAndTlsOptions); } else if (socketAddress instanceof InetSocketAddress) { @@ -288,6 +317,7 @@ private HttpCacheClient createHttpBlobStore( remoteVerifyDownloads, ImmutableList.of(), DIGEST_UTIL, + retrier, creds, authAndTlsOptions); } else { @@ -303,7 +333,12 @@ private HttpCacheClient createHttpBlobStore( AuthAndTLSOptions authAndTlsOptions) throws Exception { return createHttpBlobStore( - serverChannel, timeoutSeconds, /* remoteVerifyDownloads= */ true, creds, authAndTlsOptions); + serverChannel, + timeoutSeconds, + /* remoteVerifyDownloads= */ true, + creds, + authAndTlsOptions, + Optional.empty()); } @Before @@ -373,7 +408,7 @@ protected void channelRead0( AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials, authAndTlsOptions); - byte[] data = "File Contents".getBytes(Charsets.US_ASCII); + byte[] data = "File Contents".getBytes(StandardCharsets.US_ASCII); assertThrows( UploadTimeoutException.class, () -> @@ -443,7 +478,7 @@ protected void channelRead0( AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials, authAndTlsOptions); - ByteString data = ByteString.copyFrom("File Contents", Charsets.US_ASCII); + ByteString data = ByteString.copyFrom("File Contents", StandardCharsets.US_ASCII); IOException e = assertThrows( IOException.class, @@ -489,8 +524,9 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) /* timeoutSeconds= */ 1, /* remoteVerifyDownloads= */ true, credentials, - authAndTlsOptions); - Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(Charsets.UTF_8)); + authAndTlsOptions, + Optional.empty()); + Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(StandardCharsets.UTF_8)); try (OutputStream out = new ByteArrayOutputStream()) { IOException e = assertThrows( @@ -536,17 +572,144 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) /* timeoutSeconds= */ 1, /* remoteVerifyDownloads= */ false, credentials, - authAndTlsOptions); - Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(Charsets.UTF_8)); + authAndTlsOptions, + Optional.empty()); + Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(StandardCharsets.UTF_8)); try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { getFromFuture(blobStore.downloadBlob(remoteActionExecutionContext, fooDigest, out)); - assertThat(out.toByteArray()).isEqualTo("bar".getBytes(Charsets.UTF_8)); + assertThat(out.toByteArray()).isEqualTo("bar".getBytes(StandardCharsets.UTF_8)); } } finally { testServer.stop(server); } } + @Test + public void partialDownloadFailsWithoutRetry() throws Exception { + ServerChannel server = null; + try { + ByteBuf chunk1 = Unpooled.wrappedBuffer("File ".getBytes(StandardCharsets.US_ASCII)); + ByteBuf chunk2 = Unpooled.wrappedBuffer("Contents".getBytes(StandardCharsets.US_ASCII)); + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk2)); + Credentials credentials = newCredentials(); + AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + + HttpCacheClient blobStore = + createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials, authAndTlsOptions); + assertThrows( + ClosedChannelException.class, + () -> + getFromFuture( + blobStore.downloadBlob( + remoteActionExecutionContext, DIGEST, new ByteArrayOutputStream()))); + } finally { + testServer.stop(server); + } + } + + @Test + public void partialDownloadSucceedsWithRetry() throws Exception { + ServerChannel server = null; + try { + ByteBuf chunk1 = Unpooled.wrappedBuffer("File ".getBytes(StandardCharsets.US_ASCII)); + // Replace first chunk to test that the client skips the redundant prefix on retry. + ByteBuf chunk1Attempt2 = Unpooled.wrappedBuffer("abcde".getBytes(StandardCharsets.US_ASCII)); + ByteBuf chunk2 = Unpooled.wrappedBuffer("Contents".getBytes(StandardCharsets.US_ASCII)); + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk1Attempt2, chunk2)); + Credentials credentials = newCredentials(); + AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + RemoteRetrier retrier = + new RemoteRetrier( + () -> new Retrier.ZeroBackoff(1), + (e) -> { + return e instanceof ClosedChannelException; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); + HttpCacheClient blobStore = + createHttpBlobStore( + server, + /* timeoutSeconds= */ 1, + /* remoteVerifyDownloads= */ false, + credentials, + authAndTlsOptions, + Optional.of(retrier)); + + ByteArrayOutputStream download = new ByteArrayOutputStream(); + getFromFuture(blobStore.downloadBlob(remoteActionExecutionContext, DIGEST, download)); + assertThat(download.toByteArray()) + .isEqualTo("File Contents".getBytes(StandardCharsets.US_ASCII)); + } finally { + testServer.stop(server); + } + } + + @Test + public void actionResultRetryReadsFromStart() throws Exception { + ServerChannel server = null; + try { + ActionResult.Builder builder1 = ActionResult.newBuilder(); + builder1 + .addOutputFilesBuilder() + .setPath("attempt1/filename") + .setDigest(DIGEST_UTIL.computeAsUtf8("digest1")) + .setIsExecutable(true); + ActionResult action1 = builder1.build(); + ByteArrayOutputStream buffer1 = new ByteArrayOutputStream(); + action1.writeTo(buffer1); + int splitAt = buffer1.size() / 2; + ByteBuf chunk1 = Unpooled.copiedBuffer(buffer1.toByteArray(), 0, splitAt); + + // Replace first chunk to test that the client starts a fresh ActionResult download on retry. + ActionResult.Builder builder2 = ActionResult.newBuilder(); + builder2 + .addOutputFilesBuilder() + .setPath("attempt2/filename") + .setDigest(DIGEST_UTIL.computeAsUtf8("digest2")) + .setIsExecutable(false); + ActionResult action2 = builder2.build(); + ByteArrayOutputStream buffer2 = new ByteArrayOutputStream(); + action2.writeTo(buffer2); + ByteBuf chunk1Attempt2 = Unpooled.copiedBuffer(buffer2.toByteArray(), 0, splitAt); + ByteBuf chunk2 = + Unpooled.copiedBuffer(buffer2.toByteArray(), splitAt, buffer2.size() - splitAt); + + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk1Attempt2, chunk2)); + Credentials credentials = newCredentials(); + AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + RemoteRetrier retrier = + new RemoteRetrier( + () -> new Retrier.ZeroBackoff(1), + (e) -> { + return e instanceof ClosedChannelException; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); + HttpCacheClient blobStore = + createHttpBlobStore( + server, + /* timeoutSeconds= */ 1, + /* remoteVerifyDownloads= */ false, + credentials, + authAndTlsOptions, + Optional.of(retrier)); + + RemoteCacheClient.CachedActionResult download = + getFromFuture( + blobStore.downloadActionResult( + remoteActionExecutionContext, new RemoteCacheClient.ActionKey(DIGEST), false)); + assertThat(download.actionResult()).isEqualTo(action2); + } finally { + testServer.stop(server); + } + } + @Test public void expiredAuthTokensShouldBeRetried_get() throws Exception { expiredAuthTokensShouldBeRetried_get( @@ -567,7 +730,7 @@ private void expiredAuthTokensShouldBeRetried_get( createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials, authAndTlsOptions); ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); getFromFuture(blobStore.downloadBlob(remoteActionExecutionContext, DIGEST, out)); - assertThat(out.toString(Charsets.US_ASCII.name())).isEqualTo("File Contents"); + assertThat(out.toString(StandardCharsets.US_ASCII.name())).isEqualTo("File Contents"); verify(credentials, times(1)).refresh(); verify(credentials, times(2)).getRequestMetadata(any(URI.class)); verify(credentials, times(2)).hasRequestMetadata(); @@ -597,7 +760,7 @@ private void expiredAuthTokensShouldBeRetried_put( AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials, authAndTlsOptions); - byte[] data = "File Contents".getBytes(Charsets.US_ASCII); + byte[] data = "File Contents".getBytes(StandardCharsets.US_ASCII); blobStore .uploadBlob( remoteActionExecutionContext, DIGEST_UTIL.compute(data), ByteString.copyFrom(data)) @@ -753,7 +916,7 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) return; } ByteBuf content = ctx.alloc().buffer(); - content.writeCharSequence("File Contents", Charsets.US_ASCII); + content.writeCharSequence("File Contents", StandardCharsets.US_ASCII); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); HttpUtil.setKeepAlive(response, true); @@ -769,4 +932,44 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) } } } + + /** + * {@link ChannelHandler} that on the first request returns a partial response and then closes the + * stream, and on any further requests returns a full response. + */ + @Sharable + static class IntermittentFailureHandler extends SimpleChannelInboundHandler { + private final ByteBuf attempt1Chunk1; + private final ByteBuf attempt2Chunk1; + private final ByteBuf attempt2Chunk2; + private int messageCount; + + public IntermittentFailureHandler( + ByteBuf attempt1Chunk1, ByteBuf attempt2Chunk1, ByteBuf attempt2Chunk2) { + this.attempt1Chunk1 = attempt1Chunk1; + this.attempt2Chunk1 = attempt2Chunk1; + this.attempt2Chunk2 = attempt2Chunk2; + } + + public IntermittentFailureHandler(ByteBuf chunk1, ByteBuf chunk2) { + this(chunk1.copy(), chunk1, chunk2); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { + DefaultHttpResponse response = + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.write(response); + if (messageCount == 0) { + ctx.writeAndFlush(new DefaultHttpContent(attempt1Chunk1)) + .addListener(ChannelFutureListener.CLOSE); + } else { + ctx.writeAndFlush(new DefaultHttpContent(attempt2Chunk1)); + ctx.writeAndFlush(new DefaultLastHttpContent(attempt2Chunk2)) + .addListener(ChannelFutureListener.CLOSE); + } + ++messageCount; + } + } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java index 35eecfbd90bcd6..22bb088cd32682 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java @@ -162,4 +162,56 @@ public void httpErrorsWithContentAreSupported() throws IOException { verify(out, never()).close(); assertThat(ch.isOpen()).isFalse(); } + + /** Test that the handler correctly supports downloads at an offset, e.g. on retry. */ + @Test + public void downloadAtOffsetShouldWork() throws IOException { + EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of())); + ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); + DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, DIGEST, out, 2); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(cmd, writePromise); + + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaders.CONTENT_LENGTH, 5); + response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ch.writeInbound(response); + ByteBuf content = Unpooled.buffer(); + content.writeBytes(new byte[] {1, 2, 3, 4, 5}); + ch.writeInbound(new DefaultLastHttpContent(content)); + + assertThat(writePromise.isDone()).isTrue(); + assertThat(out.toByteArray()).isEqualTo(new byte[] {3, 4, 5}); + verify(out, never()).close(); + assertThat(ch.isActive()).isTrue(); + } + + /** Test that the handler correctly supports chunked downloads at an offset, e.g. on retry. */ + @Test + public void chunkedDownloadAtOffsetShouldWork() throws IOException { + EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of())); + ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); + DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, DIGEST, out, 3); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(cmd, writePromise); + + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaders.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ch.writeInbound(response); + ByteBuf content1 = Unpooled.buffer(); + content1.writeBytes(new byte[] {1, 2}); + ch.writeInbound(new DefaultHttpContent(content1)); + ByteBuf content2 = Unpooled.buffer(); + content2.writeBytes(new byte[] {3, 4}); + ch.writeInbound(new DefaultHttpContent(content2)); + ByteBuf content3 = Unpooled.buffer(); + content3.writeBytes(new byte[] {5}); + ch.writeInbound(new DefaultLastHttpContent(content3)); + + assertThat(writePromise.isDone()).isTrue(); + assertThat(out.toByteArray()).isEqualTo(new byte[] {4, 5}); + verify(out, never()).close(); + assertThat(ch.isActive()).isTrue(); + } }