Skip to content

Commit

Permalink
Also check the plans we create are correct.
Browse files Browse the repository at this point in the history
  • Loading branch information
yfy- committed Jun 12, 2024
1 parent 2538702 commit aea95c6
Showing 1 changed file with 35 additions and 14 deletions.
49 changes: 35 additions & 14 deletions datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl PhysicalOptimizerRule for SanityCheckPlan {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(|p| check_plan_sanity(p)).data()
}
Expand All @@ -61,7 +61,7 @@ pub fn check_plan_sanity(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
if !plan.execution_mode().pipeline_friendly() {
return plan_err!("Plan {:?} is not pipeline friendly.", plan)
return plan_err!("Plan {:?} is not pipeline friendly.", plan);
}

for (child, child_sort_req, child_dist_req) in izip!(
Expand Down Expand Up @@ -106,16 +106,32 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use datafusion_physical_plan::displayable;

fn create_test_schema() -> Result<SchemaRef> {
let c9_column = Field::new("c9", DataType::Int32, true);
let schema = Arc::new(Schema::new(vec![c9_column]));
Ok(schema)
fn create_test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)]))
}

fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
assert_eq!(
sanity_checker.optimize(plan.clone(), &opts).is_ok(),
is_sane
);
}

/// Check if the plan we created is as expected by comparing the plan
/// formatted as a string.
fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) {
let plan_str = displayable(plan).indent(true).to_string();
let actual_lines: Vec<&str> = plan_str.trim().lines().collect();
assert_eq!(actual_lines, expected_lines);
}

#[tokio::test]
async fn test_bw_sort_requirement() -> Result<()> {
let schema = create_test_schema()?;
let schema = create_test_schema();
let source = memory_exec(&schema);
let sort_exprs = vec![sort_expr_options(
"c9",
Expand All @@ -127,15 +143,18 @@ mod tests {
)];
let sort = sort_exec(sort_exprs.clone(), source);
let bw = bounded_window_exec("c9", sort_exprs, sort);
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
assert_eq!(sanity_checker.optimize(bw, &opts).is_ok(), true);
assert_plan(bw.as_ref(), vec![
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"
]);
assert_sanity_check(&bw, true);
Ok(())
}

#[tokio::test]
async fn test_bw_no_sort_requirement() -> Result<()> {
let schema = create_test_schema()?;
let schema = create_test_schema();
let source = memory_exec(&schema);
let sort_exprs = vec![sort_expr_options(
"c9",
Expand All @@ -146,9 +165,11 @@ mod tests {
},
)];
let bw = bounded_window_exec("c9", sort_exprs, source);
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
assert_eq!(sanity_checker.optimize(bw, &opts).is_ok(), false);
assert_plan(bw.as_ref(), vec![
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" MemoryExec: partitions=1, partition_sizes=[0]"
]);
assert_sanity_check(&bw, false);
Ok(())
}
}

0 comments on commit aea95c6

Please sign in to comment.