Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Sep 20, 2024
1 parent 0db2045 commit 9bad2d7
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,20 @@ synchronized Optional<ExecutableWork> completeWorkAndGetNextWorkForKey(
@Nullable Queue<ExecutableWork> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId);
LOG.warn(
"Unable to complete inactive work for key={} and token={}. Work queue for key does not exist.",
shardedKey,
workId);
return Optional.empty();
}

removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
return getNextWork(workQueue, shardedKey);
}

private synchronized void removeCompletedWorkFromQueue(
Queue<ExecutableWork> workQueue, ShardedKey shardedKey, WorkId workId) {
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
ExecutableWork completedWork = workQueue.peek();
@Nullable ExecutableWork completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Active key {} without work, expected token {}", shardedKey, workId);
Expand Down Expand Up @@ -346,7 +348,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
+ "<th>Active For</th>"
+ "<th>State</th>"
+ "<th>State Active For</th>"
+ "<th>Produced By</th>"
+ "<th>Backend</th>"
+ "</tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,59 +269,53 @@ public synchronized void shutdown() {
channelCachingStubFactory.shutdown();
}

private void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) {
CompletableFuture<Void> closeStaleStreams;

synchronized (this) {
LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
closeStaleStreams =
closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams());
ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
createAndStartNewStreams(newWindmillConnections.values()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
.setWindmillConnections(newWindmillConnections)
.setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
LOG.info(
"Setting new connections: {}. Previous connections: {}.",
newConnectionsState,
connections.get());
connections.set(newConnectionsState);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
}

// Close the streams outside the lock.
closeStaleStreams.join();
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) {
LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams());
ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
createAndStartNewStreams(newWindmillConnections.values()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
.setWindmillConnections(newWindmillConnections)
.setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
LOG.info(
"Setting new connections: {}. Previous connections: {}.",
newConnectionsState,
connections.get());
connections.set(newConnectionsState);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
}

/** Close the streams that are no longer valid asynchronously. */
private CompletableFuture<Void> closeStaleStreams(
@SuppressWarnings("FutureReturnValueIgnored")
private void closeStaleStreams(
Collection<WindmillConnection> newWindmillConnections,
ImmutableMap<WindmillConnection, WindmillStreamSender> currentStreams) {
return CompletableFuture.allOf(
currentStreams.entrySet().stream()
.filter(
connectionAndStream ->
!newWindmillConnections.contains(connectionAndStream.getKey()))
.map(
entry ->
CompletableFuture.runAsync(
() -> {
LOG.debug("Closing streams to {}", entry);
entry.getValue().closeAllStreams();
entry
.getKey()
.directEndpoint()
.ifPresent(channelCachingStubFactory::remove);
LOG.debug("Successfully closed streams to {}", entry);
},
windmillStreamManager))
.toArray(CompletableFuture[]::new));
currentStreams.entrySet().stream()
.filter(
connectionAndStream -> !newWindmillConnections.contains(connectionAndStream.getKey()))
.forEach(
entry ->
CompletableFuture.runAsync(
() -> {
LOG.debug("Closing streams to {}", entry);
try {
entry.getValue().closeAllStreams();
entry
.getKey()
.directEndpoint()
.ifPresent(channelCachingStubFactory::remove);
LOG.debug("Successfully closed streams to {}", entry);
} catch (Exception e) {
LOG.error("Error closing streams to {}", entry);
}
},
windmillStreamManager));
}

private synchronized CompletableFuture<ImmutableMap<WindmillConnection, WindmillStreamSender>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void adjustBudget(long itemsDelta, long bytesDelta) {
GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build();
getWorkBudget.getAndSet(adjustment);
if (started.get()) {
getWorkStream.get().adjustBudget(adjustment);
getWorkStream.get().setBudget(adjustment);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final String backendWorkerToken;
private final ResettableRequestObserver<RequestT> requestObserver;
private final AtomicBoolean isShutdown;
private final AtomicReference<DateTime> shutdownTime;

/**
* Indicates if the current {@link ResettableRequestObserver} was closed by calling {@link
Expand Down Expand Up @@ -140,6 +141,7 @@ protected AbstractWindmillStream(
new AbstractWindmillStream<RequestT, ResponseT>.ResponseObserver()));
this.sleeper = Sleeper.DEFAULT;
this.logger = logger;
this.shutdownTime = new AtomicReference<>();
}

private static String createThreadName(String streamType, String backendWorkerToken) {
Expand Down Expand Up @@ -293,11 +295,14 @@ public final void appendSummaryHtml(PrintWriter writer) {
writer.format(", %dms backoff remaining", sleepLeft);
}
writer.format(
", current stream is %dms old, last send %dms, last response %dms, closed: %s",
", current stream is %dms old, last send %dms, last response %dms, closed: %s, "
+ "isShutdown: %s, shutdown time: %s",
debugDuration(nowMs, startTimeMs.get()),
debugDuration(nowMs, lastSendTimeMs.get()),
debugDuration(nowMs, lastResponseTimeMs.get()),
streamClosed.get());
streamClosed.get(),
isShutdown.get(),
shutdownTime.get());
}

/**
Expand All @@ -307,7 +312,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
protected abstract void appendSpecificHtml(PrintWriter writer);

@Override
public final void halfClose() {
public final synchronized void halfClose() {
clientClosed.set(true);
requestObserver.onCompleted();
streamClosed.set(true);
Expand Down Expand Up @@ -336,6 +341,7 @@ public final void shutdown() {
requestObserver()
.onError(new WindmillStreamShutdownException("Explicit call to shutdown stream."));
shutdownInternal();
shutdownTime.set(DateTime.now());
}
}

Expand All @@ -362,7 +368,7 @@ private static class ResettableRequestObserver<RequestT> implements StreamObserv
private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;

@GuardedBy("this")
private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver;
private @Nullable StreamObserver<RequestT> delegateRequestObserver;

private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> requestObserverSupplier) {
this.requestObserverSupplier = requestObserverSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ public interface WindmillStream {
@ThreadSafe
interface GetWorkStream extends WindmillStream {
/** Adjusts the {@link GetWorkBudget} for the stream. */
void adjustBudget(long itemsDelta, long bytesDelta);
void setBudget(long itemsDelta, long bytesDelta);

default void adjustBudget(GetWorkBudget newBudget) {
adjustBudget(newBudget.items(), newBudget.bytes());
default void setBudget(GetWorkBudget newBudget) {
setBudget(newBudget.items(), newBudget.bytes());
}

/** Returns the remaining in-flight {@link GetWorkBudget}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ abstract static class ComputationMetadata {
private static ComputationMetadata fromProto(
Windmill.ComputationWorkItemMetadata metadataProto) {
return new AutoValue_GetWorkResponseChunkAssembler_ComputationMetadata(
metadataProto.getComputationId(),
Preconditions.checkNotNull(metadataProto.getComputationId()),
WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()),
WindmillTimeUtils.windmillToHarnessWatermark(
metadataProto.getDependentRealtimeInputWatermark()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -179,10 +178,8 @@ private static Watermarks createWatermarks(
* which can deadlock since we send on the stream beneath the synchronization. {@link
* AbstractWindmillStream#send(Object)} is synchronized so the sends are already guarded.
*/
private void sendRequestExtension() {
GetWorkBudget currentInFlightBudget = inFlightBudget.get();
GetWorkBudget currentMaxBudget = maxGetWorkBudget.get();

private void sendRequestExtension(
GetWorkBudget currentMaxBudget, GetWorkBudget currentInFlightBudget) {
// If the outstanding items or bytes limit has gotten too low, top both off with a
// GetWorkExtension. The goal is to keep the limits relatively close to their maximum
// values without sending too many extension requests.
Expand Down Expand Up @@ -222,7 +219,7 @@ protected synchronized void onNewStream() {
.setMaxBytes(currentMaxGetWorkBudget.bytes())
.build())
.build();
lastRequest.getAndSet(request);
lastRequest.set(request);
send(request);
}
}
Expand All @@ -236,8 +233,8 @@ protected boolean hasPendingRequests() {
public void appendSpecificHtml(PrintWriter writer) {
// Number of buffers is same as distinct workers that sent work on this stream.
writer.format(
"GetWorkStream: %d buffers, in-flight budget: %s; last sent request: %s.",
workItemAssemblers.size(), inFlightBudget.get(), lastRequest.get());
"GetWorkStream: %d buffers, max budget: %s, in-flight budget: %s, last sent request: %s.",
workItemAssemblers.size(), maxGetWorkBudget.get(), inFlightBudget.get(), lastRequest.get());
}

@Override
Expand All @@ -255,17 +252,19 @@ protected void onResponse(StreamingGetWorkResponseChunk chunk) {
}

private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
// Record the fact that there are now fewer outstanding messages and bytes on the stream.
inFlightBudget.updateAndGet(budget -> budget.subtract(1, assembledWorkItem.bufferedSize()));
WorkItem workItem = assembledWorkItem.workItem();
GetWorkResponseChunkAssembler.ComputationMetadata metadata =
assembledWorkItem.computationMetadata();
workItemScheduler.scheduleWork(
workItem,
createWatermarks(workItem, Preconditions.checkNotNull(metadata)),
createProcessingContext(Preconditions.checkNotNull(metadata.computationId())),
createWatermarks(workItem, metadata),
createProcessingContext(metadata.computationId()),
assembledWorkItem.latencyAttributions());
sendRequestExtension();
sendRequestExtension(
maxGetWorkBudget.get(),
// Record the fact that there are now fewer outstanding messages and bytes on the stream.
inFlightBudget.updateAndGet(
budget -> budget.subtract(1, assembledWorkItem.bufferedSize())));
}

private Work.ProcessingContext createProcessingContext(String computationId) {
Expand All @@ -283,10 +282,11 @@ protected void startThrottleTimer() {
}

@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
maxGetWorkBudget.getAndSet(
GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build());
sendRequestExtension();
public void setBudget(long items, long bytes) {
GetWorkBudget currentMaxGetWorkBudget =
maxGetWorkBudget.updateAndGet(
ignored -> GetWorkBudget.builder().setItems(items).setBytes(bytes).build());
sendRequestExtension(currentMaxGetWorkBudget, inFlightBudget.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,15 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<Respon
} catch (AppendableInputStream.InvalidInputStreamStateException
| VerifyException
| CancellationException e) {
handleShutdown(request);
handleShutdown(request, e);
if (!(e instanceof CancellationException)) {
throw e;
}
} catch (IOException e) {
LOG.error("Parsing GetData response failed: ", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleShutdown(request);
handleShutdown(request, e);
throw new RuntimeException(e);
} finally {
pending.remove(request.id());
Expand All @@ -363,10 +363,13 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<Respon
"Cannot send request=[" + request + "] on closed stream.");
}

private void handleShutdown(QueuedRequest request) {
private void handleShutdown(QueuedRequest request, Throwable cause) {
if (isShutdown()) {
throw new WindmillStreamShutdownException(
"Cannot send request=[" + request + "] on closed stream.");
WindmillStreamShutdownException shutdownException =
new WindmillStreamShutdownException(
"Cannot send request=[" + request + "] on closed stream.");
shutdownException.addSuppressed(cause);
throw shutdownException;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest;
Expand All @@ -37,6 +36,7 @@
/** Utility data classes for {@link GrpcGetDataStream}. */
final class GrpcGetDataStreamRequests {
private static final Logger LOG = LoggerFactory.getLogger(GrpcGetDataStreamRequests.class);
private static final int STREAM_CANCELLED_ERROR_LOG_LIMIT = 3;

private GrpcGetDataStreamRequests() {}

Expand Down Expand Up @@ -154,7 +154,7 @@ void waitForSendOrFailNotification() throws InterruptedException {
LOG.error("Requests failed for the following batches: {}", cancelledRequests);
throw new WindmillStreamShutdownException(
"Requests failed for batch containing "
+ cancelledRequests.stream().limit(3).collect(Collectors.joining(", "))
+ String.join(", ", cancelledRequests)
+ " ... requests. This is most likely due to the stream being explicitly closed"
+ " which happens when the work is marked as invalid on the streaming"
+ " backend when key ranges shuffle around. This is transient and corresponding"
Expand Down Expand Up @@ -186,6 +186,7 @@ ImmutableList<String> createStreamCancelledErrorMessage() {
throw new IllegalStateException();
}
})
.limit(STREAM_CANCELLED_ERROR_LOG_LIMIT)
.collect(toImmutableList());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ protected void startThrottleTimer() {
}

@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
public void setBudget(long itemsDelta, long bytesDelta) {
// no-op
}

Expand Down
Loading

0 comments on commit 9bad2d7

Please sign in to comment.