Skip to content

Commit

Permalink
Use Rows API
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniël Heres committed Jul 3, 2023
1 parent 7b20155 commit 6275a9f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,12 @@ opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false

# TODO remove after 43 release
[patch.crates-io]
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "d7fa775cf76c7cd54c6d2a86542115599d8f53ee" }
arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "d7fa775cf76c7cd54c6d2a86542115599d8f53ee" }
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "d7fa775cf76c7cd54c6d2a86542115599d8f53ee" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "d7fa775cf76c7cd54c6d2a86542115599d8f53ee" }
arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "d7fa775cf76c7cd54c6d2a86542115599d8f53ee" }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "d7fa775cf76c7cd54c6d2a86542115599d8f53ee" }
22 changes: 9 additions & 13 deletions datafusion/core/src/physical_plan/aggregates/row_hash2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::task::{Context, Poll};
use std::vec;

use ahash::RandomState;
use arrow::row::{OwnedRow, RowConverter, SortField};
use arrow::row::{RowConverter, SortField, Rows};
use datafusion_physical_expr::hash_utils::create_hashes;
use futures::ready;
use futures::stream::{Stream, StreamExt};
Expand Down Expand Up @@ -163,11 +163,7 @@ pub(crate) struct GroupedHashAggregateStream2 {
///
/// The row format is used to compare group keys quickly. This is
/// especially important for multi-column group keys.
///
/// TODO, make this Rows (rather than Vec<OwnedRow> to reduce
/// allocations once
/// https://github.com/apache/arrow-rs/issues/4466 is available
group_values: Vec<OwnedRow>,
group_values: Rows,

/// scratch space for the current input Batch being
/// processed. Reused across batches here to avoid reallocations
Expand Down Expand Up @@ -239,7 +235,7 @@ impl GroupedHashAggregateStream2 {
let name = format!("GroupedHashAggregateStream2[{partition}]");
let reservation = MemoryConsumer::new(name).register(context.memory_pool());
let map = RawTable::with_capacity(0);
let group_by_values = vec![];
let group_by_values = row_converter.empty_rows(0, 0);
let current_group_indices = vec![];

timer.done();
Expand Down Expand Up @@ -381,7 +377,7 @@ impl GroupedHashAggregateStream2 {
// TODO update *allocated based on size of the row
// that was just pushed into
// aggr_state.group_by_values
group_rows.row(row) == self.group_values[*group_idx].row()
group_rows.row(row) == self.group_values.row(*group_idx)
});

let group_idx = match entry {
Expand All @@ -390,8 +386,8 @@ impl GroupedHashAggregateStream2 {
// 1.2 Need to create new entry for the group
None => {
// Add new entry to aggr_state and save newly created index
let group_idx = self.group_values.len();
self.group_values.push(group_rows.row(row).owned());
let group_idx = self.group_values.num_rows();
self.group_values.push(group_rows.row(row));

// for hasher function, use precomputed hash value
self.map.insert_accounted(
Expand Down Expand Up @@ -438,7 +434,7 @@ impl GroupedHashAggregateStream2 {
.zip(input_values.iter())
.zip(filter_values.iter());

let total_num_groups = self.group_values.len();
let total_num_groups = self.group_values.num_rows();

for ((acc, values), opt_filter) in t {
let acc_size_pre = acc.size();
Expand Down Expand Up @@ -482,13 +478,13 @@ impl GroupedHashAggregateStream2 {
impl GroupedHashAggregateStream2 {
/// Create an output RecordBatch with all group keys and accumulator states/values
fn create_batch_from_map(&mut self) -> Result<RecordBatch> {
if self.group_values.is_empty() {
if self.group_values.num_rows() == 0 {
let schema = self.schema.clone();
return Ok(RecordBatch::new_empty(schema));
}

// First output rows are the groups
let groups_rows = self.group_values.iter().map(|owned_row| owned_row.row());
let groups_rows = self.group_values.iter().map(|owned_row| owned_row);

let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(groups_rows)?;

Expand Down

0 comments on commit 6275a9f

Please sign in to comment.