Skip to content

Commit

Permalink
Change ScalarValue::Struct to ArrayRef (#7893)
Browse files Browse the repository at this point in the history
* first draft

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* cleanup

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* rebase

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* todo

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* revert back to_scalar

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* cleanup

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fmt

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fix debug

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fix display

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fix try from array

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* comment

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fix state types

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

---------

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 authored Feb 7, 2024
1 parent 77f9775 commit 8413da8
Show file tree
Hide file tree
Showing 13 changed files with 692 additions and 575 deletions.
714 changes: 490 additions & 224 deletions datafusion/common/src/scalar.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,6 @@ mod tests {
use crate::physical_plan::{DisplayAs, SendableRecordBatchStream};
use crate::physical_planner::PhysicalPlanner;
use crate::prelude::{SessionConfig, SessionContext};
use crate::scalar::ScalarValue;
use crate::test_util::{scan_empty, scan_empty_with_partitions};
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type, SchemaRef};
Expand Down Expand Up @@ -2310,10 +2309,11 @@ mod tests {

/// Return a `null` literal representing a struct type like: `{ a: bool }`
fn struct_literal() -> Expr {
let struct_literal = ScalarValue::Struct(
None,
let struct_literal = ScalarValue::try_from(DataType::Struct(
vec![Field::new("foo", DataType::Boolean, false)].into(),
);
))
.unwrap();

lit(struct_literal)
}

Expand Down
29 changes: 11 additions & 18 deletions datafusion/core/tests/user_defined/user_defined_aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! user defined aggregate functions

use arrow::{array::AsArray, datatypes::Fields};
use arrow_array::{types::UInt64Type, Int32Array, PrimitiveArray};
use arrow_array::{types::UInt64Type, Int32Array, PrimitiveArray, StructArray};
use arrow_schema::Schema;
use std::sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -582,36 +582,29 @@ impl FirstSelector {

// Internally, keep the data types as this type
fn state_datatypes() -> Vec<DataType> {
vec![
DataType::Float64,
DataType::Timestamp(TimeUnit::Nanosecond, None),
]
vec![Self::output_datatype()]
}

/// Convert to a set of ScalarValues
fn to_state(&self) -> Vec<ScalarValue> {
vec![
ScalarValue::Float64(Some(self.value)),
ScalarValue::TimestampNanosecond(Some(self.time), None),
]
}
fn to_state(&self) -> Result<ScalarValue> {
let f64arr = Arc::new(Float64Array::from(vec![self.value])) as ArrayRef;
let timearr =
Arc::new(TimestampNanosecondArray::from(vec![self.time])) as ArrayRef;

/// return this selector as a single scalar (struct) value
fn to_scalar(&self) -> ScalarValue {
ScalarValue::Struct(Some(self.to_state()), Self::fields())
let struct_arr =
StructArray::try_new(Self::fields(), vec![f64arr, timearr], None)?;
Ok(ScalarValue::Struct(Arc::new(struct_arr)))
}
}

impl Accumulator for FirstSelector {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let state = self.to_state().into_iter().collect::<Vec<_>>();

Ok(state)
self.evaluate().map(|s| vec![s])
}

/// produce the output structure
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.to_scalar())
self.to_state()
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
Expand Down
28 changes: 10 additions & 18 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

//! Implementations for DISTINCT expressions, e.g. `COUNT(DISTINCT c)`

use arrow::datatypes::{DataType, Field};
use std::any::Any;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;

use arrow::array::ArrayRef;
use std::collections::HashSet;
use arrow::datatypes::{DataType, Field};

use crate::aggregate::utils::down_cast_any_ref;
use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};

use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;

Expand Down Expand Up @@ -137,10 +138,11 @@ impl Accumulator for DistinctArrayAggAccumulator {
assert_eq!(values.len(), 1, "batch input should only include 1 column!");

let array = &values[0];
let scalars = ScalarValue::convert_array_to_scalar_vec(array)?;
for scalar in scalars {
self.values.extend(scalar)
let scalar_vec = ScalarValue::convert_array_to_scalar_vec(array)?;
for scalars in scalar_vec {
self.values.extend(scalars);
}

Ok(())
}

Expand All @@ -149,18 +151,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
return Ok(());
}

assert_eq!(
states.len(),
1,
"array_agg_distinct states must contain single array"
);

let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&states[0])?;
for scalars in scalar_vec {
self.values.extend(scalars)
}

Ok(())
self.update_batch(states)
}

fn evaluate(&mut self) -> Result<ScalarValue> {
Expand All @@ -187,7 +178,8 @@ mod tests {
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::types::Int32Type;
use arrow_array::{Array, ListArray};
use arrow_array::Array;
use arrow_array::ListArray;
use arrow_buffer::OffsetBuffer;
use datafusion_common::utils::array_into_list_array;
use datafusion_common::{internal_err, DataFusionError};
Expand Down
80 changes: 52 additions & 28 deletions datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use crate::{
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
use arrow_array::{new_empty_array, StructArray};
use arrow_schema::{Fields, SortOptions};

use datafusion_common::utils::array_into_list_array;
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -219,6 +222,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
if states.is_empty() {
return Ok(());
}

// First entry in the state is the aggregation result. Second entry
// stores values received for ordering requirement columns for each
// aggregation value inside `ARRAY_AGG` list. For each `StructArray`
Expand All @@ -241,41 +245,49 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
partition_values.push(self.values.clone().into());
partition_ordering_values.push(self.ordering_values.clone().into());

// Convert array to Scalars to sort them easily. Convert back to array at evaluation.
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;

for v in array_agg_res.into_iter() {
partition_values.push(v.into());
}

let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;

let ordering_values = orderings.into_iter().map(|partition_ordering_rows| {
for partition_ordering_rows in orderings.into_iter() {
// Extract value from struct to ordering_rows for each group/partition
partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
Ok(ordering_columns_per_row)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Some(..), _) but got: {:?}",
ordering_row.data_type()
)
}
}).collect::<Result<VecDeque<_>>>()
}).collect::<Result<Vec<_>>>()?;
for ordering_values in ordering_values.into_iter() {
partition_ordering_values.push(ordering_values);
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(s) = ordering_row {
let mut ordering_columns_per_row = vec![];

for column in s.columns() {
let sv = ScalarValue::try_from_array(column, 0)?;
ordering_columns_per_row.push(sv);
}

Ok(ordering_columns_per_row)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Arc<StructArray>) but got:{:?}",
ordering_row.data_type()
)
}
}).collect::<Result<VecDeque<_>>>()?;

partition_ordering_values.push(ordering_value);
}

let sort_options = self
.ordering_req
.iter()
.map(|sort_expr| sort_expr.options)
.collect::<Vec<_>>();

(self.values, self.ordering_values) = merge_ordered_arrays(
&mut partition_values,
&mut partition_ordering_values,
&sort_options,
)?;

Ok(())
}

Expand Down Expand Up @@ -323,20 +335,32 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
impl OrderSensitiveArrayAggAccumulator {
fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields);

let orderings: Vec<ScalarValue> = self
.ordering_values
.iter()
.map(|ordering| {
ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
})
.collect();
let struct_type = DataType::Struct(struct_field);
let num_columns = fields.len();
let struct_field = Fields::from(fields.clone());

let mut column_wise_ordering_values = vec![];
for i in 0..num_columns {
let column_values = self
.ordering_values
.iter()
.map(|x| x[i].clone())
.collect::<Vec<_>>();
let array = if column_values.is_empty() {
new_empty_array(fields[i].data_type())
} else {
ScalarValue::iter_to_array(column_values.into_iter())?
};
column_wise_ordering_values.push(array);
}

// Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases
let arr = ScalarValue::new_list(&orderings, &struct_type);
Ok(ScalarValue::List(arr))
let ordering_array = StructArray::try_new(
struct_field.clone(),
column_wise_ordering_values,
None,
)?;
Ok(ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(ordering_array),
))))
}
}

Expand Down
53 changes: 37 additions & 16 deletions datafusion/physical-expr/src/aggregate/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use crate::{
};

use arrow_array::cast::AsArray;
use arrow_array::ArrayRef;
use arrow_array::{new_empty_array, ArrayRef, StructArray};
use arrow_schema::{DataType, Field, Fields};
use datafusion_common::utils::get_row_at_idx;
use datafusion_common::utils::{array_into_list_array, get_row_at_idx};
use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;

Expand Down Expand Up @@ -271,7 +271,14 @@ impl Accumulator for NthValueAccumulator {
let ordering_values = orderings.into_iter().map(|partition_ordering_rows| {
// Extract value from struct to ordering_rows for each group/partition
partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
if let ScalarValue::Struct(s) = ordering_row {
let mut ordering_columns_per_row = vec![];

for column in s.columns() {
let sv = ScalarValue::try_from_array(column, 0)?;
ordering_columns_per_row.push(sv);
}

Ok(ordering_columns_per_row)
} else {
exec_err!(
Expand Down Expand Up @@ -306,7 +313,7 @@ impl Accumulator for NthValueAccumulator {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.evaluate_values()];
if !self.ordering_req.is_empty() {
result.push(self.evaluate_orderings());
result.push(self.evaluate_orderings()?);
}
Ok(result)
}
Expand Down Expand Up @@ -355,21 +362,35 @@ impl Accumulator for NthValueAccumulator {
}

impl NthValueAccumulator {
fn evaluate_orderings(&self) -> ScalarValue {
fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields);
let struct_field = Fields::from(fields.clone());

let orderings = self
.ordering_values
.iter()
.map(|ordering| {
ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
})
.collect::<Vec<_>>();
let struct_type = DataType::Struct(struct_field);
let mut column_wise_ordering_values = vec![];
let num_columns = fields.len();
for i in 0..num_columns {
let column_values = self
.ordering_values
.iter()
.map(|x| x[i].clone())
.collect::<Vec<_>>();
let array = if column_values.is_empty() {
new_empty_array(fields[i].data_type())
} else {
ScalarValue::iter_to_array(column_values.into_iter())?
};
column_wise_ordering_values.push(array);
}

let ordering_array = StructArray::try_new(
struct_field.clone(),
column_wise_ordering_values,
None,
)?;

// Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases
ScalarValue::List(ScalarValue::new_list(&orderings, &struct_type))
Ok(ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(ordering_array),
))))
}

fn evaluate_values(&self) -> ScalarValue {
Expand Down
Loading

0 comments on commit 8413da8

Please sign in to comment.