-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37199][SQL] Add deterministic field to QueryPlan #34470
Conversation
add to whitelist |
@@ -1931,18 +1931,29 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark | |||
sql( | |||
""" | |||
|SELECT c1, s, s * 10 FROM ( | |||
| SELECT c1, (SELECT FIRST(c2) FROM t2 WHERE t1.c1 = t2.c1) s FROM t1) | |||
| SELECT c1, (SELECT MIN(c2) FROM t2 WHERE t1.c1 = t2.c1) s FROM t1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the error if we don't make this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is fine, but the one below fails with:
Failed to analyze query: org.apache.spark.sql.AnalysisException: nondeterministic expression sum(scalarsubquery(t1.c1)) should not appear in the arguments of an aggregate function.;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a side note - I have been arguing, that first/last should be deterministic functions, but it has not gotten any attention - #29810.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not change the test query and assert the error instead?
Just a side note - I have been arguing, that first/last should be deterministic functions
+1 even though FIRST/LAST are not truly deterministic during execution.
The purpose of this field is for determining the eligibility of query rewrites. Postgres has a nice categorization of those:
https://www.postgresql.org/docs/8.3/xfunc-volatility.html
SUM, AVG are not completely deterministic (when running distributed-ly) neither, but we can still do query optimizations over them, and I think it'd be fine for LAST/FIRST too. Differently, rand() has to be marked as non-deterministic because we don't want query rewrites to move, duplicate or dedup it.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144871 has finished for PR 34470 at commit
|
|""".stripMargin), | ||
correctAnswer) | ||
checkAnswer( | ||
sql( | ||
""" | ||
|SELECT c1, s, s * 10 FROM ( | ||
| SELECT c1, SUM((SELECT FIRST(c2) FROM t2 WHERE t1.c1 = t2.c1)) s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this query also fail in other databases like pgsql?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this subquery semantically the same as SELECT c1, SUM((SELECT c2 FROM t2 WHERE t1.c1 = t2.c1 LIMIT 1)) s FROM t1 GROUP BY c1
? Spark currently does not support LIMIT
to be on the correlation path, but this subquery, according to the current logic, is deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this subquery, according to the current logic, is deterministic
It seems fine to mark first/last deterministic? #34470 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks @cloud-fan!
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144930 has finished for PR 34470 at commit
|
thanks, merging to master! |
* Returns true when the all the expressions in the current node as well as all of its children | ||
* are deterministic | ||
*/ | ||
lazy val deterministic: Boolean = expressions.forall(_.deterministic) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq: should we mark all non-deterministic plans as so? e.g. Sample
?
* Returns true when the all the expressions in the current node as well as all of its children | ||
* are deterministic | ||
*/ | ||
lazy val deterministic: Boolean = expressions.forall(_.deterministic) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait .. why is this in query plan? What about physical plans vs logical plans? should both be marked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move this to logical plan only since it doesn't make sense physical plans have different determinism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
physical plan can override this lazy val
if it has custom logic, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can physical plan have a different determinism to ones in logical plan?
e.g., Sample is non-deterministic. I think physical plans of Sample should always be non-deterministic. Otherwise, the output will be inconsistent for which physical plan is used. The opposite case is the same too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, if we override this lazy val in a logical plan, we should do it in the corresponding physical plan as well.
Moving this to logical plan is also OK, if we don't need it in physical plan at all. cc @maryannxue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if we optimize something, that should always happen in optimizer with logical plans ... right?
If we can do something with physical plans, we will have to add another argument for every non deterministic plan e.g.)
case class Sample(
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long,
+ deterministic: Boolean,
child: LogicalPlan) extends UnaryNode {
case class SampleExec(
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long,
+ deterministic: Boolean,
child: SparkPlan) extends UnaryExecNode with CodegenSupport {
which is pretty much different from how we do in Expression
.
Otherwise, we will have to recalculate it for each plan, etc.
…lan (#879) * [SPARK-37199][SQL] Add deterministic field to QueryPlan ### What changes were proposed in this pull request? We have a deterministic field in Expressions to check if an expression is deterministic, but we do not have a similar field in QueryPlan. We have a need for such a check in the QueryPlan sometimes, like in InlineCTE This proposal is to add a deterministic field to QueryPlan. More details in this document: https://docs.google.com/document/d/1eIiaSJf-Co2HhjsaQxFNGwUxobnHID4ZGmJMcVytREc/edit#heading=h.4cz970y1mk93 ### Why are the changes needed? We have a need for such a check in the QueryPlan sometimes, like in InlineCTE ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests Closes #34470 from somani/isDeterministic. Authored-by: Abhishek Somani <abhishek.somani@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit fe41d18) * Fix test error Co-authored-by: Abhishek Somani <abhishek.somani@databricks.com>
What changes were proposed in this pull request?
We have a deterministic field in Expressions to check if an expression is deterministic, but we do not have a similar field in QueryPlan.
We have a need for such a check in the QueryPlan sometimes, like in InlineCTE
This proposal is to add a deterministic field to QueryPlan.
More details in this document: https://docs.google.com/document/d/1eIiaSJf-Co2HhjsaQxFNGwUxobnHID4ZGmJMcVytREc/edit#heading=h.4cz970y1mk93
Why are the changes needed?
We have a need for such a check in the QueryPlan sometimes, like in InlineCTE
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests