Skip to content

Commit

Permalink
Fix docs
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 5, 2023
1 parent f973a65 commit 24abb14
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 51 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions datafusion/core/src/physical_plan/aggregates/row_hash2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ pub(crate) struct GroupedHashAggregateStream2 {
/// specialized for that partcular aggregate and its input types
accumulators: Vec<Box<dyn GroupsAccumulator>>,

/// Arguments or each accumulator.
/// Arguments to pass to accumulator.
aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,

/// Optional filter expression to evaluate, one for each for
/// aggregate. If present, only those rows for which the filter
/// accumulator. If present, only those rows for which the filter
/// evaluate to true should be included in the aggregate results.
///
/// For example, for an aggregate like `SUM(x FILTER x > 100)`,
Expand All @@ -161,17 +161,19 @@ pub(crate) struct GroupedHashAggregateStream2 {
map: RawTable<(u64, usize)>,

/// The actual group by values, stored in arrow [`Row`] format. The
/// group_values[i] holds the group value for group_index `i`.
/// `group_values[i]` holds the group value for group_index `i`.
///
/// The row format is used to compare group keys quickly. This is
/// especially important for multi-column group keys.
///
/// [`Row`]: arrow::row::Row
group_values: Rows,

/// scratch space for the current input Batch being
/// scratch space for the current input [`RecordBatch`] being
/// processed. Reused across batches here to avoid reallocations
current_group_indices: Vec<usize>,

/// Tracks if this stream is generating input/output?
/// Tracks if this stream is generating input or output
exec_state: ExecutionState,

/// Execution metrics
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ impl RowAccumulator for AvgRowAccumulator {
}
}

/// An accumulator to compute the average of PrimitiveArray<T>.
/// An accumulator to compute the average of `[PrimitiveArray<T>]`.
/// Stores values as native types, and does overflow checking
///
/// F: Function that calcuates the average value from a sum of
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Count {
}
}

/// An accumulator to compute the average of PrimitiveArray<T>.
/// An accumulator to compute the counts of [`PrimitiveArray<T>`].
/// Stores values as native types, and does overflow checking
///
/// Unlike most other accumulators, COUNT never produces NULLs. If no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Vectorized [`accumulate`] and [`accumulate_nullable`] functions.
//! Vectorized accumulate helpers: [`NullState`] and [`accumulate_indices`]
//!
//! These functions are designed to be the performance critical inner
//! loops of accumlators and thus there are multiple versions, to be
Expand Down Expand Up @@ -51,21 +51,21 @@ pub struct NullState {
/// Tracks validity (if we we have seen a null input value for
/// `group_index`)
///
/// If null_inputs[i] is true, it means we haven't seen any null values for
/// that group (including not having seen any)
/// If `null_inputs[i]` is true, have not seen any null values for
/// that group (also true for no values)
///
/// If null_inputs[i] is false, it means we saw at least one null value for
/// If `null_inputs[i]` is false, saw at least one null value for
/// that group
null_inputs: Option<BooleanBufferBuilder>,

/// If there has been a filter value, has it seen any non-filtered
/// input values for `group_index`?
///
/// If seen_values[i] is true, it means we have seen at least one
/// non null value for this group
/// If `seen_values[i]` is true, it seen at least one non null
/// value for this group
///
/// If seen_values[i] is false, it means we have not seen any
/// values that pass the filter yet for the group
/// If `seen_values[i]` is false, have not seen any values that
/// pass the filter yet for the group
seen_values: Option<BooleanBufferBuilder>,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use datafusion_expr::Accumulator;
pub struct GroupsAccumulatorAdapter {
factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,

/// [`Accumulators`] for each group, stored in group_index order
/// state for each group, stored in group_index order
states: Vec<AccumulatorState>,

/// Current memory usage, in bytes.
Expand All @@ -48,12 +48,12 @@ pub struct GroupsAccumulatorAdapter {
}

struct AccumulatorState {
/// [`Accumulators`]
/// [`Accumulator`] that stores the per-group state
accumulator: Box<dyn Accumulator>,

// scratch space for holding the indexes in the input array that
// will be fed to this accumulator. Use u32 to match take kernel
// input
// scratch space: indexes in the input array that will be fed to
// this accumulator. Stores indexes as `u32` to match the arrow
// `take` kernel input.
indices: Vec<u32>,
}

Expand Down
54 changes: 30 additions & 24 deletions datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,19 @@ use datafusion_common::Result;
/// expected that each GroupAccumulator will use something like `Vec<..>`
/// to store the group states.
pub trait GroupsAccumulator: Send {
/// updates the accumulator's state from a vector of arrays:
/// Updates the accumulator's state from its arguments, encoded as
/// a vector of arrow [`ArrayRef`]s.
///
/// * `values`: the input arguments to the accumulator
/// * `group_indices`: To which groups do the rows in `values` belong, group id)
/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
/// * `total_num_groups`: the number of groups (the largest group_index is total_num_groups - 1)
///
/// * `group_indices`: To which groups do the rows in `values`
/// belong, group id)
///
/// * `opt_filter`: if present, only update aggregate state using
/// `values[i]` if `opt_filter[i]` is true
///
/// * `total_num_groups`: the number of groups (the largest
/// group_index is thus `total_num_groups - 1`)
fn update_batch(
&mut self,
values: &[ArrayRef],
Expand All @@ -53,52 +60,51 @@ pub trait GroupsAccumulator: Send {
/// `RecordBatch`.
///
/// The rows returned *must* be in group_index order: The value
/// for group_index 0, followed by 1, etc. Any group_index that
/// did not have values, should be null.
/// for group_index 0, followed by 1, etc.
///
/// OPEN QUESTION: Should this method take a "batch_size: usize"
/// and produce a Vec<RecordBatch> as output to avoid 1) requiring
/// one giant intermediate buffer?
/// and produce a `Vec<RecordBatch>` as output to avoid requiring
/// a contiguous intermediate buffer?
///
/// For example, the `SUM` accumulator maintains a running sum,
/// and `evaluate` will produce that running sum as its output for
/// all groups, in group_index order
///
/// This call should be treated as consuming (takes `self`, but it
/// can not be due to keeping it object save) the accumulator is
/// free to release / reset it is internal state after this call
/// and error on any subsequent call.
/// This call should be treated as consuming (takes `self`) as no
/// other functions will be called after this. This can not
/// actually take `self` otherwise the trait would not be object
/// safe). The accumulator is free to release / reset it is
/// internal state after this call and error on any subsequent
/// call.
fn evaluate(&mut self) -> Result<ArrayRef>;

/// Returns any intermediate aggregate state, used for multi-phase
/// grouping.
/// Returns the intermediate aggregate state for this accumulator,
/// used for multi-phase grouping.
///
/// The rows returned *must* be in group_index order: The value
/// for group_index 0, followed by 1, etc. Any group_index that
/// did not have values, should be null.
///
/// For example, AVG returns two arrays: `SUM` and `COUNT`.
///
/// This call should be treated as consuming (takes `self`, but it
/// can not be due to keeping it object save) the accumulator is
/// free to release / reset it is internal state after this call
/// and error on any subsequent call.
/// Note more sophisticated internal state can be passed as
/// single `StructArray` rather than multiple arrays.
///
/// TODO: consider returning a single Array (which could be a
/// StructArray) instead
/// This call should be treated as consuming, as described in the
/// comments of [`Self::evaluate`].
fn state(&mut self) -> Result<Vec<ArrayRef>>;

/// merges intermediate state (from `state()`) into this accumulators values
/// Merges intermediate state (from [`Self::state`]) into this
/// accumulator's values.
///
/// For some aggregates (such as `SUM`), merge_batch is the same
/// as `update_batch`, but for some aggregrates (such as `COUNT`)
/// the operations differ. See [`Self::state`] for more details on how
/// state is used and merged.
///
/// * `values`: arrays produced from calling `state` previously to the accumulator
/// * `group_indices`: To which groups do the rows in `values` belong, group id)
/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
/// * `total_num_groups`: the number of groups (the largest group_index is total_num_groups - 1)
///
/// Other arguments are the same as for [`Self::update_batch`];
fn merge_batch(
&mut self,
values: &[ArrayRef],
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-expr/src/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,7 @@ impl RowAccumulator for SumRowAccumulator {
}
}

/// An accumulator to compute the average of PrimitiveArray<T>.
/// Stores values as native types, and does overflow checking
///
/// F: Function that calcuates the average value from a sum of
/// T::Native and a total count
/// An accumulator to compute the sum of values in [`PrimitiveArray<T>`]
#[derive(Debug)]
struct SumGroupsAccumulator<T>
where
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ pub fn calculate_result_decimal_for_avg(

/// Adjust array type metadata if needed
///
/// Decimal128Arrays are are are created from Vec<NativeType> with default
/// precision and scale. This function adjusts them down.
/// Since `Decimal128Arrays` created from `Vec<NativeType>` have
/// default precision and scale, this function adjusts the output to
/// match `sum_data_type`.
pub fn adjust_output_array(
sum_data_type: &DataType,
array: ArrayRef,
Expand Down

0 comments on commit 24abb14

Please sign in to comment.