Skip to content

Commit

Permalink
Poll all streams when not ready
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 4, 2024
1 parent 9f32d2b commit 4d04a80
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 22 deletions.
12 changes: 5 additions & 7 deletions datafusion/core/tests/fuzz_cases/merge_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use datafusion::physical_plan::{
sorts::sort_preserving_merge::SortPreservingMergeExec,
};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{exec_err, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::expressions::Column;
Expand Down Expand Up @@ -310,11 +310,9 @@ async fn test_spm_congestion() -> Result<()> {
match result {
Ok(Ok(Ok(_batches))) => Ok(()),
Ok(Ok(Err(e))) => Err(e),
Ok(Err(_)) => Err(DataFusionError::Execution(
"SortPreservingMerge task panicked or was cancelled".to_string(),
)),
Err(_) => Err(DataFusionError::Execution(
"SortPreservingMerge caused a deadlock".to_string(),
)),
Ok(Err(e)) => {
exec_err!("SortPreservingMerge task panicked or was cancelled: {e}")
}
Err(e) => exec_err!("SortPreservingMerge caused a deadlock: {e}"),
}
}
28 changes: 13 additions & 15 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {

/// number of rows produced
produced: usize,

/// This vector contains partition indices in order. When a partition is polled and returns `Poll::Ready`,
/// it is removed from the vector. If a partition returns `Poll::Pending`, it is moved to the end of the
/// vector to ensure the next iteration starts with a different partition, preventing the same partition
/// from being continuously polled.
uninitiated_partitions: Vec<usize>,
}

impl<C: CursorValues> SortPreservingMergeStream<C> {
Expand All @@ -130,7 +124,6 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
batch_size,
fetch,
produced: 0,
uninitiated_partitions: (0..stream_count).collect(),
}
}

Expand Down Expand Up @@ -167,23 +160,28 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// try to initialize the loser tree
if self.loser_tree.is_empty() {
// Ensure all non-exhausted streams have a cursor from which rows can be pulled
let remaining_partitions = self.uninitiated_partitions.clone();
for i in remaining_partitions {
let mut any_pending = false;
for i in 0..self.streams.partitions() {
match self.maybe_poll_stream(cx, i) {
Poll::Ready(Err(e)) => {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.uninitiated_partitions.rotate_left(1);
cx.waker().wake_by_ref();
return Poll::Pending;
Poll::Ready(Ok(())) => {
// input i is ready
}
_ => {
self.uninitiated_partitions.retain(|idx| *idx != i);
Poll::Pending => {
// input i is not ready
any_pending = true;
}
}
}
if any_pending {
// If any stream is not ready, return pending and tell the executor to wake us up
// to try again when it is ready
cx.waker().wake_by_ref();
return Poll::Pending;
}
self.init_loser_tree();
}

Expand Down

0 comments on commit 4d04a80

Please sign in to comment.