From 205826ef01eb3a082ef2bcdff706b9794bd4c430 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Tue, 20 Aug 2024 16:02:36 +0200 Subject: [PATCH 1/4] refactor(rust): Add DataFrame::new_with_broadcast and simplify column uniqueness checks --- crates/polars-core/src/frame/mod.rs | 199 ++++++++---------- crates/polars-plan/src/plans/mod.rs | 2 +- crates/polars-stream/src/nodes/select.rs | 15 +- docs/src/rust/user-guide/expressions/lists.rs | 5 +- 4 files changed, 92 insertions(+), 129 deletions(-) diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index f5aca6e513a1..359aebeeda03 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -61,6 +61,38 @@ pub enum UniqueKeepStrategy { Any, } +fn ensure_names_unique(items: &[T], mut get_name: F) -> PolarsResult<()> +where + F: FnMut(&T) -> &str, +{ + // Always unique. + if items.len() <= 1 { + return Ok(()); + } + + // Too small to be worth spawning a hashmap for, this is at most 6 comparisons. + if items.len() <= 4 { + for i in 0..items.len() - 1 { + let name = get_name(&items[i]); + for j in i + 1..items.len() { + if name == get_name(&items[j]) { + polars_bail!(duplicate = name); + } + } + } + } + + let mut names = PlHashSet::with_capacity(items.len()); + for item in items { + let name = get_name(item); + if !names.insert(name) { + polars_bail!(duplicate = name); + } + } + + Ok(()) +} + /// A contiguous growable collection of `Series` that have the same length. /// /// ## Use declarations @@ -221,89 +253,54 @@ impl DataFrame { /// let df = DataFrame::new(vec![s0, s1])?; /// # Ok::<(), PolarsError>(()) /// ``` - pub fn new(columns: Vec) -> PolarsResult { - let mut first_len = None; + pub fn new(columns: Vec) -> PolarsResult { + ensure_names_unique(&columns, |s| s.name())?; - let shape_err = |&first_name, &first_len, &name, &len| { - polars_bail!( - ShapeMismatch: "could not create a new DataFrame: series {:?} has length {} \ - while series {:?} has length {}", - first_name, first_len, name, len - ); - }; + if columns.len() > 1 { + let first_len = columns[0].len(); + for col in &columns { + polars_ensure!( + col.len() == first_len, + ShapeMismatch: "could not create a new DataFrame: series {:?} has length {} while series {:?} has length {}", + columns[0].len(), first_len, col.name(), col.len() + ); + } + } - let series_cols = if S::is_series() { - // SAFETY: - // we are guarded by the type system here. - #[allow(clippy::transmute_undefined_repr)] - let series_cols = unsafe { std::mem::transmute::, Vec>(columns) }; - let mut names = PlHashSet::with_capacity(series_cols.len()); - - for s in &series_cols { - let name = s.name(); - - match first_len { - Some(len) => { - if s.len() != len { - let first_series = &series_cols.first().unwrap(); - return shape_err( - &first_series.name(), - &first_series.len(), - &name, - &s.len(), - ); - } - }, - None => first_len = Some(s.len()), - } + Ok(DataFrame { columns }) + } - if !names.insert(name) { - polars_bail!(duplicate = name); - } - } - // we drop early as the brchk thinks the &str borrows are used when calling the drop - // of both `series_cols` and `names` - drop(names); - series_cols - } else { - let mut series_cols: Vec = Vec::with_capacity(columns.len()); - let mut names = PlHashSet::with_capacity(columns.len()); - - // check for series length equality and convert into series in one pass - for s in columns { - let series = s.into_series(); - // we have aliasing borrows so we must allocate a string - let name = series.name().to_string(); - - match first_len { - Some(len) => { - if series.len() != len { - let first_series = &series_cols.first().unwrap(); - return shape_err( - &first_series.name(), - &first_series.len(), - &name.as_str(), - &series.len(), - ); - } - }, - None => first_len = Some(series.len()), - } + // Converts a sequence of columns into a DataFrame, broadcasting length-1 + // columns to match the other columns. + pub fn new_with_broadcast(mut columns: Vec) -> PolarsResult { + ensure_names_unique(&columns, |s| s.name())?; - if names.contains(&name) { - polars_bail!(duplicate = name); + // The length of the longest non-unit length column determines the + // broadcast length. If all columns are unit-length the broadcast length + // is one. + let broadcast_len = columns + .iter() + .map(|s| s.len()) + .filter(|l| *l != 1) + .max() + .unwrap_or(1); + + for col in &mut columns { + // Length not equal to the broadcast len, needs broadcast or is an error. + let len = col.len(); + if len != broadcast_len { + if len != 1 { + let name = col.name().to_owned(); + let longest_column = columns.iter().max_by_key(|c| c.len()).unwrap().name(); + polars_bail!( + ShapeMismatch: "could not create a new DataFrame: series {:?} has length {} while series {:?} has length {}", + name, len, longest_column, broadcast_len + ); } - - series_cols.push(series); - names.insert(name); + *col = col.new_from_index(0, broadcast_len); } - drop(names); - series_cols - }; - - Ok(DataFrame { - columns: series_cols, - }) + } + Ok(unsafe { DataFrame::new_no_checks(columns) }) } /// Creates an empty `DataFrame` usable in a compile time context (such as static initializers). @@ -442,16 +439,7 @@ impl DataFrame { /// It is the callers responsibility to uphold the contract of all `Series` /// having an equal length, if not this may panic down the line. pub unsafe fn new_no_length_checks(columns: Vec) -> PolarsResult { - let mut names = PlHashSet::with_capacity(columns.len()); - for column in &columns { - let name = column.name(); - if !names.insert(name) { - polars_bail!(duplicate = name); - } - } - // we drop early as the brchk thinks the &str borrows are used when calling the drop - // of both `columns` and `names` - drop(names); + ensure_names_unique(&columns, |s| s.name())?; Ok(DataFrame { columns }) } @@ -637,12 +625,7 @@ impl DataFrame { ShapeMismatch: "{} column names provided for a DataFrame of width {}", names.len(), self.width() ); - let unique_names: PlHashSet<&str> = - PlHashSet::from_iter(names.iter().map(|name| name.as_ref())); - polars_ensure!( - unique_names.len() == self.width(), - Duplicate: "duplicate column names found" - ); + ensure_names_unique(names, |s| s.as_ref())?; let columns = mem::take(&mut self.columns); self.columns = columns @@ -1447,7 +1430,7 @@ impl DataFrame { } pub fn _select_impl(&self, cols: &[SmartString]) -> PolarsResult { - self.select_check_duplicates(cols)?; + ensure_names_unique(cols, |s| s.as_str())?; self._select_impl_unchecked(cols) } @@ -1493,7 +1476,7 @@ impl DataFrame { check_duplicates: bool, ) -> PolarsResult { if check_duplicates { - self.select_check_duplicates(cols)?; + ensure_names_unique(cols, |s| s.as_str())?; } let selected = self.select_series_impl_with_schema(cols, schema)?; Ok(unsafe { DataFrame::new_no_checks(selected) }) @@ -1526,21 +1509,11 @@ impl DataFrame { } fn select_physical_impl(&self, cols: &[SmartString]) -> PolarsResult { - self.select_check_duplicates(cols)?; + ensure_names_unique(cols, |s| s.as_str())?; let selected = self.select_series_physical_impl(cols)?; Ok(unsafe { DataFrame::new_no_checks(selected) }) } - fn select_check_duplicates(&self, cols: &[SmartString]) -> PolarsResult<()> { - let mut names = PlHashSet::with_capacity(cols.len()); - for name in cols { - if !names.insert(name.as_str()) { - polars_bail!(duplicate = name); - } - } - Ok(()) - } - /// Select column(s) from this [`DataFrame`] and return them into a [`Vec`]. /// /// # Example @@ -1712,16 +1685,16 @@ impl DataFrame { /// } /// ``` pub fn rename(&mut self, column: &str, name: &str) -> PolarsResult<&mut Self> { + if column == name { + return Ok(self); + } + polars_ensure!( + self.columns.iter().all(|c| c.name() != name), + Duplicate: "column rename attempted with already existing name \"{name}\"" + ); self.select_mut(column) .ok_or_else(|| polars_err!(col_not_found = column)) .map(|s| s.rename(name))?; - let unique_names: PlHashSet<&str> = - PlHashSet::from_iter(self.columns.iter().map(|s| s.name())); - polars_ensure!( - unique_names.len() == self.width(), - Duplicate: "duplicate column names found" - ); - drop(unique_names); Ok(self) } diff --git a/crates/polars-plan/src/plans/mod.rs b/crates/polars-plan/src/plans/mod.rs index ac8dff3e90fd..6967e743f5b3 100644 --- a/crates/polars-plan/src/plans/mod.rs +++ b/crates/polars-plan/src/plans/mod.rs @@ -213,7 +213,7 @@ impl Clone for DslPlan { impl Default for DslPlan { fn default() -> Self { - let df = DataFrame::new::(vec![]).unwrap(); + let df = DataFrame::empty(); let schema = df.schema(); DslPlan::DataFrameScan { df: Arc::new(df), diff --git a/crates/polars-stream/src/nodes/select.rs b/crates/polars-stream/src/nodes/select.rs index 568351ee4f47..e4f1cef4b348 100644 --- a/crates/polars-stream/src/nodes/select.rs +++ b/crates/polars-stream/src/nodes/select.rs @@ -59,20 +59,7 @@ impl ComputeNode for SelectNode { out._add_columns(selected, &slf.schema)?; out } else { - // Broadcast scalars. - let max_non_unit_length = selected - .iter() - .map(|s| s.len()) - .filter(|l| *l != 1) - .max() - .unwrap_or(1); - for s in &mut selected { - if s.len() != max_non_unit_length { - assert!(s.len() == 1, "got series of incompatible lengths"); - *s = s.new_from_index(0, max_non_unit_length); - } - } - unsafe { DataFrame::new_no_checks(selected) } + DataFrame::new_with_broadcast(selected)? }; let mut morsel = Morsel::new(ret, seq, source_token); diff --git a/docs/src/rust/user-guide/expressions/lists.rs b/docs/src/rust/user-guide/expressions/lists.rs index 530ae4d79892..c03824c7e368 100644 --- a/docs/src/rust/user-guide/expressions/lists.rs +++ b/docs/src/rust/user-guide/expressions/lists.rs @@ -141,7 +141,10 @@ fn main() -> Result<(), Box> { ListPrimitiveChunkedBuilder::new("Array_2", 8, 8, DataType::Int32); col2.append_slice(&[1, 7, 3]); col2.append_slice(&[8, 1, 0]); - let array_df = DataFrame::new([col1.finish(), col2.finish()].into())?; + let array_df = DataFrame::new(vec![ + col1.finish().into_series(), + col2.finish().into_series(), + ])?; println!("{}", &array_df); // --8<-- [end:array_df] From c402c20fb473d4cad58b5a81c437060464516047 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Tue, 20 Aug 2024 16:10:30 +0200 Subject: [PATCH 2/4] clippy --- crates/polars-core/src/frame/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 359aebeeda03..42379797a6ba 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -74,8 +74,8 @@ where if items.len() <= 4 { for i in 0..items.len() - 1 { let name = get_name(&items[i]); - for j in i + 1..items.len() { - if name == get_name(&items[j]) { + for other in items.iter().skip(i + 1) { + if name == get_name(other) { polars_bail!(duplicate = name); } } From 7280fdf9e5fc411ed583a77622930bf52ad2d5d4 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Tue, 20 Aug 2024 18:26:51 +0200 Subject: [PATCH 3/4] fix logic bug --- crates/polars-core/src/frame/mod.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 42379797a6ba..34da916c72b2 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -70,8 +70,8 @@ where return Ok(()); } - // Too small to be worth spawning a hashmap for, this is at most 6 comparisons. if items.len() <= 4 { + // Too small to be worth spawning a hashmap for, this is at most 6 comparisons. for i in 0..items.len() - 1 { let name = get_name(&items[i]); for other in items.iter().skip(i + 1) { @@ -80,16 +80,15 @@ where } } } - } - - let mut names = PlHashSet::with_capacity(items.len()); - for item in items { - let name = get_name(item); - if !names.insert(name) { - polars_bail!(duplicate = name); + } else { + let mut names = PlHashSet::with_capacity(items.len()); + for item in items { + let name = get_name(item); + if !names.insert(name) { + polars_bail!(duplicate = name); + } } } - Ok(()) } From 7402f0ab14099c1efc7b4ed405ee047e7014afb3 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Wed, 21 Aug 2024 14:40:31 +0200 Subject: [PATCH 4/4] add new_with_broadcast_no_checks --- crates/polars-core/src/frame/mod.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 34da916c72b2..1923d370ebc1 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -269,11 +269,19 @@ impl DataFrame { Ok(DataFrame { columns }) } - // Converts a sequence of columns into a DataFrame, broadcasting length-1 - // columns to match the other columns. - pub fn new_with_broadcast(mut columns: Vec) -> PolarsResult { + /// Converts a sequence of columns into a DataFrame, broadcasting length-1 + /// columns to match the other columns. + pub fn new_with_broadcast(columns: Vec) -> PolarsResult { ensure_names_unique(&columns, |s| s.name())?; + unsafe { Self::new_with_broadcast_no_checks(columns) } + } + /// Converts a sequence of columns into a DataFrame, broadcasting length-1 + /// columns to match the other columns. + /// + /// # Safety + /// Does not check that the column names are unique (which they must be). + pub unsafe fn new_with_broadcast_no_checks(mut columns: Vec) -> PolarsResult { // The length of the longest non-unit length column determines the // broadcast length. If all columns are unit-length the broadcast length // is one.