Skip to content

Commit

Permalink
[CARMEL-5868] Backport SPARK-37199: Add deterministic field to QueryP…
Browse files Browse the repository at this point in the history
…lan (#879)

* [SPARK-37199][SQL] Add deterministic field to QueryPlan

### What changes were proposed in this pull request?
We have a deterministic field in Expressions to check if an expression is deterministic, but we do not have a similar field in QueryPlan.

We have a need for such a check in the QueryPlan sometimes, like in InlineCTE

This proposal is to add a deterministic field to QueryPlan.

More details in this document: https://docs.google.com/document/d/1eIiaSJf-Co2HhjsaQxFNGwUxobnHID4ZGmJMcVytREc/edit#heading=h.4cz970y1mk93

### Why are the changes needed?

We have a need for such a check in the QueryPlan sometimes, like in InlineCTE

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests

Closes #34470 from somani/isDeterministic.

Authored-by: Abhishek Somani <abhishek.somani@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

(cherry picked from commit fe41d18)

* Fix test error

Co-authored-by: Abhishek Somani <abhishek.somani@databricks.com>
  • Loading branch information
2 people authored and GitHub Enterprise committed Mar 11, 2022
1 parent c9524ee commit 4c840e3
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import org.apache.spark.sql.types._
* An interface for expressions that contain a [[QueryPlan]].
*/
abstract class PlanExpression[T <: QueryPlan[_]] extends Expression {

override lazy val deterministic: Boolean = children.forall(_.deterministic) &&
plan.deterministic

/** The id of the subquery expression. */
def exprId: ExprId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
AttributeSet.fromAttributeSets(expressions.map(_.references)) -- producedAttributes
}

/**
* Returns true when the all the expressions in the current node as well as all of its children
* are deterministic
*/
lazy val deterministic: Boolean = expressions.forall(_.deterministic) &&
children.forall(_.deterministic)

/**
* Attributes that are referenced by expressions but not provided by this node's children.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.dsl.plans
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ListQuery, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Add, Alias, AttributeReference, Expression, ListQuery, Literal, NamedExpression, Rand}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, Union}
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
import org.apache.spark.sql.types.IntegerType
Expand Down Expand Up @@ -83,4 +83,32 @@ class QueryPlanSuite extends SparkFunSuite {
assert(countRelationsInPlan == 2)
assert(countRelationsInPlanAndSubqueries == 5)
}

test("SPARK-37199: add a deterministic field to QueryPlan") {
val a: NamedExpression = AttributeReference("a", IntegerType)()
val aRand: NamedExpression = Alias(Add(a, Rand(1)), "aRand")()
val deterministicPlan = Project(
Seq(a),
Filter(
ListQuery(Project(
Seq(a),
UnresolvedRelation(TableIdentifier("t", None))
)),
UnresolvedRelation(TableIdentifier("t", None))
)
)
assert(deterministicPlan.deterministic)

val nonDeterministicPlan = Project(
Seq(aRand),
Filter(
ListQuery(Project(
Seq(a),
UnresolvedRelation(TableIdentifier("t", None))
)),
UnresolvedRelation(TableIdentifier("t", None))
)
)
assert(!nonDeterministicPlan.deterministic)
}
}
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1753,4 +1753,14 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}

test("SPARK-37199: deterministic in QueryPlan considers subquery") {
val deterministicQueryPlan = sql("select (select 1 as b) as b")
.queryExecution.executedPlan
assert(deterministicQueryPlan.deterministic)

val nonDeterministicQueryPlan = sql("select (select rand(1) as b) as b")
.queryExecution.executedPlan
assert(!nonDeterministicQueryPlan.deterministic)
}
}

0 comments on commit 4c840e3

Please sign in to comment.