Skip to content

Commit

Permalink
Implement special min/max accumulator for Strings: `MinMaxBytesAccumu…
Browse files Browse the repository at this point in the history
…lator`
  • Loading branch information
alamb committed Oct 7, 2024
1 parent 9d8f77d commit c4d6bef
Show file tree
Hide file tree
Showing 4 changed files with 412 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl NullState {
///
/// When value_fn is called it also sets
///
/// 1. `self.seen_values[group_index]` to true for all rows that had a non null vale
/// 1. `self.seen_values[group_index]` to true for all rows that had a non null value
pub fn accumulate<T, F>(
&mut self,
group_indices: &[usize],
Expand Down
198 changes: 127 additions & 71 deletions datafusion/functions-aggregate/src/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,12 @@
//! [`Max`] and [`MaxAccumulator`] accumulator for the `max` function
//! [`Min`] and [`MinAccumulator`] accumulator for the `min` function

// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
mod min_max_bytes;

use arrow::array::{
ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, IntervalDayTimeArray,
ArrayRef, AsArray, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
Date64Array, Decimal128Array, Decimal256Array, Float16Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, IntervalDayTimeArray,
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeBinaryArray,
LargeStringArray, StringArray, StringViewArray, Time32MillisecondArray,
Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
Expand Down Expand Up @@ -64,6 +51,7 @@ use arrow::datatypes::{
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};

use crate::min_max::min_max_bytes::MinMaxBytesAccumulator;
use datafusion_common::ScalarValue;
use datafusion_expr::{
function::AccumulatorArgs, Accumulator, AggregateUDFImpl, Signature, Volatility,
Expand Down Expand Up @@ -116,7 +104,7 @@ impl Default for Max {
/// the specified [`ArrowPrimitiveType`].
///
/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType
macro_rules! instantiate_max_accumulator {
macro_rules! instantiate_primitive_max_accumulator {
($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
Ok(Box::new(
PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new($DATA_TYPE, |cur, new| {
Expand All @@ -135,7 +123,7 @@ macro_rules! instantiate_max_accumulator {
///
///
/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType
macro_rules! instantiate_min_accumulator {
macro_rules! instantiate_primitive_min_accumulator {
($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{
Ok(Box::new(
PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new(&$DATA_TYPE, |cur, new| {
Expand Down Expand Up @@ -243,6 +231,12 @@ impl AggregateUDFImpl for Max {
| Time32(_)
| Time64(_)
| Timestamp(_, _)
| Utf8
| LargeUtf8
| Utf8View
| Binary
| LargeBinary
| BinaryView
)
}

Expand All @@ -254,58 +248,86 @@ impl AggregateUDFImpl for Max {
use TimeUnit::*;
let data_type = args.return_type;
match data_type {
Int8 => instantiate_max_accumulator!(data_type, i8, Int8Type),
Int16 => instantiate_max_accumulator!(data_type, i16, Int16Type),
Int32 => instantiate_max_accumulator!(data_type, i32, Int32Type),
Int64 => instantiate_max_accumulator!(data_type, i64, Int64Type),
UInt8 => instantiate_max_accumulator!(data_type, u8, UInt8Type),
UInt16 => instantiate_max_accumulator!(data_type, u16, UInt16Type),
UInt32 => instantiate_max_accumulator!(data_type, u32, UInt32Type),
UInt64 => instantiate_max_accumulator!(data_type, u64, UInt64Type),
Int8 => instantiate_primitive_max_accumulator!(data_type, i8, Int8Type),
Int16 => instantiate_primitive_max_accumulator!(data_type, i16, Int16Type),
Int32 => instantiate_primitive_max_accumulator!(data_type, i32, Int32Type),
Int64 => instantiate_primitive_max_accumulator!(data_type, i64, Int64Type),
UInt8 => instantiate_primitive_max_accumulator!(data_type, u8, UInt8Type),
UInt16 => instantiate_primitive_max_accumulator!(data_type, u16, UInt16Type),
UInt32 => instantiate_primitive_max_accumulator!(data_type, u32, UInt32Type),
UInt64 => instantiate_primitive_max_accumulator!(data_type, u64, UInt64Type),
Float16 => {
instantiate_max_accumulator!(data_type, f16, Float16Type)
instantiate_primitive_max_accumulator!(data_type, f16, Float16Type)
}
Float32 => {
instantiate_max_accumulator!(data_type, f32, Float32Type)
instantiate_primitive_max_accumulator!(data_type, f32, Float32Type)
}
Float64 => {
instantiate_max_accumulator!(data_type, f64, Float64Type)
instantiate_primitive_max_accumulator!(data_type, f64, Float64Type)
}
Date32 => instantiate_max_accumulator!(data_type, i32, Date32Type),
Date64 => instantiate_max_accumulator!(data_type, i64, Date64Type),
Date32 => instantiate_primitive_max_accumulator!(data_type, i32, Date32Type),
Date64 => instantiate_primitive_max_accumulator!(data_type, i64, Date64Type),
Time32(Second) => {
instantiate_max_accumulator!(data_type, i32, Time32SecondType)
instantiate_primitive_max_accumulator!(data_type, i32, Time32SecondType)
}
Time32(Millisecond) => {
instantiate_max_accumulator!(data_type, i32, Time32MillisecondType)
instantiate_primitive_max_accumulator!(
data_type,
i32,
Time32MillisecondType
)
}
Time64(Microsecond) => {
instantiate_max_accumulator!(data_type, i64, Time64MicrosecondType)
instantiate_primitive_max_accumulator!(
data_type,
i64,
Time64MicrosecondType
)
}
Time64(Nanosecond) => {
instantiate_max_accumulator!(data_type, i64, Time64NanosecondType)
instantiate_primitive_max_accumulator!(
data_type,
i64,
Time64NanosecondType
)
}
Timestamp(Second, _) => {
instantiate_max_accumulator!(data_type, i64, TimestampSecondType)
instantiate_primitive_max_accumulator!(
data_type,
i64,
TimestampSecondType
)
}
Timestamp(Millisecond, _) => {
instantiate_max_accumulator!(data_type, i64, TimestampMillisecondType)
instantiate_primitive_max_accumulator!(
data_type,
i64,
TimestampMillisecondType
)
}
Timestamp(Microsecond, _) => {
instantiate_max_accumulator!(data_type, i64, TimestampMicrosecondType)
instantiate_primitive_max_accumulator!(
data_type,
i64,
TimestampMicrosecondType
)
}
Timestamp(Nanosecond, _) => {
instantiate_max_accumulator!(data_type, i64, TimestampNanosecondType)
instantiate_primitive_max_accumulator!(
data_type,
i64,
TimestampNanosecondType
)
}
Decimal128(_, _) => {
instantiate_max_accumulator!(data_type, i128, Decimal128Type)
instantiate_primitive_max_accumulator!(data_type, i128, Decimal128Type)
}
Decimal256(_, _) => {
instantiate_max_accumulator!(data_type, i256, Decimal256Type)
instantiate_primitive_max_accumulator!(data_type, i256, Decimal256Type)
}
Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
Ok(Box::new(MinMaxBytesAccumulator::new(data_type.clone())))
}

// It would be nice to have a fast implementation for Strings as well
// https://github.com/apache/datafusion/issues/6906

// This is only reached if groups_accumulator_supported is out of sync
_ => internal_err!("GroupsAccumulator not supported for max({})", data_type),
Expand Down Expand Up @@ -1040,6 +1062,12 @@ impl AggregateUDFImpl for Min {
| Time32(_)
| Time64(_)
| Timestamp(_, _)
| Utf8
| LargeUtf8
| Utf8View
| Binary
| LargeBinary
| BinaryView
)
}

Expand All @@ -1051,58 +1079,86 @@ impl AggregateUDFImpl for Min {
use TimeUnit::*;
let data_type = args.return_type;
match data_type {
Int8 => instantiate_min_accumulator!(data_type, i8, Int8Type),
Int16 => instantiate_min_accumulator!(data_type, i16, Int16Type),
Int32 => instantiate_min_accumulator!(data_type, i32, Int32Type),
Int64 => instantiate_min_accumulator!(data_type, i64, Int64Type),
UInt8 => instantiate_min_accumulator!(data_type, u8, UInt8Type),
UInt16 => instantiate_min_accumulator!(data_type, u16, UInt16Type),
UInt32 => instantiate_min_accumulator!(data_type, u32, UInt32Type),
UInt64 => instantiate_min_accumulator!(data_type, u64, UInt64Type),
Int8 => instantiate_primitive_min_accumulator!(data_type, i8, Int8Type),
Int16 => instantiate_primitive_min_accumulator!(data_type, i16, Int16Type),
Int32 => instantiate_primitive_min_accumulator!(data_type, i32, Int32Type),
Int64 => instantiate_primitive_min_accumulator!(data_type, i64, Int64Type),
UInt8 => instantiate_primitive_min_accumulator!(data_type, u8, UInt8Type),
UInt16 => instantiate_primitive_min_accumulator!(data_type, u16, UInt16Type),
UInt32 => instantiate_primitive_min_accumulator!(data_type, u32, UInt32Type),
UInt64 => instantiate_primitive_min_accumulator!(data_type, u64, UInt64Type),
Float16 => {
instantiate_min_accumulator!(data_type, f16, Float16Type)
instantiate_primitive_min_accumulator!(data_type, f16, Float16Type)
}
Float32 => {
instantiate_min_accumulator!(data_type, f32, Float32Type)
instantiate_primitive_min_accumulator!(data_type, f32, Float32Type)
}
Float64 => {
instantiate_min_accumulator!(data_type, f64, Float64Type)
instantiate_primitive_min_accumulator!(data_type, f64, Float64Type)
}
Date32 => instantiate_min_accumulator!(data_type, i32, Date32Type),
Date64 => instantiate_min_accumulator!(data_type, i64, Date64Type),
Date32 => instantiate_primitive_min_accumulator!(data_type, i32, Date32Type),
Date64 => instantiate_primitive_min_accumulator!(data_type, i64, Date64Type),
Time32(Second) => {
instantiate_min_accumulator!(data_type, i32, Time32SecondType)
instantiate_primitive_min_accumulator!(data_type, i32, Time32SecondType)
}
Time32(Millisecond) => {
instantiate_min_accumulator!(data_type, i32, Time32MillisecondType)
instantiate_primitive_min_accumulator!(
data_type,
i32,
Time32MillisecondType
)
}
Time64(Microsecond) => {
instantiate_min_accumulator!(data_type, i64, Time64MicrosecondType)
instantiate_primitive_min_accumulator!(
data_type,
i64,
Time64MicrosecondType
)
}
Time64(Nanosecond) => {
instantiate_min_accumulator!(data_type, i64, Time64NanosecondType)
instantiate_primitive_min_accumulator!(
data_type,
i64,
Time64NanosecondType
)
}
Timestamp(Second, _) => {
instantiate_min_accumulator!(data_type, i64, TimestampSecondType)
instantiate_primitive_min_accumulator!(
data_type,
i64,
TimestampSecondType
)
}
Timestamp(Millisecond, _) => {
instantiate_min_accumulator!(data_type, i64, TimestampMillisecondType)
instantiate_primitive_min_accumulator!(
data_type,
i64,
TimestampMillisecondType
)
}
Timestamp(Microsecond, _) => {
instantiate_min_accumulator!(data_type, i64, TimestampMicrosecondType)
instantiate_primitive_min_accumulator!(
data_type,
i64,
TimestampMicrosecondType
)
}
Timestamp(Nanosecond, _) => {
instantiate_min_accumulator!(data_type, i64, TimestampNanosecondType)
instantiate_primitive_min_accumulator!(
data_type,
i64,
TimestampNanosecondType
)
}
Decimal128(_, _) => {
instantiate_min_accumulator!(data_type, i128, Decimal128Type)
instantiate_primitive_min_accumulator!(data_type, i128, Decimal128Type)
}
Decimal256(_, _) => {
instantiate_min_accumulator!(data_type, i256, Decimal256Type)
instantiate_primitive_min_accumulator!(data_type, i256, Decimal256Type)
}
Utf8 | LargeUtf8 | Utf8View | Binary | LargeBinary | BinaryView => {
Ok(Box::new(MinMaxBytesAccumulator::new(data_type.clone())))
}

// It would be nice to have a fast implementation for Strings as well
// https://github.com/apache/datafusion/issues/6906

// This is only reached if groups_accumulator_supported is out of sync
_ => internal_err!("GroupsAccumulator not supported for min({})", data_type),
Expand Down
Loading

0 comments on commit c4d6bef

Please sign in to comment.