Skip to content

Commit

Permalink
Test for pushing through orders
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 31, 2024
1 parent f67d810 commit c0d08d2
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 1 deletion.
64 changes: 64 additions & 0 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ mod tests {
limit_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_sorted,
repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
sort_preserving_merge_exec, spr_repartition_exec, union_exec,
RequirementsTestExec,
};
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -2346,4 +2347,67 @@ mod tests {
assert_optimized!(expected_input, expected_no_change, physical_plan, true);
Ok(())
}

#[tokio::test]
async fn test_push_with_required_input_ordering_prohibited() -> Result<()> {
// SortExec: expr=[b] <-- can't push this down
// RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order
// SortExec: expr=[a]
// MemoryExec
let schema = create_test_schema3()?;
let sort_exprs_a = vec![sort_expr("a", &schema)];
let sort_exprs_b = vec![sort_expr("b", &schema)];
let plan = memory_exec(&schema);
let plan = sort_exec(sort_exprs_a.clone(), plan);
let plan = RequirementsTestExec::new(plan)
.with_required_input_ordering(sort_exprs_a)
.with_maintains_input_order(true)
.into_arc();
let plan = sort_exec(sort_exprs_b, plan);

let expected_input = [
"SortExec: expr=[b@1 ASC], preserve_partitioning=[false]",
" RequiredInputOrderingExec",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
// should not be able to push shorts
let expected_no_change = expected_input;
assert_optimized!(expected_input, expected_no_change, plan, true);
Ok(())
}

// test when the required input ordering is satisfied so could push through
#[tokio::test]
async fn test_push_with_required_input_ordering_allowed() -> Result<()> {
// SortExec: expr=[a,b] <-- can push this down (as it is compatible with the required input ordering)
// RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order
// SortExec: expr=[a]
// MemoryExec
let schema = create_test_schema3()?;
let sort_exprs_a = vec![sort_expr("a", &schema)];
let sort_exprs_ab = vec![sort_expr("a", &schema), sort_expr("b", &schema)];
let plan = memory_exec(&schema);
let plan = sort_exec(sort_exprs_a.clone(), plan);
let plan = RequirementsTestExec::new(plan)
.with_required_input_ordering(sort_exprs_a)
.with_maintains_input_order(true)
.into_arc();
let plan = sort_exec(sort_exprs_ab, plan);

let expected_input = [
"SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]",
" RequiredInputOrderingExec",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
// should able to push shorts
let expected = [
"RequiredInputOrderingExec",
" SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected, plan, true);
Ok(())
}
}
99 changes: 98 additions & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Collection of testing utility functions that are leveraged by the query optimizer rules

use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;

use crate::datasource::listing::PartitionedFile;
Expand Down Expand Up @@ -47,10 +49,14 @@ use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::displayable;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::{
displayable, DisplayAs, DisplayFormatType, PlanProperties,
};

use async_trait::async_trait;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement;

async fn register_current_csv(
ctx: &SessionContext,
Expand Down Expand Up @@ -354,6 +360,97 @@ pub fn sort_exec(
Arc::new(SortExec::new(sort_exprs, input))
}

/// A test [`ExecutionPlan`] whose requirements can be configured.
#[derive(Debug)]
pub struct RequirementsTestExec {
required_input_ordering: Vec<PhysicalSortExpr>,
maintains_input_order: bool,
input: Arc<dyn ExecutionPlan>,
}

impl RequirementsTestExec {
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
Self {
required_input_ordering: vec![],
maintains_input_order: true,
input,
}
}

/// sets the required input ordering
pub fn with_required_input_ordering(
mut self,
required_input_ordering: Vec<PhysicalSortExpr>,
) -> Self {
self.required_input_ordering = required_input_ordering;
self
}

/// set the maintains_input_order flag
pub fn with_maintains_input_order(mut self, maintains_input_order: bool) -> Self {
self.maintains_input_order = maintains_input_order;
self
}

/// returns this ExecutionPlan as an Arc<dyn ExecutionPlan>
pub fn into_arc(self) -> Arc<dyn ExecutionPlan> {
Arc::new(self)
}
}

impl DisplayAs for RequirementsTestExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "RequiredInputOrderingExec")
}
}

impl ExecutionPlan for RequirementsTestExec {
fn name(&self) -> &str {
"RequiredInputOrderingExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
self.input.properties()
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let requirement =
PhysicalSortRequirement::from_sort_exprs(&self.required_input_ordering);
vec![Some(requirement)]
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![self.maintains_input_order]
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
assert_eq!(children.len(), 1);
Ok(RequirementsTestExec::new(children[0].clone())
.with_required_input_ordering(self.required_input_ordering.clone())
.with_maintains_input_order(self.maintains_input_order)
.into_arc())
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!("Test exec does not support execution")
}
}

/// A [`PlanContext`] object is susceptible to being left in an inconsistent state after
/// untested mutable operations. It is crucial that there be no discrepancies between a plan
/// associated with the root node and the plan generated after traversing all nodes
Expand Down

0 comments on commit c0d08d2

Please sign in to comment.