Skip to content

Commit

Permalink
Implement convert to state
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 8, 2024
1 parent 68e9500 commit acc41a3
Showing 1 changed file with 103 additions and 7 deletions.
110 changes: 103 additions & 7 deletions datafusion/functions-aggregate/src/min_max/min_max_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{Array, ArrayRef, AsArray, BinaryBuilder, BinaryViewBuilder, BooleanArray, LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder};
use arrow::array::{
Array, ArrayRef, AsArray, BinaryArray, BinaryBuilder, BinaryViewArray,
BinaryViewBuilder, BooleanArray, LargeBinaryArray, LargeBinaryBuilder,
LargeStringArray, LargeStringBuilder, StringArray, StringBuilder, StringViewArray,
StringViewBuilder,
};
use arrow_schema::DataType;
use datafusion_common::{internal_err, Result};
use datafusion_expr::{EmitTo, GroupsAccumulator};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask;
use std::sync::Arc;

/// Implement the accumulator API for computing min/max
Expand Down Expand Up @@ -312,16 +318,106 @@ impl GroupsAccumulator for MinMaxBytesAccumulator {

fn convert_to_state(
&self,
_values: &[ArrayRef],
_opt_filter: Option<&BooleanArray>,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
// need to apply filter (maybe just combine with nulls) but otherwise just pass input along
todo!()
// Min/max do not change the values as they are their own states
// apply the filter by combining with the null mask, if any
let input = &values[0];
let nulls = filtered_null_mask(opt_filter, input);
if let Some(nulls) = nulls.as_ref() {
assert_eq!(nulls.len(), input.len());
}

let output: ArrayRef = match input.data_type() {
// TODO it would be nice to have safe apis in arrow-rs to update the null buffers in the arrays
DataType::Utf8 => {
let input = input.as_string::<i32>();
// safety: values / offsets came from a valid string array, so are valid utf8
// and we checked nulls has the same length as values
unsafe {
Arc::new(StringArray::new_unchecked(
input.offsets().clone(),
input.values().clone(),
nulls,
))
}
}
DataType::LargeUtf8 => {
let input = input.as_string::<i64>();
// safety: values / offsets came from a valid string array, so are valid utf8
// and we checked nulls has the same length as values
unsafe {
Arc::new(LargeStringArray::new_unchecked(
input.offsets().clone(),
input.values().clone(),
nulls,
))
}
}
DataType::Utf8View => {
let input = input.as_string_view();
// safety: values / views came from a valid string view array, so are valid utf8
// and we checked nulls has the same length as values
unsafe {
Arc::new(StringViewArray::new_unchecked(
input.views().clone(),
input.data_buffers().to_vec(),
nulls,
))
}
}

DataType::Binary => {
let input = input.as_binary::<i32>();
// safety: values / offsets came from a valid binary array
// and we checked nulls has the same length as values
unsafe {
Arc::new(BinaryArray::new_unchecked(
input.offsets().clone(),
input.values().clone(),
nulls,
))
}
}
DataType::LargeBinary => {
let input = input.as_binary::<i64>();
// safety: values / offsets came from a valid large binary array
// and we checked nulls has the same length as values
unsafe {
Arc::new(LargeBinaryArray::new_unchecked(
input.offsets().clone(),
input.values().clone(),
nulls,
))
}
}
DataType::BinaryView => {
let input = input.as_binary_view();
// safety: values / views came from a valid binary view array
// and we checked nulls has the same length as values
unsafe {
Arc::new(BinaryViewArray::new_unchecked(
input.views().clone(),
input.data_buffers().to_vec(),
nulls,
))
}
}
_ => {
return internal_err!(
"Unexpected data type for convert_to_state in MinMaxBytesAccumulator: {:?}",
self.inner.data_type
);
}
};
assert_eq!(input.len(), output.len());
assert_eq!(input.data_type(), output.data_type());
Ok(vec![output])
}

fn supports_convert_to_state(&self) -> bool {
// TODO
false
true
}

fn size(&self) -> usize {
Expand Down

0 comments on commit acc41a3

Please sign in to comment.