Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix swapping rename schema #17458

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub(super) fn process_asof_join(
true
} else {
let name = column_node_to_name(proj, expr_arena);
!local_projected_names.contains(&name)
!local_projected_names.contains(name)
};

process_projection(
Expand Down Expand Up @@ -327,7 +327,7 @@ pub(super) fn process_join(
true
} else {
let name = column_node_to_name(proj, expr_arena);
!local_projected_names.contains(&name)
!local_projected_names.contains(name)
};

process_projection(
Expand Down Expand Up @@ -414,7 +414,7 @@ fn process_projection(
// this branch tries to pushdown the column without suffix
{
// Column name of the projection without any alias.
let leaf_column_name = column_node_to_name(proj, expr_arena);
let leaf_column_name = column_node_to_name(proj, expr_arena).clone();

let suffix = options.args.suffix();
// If _right suffix exists we need to push a projection down without this
Expand Down Expand Up @@ -479,14 +479,14 @@ fn resolve_join_suffixes(
let projections = local_projection
.iter()
.map(|proj| {
let name = column_node_to_name(*proj, expr_arena);
let name = column_node_to_name(*proj, expr_arena).clone();
if name.ends_with(suffix) && schema_after_join.get(&name).is_none() {
let downstream_name = &name.as_ref()[..name.len() - suffix.len()];
let col = AExpr::Column(ColumnName::from(downstream_name));
let node = expr_arena.add(col);
ExprIR::new(node, OutputName::Alias(name))
ExprIR::new(node, OutputName::Alias(name.clone()))
} else {
ExprIR::new(proj.0, OutputName::ColumnLhs(name))
ExprIR::new(proj.0, OutputName::ColumnLhs(name.clone()))
}
})
.collect::<Vec<_>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ fn get_scan_columns(
// we shouldn't project the row-count column, as that is generated
// in the scan
let push = match row_index {
Some(rc) if name != rc.name => true,
Some(rc) if (*name).as_ref() != rc.name.as_ref() => true,
None => true,
_ => false,
};
if push {
columns.push((*name).to_owned())
columns.push((**name).to_owned())
}
}
with_columns = Some(Arc::from(columns));
Expand Down Expand Up @@ -83,7 +83,7 @@ fn split_acc_projections(
.partition(|expr| check_input_column_node(*expr, down_schema, expr_arena));
let mut names = init_set();
for proj in &acc_projections {
let name = column_node_to_name(*proj, expr_arena);
let name = column_node_to_name(*proj, expr_arena).clone();
names.insert(name);
}
(acc_projections, local_projections, names)
Expand All @@ -98,7 +98,7 @@ fn add_expr_to_accumulated(
expr_arena: &Arena<AExpr>,
) {
for root_node in aexpr_to_column_nodes_iter(expr, expr_arena) {
let name = column_node_to_name(root_node, expr_arena);
let name = column_node_to_name(root_node, expr_arena).clone();
if projected_names.insert(name) {
acc_projections.push(root_node)
}
Expand Down Expand Up @@ -128,7 +128,7 @@ fn update_scan_schema(
let mut new_cols = Vec::with_capacity(acc_projections.len());
for node in acc_projections.iter() {
let name = column_node_to_name(*node, expr_arena);
let item = schema.try_get_full(&name)?;
let item = schema.try_get_full(name)?;
new_cols.push(item);
}
// make sure that the projections are sorted by the schema.
Expand Down Expand Up @@ -227,8 +227,8 @@ impl ProjectionPushDown {
let mut already_projected = false;

let name = column_node_to_name(proj, expr_arena);
let is_in_left = names_left.contains(&name);
let is_in_right = names_right.contains(&name);
let is_in_left = names_left.contains(name);
let is_in_right = names_right.contains(name);
already_projected |= is_in_left;
already_projected |= is_in_right;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,46 +33,29 @@ pub(super) fn process_rename(
new: &[SmartString],
swapping: bool,
) -> PolarsResult<()> {
let mut processed = BTreeSet::new();
if swapping {
// We clone otherwise we update a data structure whilst we rename it.
let mut new_projected_names = projected_names.clone();
for (existing, new) in existing.iter().zip(new.iter()) {
let has_existing = projected_names.contains(existing.as_str());
// Only if the new column name is projected by the upper node we must update the name.
let has_new = projected_names.contains(new.as_str());
let has_both = has_existing && has_new;
let reverse_map: PlHashMap<_, _> = new
.iter()
.map(|s| s.as_str())
.zip(existing.iter().map(|s| s.as_str()))
.collect();
let mut new_projected_names = PlHashSet::with_capacity(projected_names.len());

for col in acc_projections {
let name = column_node_to_name(*col, expr_arena);

if has_new {
// swapping path
// this must leave projected names intact, as we only swap
if has_both {
iter_and_update_nodes(
existing,
new,
acc_projections,
expr_arena,
&mut processed,
);
}
// simple new name path
// this must add and remove names
else {
new_projected_names.remove(new.as_str());
let name = ColumnName::from(existing.as_str());
new_projected_names.insert(name);
iter_and_update_nodes(
existing,
new,
acc_projections,
expr_arena,
&mut processed,
);
}
if let Some(previous) = reverse_map.get(name.as_ref()) {
let previous: Arc<str> = Arc::from(*previous);
let new = expr_arena.add(AExpr::Column(previous.clone()));
*col = ColumnNode(new);
let _ = new_projected_names.insert(previous);
} else {
let _ = new_projected_names.insert(name.clone());
}
}
*projected_names = new_projected_names;
} else {
let mut processed = BTreeSet::new();
for (existing, new) in existing.iter().zip(new.iter()) {
if projected_names.remove(new.as_str()) {
let name: Arc<str> = ColumnName::from(existing.as_str());
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ pub(crate) fn aexpr_to_column_nodes_iter<'a>(
})
}

pub fn column_node_to_name(node: ColumnNode, arena: &Arena<AExpr>) -> Arc<str> {
pub fn column_node_to_name(node: ColumnNode, arena: &Arena<AExpr>) -> &Arc<str> {
if let AExpr::Column(name) = arena.get(node.0) {
name.clone()
name
} else {
unreachable!()
}
Expand Down
11 changes: 11 additions & 0 deletions py-polars/tests/unit/operations/test_rename.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,14 @@ def test_rename_schema_order_6660() -> None:

assert renamed.collect_schema() == renamed.collect().schema
assert computed.collect_schema() == computed.collect().schema


def test_rename_schema_17427() -> None:
assert (
pl.LazyFrame({"A": [1]})
.with_columns(B=2)
.select(["A", "B"])
.rename({"A": "C", "B": "A"})
.select(["C", "A"])
.collect()
).to_dict(as_series=False) == {"C": [1], "A": [2]}