Skip to content

Commit

Permalink
Rename WorkerMetric to WorkerProcessMetrics as a single instance enca…
Browse files Browse the repository at this point in the history
…psulates the metrics of the process itself rather than each Worker object (there could be multiple Worker(s) pointing to the same process due to multiplex workers).

RELNOTES: None.
PiperOrigin-RevId: 580922075
Change-Id: Ia26916789e01248d5e4bf74705e9326889305ebc
  • Loading branch information
zhengwei143 authored and copybara-github committed Nov 9, 2023
1 parent b5c38af commit 99a3486
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import com.google.devtools.build.lib.worker.WorkerCreatedEvent;
import com.google.devtools.build.lib.worker.WorkerDestroyedEvent;
import com.google.devtools.build.lib.worker.WorkerEvictedEvent;
import com.google.devtools.build.lib.worker.WorkerMetricsCollector;
import com.google.devtools.build.lib.worker.WorkerProcessMetricsCollector;
import com.google.devtools.build.skyframe.SkyframeGraphStatsEvent;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.protobuf.util.Durations;
Expand Down Expand Up @@ -110,7 +110,7 @@ private MetricsCollector(
this.numAnalyses = numAnalyses;
this.numBuilds = numBuilds;
env.getEventBus().register(this);
WorkerMetricsCollector.instance().setClock(env.getClock());
WorkerProcessMetricsCollector.instance().setClock(env.getClock());
this.buildAccountedFor = new AtomicBoolean();
}

Expand Down Expand Up @@ -310,7 +310,8 @@ private BuildMetrics createBuildMetrics() {
.setCumulativeMetrics(createCumulativeMetrics())
.setArtifactMetrics(artifactMetrics.build())
.setBuildGraphMetrics(buildGraphMetrics.build())
.addAllWorkerMetrics(WorkerMetricsCollector.instance().createWorkerMetricsProto())
.addAllWorkerMetrics(
WorkerProcessMetricsCollector.instance().createWorkerMetricsProto())
.setWorkerPoolMetrics(createWorkerPoolMetrics());

NetworkMetrics networkMetrics = NetworkMetricsCollector.instance().collectMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import com.google.devtools.build.lib.unix.ProcMeminfoParser;
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.lib.util.ResourceUsage;
import com.google.devtools.build.lib.worker.WorkerMetric;
import com.google.devtools.build.lib.worker.WorkerMetricsCollector;
import com.google.devtools.build.lib.worker.WorkerProcessMetrics;
import com.google.devtools.build.lib.worker.WorkerProcessMetricsCollector;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
Expand Down Expand Up @@ -62,14 +62,14 @@ public class CollectLocalResourceUsage extends Thread {

private Stopwatch stopwatch;

private final WorkerMetricsCollector workerMetricsCollector;
private final WorkerProcessMetricsCollector workerProcessMetricsCollector;

private final ResourceEstimator resourceEstimator;
private final boolean collectPressureStallIndicators;

CollectLocalResourceUsage(
BugReporter bugReporter,
WorkerMetricsCollector workerMetricsCollector,
WorkerProcessMetricsCollector workerProcessMetricsCollector,
ResourceEstimator resourceEstimator,
boolean collectWorkerDataInProfiler,
boolean collectLoadAverage,
Expand All @@ -79,7 +79,7 @@ public class CollectLocalResourceUsage extends Thread {
super("collect-local-resources");
this.bugReporter = checkNotNull(bugReporter);
this.collectWorkerDataInProfiler = collectWorkerDataInProfiler;
this.workerMetricsCollector = workerMetricsCollector;
this.workerProcessMetricsCollector = workerProcessMetricsCollector;
this.collectLoadAverage = collectLoadAverage;
this.collectSystemNetworkUsage = collectSystemNetworkUsage;
this.collectResourceManagerEstimation = collectResourceManagerEstimation;
Expand Down Expand Up @@ -184,8 +184,8 @@ public void run() {
if (collectWorkerDataInProfiler) {
try (SilentCloseable c = Profiler.instance().profile("Worker metrics collection")) {
workerMemoryUsageMb =
this.workerMetricsCollector.collectMetrics().stream()
.mapToInt(WorkerMetric::getUsedMemoryInKb)
this.workerProcessMetricsCollector.collectMetrics().stream()
.mapToInt(WorkerProcessMetrics::getUsedMemoryInKb)
.sum()
/ 1024;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.profiler.PredicateBasedStatRecorder.RecorderAndPredicate;
import com.google.devtools.build.lib.profiler.StatRecorder.VfsHeuristics;
import com.google.devtools.build.lib.worker.WorkerMetricsCollector;
import com.google.devtools.build.lib.worker.WorkerProcessMetricsCollector;
import com.google.gson.stream.JsonWriter;
import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
Expand Down Expand Up @@ -453,7 +453,7 @@ public synchronized void start(
boolean collectPressureStallIndicators,
boolean collectResourceEstimation,
ResourceEstimator resourceEstimator,
WorkerMetricsCollector workerMetricsCollector,
WorkerProcessMetricsCollector workerProcessMetricsCollector,
BugReporter bugReporter)
throws IOException {
checkState(!isActive(), "Profiler already active");
Expand Down Expand Up @@ -498,7 +498,7 @@ public synchronized void start(
resourceUsageThread =
new CollectLocalResourceUsage(
bugReporter,
workerMetricsCollector,
workerProcessMetricsCollector,
resourceEstimator,
collectWorkerDataInProfiler,
collectLoadAverage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.worker.WorkerMetricsCollector;
import com.google.devtools.build.lib.worker.WorkerProcessMetricsCollector;
import com.google.devtools.common.options.CommandNameCache;
import com.google.devtools.common.options.InvocationPolicyParser;
import com.google.devtools.common.options.OptionDefinition;
Expand Down Expand Up @@ -378,8 +378,9 @@ ProfilerStartedEvent initProfiler(
+ " will be omitted in merged actions."));
}
Profiler profiler = Profiler.instance();
WorkerMetricsCollector workerMetricsCollector = WorkerMetricsCollector.instance();
workerMetricsCollector.setClock(clock);
WorkerProcessMetricsCollector workerProcessMetricsCollector =
WorkerProcessMetricsCollector.instance();
workerProcessMetricsCollector.setClock(clock);
profiler.start(
profiledTasks,
out,
Expand All @@ -399,7 +400,7 @@ ProfilerStartedEvent initProfiler(
options.collectPressureStallIndicators,
options.collectResourceEstimation,
env.getLocalResourceManager(),
WorkerMetricsCollector.instance(),
WorkerProcessMetricsCollector.instance(),
bugReporter);
// Instead of logEvent() we're calling the low level function to pass the timings we took in
// the launcher. We're setting the INIT phase marker so that it follows immediately the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerMetrics.WorkerStats;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
import com.google.devtools.build.lib.runtime.InfoItem;
import com.google.devtools.build.lib.worker.WorkerMetricsCollector;
import com.google.devtools.build.lib.worker.WorkerProcessMetricsCollector;
import java.util.List;

/** Info item for persistent worker metrics. */
Expand All @@ -38,7 +38,7 @@ public byte[] get(
Supplier<BuildConfigurationValue> configurationSupplier, CommandEnvironment env) {

ImmutableList<WorkerMetrics> proto =
WorkerMetricsCollector.instance().createWorkerMetricsProto();
WorkerProcessMetricsCollector.instance().createWorkerMetricsProto();
if (proto.isEmpty()) {
return print("No persistent workers active.");
} else {
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/google/devtools/build/lib/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,14 @@ java_library(
java_library(
name = "worker_metric",
srcs = [
"WorkerMetric.java",
"WorkerMetricsCollector.java",
"WorkerProcessMetrics.java",
"WorkerProcessMetricsCollector.java",
],
deps = [
"//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/build/lib/clock",
"//src/main/java/com/google/devtools/build/lib/metrics:ps_info_collector",
"//src/main/java/com/google/devtools/build/lib/util:os",
"//third_party:auto_value",
"//third_party:guava",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,20 @@ public void run() {
break;
}

ImmutableList<WorkerMetric> workerMetrics =
WorkerMetricsCollector.instance().collectMetrics();
ImmutableList<WorkerProcessMetrics> workerProcessMetrics =
WorkerProcessMetricsCollector.instance().collectMetrics();

if (options.totalWorkerMemoryLimitMb > 0) {
try {
evictWorkers(workerMetrics);
evictWorkers(workerProcessMetrics);
} catch (InterruptedException e) {
logger.atInfo().withCause(e).log("received interrupt in worker life cycle manager");
break;
}
}

if (options.workerMemoryLimitMb > 0) {
killLargeWorkers(workerMetrics, options.workerMemoryLimitMb);
killLargeWorkers(workerProcessMetrics, options.workerMemoryLimitMb);
}
}

Expand All @@ -116,13 +116,13 @@ void stopProcessing() {
}

/** Kills any worker that uses more than {@code limitMb} MB of memory. */
void killLargeWorkers(ImmutableList<WorkerMetric> workerMetrics, int limitMb) {
ImmutableList<WorkerMetric> large =
workerMetrics.stream()
void killLargeWorkers(ImmutableList<WorkerProcessMetrics> workerProcessMetrics, int limitMb) {
ImmutableList<WorkerProcessMetrics> large =
workerProcessMetrics.stream()
.filter(m -> m.getUsedMemoryInKb() / 1000 > limitMb)
.collect(toImmutableList());

for (WorkerMetric l : large) {
for (WorkerProcessMetrics l : large) {
String msg;

ImmutableList<Integer> workerIds = l.getWorkerIds();
Expand Down Expand Up @@ -151,22 +151,23 @@ void killLargeWorkers(ImmutableList<WorkerMetric> workerMetrics, int limitMb) {
}

@VisibleForTesting // productionVisibility = Visibility.PRIVATE
void evictWorkers(ImmutableList<WorkerMetric> workerMetrics) throws InterruptedException {
void evictWorkers(ImmutableList<WorkerProcessMetrics> workerProcessMetrics)
throws InterruptedException {

if (options.totalWorkerMemoryLimitMb == 0) {
return;
}

int workerMemoryUsage =
workerMetrics.stream().mapToInt(metric -> metric.getUsedMemoryInKb() / 1000).sum();
workerProcessMetrics.stream().mapToInt(metric -> metric.getUsedMemoryInKb() / 1000).sum();

// TODO: Remove after b/274608075 is fixed.
if (!workerMetrics.isEmpty()) {
if (!workerProcessMetrics.isEmpty()) {
logger.atInfo().atMostEvery(1, TimeUnit.MINUTES).log(
"total worker memory %dMB while limit is %dMB - details: %s",
workerMemoryUsage,
options.totalWorkerMemoryLimitMb,
workerMetrics.stream()
workerProcessMetrics.stream()
.map(
metric ->
metric.getWorkerIds()
Expand All @@ -184,7 +185,7 @@ void evictWorkers(ImmutableList<WorkerMetric> workerMetrics) throws InterruptedE

ImmutableSet<Integer> candidates =
collectEvictionCandidates(
workerMetrics, options.totalWorkerMemoryLimitMb, workerMemoryUsage);
workerProcessMetrics, options.totalWorkerMemoryLimitMb, workerMemoryUsage);

if (!candidates.isEmpty() || !emptyEvictionWasLogged) {
String msg;
Expand Down Expand Up @@ -220,7 +221,7 @@ void evictWorkers(ImmutableList<WorkerMetric> workerMetrics) throws InterruptedE
}

if (eventBus != null) {
for (WorkerMetric metric : workerMetrics) {
for (WorkerProcessMetrics metric : workerProcessMetrics) {

for (Integer workerId : metric.getWorkerIds()) {
if (evictedWorkers.contains(workerId)) {
Expand All @@ -231,28 +232,29 @@ void evictWorkers(ImmutableList<WorkerMetric> workerMetrics) throws InterruptedE
}

if (options.shrinkWorkerPool) {
List<WorkerMetric> notEvictedWorkerMetrics =
workerMetrics.stream()
List<WorkerProcessMetrics> notEvictedWorkerProcessMetrics =
workerProcessMetrics.stream()
.filter(metric -> !evictedWorkers.containsAll(metric.getWorkerIds()))
.collect(Collectors.toList());

int notEvictedWorkerMemoryUsage =
notEvictedWorkerMetrics.stream()
notEvictedWorkerProcessMetrics.stream()
.mapToInt(metric -> metric.getUsedMemoryInKb() / 1000)
.sum();

if (notEvictedWorkerMemoryUsage <= options.totalWorkerMemoryLimitMb) {
return;
}

postponeInvalidation(notEvictedWorkerMetrics, notEvictedWorkerMemoryUsage);
postponeInvalidation(notEvictedWorkerProcessMetrics, notEvictedWorkerMemoryUsage);
}
}

private void postponeInvalidation(
List<WorkerMetric> workerMetrics, int notEvictedWorkerMemoryUsage) {
List<WorkerProcessMetrics> workerProcessMetrics, int notEvictedWorkerMemoryUsage) {
ImmutableSet<Integer> potentialCandidates =
getCandidates(workerMetrics, options.totalWorkerMemoryLimitMb, notEvictedWorkerMemoryUsage);
getCandidates(
workerProcessMetrics, options.totalWorkerMemoryLimitMb, notEvictedWorkerMemoryUsage);

if (!potentialCandidates.isEmpty()) {
String msg = String.format("New doomed workers candidates %s", potentialCandidates);
Expand All @@ -278,29 +280,31 @@ private static ImmutableSet<Integer> evictCandidates(
/** Collects worker candidates to evict. Chooses workers with the largest memory consumption. */
@SuppressWarnings("JdkCollectors")
ImmutableSet<Integer> collectEvictionCandidates(
ImmutableList<WorkerMetric> workerMetrics, int memoryLimitMb, int workerMemoryUsageMb)
ImmutableList<WorkerProcessMetrics> workerProcessMetrics,
int memoryLimitMb,
int workerMemoryUsageMb)
throws InterruptedException {
Set<Integer> idleWorkers = getIdleWorkers();

List<WorkerMetric> idleWorkerMetrics =
workerMetrics.stream()
List<WorkerProcessMetrics> idleWorkerProcessMetrics =
workerProcessMetrics.stream()
.filter(metric -> idleWorkers.containsAll(metric.getWorkerIds()))
.collect(Collectors.toList());

return getCandidates(idleWorkerMetrics, memoryLimitMb, workerMemoryUsageMb);
return getCandidates(idleWorkerProcessMetrics, memoryLimitMb, workerMemoryUsageMb);
}

/**
* Chooses the worker ids of workers with the most usage of memory. Selects workers until total
* memory usage is less than memoryLimitMb.
*/
private static ImmutableSet<Integer> getCandidates(
List<WorkerMetric> workerMetrics, int memoryLimitMb, int usedMemoryMb) {
List<WorkerProcessMetrics> workerProcessMetrics, int memoryLimitMb, int usedMemoryMb) {

workerMetrics.sort(new MemoryComparator());
workerProcessMetrics.sort(new MemoryComparator());
ImmutableSet.Builder<Integer> candidates = ImmutableSet.builder();
int freeMemoryMb = 0;
for (WorkerMetric metric : workerMetrics) {
for (WorkerProcessMetrics metric : workerProcessMetrics) {
candidates.addAll(metric.getWorkerIds());
freeMemoryMb += metric.getUsedMemoryInKb() / 1000;

Expand Down Expand Up @@ -370,9 +374,9 @@ public ImmutableSet<Integer> getEvictedWorkers() {
}

/** Compare worker metrics by memory consumption in descending order. */
private static class MemoryComparator implements Comparator<WorkerMetric> {
private static class MemoryComparator implements Comparator<WorkerProcessMetrics> {
@Override
public int compare(WorkerMetric m1, WorkerMetric m2) {
public int compare(WorkerProcessMetrics m1, WorkerProcessMetrics m2) {
return m2.getUsedMemoryInKb() - m1.getUsedMemoryInKb();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void buildStarting(BuildStartingEvent event) {
if (workerPool == null) {
workerPool = new WorkerPoolImpl(newConfig);
// If workerPool is restarted then we should recreate metrics.
WorkerMetricsCollector.instance().clear();
WorkerProcessMetricsCollector.instance().clear();
}

// Start collecting after a pool is defined
Expand Down Expand Up @@ -193,7 +193,7 @@ public void registerSpawnStrategies(
env.getLocalResourceManager(),
RunfilesTreeUpdater.forCommandEnvironment(env),
env.getOptions().getOptions(WorkerOptions.class),
WorkerMetricsCollector.instance(),
WorkerProcessMetricsCollector.instance(),
env.getClock());
ExecutionOptions executionOptions =
checkNotNull(env.getOptions().getOptions(ExecutionOptions.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* Contains data about worker statistics during execution. This class contains data for {@link
* com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerMetrics}
*/
public class WorkerMetric {
public class WorkerProcessMetrics {

private final List<Integer> workerIds;

Expand All @@ -47,7 +47,7 @@ public class WorkerMetric {

private Optional<Instant> lastCollectedTime = Optional.empty();

public WorkerMetric(
public WorkerProcessMetrics(
List<Integer> workerIds,
long processId,
String mnemonic,
Expand All @@ -62,7 +62,7 @@ public WorkerMetric(
this.workerKeyHash = workerKeyHash;
}

public WorkerMetric(
public WorkerProcessMetrics(
int workerId,
long processId,
String mnemonic,
Expand Down
Loading

0 comments on commit 99a3486

Please sign in to comment.