Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.1.0] Compute output directories in parallel when building the upload manifest. #21386

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 211 additions & 91 deletions src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,45 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
import static java.util.Comparator.comparing;
import static java.util.Comparator.naturalOrder;
import static java.util.Comparator.reverseOrder;

import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.CacheCapabilities;
import build.bazel.remote.execution.v2.Command;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import build.bazel.remote.execution.v2.FileNode;
import build.bazel.remote.execution.v2.OutputSymlink;
import build.bazel.remote.execution.v2.SymlinkAbsolutePathStrategy;
import build.bazel.remote.execution.v2.SymlinkNode;
import build.bazel.remote.execution.v2.Tree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
Expand Down Expand Up @@ -66,25 +80,28 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/** UploadManifest adds output metadata to a {@link ActionResult}. */
public class UploadManifest {

private final DigestUtil digestUtil;
private final RemotePathResolver remotePathResolver;
private final ActionResult.Builder result;
private final boolean followSymlinks;
private final boolean allowDanglingSymlinks;
private final boolean allowAbsoluteSymlinks;
private final Map<Digest, Path> digestToFile = new HashMap<>();
private final Map<Digest, ByteString> digestToBlobs = new HashMap<>();
private final ConcurrentHashMap<Digest, Path> digestToFile = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Digest, ByteString> digestToBlobs = new ConcurrentHashMap<>();
@Nullable private ActionKey actionKey;
private Digest stderrDigest;
private Digest stdoutDigest;
Expand All @@ -102,7 +119,7 @@ public static UploadManifest create(
int exitCode,
Instant startTime,
int wallTimeInMs)
throws ExecException, IOException {
throws ExecException, IOException, InterruptedException {
ActionResult.Builder result = ActionResult.newBuilder();
result.setExitCode(exitCode);

Expand Down Expand Up @@ -208,7 +225,7 @@ private void setStdoutStderr(FileOutErr outErr) throws IOException {
* after execution, both for cache hits and misses.
*/
@VisibleForTesting
void addFiles(Collection<Path> files) throws ExecException, IOException {
void addFiles(Collection<Path> files) throws ExecException, IOException, InterruptedException {
// TODO(tjgq): Non-dangling absolute symlinks are uploaded as the file or directory they point
// to even when followSymlinks is false. This is inconsistent with the treatment of relative
// symlinks, but fixing it would require an incompatible change.
Expand Down Expand Up @@ -341,107 +358,210 @@ private void addFile(Digest digest, Path file) throws IOException {
digestToFile.put(digest, file);
}

// Field numbers of the 'root' and 'directory' fields in the Tree message.
private static final int TREE_ROOT_FIELD_NUMBER =
Tree.getDescriptor().findFieldByName("root").getNumber();
private static final int TREE_CHILDREN_FIELD_NUMBER =
Tree.getDescriptor().findFieldByName("children").getNumber();
private static final class WrappedException extends RuntimeException {
private final Exception wrapped;

private void addDirectory(Path dir) throws ExecException, IOException {
LinkedHashSet<ByteString> directories = new LinkedHashSet<>();
var ignored = computeDirectory(dir, directories);

// Convert individual Directory messages to a Tree message. As we want the
// records to be topologically sorted (parents before children), we iterate
// over the directories in reverse insertion order.
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(byteArrayOutputStream);
int fieldNumber = TREE_ROOT_FIELD_NUMBER;
for (ByteString directory : Lists.reverse(new ArrayList<ByteString>(directories))) {
codedOutputStream.writeBytes(fieldNumber, directory);
fieldNumber = TREE_CHILDREN_FIELD_NUMBER;
WrappedException(Exception wrapped) {
super(wrapped);
this.wrapped = wrapped;
}
codedOutputStream.flush();

ByteString data = ByteString.copyFrom(byteArrayOutputStream.toByteArray());
Digest digest = digestUtil.compute(data.toByteArray());

if (result != null) {
result
.addOutputDirectoriesBuilder()
.setPath(remotePathResolver.localPathToOutputPath(dir))
.setTreeDigest(digest)
.setIsTopologicallySorted(true);
Exception unwrap() {
return wrapped;
}

digestToBlobs.put(digest, data);
}

/** A thread pool shared by all {@link DirectoryBuilder} instances. */
private static final ForkJoinPool VISITOR_POOL =
NamedForkJoinPool.newNamedPool(
"upload-manifest-directory-visitor", Runtime.getRuntime().availableProcessors());

/**
* Computes the {@link Directory} message describing the transitive contents of a directory.
*
* @param path the directory path.
* @param directories an output parameter accepting the wire-format {@link Directory} messages for
* every visited (sub-)directory of {@code path}, including {@code path} itself, in a
* deterministic topological order (children before parents).
* @return the wire-format {@link Directory} message for {@code path}.
* A {@link DirectoryBuilder} constructs a {@link Tree} message for an output directory, doing as
* much as possible in parallel.
*/
private ByteString computeDirectory(Path path, LinkedHashSet<ByteString> directories)
throws ExecException, IOException {
Directory.Builder b = Directory.newBuilder();

List<Dirent> sortedDirent = new ArrayList<>(path.readdir(Symlinks.NOFOLLOW));
sortedDirent.sort(Comparator.comparing(Dirent::getName));

for (Dirent dirent : sortedDirent) {
String name = dirent.getName();
Path child = path.getRelative(name);
if (dirent.getType() == Dirent.Type.FILE) {
Digest digest = digestUtil.compute(child);
b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(true);
digestToFile.put(digest, child);
continue;
private class DirectoryBuilder extends AbstractQueueVisitor {
private final Path rootDir;

// Directories found during the traversal, including the root.
// Sorted in reverse so that children iterate before parents.
private final SortedSet<Path> dirs =
Collections.synchronizedSortedSet(new TreeSet<Path>(reverseOrder()));

// Maps each directory found during the traversal to its subdirectories.
private final SortedSetMultimap<Path, Path> dirToSubdirs =
Multimaps.synchronizedSortedSetMultimap(TreeMultimap.create());

// Maps each directory found during the traversal to its files.
private final SortedSetMultimap<Path, FileNode> dirToFiles =
Multimaps.synchronizedSortedSetMultimap(
TreeMultimap.<Path, FileNode>create(naturalOrder(), comparing(FileNode::getName)));

// Maps each directory found during the traversal to its symlinks.
private final SortedSetMultimap<Path, SymlinkNode> dirToSymlinks =
Multimaps.synchronizedSortedSetMultimap(
TreeMultimap.<Path, SymlinkNode>create(
naturalOrder(), comparing(SymlinkNode::getName)));

DirectoryBuilder(Path rootDir) {
super(
VISITOR_POOL,
ExecutorOwnership.SHARED,
ExceptionHandlingMode.FAIL_FAST,
ErrorClassifier.DEFAULT);
this.rootDir = checkNotNull(rootDir);
}

/**
* Returns a {@link Tree} message in wire format describing the directory contents, obeying the
* requirements of the {@code OutputDirectory.is_topologically_sorted} field.
*/
ByteString build() throws ExecException, IOException, InterruptedException {
// Collect directory entries (subdirectories, files, symlinks) in parallel.
// This is a major speedup for large tree artifacts with hundreds of thousands of files.
execute(() -> visit(rootDir, Dirent.Type.DIRECTORY));
try {
awaitQuiescence(true);
} catch (WrappedException e) {
Throwables.throwIfInstanceOf(e.unwrap(), ExecException.class);
Throwables.throwIfInstanceOf(e.unwrap(), IOException.class);
throw new AssertionError("unexpected exception", e.unwrap());
}
if (dirent.getType() == Dirent.Type.DIRECTORY) {
ByteString dir = computeDirectory(child, directories);
b.addDirectoriesBuilder().setName(name).setDigest(digestUtil.compute(dir.toByteArray()));
continue;

// Compute the Directory message for every node, including the root. Since each directory
// references its subdirectories by their digest, the messages must be computed in topological
// order (children before parents). In addition, the contents of each Directory message must
// be sorted, which is already ensured by the use of sorted maps.

HashMap<Path, Digest> dirToDigest = new HashMap<>();
LinkedHashSet<ByteString> dirBlobs = new LinkedHashSet<>();

for (Path dir : dirs) {
Directory.Builder builder = Directory.newBuilder();
builder.addAllFiles(dirToFiles.get(dir));
builder.addAllSymlinks(dirToSymlinks.get(dir));
for (Path subdir : dirToSubdirs.get(dir)) {
checkState(subdir.getParentDirectory().equals(dir));
builder
.addDirectoriesBuilder()
.setName(subdir.getBaseName())
.setDigest(dirToDigest.get(subdir));
}
ByteString dirBlob = builder.build().toByteString();

dirToDigest.put(dir, digestUtil.compute(dirBlob.toByteArray()));
dirBlobs.add(dirBlob);
}
if (dirent.getType() == Dirent.Type.SYMLINK) {
PathFragment target = child.readSymbolicLink();
FileStatus statFollow = child.statIfFound(Symlinks.FOLLOW);
if (statFollow == null || (!followSymlinks && !target.isAbsolute())) {
// Symlink uploaded as a symlink.
if (statFollow == null) {
checkDanglingSymlinkAllowed(child, target);
}
if (target.isAbsolute()) {
checkAbsoluteSymlinkAllowed(child, target);
}
b.addSymlinksBuilder().setName(name).setTarget(target.toString());
continue;

// Convert individual Directory messages to a Tree message. As we want the records to be
// topologically sorted (parents before children), we iterate over the directories in reverse
// insertion order. We construct the message through direct byte manipulation to ensure that
// the strict requirements on the encoding are observed.

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(byteArrayOutputStream);
int fieldNumber = TREE_ROOT_FIELD_NUMBER;
for (ByteString directory : Lists.reverse(new ArrayList<>(dirBlobs))) {
codedOutputStream.writeBytes(fieldNumber, directory);
fieldNumber = TREE_CHILDREN_FIELD_NUMBER;
}
codedOutputStream.flush();

return ByteString.copyFrom(byteArrayOutputStream.toByteArray());
}

private void visit(Path path, Dirent.Type type) {
try {
if (type == Dirent.Type.FILE) {
visitAsFile(path);
return;
}
if (statFollow.isFile() && !statFollow.isSpecialFile()) {
// Symlink to file uploaded as a file.
Digest digest = digestUtil.compute(child);
b.addFilesBuilder().setName(name).setDigest(digest).setIsExecutable(true);
digestToFile.put(digest, child);
continue;
if (type == Dirent.Type.DIRECTORY) {
visitAsDirectory(path);
for (Dirent dirent : path.readdir(Symlinks.NOFOLLOW)) {
Path childPath = path.getChild(dirent.getName());
Dirent.Type childType = dirent.getType();
execute(() -> visit(childPath, childType));
}
return;
}
if (statFollow.isDirectory()) {
// Symlink to directory uploaded as a directory.
ByteString dir = computeDirectory(child, directories);
b.addDirectoriesBuilder().setName(name).setDigest(digestUtil.compute(dir.toByteArray()));
continue;
if (type == Dirent.Type.SYMLINK) {
PathFragment target = path.readSymbolicLink();
FileStatus statFollow = path.statIfFound(Symlinks.FOLLOW);
if (statFollow == null || (!followSymlinks && !target.isAbsolute())) {
// Symlink uploaded as a symlink.
if (statFollow == null) {
checkDanglingSymlinkAllowed(path, target);
}
if (target.isAbsolute()) {
checkAbsoluteSymlinkAllowed(path, target);
}
visitAsSymlink(path, target);
return;
}
if (statFollow.isFile() && !statFollow.isSpecialFile()) {
// Symlink to file uploaded as a file.
execute(() -> visit(path, Dirent.Type.FILE));
return;
}
if (statFollow.isDirectory()) {
// Symlink to directory uploaded as a directory.
execute(() -> visit(path, Dirent.Type.DIRECTORY));
return;
}
}
rejectSpecialFile(path);
} catch (ExecException | IOException e) {
// We can't throw checked exceptions here since AQV expects Runnables
throw new WrappedException(e);
}
}

private void visitAsDirectory(Path path) {
dirs.add(path);
if (!path.equals(rootDir)) {
dirToSubdirs.put(path.getParentDirectory(), path);
}
// Special file or dereferenced symlink to special file.
rejectSpecialFile(child);
}

ByteString directory = b.build().toByteString();
directories.add(directory);
return directory;
private void visitAsFile(Path path) throws IOException {
Path parentPath = path.getParentDirectory();
Digest digest = digestUtil.compute(path);
FileNode node =
FileNode.newBuilder()
.setName(path.getBaseName())
.setDigest(digest)
.setIsExecutable(true)
.build();
digestToFile.put(digest, path);
dirToFiles.put(parentPath, node);
}

private void visitAsSymlink(Path path, PathFragment target) {
Path parentPath = path.getParentDirectory();
SymlinkNode node =
SymlinkNode.newBuilder().setName(path.getBaseName()).setTarget(target.toString()).build();
dirToSymlinks.put(parentPath, node);
}
}

// Field numbers of the 'root' and 'directory' fields in the Tree message.
private static final int TREE_ROOT_FIELD_NUMBER =
Tree.getDescriptor().findFieldByName("root").getNumber();
private static final int TREE_CHILDREN_FIELD_NUMBER =
Tree.getDescriptor().findFieldByName("children").getNumber();

private void addDirectory(Path dir) throws ExecException, IOException, InterruptedException {
ByteString treeBlob = new DirectoryBuilder(dir).build();
Digest treeDigest = digestUtil.compute(treeBlob.toByteArray());

result
.addOutputDirectoriesBuilder()
.setPath(remotePathResolver.localPathToOutputPath(dir))
.setTreeDigest(treeDigest)
.setIsTopologicallySorted(true);

digestToBlobs.put(treeDigest, treeBlob);
}

private void checkDanglingSymlinkAllowed(Path file, PathFragment target) throws IOException {
Expand Down
Loading
Loading