Skip to content

Commit

Permalink
perf(rust, python): Improve rechunk check (#6268)
Browse files Browse the repository at this point in the history
  • Loading branch information
c-peters authored Jan 18, 2023
1 parent f096087 commit 7331f6e
Showing 1 changed file with 47 additions and 34 deletions.
81 changes: 47 additions & 34 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::borrow::Cow;
use std::iter::{FromIterator, Iterator};
use std::{mem, ops};

use ahash::{AHashSet, RandomState};
use ahash::AHashSet;
use polars_arrow::prelude::QuantileInterpolOptions;
use rayon::prelude::*;

Expand All @@ -28,8 +28,6 @@ pub mod hash_join;
pub mod row;
mod upstream_traits;

use std::hash::{BuildHasher, Hash, Hasher};

pub use chunks::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand All @@ -38,7 +36,6 @@ use crate::frame::groupby::GroupsIndicator;
#[cfg(feature = "sort_multiple")]
use crate::prelude::sort::prepare_argsort;
use crate::series::IsSorted;
use crate::vector_hasher::_boost_hash_combine;
#[cfg(feature = "row_hash")]
use crate::vector_hasher::df_rows_to_hashes_threaded;
use crate::POOL;
Expand Down Expand Up @@ -437,36 +434,26 @@ impl DataFrame {
self
}

/// Estimates of the DataFrames columns consist of the same chunk sizes
/// Returns true if the chunks of the columns do not align and re-chunking should be done
pub fn should_rechunk(&self) -> bool {
let hb = RandomState::default();
let hb2 = RandomState::with_seeds(392498, 98132457, 0, 412059);
!self
.columns
.iter()
// The idea is that we create a hash of the chunk lengths.
// Consisting of the combined hash + the sum (assuming collision probability is nihil)
// if not, we can add more hashes or at worst case we do an extra rechunk.
// the old solution to this was clone all lengths to a vec and compare the vecs
.map(|s| {
s.chunk_lengths().map(|i| i as u64).fold(
(0u64, 0u64, s.n_chunks()),
|(lhash, lh2, n), rval| {
let mut h = hb.build_hasher();
rval.hash(&mut h);
let rhash = h.finish();
let mut h = hb2.build_hasher();
rval.hash(&mut h);
let rh2 = h.finish();
(
_boost_hash_combine(lhash, rhash),
_boost_hash_combine(lh2, rh2),
n,
)
},
)
})
.all_equal()
let mut chunk_lenghts = self.columns.iter().map(|s| s.chunk_lengths());
match chunk_lenghts.next() {
None => false,
Some(first_chunk_lengths) => {
// Fast Path for single Chunk Series
if first_chunk_lengths.len() == 1 {
return chunk_lenghts.any(|cl| cl.len() != 1);
}
// Slow Path for multi Chunk series
let v: Vec<_> = first_chunk_lengths.collect();
for cl in chunk_lenghts {
if cl.enumerate().any(|(idx, el)| Some(&el) != v.get(idx)) {
return true;
}
}
false
}
}
}

/// Ensure all the chunks in the DataFrame are aligned.
Expand Down Expand Up @@ -3154,7 +3141,7 @@ impl DataFrame {
#[cfg(feature = "row_hash")]
pub fn hash_rows(
&mut self,
hasher_builder: Option<RandomState>,
hasher_builder: Option<ahash::RandomState>,
) -> PolarsResult<UInt64Chunked> {
let dfs = split_df(self, POOL.current_num_threads())?;
let (cas, _) = df_rows_to_hashes_threaded(&dfs, hasher_builder)?;
Expand Down Expand Up @@ -3512,6 +3499,32 @@ mod test {
assert_eq!(sliced_df.shape(), (2, 2));
}

#[test]
fn rechunk_false() {
let df = create_frame();
assert!(!df.should_rechunk())
}

#[test]
fn rechunk_true() -> PolarsResult<()> {
let mut base = df!(
"a" => [1, 2, 3],
"b" => [1, 2, 3]
)?;

// Create a series with multiple chunks
let mut s = Series::new("foo", 0..2);
let s2 = Series::new("bar", 0..1);
s.append(&s2)?;

// Append series to frame
let out = base.with_column(s)?;

// Now we should rechunk
assert!(out.should_rechunk());
Ok(())
}

#[test]
fn test_duplicate_column() {
let mut df = df! {
Expand Down

0 comments on commit 7331f6e

Please sign in to comment.