Skip to content

Commit

Permalink
Fix stack overflow calculating projected orderings
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 4, 2024
1 parent d4bc1c1 commit a156611
Showing 1 changed file with 121 additions and 41 deletions.
162 changes: 121 additions & 41 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1295,21 +1295,30 @@ fn construct_prefix_orderings(
relevant_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let mut dep_enumerator = DependencyEnumerator::new();
dependency_map[relevant_sort_expr]
.dependencies
.iter()
.flat_map(|dep| construct_orderings(dep, dependency_map))
.flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
.collect()
}

/// Given a set of relevant dependencies (`relevant_deps`) and a map of dependencies
/// (`dependency_map`), this function generates all possible prefix orderings
/// based on the given dependencies.
/// Generates all possible orderings where dependencies are satisfied for the
/// current projection expression.
///
/// # Examaple
/// If `dependences` is `a + b ASC` and the dependency map holds dependencies
/// * `a ASC` --> `[c ASC]`
/// * `b ASC` --> `[d DESC]`,
///
/// This function generates these two sort orders
/// * `[c ASC, d DESC, a + b ASC]`
/// * `[d DESC, c ASC, a + b ASC]`
///
/// # Parameters
///
/// * `dependencies` - A reference to the dependencies.
/// * `dependency_map` - A reference to the map of dependencies for expressions.
/// * `dependencies` - Set of relevant expressions.
/// * `dependency_map` - Map of dependencies for expressions that may appear in `dependencies`
///
/// # Returns
///
Expand All @@ -1335,11 +1344,6 @@ fn generate_dependency_orderings(
return vec![vec![]];
}

// Generate all possible orderings where dependencies are satisfied for the
// current projection expression. For example, if expression is `a + b ASC`,
// and the dependency for `a ASC` is `[c ASC]`, the dependency for `b ASC`
// is `[d DESC]`, then we generate `[c ASC, d DESC, a + b ASC]` and
// `[d DESC, c ASC, a + b ASC]`.
relevant_prefixes
.into_iter()
.multi_cartesian_product()
Expand Down Expand Up @@ -1421,7 +1425,7 @@ struct DependencyNode {
}

impl DependencyNode {
// Insert dependency to the state (if exists).
/// Insert dependency to the state (if exists).
fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) {
if let Some(dep) = dependency {
self.dependencies.insert(dep.clone());
Expand All @@ -1437,38 +1441,69 @@ impl DependencyNode {
type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = IndexSet<PhysicalSortExpr>;

/// This function recursively analyzes the dependencies of the given sort
/// expression within the given dependency map to construct lexicographical
/// orderings that include the sort expression and its dependencies.
///
/// # Parameters
///
/// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`)
/// for which lexicographical orderings satisfying its dependencies are to be
/// constructed.
/// - `dependency_map`: A reference to the `DependencyMap` that contains
/// dependencies for different `PhysicalSortExpr`s.
///
/// # Returns
///
/// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the given
/// sort expression and its dependencies.
fn construct_orderings(
referred_sort_expr: &PhysicalSortExpr,
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
// We are sure that `referred_sort_expr` is inside `dependency_map`.
let node = &dependency_map[referred_sort_expr];
// Since we work on intermediate nodes, we are sure `val.target_sort_expr`
// exists.
let target_sort_expr = node.target_sort_expr.clone().unwrap();
if node.dependencies.is_empty() {
vec![vec![target_sort_expr]]
} else {
/// Contains a mapping of all dependencies we have processed for each sort expr
struct DependencyEnumerator<'a> {
/// expr --> [exprs]
seen: IndexMap<&'a PhysicalSortExpr, IndexSet<&'a PhysicalSortExpr>>,
}

impl<'a> DependencyEnumerator<'a> {
fn new() -> Self {
Self {
seen: IndexMap::new(),
}
}

/// Insert a new dependency,
///
/// returns false if the dependency was already in the map
/// returns true if the dependency was newly inserted
fn insert(
&mut self,
target: &'a PhysicalSortExpr,
dep: &'a PhysicalSortExpr,
) -> bool {
self.seen.entry(target).or_default().insert(dep)
}

/// This function recursively analyzes the dependencies of the given sort
/// expression within the given dependency map to construct lexicographical
/// orderings that include the sort expression and its dependencies.
///
/// # Parameters
///
/// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`)
/// for which lexicographical orderings satisfying its dependencies are to be
/// constructed.
/// - `dependency_map`: A reference to the `DependencyMap` that contains
/// dependencies for different `PhysicalSortExpr`s.
///
/// # Returns
///
/// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the given
/// sort expression and its dependencies.
fn construct_orderings(
&mut self,
referred_sort_expr: &'a PhysicalSortExpr,
dependency_map: &'a DependencyMap,
) -> Vec<LexOrdering> {
// We are sure that `referred_sort_expr` is inside `dependency_map`.
let node = &dependency_map[referred_sort_expr];
// Since we work on intermediate nodes, we are sure `val.target_sort_expr`
// exists.
let target_sort_expr = node.target_sort_expr.as_ref().unwrap();
if node.dependencies.is_empty() {
return vec![vec![target_sort_expr.clone()]];
};

node.dependencies
.iter()
.flat_map(|dep| {
let mut orderings = construct_orderings(dep, dependency_map);
let mut orderings = if self.insert(target_sort_expr, dep) {
self.construct_orderings(dep, dependency_map)
} else {
vec![]
};
for ordering in orderings.iter_mut() {
ordering.push(target_sort_expr.clone())
}
Expand Down Expand Up @@ -1763,6 +1798,51 @@ mod tests {
Ok(())
}

#[test]
fn project_equivalence_properties_test_multi() -> Result<()> {
// test multiple input orderings with equivalence properties
let input_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
Field::new("d", DataType::Int64, true),
]));

let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
// add equivalent ordering [a, b, c, d]
input_properties.add_new_ordering(vec![
parse_sort_expr("a", &input_schema),
parse_sort_expr("b", &input_schema),
parse_sort_expr("c", &input_schema),
parse_sort_expr("d", &input_schema),
]);

// add equivalent ordering [a, c, b, d]
input_properties.add_new_ordering(vec![
parse_sort_expr("a", &input_schema),
parse_sort_expr("c", &input_schema),
parse_sort_expr("b", &input_schema), // NB b and c are swapped
parse_sort_expr("d", &input_schema),
]);

// simply project all the columns in order
let proj_exprs = vec![
(col("a", &input_schema)?, "a".to_string()),
(col("b", &input_schema)?, "b".to_string()),
(col("c", &input_schema)?, "c".to_string()),
(col("d", &input_schema)?, "d".to_string()),
];
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
let out_properties = input_properties.project(&projection_mapping, input_schema);

assert_eq!(
out_properties.to_string(),
"order: [[a@0 ASC,c@2 ASC,b@1 ASC,d@3 ASC], [a@0 ASC,b@1 ASC,c@2 ASC,d@3 ASC]]"
);

Ok(())
}

#[test]
fn test_join_equivalence_properties() -> Result<()> {
let schema = create_test_schema()?;
Expand Down

0 comments on commit a156611

Please sign in to comment.