-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add correct shutdown mechanics for direct path #31902
base: master
Are you sure you want to change the base?
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @shunping added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
R: @scwhittle |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
01e8586
to
ff287f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not finished, mostly looking at stream stuff to help find what is causing stuckness
/** Send a request to the server. */ | ||
protected final void send(RequestT request) { | ||
lastSendTimeMs.set(Instant.now().getMillis()); | ||
synchronized (this) { | ||
// Check if we should send after we acquire the lock. | ||
if (isShutdown()) { | ||
LOG.warn("Send called on a shutdown stream."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is possible, don't log as customers don't like warning logs and open issues about them
if this should not be possible, perhaps better to throw an exception so that we notice and fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
/** Starts the underlying stream. */ | ||
protected final void startStream() { | ||
// Add the stream to the registry after it has been fully constructed. | ||
streamRegistry.add(this); | ||
while (true) { | ||
while (!isShutdown.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just remove this check since you do it first thing below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return; | ||
} | ||
private void tryRestartStream() { | ||
if (!isShutdown()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks racy, we check shutdown above in isStreamDone with synchronization, but then if it is shutdown before here, we end up with an error but won't restart the stream or remove it from the registry.
I would remove this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
setLastError(error); | ||
private synchronized boolean isStreamDone() { | ||
if (isShutdown() || (clientClosed.get() && !hasPendingRequests())) { | ||
streamRegistry.remove(AbstractWindmillStream.this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this kind of side effect is confusing in method that just sounds like an accessor
how about maybeTeardownStream()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment on return value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -176,7 +180,7 @@ private StreamingDataflowWorker( | |||
DataflowWorkerHarnessOptions options, | |||
HotKeyLogger hotKeyLogger, | |||
Supplier<Instant> clock, | |||
StreamingWorkerStatusReporter workerStatusReporter, | |||
Function<Supplier<Long>, StreamingWorkerStatusReporter> streamingWorkerStatusReporterFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a functional interface so you can document? it's unclear what the long supplier is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...va/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
Show resolved
Hide resolved
() -> { | ||
try { | ||
send(extension); | ||
} catch (IllegalStateException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we handle illegalstateexception internally? or rely on executeSafely to catch it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Show resolved
Hide resolved
@@ -182,6 +184,12 @@ protected void onResponse(StreamingCommitResponse response) { | |||
} | |||
} | |||
|
|||
@Override | |||
protected void shutdownInternal() { | |||
pending.values().forEach(pendingRequest -> pendingRequest.onDone.accept(CommitStatus.ABORTED)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried if pending has somethign inserted between iterating and clear
can you instead use an iterator where you remove as you go so everything removed is guaranteed to be aborted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -163,7 +165,7 @@ protected void onResponse(StreamingCommitResponse response) { | |||
continue; | |||
} | |||
PendingRequest done = pending.remove(requestId); | |||
if (done == null) { | |||
if (done == null && !isShutdown()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to get nullptr exception below, instead move the isShutdown check to whether or not to log inside this if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
bbf2778
to
1e02463
Compare
@@ -302,4 +301,12 @@ public Integer create(PipelineOptions options) { | |||
return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1; | |||
} | |||
} | |||
|
|||
/** EnableStreamingEngine defaults to false unless one of the two experiments is set. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like just 1 experiment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.../worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
Outdated
Show resolved
Hide resolved
.../worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
Outdated
Show resolved
Hide resolved
...taflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
Outdated
Show resolved
Hide resolved
...pache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
Outdated
Show resolved
Hide resolved
...pache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamShutdownException.java
Show resolved
Hide resolved
* Only send the next value if the phaser is not terminated by the time we acquire the lock since | ||
* the phaser can be terminated at any time. | ||
*/ | ||
private void tryOnNext(T value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure these phaser checks are necessary since the outboundObserver itself should stop blocking for onNext if the notifier is terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but if the phaser is terminated, we don't want to call outboundObserver.onNext() right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if you do check here there is still race between checking the phaser and calling onNext regardless. Internally the outboundObserver is already observing the phaser termination via getPhase() (and also blocking respecting phaser termination) so the extra check is just mental overhead I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
return; | ||
} | ||
} catch (TimeoutException e) { | ||
if (isReadyNotifier.isTerminated()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think you need this, awaitAdvanceInterruptibly will return -1 if it's terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -39,7 +39,9 @@ | |||
@ThreadSafe | |||
public final class DirectStreamObserver<T> implements StreamObserver<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this change can be submitted separately? Would be nice to have a test for it showing the previous bug as well.
@@ -67,17 +70,18 @@ public <T extends GetWorkBudgetSpender> void distributeBudget( | |||
GetWorkBudgetSpender getWorkBudgetSpender = streamAndDesiredBudget.getKey(); | |||
GetWorkBudget desired = streamAndDesiredBudget.getValue(); | |||
GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget(); | |||
if (isBelowFiftyPercentOfTarget(remaining, desired)) { | |||
if (isBelowFiftyPercentOfTarget(remaining, desired) && isActiveWorkBudgetAware) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unclear to me. It seems like in one case the adjustment is addtiive and another is resetting
Wouldn't we want to only increase by the desired-remaining in either case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whether its overriding or additive is depending on the internal implementation of GetWorkBudgetSpender.adjustBudget()
based on the current implementation that this is reflecting, we do not account for remaining budget here it is handled internally in adjustBudget.
4820b9f
to
d03b0bc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't get through everything but sending comments I had
+ "<th>Active For</th>" | ||
+ "<th>State</th>" | ||
+ "<th>State Active For</th>" | ||
+ "<th>Produced By</th>" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: produced is confusing since we use that for shuffle terminology. How about just "Backend"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
getWorkBudgetRefresher.requestBudgetRefresh(); | ||
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget); | ||
|
||
// Close the streams outside the lock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I don't know if we need to block on closing streams. I think we might want to block on creating the new ones.
one possible thing is that if we are leaking stuff and they never actually close we might not know. But if we have the stream registry I think we'd see that. Or we can add some logging to closeAllStreams that it is taking a long time.
clientClosed.set(true); | ||
requestObserver().onCompleted(); | ||
requestObserver.onCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it safe to not synchronize? The comment indicated it was for a reason. In general I think that we need to synchronize using the request observer because grpc doesn't expect it to be used simultaneously by multiple threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me try to run a pipeline w synchronization
previously we were calling relying on this to shutdown, so we had removed synchronization but since we arent anymore it could be ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ran ok in the test pipeline
done.
private final Supplier<StreamObserver<RequestT>> requestObserverSupplier; | ||
|
||
@GuardedBy("this") | ||
private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need both, synchronization also ensures that memory changes are viewed by other threads. Volatile just ensures one fields's changes are visible to other threads.
This seems like a good overview: https://blogs.oracle.com/javamagazine/post/java-thread-synchronization-volatile-final-atomic-deadlocks
} | ||
|
||
@Override | ||
public synchronized void onCompleted() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
streamRegistry.stream() | ||
.sorted(Comparator.comparing(AbstractWindmillStream::backendWorkerToken)) | ||
.collect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed the multimap bit. I agree the grouping is nice. However isn't the sorting then unnecessary if we're putting it in a map right away?
@@ -264,24 +284,18 @@ protected void startThrottleTimer() { | |||
|
|||
@Override | |||
public void adjustBudget(long itemsDelta, long bytesDelta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we name this setBudget?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} catch (AppendableInputStream.InvalidInputStreamStateException | ||
| VerifyException | ||
| CancellationException e) { | ||
handleShutdown(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make shutdown handling consistent?
we throw an exception for shutdown here but if we don't run the loop due to isShutdown we just return from this function without doing anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do throw a WindmillStreamShutdownException at the bottom if we exit the loop
private void handleShutdown(QueuedRequest request) { | ||
if (isShutdown()) { | ||
throw new WindmillStreamShutdownException( | ||
"Cannot send request=[" + request + "] on closed stream."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we keep this, should we pass in the exception above to add as a suppressed exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
ImmutableList<String> createStreamCancelledErrorMessage() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add parameter for the limit or just inline above? We're building up a possibly big list just to ignore most of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ade21ba
to
d95d1b5
Compare
… metric tracking.
… Future proofs this logic for direct path.
… have it implement WorkProvider interface. Move class to windmill/work/provider directory, update visibility for dependent classes and move tests, add GetWorkBudgetOwnerInterface
… have it implement WorkProvider interface. Move class to windmill/work/provider directory, update visibility for dependent classes and move tests, add GetWorkBudgetOwnerInterface
… have it implement WorkProvider interface. Move class to windmill/work/provider directory, update visibility for dependent classes and move tests, add GetWorkBudgetOwnerInterface
d95d1b5
to
9bad2d7
Compare
@scwhittle ready for another look! I will resolve the merge conflicts and rebase |
Add correct shutdown mechanics and integrate direct path
R: @scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.