diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index dda472d696da..029003374acc 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -349,6 +349,9 @@ impl ExecutionPlan for NestedLoopJoinExec { let indices_cache = (UInt64Array::new_null(0), UInt32Array::new_null(0)); + // Right side has an order and it is maintained during operation. + let right_side_ordered = + self.maintains_input_order()[1] && self.right.output_ordering().is_some(); Ok(Box::pin(NestedLoopJoinStream { schema: Arc::clone(&self.schema), filter: self.filter.clone(), @@ -359,6 +362,7 @@ impl ExecutionPlan for NestedLoopJoinExec { column_indices: self.column_indices.clone(), join_metrics, indices_cache, + right_side_ordered, })) } @@ -460,6 +464,8 @@ struct NestedLoopJoinStream { join_metrics: BuildProbeJoinMetrics, /// Cache for join indices calculations indices_cache: (UInt64Array, UInt32Array), + /// Whether the right side is ordered + right_side_ordered: bool, } /// Creates a Cartesian product of two input batches, preserving the order of the right batch, @@ -578,6 +584,7 @@ impl NestedLoopJoinStream { &self.schema, visited_left_side, &mut self.indices_cache, + self.right_side_ordered, ); // Recording time & updating output metrics @@ -651,6 +658,7 @@ fn join_left_and_right_batch( schema: &Schema, visited_left_side: &SharedBitmapBuilder, indices_cache: &mut (UInt64Array, UInt32Array), + right_side_ordered: bool, ) -> Result { let (left_side, right_side) = build_join_indices(left_batch, right_batch, filter, indices_cache).map_err( @@ -675,7 +683,7 @@ fn join_left_and_right_batch( right_side, 0..right_batch.num_rows(), join_type, - true, + right_side_ordered, ); build_batch_from_indices(