Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37199][SQL] Add deterministic field to QueryPlan #34470

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression {

final override val nodePatterns: Seq[TreePattern] = Seq(PLAN_EXPRESSION) ++ nodePatternsInternal

override lazy val deterministic: Boolean = children.forall(_.deterministic) &&
plan.deterministic

// Subclasses can override this function to provide more TreePatterns.
def nodePatternsInternal(): Seq[TreePattern] = Seq()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object InlineCTE extends Rule[LogicalPlan] {
// 1) It is fine to inline a CTE if it references another CTE that is non-deterministic;
// 2) Any `CTERelationRef` that contains `OuterReference` would have been inlined first.
refCount == 1 ||
cteDef.child.find(_.expressions.exists(!_.deterministic)).isEmpty ||
cteDef.deterministic ||
cteDef.child.find(_.expressions.exists(_.isInstanceOf[OuterReference])).isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
AttributeSet.fromAttributeSets(expressions.map(_.references)) -- producedAttributes
}

/**
* Returns true when the all the expressions in the current node as well as all of its children
* are deterministic
*/
lazy val deterministic: Boolean = expressions.forall(_.deterministic) &&
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: should we mark all non-deterministic plans as so? e.g. Sample?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. why is this in query plan? What about physical plans vs logical plans? should both be marked?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this to logical plan only since it doesn't make sense physical plans have different determinism.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

physical plan can override this lazy val if it has custom logic, right?

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can physical plan have a different determinism to ones in logical plan?

e.g., Sample is non-deterministic. I think physical plans of Sample should always be non-deterministic. Otherwise, the output will be inconsistent for which physical plan is used. The opposite case is the same too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, if we override this lazy val in a logical plan, we should do it in the corresponding physical plan as well.

Moving this to logical plan is also OK, if we don't need it in physical plan at all. cc @maryannxue

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we optimize something, that should always happen in optimizer with logical plans ... right?

If we can do something with physical plans, we will have to add another argument for every non deterministic plan e.g.)

case class Sample(
    lowerBound: Double,
    upperBound: Double,
    withReplacement: Boolean,
    seed: Long,
+   deterministic: Boolean,
    child: LogicalPlan) extends UnaryNode {
case class SampleExec(
    lowerBound: Double,
    upperBound: Double,
    withReplacement: Boolean,
    seed: Long,
+   deterministic: Boolean,
    child: SparkPlan) extends UnaryExecNode with CodegenSupport {

which is pretty much different from how we do in Expression.

Otherwise, we will have to recalculate it for each plan, etc.

children.forall(_.deterministic)

/**
* Attributes that are referenced by expressions but not provided by this node's children.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ListQuery, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ListQuery, Literal, NamedExpression, Rand}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
Expand Down Expand Up @@ -101,4 +101,32 @@ class QueryPlanSuite extends SparkFunSuite {
val plan = t.select($"a", $"b").select($"a", $"b").select($"a", $"b").analyze
assert(testRule(plan).resolved)
}

test("SPARK-37199: add a deterministic field to QueryPlan") {
val a: NamedExpression = AttributeReference("a", IntegerType)()
val aRand: NamedExpression = Alias(a + Rand(1), "aRand")()
val deterministicPlan = Project(
Seq(a),
Filter(
ListQuery(Project(
Seq(a),
UnresolvedRelation(TableIdentifier("t", None))
)),
UnresolvedRelation(TableIdentifier("t", None))
)
)
assert(deterministicPlan.deterministic)

val nonDeterministicPlan = Project(
Seq(aRand),
Filter(
ListQuery(Project(
Seq(a),
UnresolvedRelation(TableIdentifier("t", None))
)),
UnresolvedRelation(TableIdentifier("t", None))
)
)
assert(!nonDeterministicPlan.deterministic)
}
}
15 changes: 13 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1931,18 +1931,29 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
sql(
"""
|SELECT c1, s, s * 10 FROM (
| SELECT c1, (SELECT FIRST(c2) FROM t2 WHERE t1.c1 = t2.c1) s FROM t1)
| SELECT c1, (SELECT MIN(c2) FROM t2 WHERE t1.c1 = t2.c1) s FROM t1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the error if we don't make this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is fine, but the one below fails with:
Failed to analyze query: org.apache.spark.sql.AnalysisException: nondeterministic expression sum(scalarsubquery(t1.c1)) should not appear in the arguments of an aggregate function.;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a side note - I have been arguing, that first/last should be deterministic functions, but it has not gotten any attention - #29810.

Copy link
Contributor

@sigmod sigmod Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not change the test query and assert the error instead?

Just a side note - I have been arguing, that first/last should be deterministic functions

+1 even though FIRST/LAST are not truly deterministic during execution.

The purpose of this field is for determining the eligibility of query rewrites. Postgres has a nice categorization of those:
https://www.postgresql.org/docs/8.3/xfunc-volatility.html

SUM, AVG are not completely deterministic (when running distributed-ly) neither, but we can still do query optimizations over them, and I think it'd be fine for LAST/FIRST too. Differently, rand() has to be marked as non-deterministic because we don't want query rewrites to move, duplicate or dedup it.

|""".stripMargin),
correctAnswer)
checkAnswer(
sql(
"""
|SELECT c1, s, s * 10 FROM (
| SELECT c1, SUM((SELECT FIRST(c2) FROM t2 WHERE t1.c1 = t2.c1)) s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this query also fail in other databases like pgsql?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this subquery semantically the same as SELECT c1, SUM((SELECT c2 FROM t2 WHERE t1.c1 = t2.c1 LIMIT 1)) s FROM t1 GROUP BY c1? Spark currently does not support LIMIT to be on the correlation path, but this subquery, according to the current logic, is deterministic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this subquery, according to the current logic, is deterministic

It seems fine to mark first/last deterministic? #34470 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#29810 has been merged, @somani can you restore the original test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks @cloud-fan!

| SELECT c1, SUM((SELECT MIN(c2) FROM t2 WHERE t1.c1 = t2.c1)) s
| FROM t1 GROUP BY c1
|)
|""".stripMargin),
correctAnswer)
}
}

test("SPARK-37199: deterministic in QueryPlan considers subquery") {
val deterministicQueryPlan = sql("select (select 1 as b) as b")
.queryExecution.executedPlan
assert(deterministicQueryPlan.deterministic)

val nonDeterministicQueryPlan = sql("select (select rand(1) as b) as b")
.queryExecution.executedPlan
assert(!nonDeterministicQueryPlan.deterministic)
}

}