Skip to content

Commit

Permalink
Add test for multiple children using SortMergeJoinExec.
Browse files Browse the repository at this point in the history
  • Loading branch information
yfy- committed Jun 15, 2024
1 parent 9aef599 commit 49aba4b
Showing 1 changed file with 141 additions and 1 deletion.
142 changes: 141 additions & 1 deletion datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,28 @@ mod tests {

use crate::physical_optimizer::test_utils::{
bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec,
repartition_exec, sort_exec, sort_expr_options,
repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec,
};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::displayable;
use datafusion_physical_plan::repartition::RepartitionExec;

fn create_test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)]))
}

fn create_test_schema2() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]))
}

fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
Expand Down Expand Up @@ -232,4 +243,133 @@ mod tests {
assert_sanity_check(&limit, true);
Ok(())
}

#[tokio::test]
/// Tests that plan is valid when the sort requirements are satisfied.
async fn test_sort_merge_join_satisfied() -> Result<()> {
let schema1 = create_test_schema();
let schema2 = create_test_schema2();
let source1 = memory_exec(&schema1);
let source2 = memory_exec(&schema2);
let sort_opts = SortOptions::default();
let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)];
let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)];
let left = sort_exec(sort_exprs1, source1);
let right = sort_exec(sort_exprs2, source2);
let left_jcol = col("c9", &left.schema()).unwrap();
let right_jcol = col("a", &right.schema()).unwrap();
let left = Arc::new(RepartitionExec::try_new(
left,
Partitioning::Hash(vec![left_jcol.clone()], 10),
)?);

let right = Arc::new(RepartitionExec::try_new(
right,
Partitioning::Hash(vec![right_jcol.clone()], 10),
)?);

let join_on = vec![(left_jcol as _, right_jcol as _)];
let join_ty = JoinType::Inner;
let smj = sort_merge_join_exec(left, right, &join_on, &join_ty);

assert_plan(
smj.as_ref(),
vec![
"SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]",
" RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1",
" SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
" RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
],
);
assert_sanity_check(&smj, true);
Ok(())
}

#[tokio::test]
/// Tests that plan is valid when the sort requirements are satisfied.
async fn test_sort_merge_join_order_missing() -> Result<()> {
let schema1 = create_test_schema();
let schema2 = create_test_schema2();
let source1 = memory_exec(&schema1);
let right = memory_exec(&schema2);
let sort_exprs1 = vec![sort_expr_options(
"c9",
&source1.schema(),
SortOptions::default(),
)];
let left = sort_exec(sort_exprs1, source1);
// Missing sort of the right child here..
let left_jcol = col("c9", &left.schema()).unwrap();
let right_jcol = col("a", &right.schema()).unwrap();
let left = Arc::new(RepartitionExec::try_new(
left,
Partitioning::Hash(vec![left_jcol.clone()], 10),
)?);

let right = Arc::new(RepartitionExec::try_new(
right,
Partitioning::Hash(vec![right_jcol.clone()], 10),
)?);

let join_on = vec![(left_jcol as _, right_jcol as _)];
let join_ty = JoinType::Inner;
let smj = sort_merge_join_exec(left, right, &join_on, &join_ty);

assert_plan(
smj.as_ref(),
vec![
"SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]",
" RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1",
" SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
" RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1",
" MemoryExec: partitions=1, partition_sizes=[0]",
],
);
assert_sanity_check(&smj, false);
Ok(())
}

#[tokio::test]
/// Tests that plan is valid when the sort requirements are satisfied.
async fn test_sort_merge_join_dist_missing() -> Result<()> {
let schema1 = create_test_schema();
let schema2 = create_test_schema2();
let source1 = memory_exec(&schema1);
let source2 = memory_exec(&schema2);
let sort_opts = SortOptions::default();
let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)];
let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)];
let left = sort_exec(sort_exprs1, source1);
let right = sort_exec(sort_exprs2, source2);
let left_jcol = col("c9", &left.schema()).unwrap();
let right_jcol = col("a", &right.schema()).unwrap();
let left = Arc::new(RepartitionExec::try_new(
left,
Partitioning::Hash(vec![left_jcol.clone()], 10),
)?);

// Missing hash partitioning here..

let join_on = vec![(left_jcol as _, right_jcol as _)];
let join_ty = JoinType::Inner;
let smj = sort_merge_join_exec(left, right, &join_on, &join_ty);

assert_plan(
smj.as_ref(),
vec![
"SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]",
" RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1",
" SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
],
);
assert_sanity_check(&smj, false);
Ok(())
}
}

0 comments on commit 49aba4b

Please sign in to comment.