Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add correct shutdown mechanics for direct path #31902

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Jul 16, 2024

Add correct shutdown mechanics and integrate direct path

R: @scwhittle

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@m-trieu m-trieu changed the title Mt dp streams Add correct shutdown mechanics for direct path Jul 16, 2024
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @shunping added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Abacn
Copy link
Contributor

Abacn commented Jul 22, 2024

R: @scwhittle

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@m-trieu m-trieu force-pushed the mt-dp-streams branch 3 times, most recently from 01e8586 to ff287f2 Compare August 6, 2024 12:25
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not finished, mostly looking at stream stuff to help find what is causing stuckness

/** Send a request to the server. */
protected final void send(RequestT request) {
lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
// Check if we should send after we acquire the lock.
if (isShutdown()) {
LOG.warn("Send called on a shutdown stream.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is possible, don't log as customers don't like warning logs and open issues about them

if this should not be possible, perhaps better to throw an exception so that we notice and fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

/** Starts the underlying stream. */
protected final void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
while (!isShutdown.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just remove this check since you do it first thing below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return;
}
private void tryRestartStream() {
if (!isShutdown()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks racy, we check shutdown above in isStreamDone with synchronization, but then if it is shutdown before here, we end up with an error but won't restart the stream or remove it from the registry.

I would remove this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

setLastError(error);
private synchronized boolean isStreamDone() {
if (isShutdown() || (clientClosed.get() && !hasPendingRequests())) {
streamRegistry.remove(AbstractWindmillStream.this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this kind of side effect is confusing in method that just sounds like an accessor

how about maybeTeardownStream()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment on return value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -176,7 +180,7 @@ private StreamingDataflowWorker(
DataflowWorkerHarnessOptions options,
HotKeyLogger hotKeyLogger,
Supplier<Instant> clock,
StreamingWorkerStatusReporter workerStatusReporter,
Function<Supplier<Long>, StreamingWorkerStatusReporter> streamingWorkerStatusReporterFactory,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a functional interface so you can document? it's unclear what the long supplier is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

() -> {
try {
send(extension);
} catch (IllegalStateException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we handle illegalstateexception internally? or rely on executeSafely to catch it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -182,6 +184,12 @@ protected void onResponse(StreamingCommitResponse response) {
}
}

@Override
protected void shutdownInternal() {
pending.values().forEach(pendingRequest -> pendingRequest.onDone.accept(CommitStatus.ABORTED));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried if pending has somethign inserted between iterating and clear

can you instead use an iterator where you remove as you go so everything removed is guaranteed to be aborted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -163,7 +165,7 @@ protected void onResponse(StreamingCommitResponse response) {
continue;
}
PendingRequest done = pending.remove(requestId);
if (done == null) {
if (done == null && !isShutdown()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to get nullptr exception below, instead move the isShutdown check to whether or not to log inside this if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mt-dp-streams branch 2 times, most recently from bbf2778 to 1e02463 Compare September 12, 2024 05:06
@@ -302,4 +301,12 @@ public Integer create(PipelineOptions options) {
return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1;
}
}

/** EnableStreamingEngine defaults to false unless one of the two experiments is set. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like just 1 experiment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Only send the next value if the phaser is not terminated by the time we acquire the lock since
* the phaser can be terminated at any time.
*/
private void tryOnNext(T value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure these phaser checks are necessary since the outboundObserver itself should stop blocking for onNext if the notifier is terminated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if the phaser is terminated, we don't want to call outboundObserver.onNext() right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if you do check here there is still race between checking the phaser and calling onNext regardless. Internally the outboundObserver is already observing the phaser termination via getPhase() (and also blocking respecting phaser termination) so the extra check is just mental overhead I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

return;
}
} catch (TimeoutException e) {
if (isReadyNotifier.isTerminated()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think you need this, awaitAdvanceInterruptibly will return -1 if it's terminated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -39,7 +39,9 @@
@ThreadSafe
public final class DirectStreamObserver<T> implements StreamObserver<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this change can be submitted separately? Would be nice to have a test for it showing the previous bug as well.

@@ -67,17 +70,18 @@ public <T extends GetWorkBudgetSpender> void distributeBudget(
GetWorkBudgetSpender getWorkBudgetSpender = streamAndDesiredBudget.getKey();
GetWorkBudget desired = streamAndDesiredBudget.getValue();
GetWorkBudget remaining = getWorkBudgetSpender.remainingBudget();
if (isBelowFiftyPercentOfTarget(remaining, desired)) {
if (isBelowFiftyPercentOfTarget(remaining, desired) && isActiveWorkBudgetAware) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unclear to me. It seems like in one case the adjustment is addtiive and another is resetting

Wouldn't we want to only increase by the desired-remaining in either case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether its overriding or additive is depending on the internal implementation of GetWorkBudgetSpender.adjustBudget()

based on the current implementation that this is reflecting, we do not account for remaining budget here it is handled internally in adjustBudget.

@m-trieu m-trieu force-pushed the mt-dp-streams branch 2 times, most recently from 4820b9f to d03b0bc Compare September 13, 2024 05:19
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't get through everything but sending comments I had

+ "<th>Active For</th>"
+ "<th>State</th>"
+ "<th>State Active For</th>"
+ "<th>Produced By</th>"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: produced is confusing since we use that for shuffle terminology. How about just "Backend"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

getWorkBudgetRefresher.requestBudgetRefresh();
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);

// Close the streams outside the lock.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I don't know if we need to block on closing streams. I think we might want to block on creating the new ones.

one possible thing is that if we are leaking stuff and they never actually close we might not know. But if we have the stream registry I think we'd see that. Or we can add some logging to closeAllStreams that it is taking a long time.

clientClosed.set(true);
requestObserver().onCompleted();
requestObserver.onCompleted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it safe to not synchronize? The comment indicated it was for a reason. In general I think that we need to synchronize using the request observer because grpc doesn't expect it to be used simultaneously by multiple threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me try to run a pipeline w synchronization

previously we were calling relying on this to shutdown, so we had removed synchronization but since we arent anymore it could be ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ran ok in the test pipeline

done.

private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;

@GuardedBy("this")
private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need both, synchronization also ensures that memory changes are viewed by other threads. Volatile just ensures one fields's changes are visible to other threads.

This seems like a good overview: https://blogs.oracle.com/javamagazine/post/java-thread-synchronization-volatile-final-atomic-deadlocks

}

@Override
public synchronized void onCompleted() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm synchronized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
streamRegistry.stream()
.sorted(Comparator.comparing(AbstractWindmillStream::backendWorkerToken))
.collect(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed the multimap bit. I agree the grouping is nice. However isn't the sorting then unnecessary if we're putting it in a map right away?

@@ -264,24 +284,18 @@ protected void startThrottleTimer() {

@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we name this setBudget?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} catch (AppendableInputStream.InvalidInputStreamStateException
| VerifyException
| CancellationException e) {
handleShutdown(request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make shutdown handling consistent?
we throw an exception for shutdown here but if we don't run the loop due to isShutdown we just return from this function without doing anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do throw a WindmillStreamShutdownException at the bottom if we exit the loop

private void handleShutdown(QueuedRequest request) {
if (isShutdown()) {
throw new WindmillStreamShutdownException(
"Cannot send request=[" + request + "] on closed stream.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we keep this, should we pass in the exception above to add as a suppressed exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

ImmutableList<String> createStreamCancelledErrorMessage() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add parameter for the limit or just inline above? We're building up a possibly big list just to ignore most of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mt-dp-streams branch 2 times, most recently from ade21ba to d95d1b5 Compare September 19, 2024 22:11
… have it implement WorkProvider interface. Move class to windmill/work/provider directory, update visibility for dependent classes and move tests, add GetWorkBudgetOwnerInterface
… have it implement WorkProvider interface. Move class to windmill/work/provider directory, update visibility for dependent classes and move tests, add GetWorkBudgetOwnerInterface
@m-trieu
Copy link
Contributor Author

m-trieu commented Sep 20, 2024

@scwhittle ready for another look! I will resolve the merge conflicts and rebase

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants