Skip to content

Commit

Permalink
Use prep_null_mask_filter to handle nulls in selection mask (#9163)
Browse files Browse the repository at this point in the history
* Use prep_null_mask_filter to handle nulls in selection mask

* Update datafusion/physical-plan/src/joins/sort_merge_join.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Avoid unwrap

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
viirya and alamb authored Feb 10, 2024
1 parent a48e271 commit b2ff63f
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,15 @@ impl SMJStream {
) {
// The reverse of the selection mask. For the rows not pass join filter above,
// we need to join them (left or right) with null rows for outer joins.
let not_mask = compute::not(mask)?;
let not_mask = if mask.null_count() > 0 {
// If the mask contains nulls, we need to use `prep_null_mask_filter` to
// handle the nulls in the mask as false to produce rows where the mask
// was null itself.
compute::not(&compute::prep_null_mask_filter(mask))?
} else {
compute::not(mask)?
};

let null_joined_batch =
compute::filter_record_batch(&output_batch, &not_mask)?;

Expand Down Expand Up @@ -1254,6 +1262,19 @@ impl SMJStream {

// For full join, we also need to output the null joined rows from the buffered side
if matches!(self.join_type, JoinType::Full) {
// Handle not mask for buffered side further.
// For buffered side, we want to output the rows that are not null joined with
// the streamed side. i.e. the rows that are not null in the `buffered_indices`.
let not_mask = if let Some(nulls) = buffered_indices.nulls() {
let mask = not_mask.values() & nulls.inner();
BooleanArray::new(mask, None)
} else {
not_mask
};

let null_joined_batch =
compute::filter_record_batch(&output_batch, &not_mask)?;

let mut streamed_columns = self
.streamed_schema
.fields()
Expand Down

0 comments on commit b2ff63f

Please sign in to comment.