Skip to content

Commit

Permalink
perf: Fix regression that led to using only a single thread (#15667)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Apr 15, 2024
1 parent 08b048d commit 19f0939
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/group_by/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,14 @@ where
// have the code duplication
pub(crate) fn group_by_threaded_slice<T, IntoSlice>(
keys: Vec<IntoSlice>,
n_partitions: usize,
sorted: bool,
) -> GroupsProxy
where
T: TotalHash + TotalEq + ToTotalOrd,
<T as ToTotalOrd>::TotalOrdItem: Send + Hash + Eq + Sync + Copy + DirtyHash,
IntoSlice: AsRef<[T]> + Send + Sync,
{
let n_partitions = keys.len();
let init_size = get_init_size();

// We will create a hashtable in every thread.
Expand Down
8 changes: 5 additions & 3 deletions crates/polars-core/src/frame/group_by/into_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where
.downcast_iter()
.map(|arr| arr.values().as_slice())
.collect::<Vec<_>>();
group_by_threaded_slice(keys, sorted)
group_by_threaded_slice(keys, n_partitions, sorted)
} else {
let keys = ca
.downcast_iter()
Expand Down Expand Up @@ -261,9 +261,10 @@ impl IntoGroupsProxy for BinaryChunked {
let bh = self.to_bytes_hashes(multithreaded, Default::default());

let out = if multithreaded {
let n_partitions = bh.len();
// Take slices so that the vecs are not cloned.
let bh = bh.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
group_by_threaded_slice(bh, sorted)
group_by_threaded_slice(bh, n_partitions, sorted)
} else {
group_by(bh[0].iter(), sorted)
};
Expand All @@ -277,9 +278,10 @@ impl IntoGroupsProxy for BinaryOffsetChunked {
let bh = self.to_bytes_hashes(multithreaded, Default::default());

let out = if multithreaded {
let n_partitions = bh.len();
// Take slices so that the vecs are not cloned.
let bh = bh.iter().map(|v| v.as_slice()).collect::<Vec<_>>();
group_by_threaded_slice(bh, sorted)
group_by_threaded_slice(bh, n_partitions, sorted)
} else {
group_by(bh[0].iter(), sorted)
};
Expand Down

0 comments on commit 19f0939

Please sign in to comment.