Skip to content

Commit

Permalink
[7.1.0] Move the disk cache reads and writes into a thread pool. (#21551
Browse files Browse the repository at this point in the history
)

So that disk cache uploads or downloads that are collectively waited
upon can make progress concurrently instead of sequentially. This
provides a significant performance boost for actions that download or
upload a large number of files.

PiperOrigin-RevId: 612432619
Change-Id: Id4e88b239a5475c49aecb1219045b047c9f5c319
  • Loading branch information
tjgq authored Mar 4, 2024
1 parent f50ac05 commit 1de8f29
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.netty.channel.unix.DomainSocketAddress;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;

/**
Expand All @@ -43,12 +44,14 @@ private RemoteCacheClientFactory() {}
public static RemoteCacheClient createDiskAndRemoteClient(
Path workingDirectory,
PathFragment diskCachePath,
boolean remoteVerifyDownloads,
DigestUtil digestUtil,
ExecutorService executorService,
boolean remoteVerifyDownloads,
RemoteCacheClient remoteCacheClient)
throws IOException {
DiskCacheClient diskCacheClient =
createDiskCache(workingDirectory, diskCachePath, remoteVerifyDownloads, digestUtil);
createDiskCache(
workingDirectory, diskCachePath, digestUtil, executorService, remoteVerifyDownloads);
return new DiskAndRemoteCacheClient(diskCacheClient, remoteCacheClient);
}

Expand All @@ -58,6 +61,7 @@ public static RemoteCacheClient create(
AuthAndTLSOptions authAndTlsOptions,
Path workingDirectory,
DigestUtil digestUtil,
ExecutorService executorService,
RemoteRetrier retrier)
throws IOException {
Preconditions.checkNotNull(workingDirectory, "workingDirectory");
Expand All @@ -69,14 +73,19 @@ public static RemoteCacheClient create(
creds,
authAndTlsOptions,
digestUtil,
executorService,
retrier);
}
if (isHttpCache(options)) {
return createHttp(options, creds, authAndTlsOptions, digestUtil, retrier);
}
if (isDiskCache(options)) {
return createDiskCache(
workingDirectory, options.diskCache, options.remoteVerifyDownloads, digestUtil);
workingDirectory,
options.diskCache,
digestUtil,
executorService,
options.remoteVerifyDownloads);
}
throw new IllegalArgumentException(
"Unrecognized RemoteOptions configuration: remote Http cache URL and/or local disk cache"
Expand Down Expand Up @@ -137,12 +146,13 @@ private static RemoteCacheClient createHttp(
private static DiskCacheClient createDiskCache(
Path workingDirectory,
PathFragment diskCachePath,
boolean verifyDownloads,
DigestUtil digestUtil)
DigestUtil digestUtil,
ExecutorService executorService,
boolean verifyDownloads)
throws IOException {
Path cacheDir =
workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath"));
return new DiskCacheClient(cacheDir, verifyDownloads, digestUtil);
return new DiskCacheClient(cacheDir, digestUtil, executorService, verifyDownloads);
}

private static RemoteCacheClient createDiskAndHttpCache(
Expand All @@ -152,11 +162,17 @@ private static RemoteCacheClient createDiskAndHttpCache(
Credentials cred,
AuthAndTLSOptions authAndTlsOptions,
DigestUtil digestUtil,
ExecutorService executorService,
RemoteRetrier retrier)
throws IOException {
RemoteCacheClient httpCache = createHttp(options, cred, authAndTlsOptions, digestUtil, retrier);
return createDiskAndRemoteClient(
workingDirectory, diskCachePath, options.remoteVerifyDownloads, digestUtil, httpCache);
workingDirectory,
diskCachePath,
digestUtil,
executorService,
options.remoteVerifyDownloads,
httpCache);
}

public static boolean isDiskCache(RemoteOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ private void initHttpAndDiskCache(
Credentials credentials,
AuthAndTLSOptions authAndTlsOptions,
RemoteOptions remoteOptions,
DigestUtil digestUtil) {
DigestUtil digestUtil,
ExecutorService executorService) {
RemoteCacheClient cacheClient;
try {
cacheClient =
Expand All @@ -227,6 +228,7 @@ private void initHttpAndDiskCache(
authAndTlsOptions,
Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"),
digestUtil,
executorService,
new RemoteRetrier(
remoteOptions, RETRIABLE_HTTP_ERRORS, retryScheduler, Retrier.ALLOW_ALL_CALLS));
} catch (IOException e) {
Expand Down Expand Up @@ -391,7 +393,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}

if ((enableHttpCache || enableDiskCache) && !enableGrpcCache) {
initHttpAndDiskCache(env, credentials, authAndTlsOptions, remoteOptions, digestUtil);
initHttpAndDiskCache(
env, credentials, authAndTlsOptions, remoteOptions, digestUtil, executorService);
return;
}

Expand Down Expand Up @@ -524,8 +527,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
RemoteCacheClientFactory.createDiskAndRemoteClient(
env.getWorkingDirectory(),
remoteOptions.diskCache,
remoteOptions.remoteVerifyDownloads,
digestUtil,
executorService,
remoteOptions.remoteVerifyDownloads,
cacheClient);
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
Expand Down Expand Up @@ -583,8 +587,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
RemoteCacheClientFactory.createDiskAndRemoteClient(
env.getWorkingDirectory(),
remoteOptions.diskCache,
remoteOptions.remoteVerifyDownloads,
digestUtil,
executorService,
remoteOptions.remoteVerifyDownloads,
cacheClient);
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.disk;

import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction;

import build.bazel.remote.execution.v2.ActionCacheUpdateCapabilities;
Expand All @@ -29,6 +29,7 @@
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.exec.SpawnCheckingCacheEvent;
import com.google.devtools.build.lib.remote.Store;
Expand All @@ -48,6 +49,7 @@
import java.io.OutputStream;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;

/**
Expand All @@ -66,17 +68,20 @@ public class DiskCacheClient implements RemoteCacheClient {
SpawnCheckingCacheEvent.create("disk-cache");

private final Path root;
private final ListeningExecutorService executorService;
private final boolean verifyDownloads;
private final DigestUtil digestUtil;

/**
* @param verifyDownloads whether verify the digest of downloaded content are the same as the
* digest used to index that file.
*/
public DiskCacheClient(Path root, boolean verifyDownloads, DigestUtil digestUtil)
public DiskCacheClient(
Path root, DigestUtil digestUtil, ExecutorService executorService, boolean verifyDownloads)
throws IOException {
this.verifyDownloads = verifyDownloads;
this.digestUtil = digestUtil;
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.verifyDownloads = verifyDownloads;

if (isOldStyleDigestFunction(digestUtil.getDigestFunction())) {
this.root = root;
Expand Down Expand Up @@ -124,18 +129,17 @@ void captureFile(Path src, Digest digest, Store store) throws IOException {
}

private ListenableFuture<Void> download(Digest digest, OutputStream out, Store store) {
Path path = toPath(digest, store);
try {
if (!refresh(path)) {
return immediateFailedFuture(new CacheNotFoundException(digest));
}
try (InputStream in = path.getInputStream()) {
ByteStreams.copy(in, out);
return immediateFuture(null);
}
} catch (IOException e) {
return immediateFailedFuture(e);
}
return executorService.submit(
() -> {
Path path = toPath(digest, store);
if (!refresh(path)) {
throw new CacheNotFoundException(digest);
}
try (InputStream in = path.getInputStream()) {
ByteStreams.copy(in, out);
}
return null;
});
}

@Override
Expand All @@ -156,7 +160,7 @@ public ListenableFuture<Void> downloadBlob(
return Futures.immediateFailedFuture(e);
}
},
MoreExecutors.directExecutor());
directExecutor());
}

private void checkDigestExists(Digest digest) throws IOException {
Expand Down Expand Up @@ -254,18 +258,19 @@ public ListenableFuture<CachedActionResult> downloadActionResult(

return immediateFuture(CachedActionResult.disk(actionResult));
},
MoreExecutors.directExecutor());
directExecutor());
}

@Override
public ListenableFuture<Void> uploadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {
try (InputStream data = actionResult.toByteString().newInput()) {
saveFile(actionKey.getDigest(), Store.AC, data);
return immediateFuture(null);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return executorService.submit(
() -> {
try (InputStream data = actionResult.toByteString().newInput()) {
saveFile(actionKey.getDigest(), Store.AC, data);
}
return null;
});
}

@Override
Expand All @@ -274,23 +279,25 @@ public void close() {}
@Override
public ListenableFuture<Void> uploadFile(
RemoteActionExecutionContext context, Digest digest, Path file) {
try (InputStream in = file.getInputStream()) {
saveFile(digest, Store.CAS, in);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return immediateFuture(null);
return executorService.submit(
() -> {
try (InputStream in = file.getInputStream()) {
saveFile(digest, Store.CAS, in);
}
return null;
});
}

@Override
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
try (InputStream in = data.newInput()) {
saveFile(digest, Store.CAS, in);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return immediateFuture(null);
return executorService.submit(
() -> {
try (InputStream in = data.newInput()) {
saveFile(digest, Store.CAS, in);
}
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -45,6 +46,8 @@
public class RemoteCacheClientFactoryTest {
private final DigestUtil digestUtil =
new DigestUtil(SyscallCache.NO_CACHE, DigestHashFunction.SHA256);
private static final ExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));

private RemoteOptions remoteOptions;
private final AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class);
Expand Down Expand Up @@ -79,6 +82,7 @@ public void createCombinedCacheWithExistingWorkingDirectory() throws IOException
authAndTlsOptions,
workingDirectory,
digestUtil,
executorService,
retrier);

assertThat(blobStore).isInstanceOf(DiskAndRemoteCacheClient.class);
Expand All @@ -97,6 +101,7 @@ public void createCombinedCacheWithNotExistingWorkingDirectory() throws IOExcept
authAndTlsOptions,
workingDirectory,
digestUtil,
executorService,
retrier);

assertThat(blobStore).isInstanceOf(DiskAndRemoteCacheClient.class);
Expand All @@ -118,6 +123,7 @@ public void createCombinedCacheWithMissingWorkingDirectoryShouldThrowException()
authAndTlsOptions,
/* workingDirectory= */ null,
digestUtil,
executorService,
retrier));
}

Expand All @@ -133,6 +139,7 @@ public void createHttpCacheWithProxy() throws IOException {
authAndTlsOptions,
workingDirectory,
digestUtil,
executorService,
retrier);

assertThat(blobStore).isInstanceOf(HttpCacheClient.class);
Expand All @@ -153,6 +160,7 @@ public void createHttpCacheFailsWithUnsupportedProxyProtocol() {
authAndTlsOptions,
workingDirectory,
digestUtil,
executorService,
retrier)))
.hasMessageThat()
.contains("Remote cache proxy unsupported: bad-proxy");
Expand All @@ -169,6 +177,7 @@ public void createHttpCacheWithoutProxy() throws IOException {
authAndTlsOptions,
workingDirectory,
digestUtil,
executorService,
retrier);

assertThat(blobStore).isInstanceOf(HttpCacheClient.class);
Expand All @@ -185,6 +194,7 @@ public void createDiskCache() throws IOException {
authAndTlsOptions,
workingDirectory,
digestUtil,
executorService,
retrier);

assertThat(blobStore).isInstanceOf(DiskCacheClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs",
"//src/test/java/com/google/devtools/build/lib:test_runner",
"//third_party:guava",
"//third_party:junit4",
"//third_party:mockito",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
Expand Down
Loading

0 comments on commit 1de8f29

Please sign in to comment.