Skip to content

Commit

Permalink
fix syntax errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mertak-synnada committed Aug 14, 2024
1 parent 128676e commit 782487c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
48 changes: 27 additions & 21 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub fn pushdown_limit_helper(
);
}

let global_fetch = Some(global_state.fetch) else {
let Some(global_fetch) = global_state.fetch else {
// There's no valid fetch information, exit early:
return if global_state.skip > 0 && !global_state.satisfied {
// There might be a case with only offset, if so add a global limit:
Expand All @@ -179,18 +179,18 @@ pub fn pushdown_limit_helper(
None,
)),
global_state,
));
))
} else {
// There's no info on fetch, nothing to do:
Ok((Transformed::no(pushdown_plan), global_state))
}
};
};

if pushdown_plan.supports_limit_pushdown() && !combines_input_partitions(&pushdown_plan) {
// We have information in the global state and the plan pushes down, continue:
return Ok((Transformed::no(pushdown_plan), global_state));
}

if pushdown_plan.supports_limit_pushdown() {
if !combines_input_partitions(&pushdown_plan) {
// We have information in the global state and the plan pushes down, continue:
return Ok((Transformed::no(pushdown_plan), global_state));
}
// This plan is combining input partitions we need to add the fetch info to plan if possible
// if not, we must add a LimitExec with the global_state info
let skip_and_fetch = Some(global_fetch + global_state.skip);
Expand All @@ -204,7 +204,7 @@ pub fn pushdown_limit_helper(

return if global_state.satisfied {
// If the plan is already satisfied, do not add a limit:
Ok((Transformed::no(pushdown_plan), global_state));
Ok((Transformed::no(pushdown_plan), global_state))
} else {
global_state.satisfied = true;
Ok((
Expand All @@ -214,41 +214,47 @@ pub fn pushdown_limit_helper(
global_fetch,
)),
global_state,
));
}
))
};
}

// The plan does not support push down and it is not a limit.
// We will need to add a limit or a fetch. If the plan is already satisfied,
// we will try to add the fetch info and return the plan.

let global_skip = global_state.skip;
let skip_and_fetch = Some(global_fetch + global_state.skip);

// There's no push down change fetch & skip to default values
global_state.fetch = None;
global_state.skip = 0;
if global_state.satisfied {
return if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
Ok((Transformed::yes(plan_with_fetch), global_state));
Ok((Transformed::yes(plan_with_fetch), global_state))
} else {
Ok((Transformed::no(pushdown_plan), global_state));
}
Ok((Transformed::no(pushdown_plan), global_state))
};
}

// Add Fetch / LimitExec
global_state.satisfied = true;
if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
return if skip > 0 {
return if global_skip > 0 {
Ok((
Transformed::yes(add_global_limit(plan_with_fetch, skip, fetch)),
Transformed::yes(add_global_limit(
plan_with_fetch,
global_skip,
Some(global_fetch),
)),
global_state,
));
))
} else {
Ok((Transformed::yes(pushdown_plan), global_state));
Ok((Transformed::yes(plan_with_fetch), global_state))
};
}

Ok((
Transformed::yes(add_limit(pushdown_plan, skip, global_fetch)),
Transformed::yes(add_limit(pushdown_plan, global_skip, global_fetch)),
global_state,
))
}
Expand Down Expand Up @@ -315,14 +321,14 @@ fn add_limit(
}

fn add_local_limit(
mut pushdown_plan: Arc<dyn ExecutionPlan>,
pushdown_plan: Arc<dyn ExecutionPlan>,
fetch: usize,
) -> Arc<dyn ExecutionPlan> {
Arc::new(LocalLimitExec::new(pushdown_plan, fetch)) as _
}

fn add_global_limit(
mut pushdown_plan: Arc<dyn ExecutionPlan>,
pushdown_plan: Arc<dyn ExecutionPlan>,
skip: usize,
fetch: Option<usize>,
) -> Arc<dyn ExecutionPlan> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ mod tests {
#[tokio::test]
async fn test_row_number_statistics_for_local_limit() -> Result<()> {
let row_count = row_number_statistics_for_local_limit(4, 10).await?;
assert_eq!(row_count, Precision::Exact(40));
assert_eq!(row_count, Precision::Exact(10));

Ok(())
}
Expand Down

0 comments on commit 782487c

Please sign in to comment.