Skip to content

Commit

Permalink
Support views
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 7, 2024
1 parent a37f6e1 commit b1cbde5
Showing 1 changed file with 29 additions and 9 deletions.
38 changes: 29 additions & 9 deletions datafusion/functions-aggregate/src/min_max/min_max_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// under the License.

use arrow::array::{
Array, ArrayRef, AsArray, BooleanArray, GenericStringArray, LargeStringBuilder,
OffsetSizeTrait, StringBuilder,
Array, ArrayRef, AsArray, BooleanArray, LargeStringBuilder,
StringBuilder,
};
use arrow_schema::DataType;
use datafusion_common::{internal_err, Result};
Expand Down Expand Up @@ -101,21 +101,20 @@ where
}

/// updates the min/max values for the given string values
fn update_batch_string<O: OffsetSizeTrait>(
fn update_batch_string<'a>(
&mut self,
array: &GenericStringArray<O>,
array: impl IntoIterator<Item = Option<&'a str>>,
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.min_max.resize(total_num_groups, None);
let mut locations = vec![MinMaxLocation::ExistingMinMax; array.len()];
let mut locations = vec![MinMaxLocation::ExistingMinMax; group_indices.len()];

assert!(opt_filter.is_none(), "Filtering not yet implemented");

// Figure out the new min value for each group
assert_eq!(array.len(), group_indices.len());
for (new_val, group_index) in array.iter().zip(group_indices.iter()) {
for (new_val, group_index) in array.into_iter().zip(group_indices.iter()) {
let group_index = *group_index;
// ignore null inputs
let Some(new_val) = new_val else {
Expand Down Expand Up @@ -164,15 +163,24 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
let array = &values[0];
assert_eq!(array.len(), group_indices.len());

match self.data_type {
DataType::Utf8 => self.update_batch_string(
values[0].as_string::<i32>(),
array.as_string::<i32>().iter(),
group_indices,
opt_filter,
total_num_groups,
),
DataType::LargeUtf8 => self.update_batch_string(
values[0].as_string::<i64>(),
array.as_string::<i64>().iter(),
group_indices,
opt_filter,
total_num_groups,
),
DataType::Utf8View => self.update_batch_string(
array.as_string_view().iter(),
group_indices,
opt_filter,
total_num_groups,
Expand Down Expand Up @@ -227,6 +235,18 @@ where
}
Ok(Arc::new(builder.finish()) as ArrayRef)
}
DataType::Utf8View => {
let mut builder =
StringBuilder::with_capacity(min_maxes.len(), data_capacity);
for opt in min_maxes {
match opt {
None => builder.append_null(),
Some(s) => builder.append_value(s.as_str()),
}
}
Ok(Arc::new(builder.finish()) as ArrayRef)
}

_ => internal_err!(
"Unexpected data type for MinMaxBytesAccumulator: {:?}",
self.data_type
Expand Down

0 comments on commit b1cbde5

Please sign in to comment.