Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll all streams when not ready #34

Closed

Conversation

alamb
Copy link

@alamb alamb commented Sep 4, 2024

Proposed change to apache#12302 for polling stream when not ready

This is an alternate implementation for apache#12302 for when an input stream is not ready

if any_pending {
// If any stream is not ready, return pending and tell the executor to wake us up
// to try again when it is ready
cx.waker().wake_by_ref();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have found, like @berkaysynnada, that it is required to call the waker here in order to have the sort preserving merge stream polled again.

I am somewhat worried about the use of wake_by_ref as I worry it may result in a busy loop (as I think it signals to tokio to call poll again as soon as possible (likely after trying to poll each input stream)

BTW I also confirmed that the ready! macro does not invoke the wake_by_ref
https://docs.rs/futures-core/0.3.30/src/futures_core/task/poll.rs.html#5

@berkaysynnada
Copy link
Collaborator

Thank you for sharing your thoughts @alamb. My initial solution was actually quite similar to what you have suggested in this PR. However, there is a scenario that I believe isn't covered by the current tests:

  1. Partition 1 is polled and returns Ready(None).
  2. Partition 2 is polled and returns Pending.
  3. Next time, Partition 1 is polled again, which seems problematic based on 'https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html#panics'.

To prevent multiple polls on a partition that returns Ready(None), we may need to track which partitions have returned Ready(None). With this in mind, my proposal feels a bit safer. Do you think I am missing something, or would you have any suggestions on how to better handle this?

@alamb
Copy link
Author

alamb commented Sep 5, 2024

To prevent multiple polls on a partition that returns Ready(None), we may need to track which partitions have returned Ready(None). With this in mind, my proposal feels a bit safer. Do you think I am missing something, or would you have any suggestions on how to better handle this?

My reading of the current code, namely
https://github.com/apache/datafusion/blob/6034be42808b43e3f48f6e58ec38cc35fa253abb/datafusion/physical-plan/src/sorts/merge.rs#L162-L164

Is that it will also potentially call poll multiple times on a partition that has returned Ready(None)

There is https://docs.rs/futures/latest/futures/prelude/stream/trait.StreamExt.html#method.fuse that ensures it is actually ok to call a stream multiple times with Ready(None) which may have been used here

My concern with the design in apache#12302 is that it doesn't actually poll all exhausted input streams (it only polls streams that haven't started yet). So in my mind while it solves the "do a first poll on each stream" it doesn't actually do what the comments imply it should do, which is

            // Ensure all non-exhausted streams have a cursor from which
            // rows can be pulled

If we do decide to go with the design in apache#12302 I recommend:

  1. Document the rationale as comments (as it was not obvious to me when reading the code)
  2. (If possible) extend the tests to cover the situation / don't poll multiple times
  3. Update the comments in the code to clarify that the code only tries to poll all non-ready streams once (the current comment implies it is trying to poll all the streams)

@berkaysynnada
Copy link
Collaborator

Is that it will also potentially call poll multiple times on a partition that has returned Ready(None)

You are right, the previous design could indeed lead to multiple polls on a partition that has already returned Ready(None).

My concern with the design in apache#12302 is that it doesn't actually poll all exhausted input streams (it only polls streams that haven't started yet). So in my mind while it solves the "do a first poll on each stream" it doesn't actually do what [the comments imply](https://github.com/apache/datafusion/blob/6034be42808b43e3f48f6e58ec38cc35fa253abb/datafusion/physical-plan/src/sorts/merge.rs#L159-L160) it should do, which is // Ensure all non-exhausted streams have a cursor from which // rows can be pulled

My design also ensures all non-exhausted streams have a cursor from which rows can be pulled (after several waking-up's of poll_next_inner() if some of the partitions returns Pending).

If any one of the partitions does not return a Pending, the existing and new design behave the same obviously. If we get a Pending, then

  1. My design does not iterate over the Ready-returned partitions.
  2. Previous design starts from the first partition again, but maybe_poll_stream() has this check https://github.com/apache/datafusion/blob/6034be42808b43e3f48f6e58ec38cc35fa253abb/datafusion/physical-plan/src/sorts/merge.rs#L135; which means if that partition has return Ready, it is skipped now. Actually they behave the same way again, except the None cases, which I intentionally change. As I said, previously, None's can be polled multiple times (since they are not guarded by
        if self.cursors[idx].is_some() {
            // Cursor is not finished - don't need a new RecordBatch yet
            return Poll::Ready(Ok(()));
        }

). But now, they are guarded by uninitiated_partitions.

If you're not entirely comfortable with the design, we can continue iterating on it. Otherwise, I'll go ahead and incorporate your recommendations. Thanks again @alamb for your time.

@alamb
Copy link
Author

alamb commented Sep 5, 2024

I am convinced :) Upon further thought I think your design in apache#12302 seems reasonable. I think we can improve the comments to make the rationale easier to follow but the code seems good. I will comment there as well

@alamb alamb closed this Sep 5, 2024
berkaysynnada pushed a commit that referenced this pull request Oct 10, 2024
* Postgres: enforce required `NUMERIC` type for `round` scalar function (#34)

Includes initial support for dialects to override scalar functions unparsing

* Document scalar_function_to_sql_overrides fn
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants