Skip to content

Commit

Permalink
Improve documentation, make DependencyMap / Dependencies a real struc…
Browse files Browse the repository at this point in the history
…t + fix stack overflow
  • Loading branch information
alamb committed Oct 4, 2024
1 parent 9ae17ee commit f24f9b9
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 23 deletions.
7 changes: 6 additions & 1 deletion datafusion/physical-expr/src/equivalence/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,12 @@ mod tests {

assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
for expected_ordering in &expected {
assert!(orderings.contains(expected_ordering), "{}", err_msg)
assert!(
orderings.contains(expected_ordering),
"{orderings} does not contain expected {} {}",
PhysicalSortExpr::format_list(expected_ordering),
err_msg
)
}
}

Expand Down
201 changes: 179 additions & 22 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
Expand Down Expand Up @@ -701,7 +702,7 @@ impl EquivalenceProperties {
/// c ASC: Node {None, HashSet{a ASC}}
/// ```
fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
let mut dependency_map = IndexMap::new();
let mut dependency_map = DependencyMap::new();
for ordering in self.normalized_oeq_class().iter() {
for (idx, sort_expr) in ordering.iter().enumerate() {
let target_sort_expr =
Expand All @@ -723,13 +724,11 @@ impl EquivalenceProperties {
let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
// Add sort expressions that can be projected or referred to
// by any of the projection expressions to the dependency map:
dependency_map
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.clone(),
dependencies: IndexSet::new(),
})
.insert_dependency(dependency);
dependency_map.insert(
sort_expr,
target_sort_expr.as_ref(),
dependency,
);
}
if !is_projected {
// If we can not project, stop constructing the dependency
Expand Down Expand Up @@ -1257,7 +1256,7 @@ fn referred_dependencies(
// Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
let mut expr_to_sort_exprs = IndexMap::<ExprWrapper, Dependencies>::new();
for sort_expr in dependency_map
.keys()
.sort_exprs()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
{
let key = ExprWrapper(Arc::clone(&sort_expr.expr));
Expand All @@ -1270,10 +1269,16 @@ fn referred_dependencies(
// Generate all valid dependencies for the source. For example, if the source
// is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get
// `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`.
expr_to_sort_exprs
.values()
let dependencies = expr_to_sort_exprs
.into_values()
.map(Dependencies::into_inner)
.collect::<Vec<_>>();
dependencies
.iter()
.multi_cartesian_product()
.map(|referred_deps| referred_deps.into_iter().cloned().collect())
.map(|referred_deps| {
Dependencies::new_from_iter(referred_deps.into_iter().cloned())
})
.collect()
}

Expand All @@ -1296,7 +1301,9 @@ fn construct_prefix_orderings(
dependency_map: &DependencyMap,
) -> Vec<LexOrdering> {
let mut dep_enumerator = DependencyEnumerator::new();
dependency_map[relevant_sort_expr]
dependency_map
.get(relevant_sort_expr)
.expect("no relevant sort expr found")
.dependencies
.iter()
.flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
Expand Down Expand Up @@ -1433,13 +1440,161 @@ impl DependencyNode {
}
}

// Using `IndexMap` and `IndexSet` makes sure to generate consistent results across different executions for the same query.
// We could have used `HashSet`, `HashMap` in place of them without any loss of functionality.
// As an example, if existing orderings are `[a ASC, b ASC]`, `[c ASC]` for output ordering
// both `[a ASC, b ASC, c ASC]` and `[c ASC, a ASC, b ASC]` are valid (e.g. concatenated version of the alternative orderings).
// When using `HashSet`, `HashMap` it is not guaranteed to generate consistent result, among the possible 2 results in the example above.
type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = IndexSet<PhysicalSortExpr>;
impl Display for DependencyNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(target) = &self.target_sort_expr {
write!(f, "(target: {}, ", target)?;
} else {
write!(f, "(")?;
}
write!(f, "dependencies: [{}])", self.dependencies)
}
}

/// Maps an expression --> DependencyNode
///
/// # Debugging / deplaying `DependencyMap`
///
/// This structure implements `Display` to assist debugging. For example:
///
/// ```text
/// DependencyMap: {
/// a@0 ASC --> (target: a@0 ASC, dependencies: [[]])
/// b@1 ASC --> (target: b@1 ASC, dependencies: [[a@0 ASC, c@2 ASC]])
/// c@2 ASC --> (target: c@2 ASC, dependencies: [[b@1 ASC, a@0 ASC]])
/// d@3 ASC --> (target: d@3 ASC, dependencies: [[c@2 ASC, b@1 ASC]])
/// }
/// ```
///
/// # Note on IndexMap Rationale
///
/// Using `IndexMap` (which preserves insert order) to ensure consistent results
/// across different executions for the same query. We could have used
/// `HashSet`, `HashMap` in place of them without any loss of functionality.
///
/// As an example, if existing orderings are
/// 1. `[a ASC, b ASC]`
/// 2. `[c ASC]` for
///
/// Then both the following output orderings are valid
/// 1. `[a ASC, b ASC, c ASC]`
/// 2. `[c ASC, a ASC, b ASC]`
///
/// (this are both valid as they are concatenated versions of the alternative
/// orderings). When using `HashSet`, `HashMap` it is not guaranteed to generate
/// consistent result, among the possible 2 results in the example above.
#[derive(Debug)]
struct DependencyMap {
inner: IndexMap<PhysicalSortExpr, DependencyNode>,
}

impl DependencyMap {
fn new() -> Self {
Self {
inner: IndexMap::new(),
}
}

/// Insert a new dependency `sort_expr` --> `dependency` into the map.
///
/// If `target_sort_expr` is none, a new entry is created with empty dependencies.
fn insert(
&mut self,
sort_expr: &PhysicalSortExpr,
target_sort_expr: Option<&PhysicalSortExpr>,
dependency: Option<&PhysicalSortExpr>,
) {
self.inner
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.cloned(),
dependencies: Dependencies::new(),
})
.insert_dependency(dependency)
}

/// Iterator over (sort_expr, DependencyNode) pairs
fn iter(&self) -> impl Iterator<Item = (&PhysicalSortExpr, &DependencyNode)> {
self.inner.iter()
}

/// iterator over all sort exprs
fn sort_exprs(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
self.inner.keys()
}

/// Return the dependency node for the given sort expression, if any
fn get(&self, sort_expr: &PhysicalSortExpr) -> Option<&DependencyNode> {
self.inner.get(sort_expr)
}
}

impl Display for DependencyMap {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "DependencyMap: {{")?;
for (sort_expr, node) in self.inner.iter() {
writeln!(f, " {sort_expr} --> {node}")?;
}
writeln!(f, "}}")
}
}

/// A list of sort expressions that can be calculated from a known set of
/// dependencies.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
struct Dependencies {
inner: IndexSet<PhysicalSortExpr>,
}

impl Display for Dependencies {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[")?;
let mut iter = self.inner.iter();
if let Some(dep) = iter.next() {
write!(f, "{}", dep)?;
}
for dep in iter {
write!(f, ", {}", dep)?;
}
write!(f, "]")
}
}

impl Dependencies {
/// Create a new empty `Dependencies` instance.
fn new() -> Self {
Self {
inner: IndexSet::new(),
}
}

/// Create a new `Dependencies` from an iterator of `PhysicalSortExpr`.
fn new_from_iter(iter: impl IntoIterator<Item = PhysicalSortExpr>) -> Self {
Self {
inner: iter.into_iter().collect(),
}
}

/// Insert a new dependency into the set.
fn insert(&mut self, sort_expr: PhysicalSortExpr) {
self.inner.insert(sort_expr);
}

/// Iterator over dependencies in the set
fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> + Clone {
self.inner.iter()
}

/// Return the inner set of dependencies
fn into_inner(self) -> IndexSet<PhysicalSortExpr> {
self.inner
}

/// Returns true if there are no dependencies
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}

/// Contains a mapping of all dependencies we have processed for each sort expr
struct DependencyEnumerator<'a> {
Expand Down Expand Up @@ -1487,8 +1642,9 @@ impl<'a> DependencyEnumerator<'a> {
referred_sort_expr: &'a PhysicalSortExpr,
dependency_map: &'a DependencyMap,
) -> Vec<LexOrdering> {
// We are sure that `referred_sort_expr` is inside `dependency_map`.
let node = &dependency_map[referred_sort_expr];
let node = dependency_map
.get(referred_sort_expr)
.expect("`referred_sort_expr` is inside `dependency_map`");
// Since we work on intermediate nodes, we are sure `val.target_sort_expr`
// exists.
let target_sort_expr = node.target_sort_expr.as_ref().unwrap();
Expand All @@ -1504,6 +1660,7 @@ impl<'a> DependencyEnumerator<'a> {
} else {
vec![]
};

for ordering in orderings.iter_mut() {
ordering.push(target_sort_expr.clone())
}
Expand Down

0 comments on commit f24f9b9

Please sign in to comment.