Skip to content

Commit

Permalink
address PR comments, fix broken tests
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Sep 13, 2024
1 parent 1e02463 commit d03b0bc
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public Integer create(PipelineOptions options) {
}
}

/** EnableStreamingEngine defaults to false unless one of the two experiments is set. */
/** EnableStreamingEngine defaults to false unless one of the experiment is set. */
class EnableWindmillServiceDirectPathFactory implements DefaultValueFactory<Boolean> {
@Override
public Boolean create(PipelineOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
if (executableWork != null) {
Work work = executableWork.work();
if (work.isStuckCommittingAt(stuckCommitDeadline)) {
LOG.debug(
LOG.error(
"Detected key {} stuck in COMMITTING state since {}, completing it with error.",
shardedKey,
work.getStateStartTime());
Expand All @@ -337,8 +337,17 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
writer.println(
"<table border=\"1\" "
+ "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
// Columns.
writer.println(
"<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th><th>Produced By</th></tr>");
"<tr>"
+ "<th>Key</th>"
+ "<th>Token</th>"
+ "<th>Queued</th>"
+ "<th>Active For</th>"
+ "<th>State</th>"
+ "<th>State Active For</th>"
+ "<th>Produced By</th>"
+ "</tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
int commitsPendingCount = 0;
Expand All @@ -364,6 +373,8 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
activeWorkStatus.append(activeWork.getState());
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), now));
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(activeWork.backendWorkerToken());
activeWorkStatus.append("</td></tr>\n");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ public static ProcessingContext createProcessingContext(
}

public static ProcessingContext createProcessingContext(
String backendWorkerToken,
String computationId,
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
HeartbeatSender heartbeatSender,
String backendWorkerToken) {
return ProcessingContext.create(
computationId, getDataClient, workCommitter, heartbeatSender, backendWorkerToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@
public final class FanOutStreamingEngineWorkerHarness implements StreamingWorkerHarness {
private static final Logger LOG =
LoggerFactory.getLogger(FanOutStreamingEngineWorkerHarness.class);
private static final String PUBLISH_NEW_WORKER_METADATA_THREAD = "PublishNewWorkerMetadataThread";
private static final String CONSUME_NEW_WORKER_METADATA_THREAD = "ConsumeNewWorkerMetadataThread";
private static final String STREAM_STARTER_THREAD = "WindmillStreamStarter";
private static final String STREAM_CLOSER_THREAD = "WindmillStreamCloser";
private static final String PUBLISH_NEW_WORKER_METADATA_THREAD_NAME =
"PublishNewWorkerMetadataThread";
private static final String CONSUME_NEW_WORKER_METADATA_THREAD_NAME =
"ConsumeNewWorkerMetadataThread";
private static final String STREAM_MANAGER_THREAD_NAME = "WindmillStreamManager-%d";

private final JobHeader jobHeader;
private final GrpcWindmillStreamFactory streamFactory;
Expand All @@ -98,8 +99,7 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker
private final Queue<WindmillEndpoints> newWindmillEndpoints;
private final Function<WindmillStream.CommitWorkStream, WorkCommitter> workCommitterFactory;
private final ThrottlingGetDataMetricTracker getDataMetricTracker;
private final ExecutorService windmillStreamStarter;
private final ExecutorService windmillStreamCloser;
private final ExecutorService windmillStreamManager;
private final ExecutorService newWorkerMetadataPublisher;
private final ExecutorService newWorkerMetadataConsumer;

Expand Down Expand Up @@ -128,18 +128,19 @@ private FanOutStreamingEngineWorkerHarness(
this.channelCachingStubFactory = channelCachingStubFactory;
this.dispatcherClient = dispatcherClient;
this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
this.windmillStreamStarter =
this.windmillStreamManager =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(STREAM_STARTER_THREAD).build());
this.windmillStreamCloser =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(STREAM_CLOSER_THREAD).build());
new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build());
this.newWorkerMetadataPublisher =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(PUBLISH_NEW_WORKER_METADATA_THREAD).build());
new ThreadFactoryBuilder()
.setNameFormat(PUBLISH_NEW_WORKER_METADATA_THREAD_NAME)
.build());
this.newWorkerMetadataConsumer =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(CONSUME_NEW_WORKER_METADATA_THREAD).build());
new ThreadFactoryBuilder()
.setNameFormat(CONSUME_NEW_WORKER_METADATA_THREAD_NAME)
.build());
this.newWindmillEndpoints = Queues.synchronizedQueue(EvictingQueue.create(1));
this.getWorkBudgetDistributor = getWorkBudgetDistributor;
this.totalGetWorkBudget = totalGetWorkBudget;
Expand Down Expand Up @@ -269,38 +270,33 @@ public synchronized void shutdown() {
}

private void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) {
consumeEndpoints(newWindmillEndpoints).join();
}
CompletableFuture<Void> closeStaleStreams;

/**
* {@link java.util.function.Consumer<WindmillEndpoints>} used to update {@link #connections} on
* new backend worker metadata.
*/
private synchronized CompletableFuture<Void> consumeEndpoints(
WindmillEndpoints newWindmillEndpoints) {
LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
CompletableFuture<Void> closeStaleStreams =
closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams());
ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
createAndStartNewStreams(newWindmillConnections.values()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
.setWindmillConnections(newWindmillConnections)
.setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
LOG.info(
"Setting new connections: {}. Previous connections: {}.",
newConnectionsState,
connections.get());
connections.set(newConnectionsState);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
synchronized (this) {
LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
closeStaleStreams =
closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams());
ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
createAndStartNewStreams(newWindmillConnections.values()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
.setWindmillConnections(newWindmillConnections)
.setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
LOG.info(
"Setting new connections: {}. Previous connections: {}.",
newConnectionsState,
connections.get());
connections.set(newConnectionsState);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
}

// Close the streams outside the lock.
return closeStaleStreams;
closeStaleStreams.join();
}

/** Close the streams that are no longer valid asynchronously. */
Expand All @@ -316,17 +312,15 @@ private CompletableFuture<Void> closeStaleStreams(
entry ->
CompletableFuture.runAsync(
() -> {
LOG.debug("Closing streams to {}", entry.getKey().backendWorkerToken());
LOG.debug("Closing streams to {}", entry);
entry.getValue().closeAllStreams();
entry
.getKey()
.directEndpoint()
.ifPresent(channelCachingStubFactory::remove);
LOG.debug(
"Successfully closed streams to {}",
entry.getKey().backendWorkerToken());
LOG.debug("Successfully closed streams to {}", entry);
},
windmillStreamCloser))
windmillStreamManager))
.toArray(CompletableFuture[]::new));
}

Expand All @@ -349,7 +343,7 @@ private CompletableFuture<Void> closeStaleStreams(
() ->
createAndStartWindmillStreamSender(
connection))),
windmillStreamStarter))
windmillStreamManager))
.collect(Collectors.toList()));

return connectionAndSenderFuture
Expand Down
Loading

0 comments on commit d03b0bc

Please sign in to comment.