From 8adad2d0d9ff15793b6e1edfc7d8afa1a22b2a52 Mon Sep 17 00:00:00 2001 From: Abhishek Somani Date: Mon, 1 Nov 2021 12:09:30 -0400 Subject: [PATCH 1/3] Init commit: add deterministic to query plan --- .../sql/catalyst/expressions/subquery.scala | 3 ++ .../sql/catalyst/optimizer/InlineCTE.scala | 2 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 7 +++++ .../sql/catalyst/plans/QueryPlanSuite.scala | 30 ++++++++++++++++++- .../org/apache/spark/sql/SubquerySuite.scala | 15 ++++++++-- 5 files changed, 53 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index 8918bb3ea86f3..0980e87db283e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -42,6 +42,9 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { final override val nodePatterns: Seq[TreePattern] = Seq(PLAN_EXPRESSION) ++ nodePatternsInternal + override lazy val deterministic: Boolean = children.forall(_.deterministic) && + plan.deterministic + // Subclasses can override this function to provide more TreePatterns. def nodePatternsInternal(): Seq[TreePattern] = Seq() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala index 6bcbc9f821de6..1de300ef9c09d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala @@ -53,7 +53,7 @@ object InlineCTE extends Rule[LogicalPlan] { // 1) It is fine to inline a CTE if it references another CTE that is non-deterministic; // 2) Any `CTERelationRef` that contains `OuterReference` would have been inlined first. refCount == 1 || - cteDef.child.find(_.expressions.exists(!_.deterministic)).isEmpty || + cteDef.deterministic || cteDef.child.find(_.expressions.exists(_.isInstanceOf[OuterReference])).isDefined } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 3c9946ba3772b..2417ff904570b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -84,6 +84,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] AttributeSet.fromAttributeSets(expressions.map(_.references)) -- producedAttributes } + /** + * Returns true when the all the expressions in the current node as well as all of its children + * are deterministic + */ + lazy val deterministic: Boolean = expressions.forall(_.deterministic) && + children.forall(_.deterministic) + /** * Attributes that are referenced by expressions but not provided by this node's children. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala index 404c8895c4d11..63f75a97a0c97 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ListQuery, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ListQuery, Literal, NamedExpression, Rand} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin} @@ -101,4 +101,32 @@ class QueryPlanSuite extends SparkFunSuite { val plan = t.select($"a", $"b").select($"a", $"b").select($"a", $"b").analyze assert(testRule(plan).resolved) } + + test("Add jira: add a deterministic field to QueryPlan") { + val a: NamedExpression = AttributeReference("a", IntegerType)() + val aRand: NamedExpression = Alias(a + Rand(1), "aRand")() + val deterministicPlan = Project( + Seq(a), + Filter( + ListQuery(Project( + Seq(a), + UnresolvedRelation(TableIdentifier("t", None)) + )), + UnresolvedRelation(TableIdentifier("t", None)) + ) + ) + assert(deterministicPlan.deterministic) + + val nonDeterministicPlan = Project( + Seq(aRand), + Filter( + ListQuery(Project( + Seq(a), + UnresolvedRelation(TableIdentifier("t", None)) + )), + UnresolvedRelation(TableIdentifier("t", None)) + ) + ) + assert(!nonDeterministicPlan.deterministic) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 9e7ce55639148..798bc5208574e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1931,18 +1931,29 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark sql( """ |SELECT c1, s, s * 10 FROM ( - | SELECT c1, (SELECT FIRST(c2) FROM t2 WHERE t1.c1 = t2.c1) s FROM t1) + | SELECT c1, (SELECT MIN(c2) FROM t2 WHERE t1.c1 = t2.c1) s FROM t1) |""".stripMargin), correctAnswer) checkAnswer( sql( """ |SELECT c1, s, s * 10 FROM ( - | SELECT c1, SUM((SELECT FIRST(c2) FROM t2 WHERE t1.c1 = t2.c1)) s + | SELECT c1, SUM((SELECT MIN(c2) FROM t2 WHERE t1.c1 = t2.c1)) s | FROM t1 GROUP BY c1 |) |""".stripMargin), correctAnswer) } } + + test("Add jira: deterministic in QueryPlan considers subquery") { + val deterministicQueryPlan = sql("select (select 1 as b) as b") + .queryExecution.executedPlan + assert(deterministicQueryPlan.deterministic) + + val nonDeterministicQueryPlan = sql("select (select rand(1) as b) as b") + .queryExecution.executedPlan + assert(!nonDeterministicQueryPlan.deterministic) + } + } From 18f7f17e713c99bc3fee71b0df5a824b4f9af812 Mon Sep 17 00:00:00 2001 From: Abhishek Somani Date: Tue, 2 Nov 2021 14:12:40 -0400 Subject: [PATCH 2/3] Added jira id to test --- .../org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala | 2 +- .../src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala index 63f75a97a0c97..fb014bb8391f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/QueryPlanSuite.scala @@ -102,7 +102,7 @@ class QueryPlanSuite extends SparkFunSuite { assert(testRule(plan).resolved) } - test("Add jira: add a deterministic field to QueryPlan") { + test("SPARK-37199: add a deterministic field to QueryPlan") { val a: NamedExpression = AttributeReference("a", IntegerType)() val aRand: NamedExpression = Alias(a + Rand(1), "aRand")() val deterministicPlan = Project( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 798bc5208574e..ebad394006b5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1946,7 +1946,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } - test("Add jira: deterministic in QueryPlan considers subquery") { + test("SPARK-37199: deterministic in QueryPlan considers subquery") { val deterministicQueryPlan = sql("select (select 1 as b) as b") .queryExecution.executedPlan assert(deterministicQueryPlan.deterministic) From 5ed47eb7806bb50fa275c4e114b7771e38f1ec22 Mon Sep 17 00:00:00 2001 From: Abhishek Somani Date: Fri, 5 Nov 2021 06:47:53 -0400 Subject: [PATCH 3/3] Restore test --- .../src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index ebad394006b5a..be0af80ff7879 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1931,14 +1931,14 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark sql( """ |SELECT c1, s, s * 10 FROM ( - | SELECT c1, (SELECT MIN(c2) FROM t2 WHERE t1.c1 = t2.c1) s FROM t1) + | SELECT c1, (SELECT FIRST(c2) FROM t2 WHERE t1.c1 = t2.c1) s FROM t1) |""".stripMargin), correctAnswer) checkAnswer( sql( """ |SELECT c1, s, s * 10 FROM ( - | SELECT c1, SUM((SELECT MIN(c2) FROM t2 WHERE t1.c1 = t2.c1)) s + | SELECT c1, SUM((SELECT FIRST(c2) FROM t2 WHERE t1.c1 = t2.c1)) s | FROM t1 GROUP BY c1 |) |""".stripMargin),