Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Possible Congestion Scenario in SortPreservingMergeExec #12302

Merged
merged 13 commits into from
Sep 6, 2024

Conversation

berkaysynnada
Copy link
Contributor

Which issue does this PR close?

Closes #12300.

Rationale for this change

Please see the issue.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Update merge_fuzz.rs

Update merge.rs

Update merge.rs

Update merge.rs

Update merge_fuzz.rs

Update merge.rs

Update merge_fuzz.rs

Update merge.rs

Counter is not enough

Termination logic with counter
@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Sep 3, 2024
}
Poll::Pending => {
self.uninitiated_partitions.rotate_left(1);
cx.waker().wake_by_ref();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this usage has some side-effects or decrease performance, but I cannot wake the SPM poll again once it receives a pending from its first partition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some research -- see https://github.com/synnada-ai/datafusion-upstream/pull/34/files#r1743621057

I think calling wake_by_ref effectively tells tokio to schedule this poll loop again after handling other tasks, which makes sense to me (as I am not sure how else we would signal to tokio that the merge is ready to go)

But I share your concern that this will cause some sort of performance issue

@berkaysynnada
Copy link
Contributor Author

@alamb, you might be more familiar with these parts of the code. Do you have any ideas about this solution, or perhaps you could suggest a different approach?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @berkaysynnada -- it seems to me like the intent of the current code is to poll all the streams but after this change, the streams are only polled until they first return Ready

I left a suggestion on a way that maybe is closer to what the orignal intent was

@@ -97,6 +100,10 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {

/// number of rows produced
produced: usize,

/// Unitiated partitions. They are stored in a vector to keep them in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please document what an "uninitiated partition" means in this context? I think it means partitions whose streams that have been polled haven't been ready yet

datafusion/physical-plan/src/sorts/merge.rs Show resolved Hide resolved
if let Err(e) = ready!(self.maybe_poll_stream(cx, i)) {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
// Ensure all non-exhausted streams have a cursor from which rows can be pulled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment implies to me that the code would / should poll all the streams. However, the code seems to ensure now that only streams that had previously not returned Ready for a poll are now polled.

Copy link
Contributor Author

@berkaysynnada berkaysynnada Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the behavior is more correct now. In the previous version, let's assume the 1st partition is exhausted and returns None without setting its cursor. Then, the 2nd partition returns Pending. When poll_next_inner() is polled again, the iteration starts from the 1st partition, which has already returned None. AFAIK polling exhausted streams could cause problems). Therefore, I track which streams have returned a result (either None or Some()), and which ones have returned Pending only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see synnada-ai#34 for alternate idea

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to explain my concern with that: synnada-ai#34 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 -- response in synnada-ai#34 (comment)

@alamb alamb changed the title Fix Possible Congestion Scenario in SPM Fix Possible Congestion Scenario in SortPreservingMergeExec Sep 4, 2024
@tustvold
Copy link
Contributor

tustvold commented Sep 4, 2024

I'm afraid I don't have capacity to review this, and am not likely to in the foreseeable future, however, one thing to be aware of is that SortPreservingMerge must be stable. Therefore if the first stream is not ready it must wait for it to be so, before it can proceed

if let Err(e) = ready!(self.maybe_poll_stream(cx, i)) {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
// Ensure all non-exhausted streams have a cursor from which rows can be pulled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 -- response in synnada-ai#34 (comment)

}

#[tokio::test]
async fn test_spm_congestion() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this test a bit more -- it doesn't seem like it is actually a fuzz test (aka it doesn't seem to have any random inputs, for example).

I think it would make more sense to put it with the other sort preserving merge tests:

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the discussion on synnada-ai#34 I am now convinced that this design is reasonable and an improvement over the current one.

In my opinion it is important for this PR:

  1. Encode the rationale for polling streams
  2. Fix the comments about "ensure all non-exhausted streams" to the new intent (something like "ensure all streams have been polled at least once to start processing") or something

It would be nice to have to:

  1. Move the tests with the existing sort preserving merge tests (rather than fuzz)
  2. Add a new test that verifies the "doesn't poll after Ready(None) is returned" as that seems an important property that is not currently covered

Thank you for bearing with me @berkaysynnada

@berkaysynnada
Copy link
Contributor Author

I’ve incorporated the final feedback. If there aren’t any more suggestions, I think we are good to merge this PR once the tests pass.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the PR looks (really) good to me now -- thank you @berkaysynnada

@alamb alamb merged commit f2a8b07 into apache:main Sep 6, 2024
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Congestion Scenario in SPM
3 participants