-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32940][SQL] Collect, first and last should be deterministic aggregate functions #29810
Conversation
Seq(positiveInt, negativeInt).foreach (v => { | ||
val e = Cast(First(f, ignoreNulls = true), IntegerType) <=> v | ||
Seq(positiveLong, negativeLong).foreach (v => { | ||
val e = Cast(SparkPartitionID(), LongType) <=> v | ||
assertEquivalent(e, e, evaluate = false) | ||
val e2 = Cast(Literal(30.toShort), IntegerType) >= v | ||
val e2 = Cast(Literal(30), LongType) >= v | ||
assertEquivalent(e2, e2, evaluate = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was no other non-deterministic expression, that can return a short, so I had to change this test a bit.
@cloud-fan, you reviewed the related pull request (although years back). |
ok to test |
@@ -63,9 +63,6 @@ case class First(child: Expression, ignoreNulls: Boolean) | |||
|
|||
override def nullable: Boolean = true | |||
|
|||
// First is not a deterministic function. | |||
override lazy val deterministic: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you may need to update the note
above and says like "The function can be non-deterministic because its results depend on the order of input rows which are usually non-deterministic after a shuffle." You might need to update functions.py
, functions.R
and functions.scala
Test build #128898 has finished for PR 29810 at commit
|
Test build #128910 has finished for PR 29810 at commit
|
hmm, it's pretty weird if we list |
Sorry if I didn't word it correctly - these are not listed there. I tried to exemplify the difference between deterministic and order irrelevant. |
Maybe I am missing something here. AFAIK the problem with First/Last/CollectList methods is that we can't control how results are merged. This depends on how we shuffle fetches results and this is not deterministic. |
You are 100% correct. As a user, this is how I would also understand the term deterministic. I'll copy our internal definition:
For aggregation expressions the internal state part can introduce extra confusion - of course all of them have some internal state about the current group they are aggregating (running count, largest value seen so far, etc), but they do not "remember" the previous groups they have aggregated. There is a separate optimizer rule For context, why this is relevant: spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala Lines 1138 to 1141 in c336ddf
Basically this case will filter out groups in the aggregation before aggregating the values. Within one group the aggregator will still see all the same rows in the same order, but it would not see the groups, that were filtered out. This would change the output of an aggregator, that remembers previous groups (non-deterministic), but it would not change the output of an aggregator, that only cares about the current group (deterministic, but possibly order relevant). |
# Conflicts: # python/pyspark/sql/functions.py # sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala
Test build #133545 has finished for PR 29810 at commit
|
Test build #133547 has finished for PR 29810 at commit
|
Test build #140262 has finished for PR 29810 at commit
|
Kubernetes integration test starting |
Test build #143761 has finished for PR 29810 at commit
|
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144407 has finished for PR 29810 at commit
|
There is inevitable randomness in the input of aggregate functions, because the shuffle reader may produce data with random orders, and we are not able to completely eliminate this randomness. For example, even I don't think |
retest this please |
Seq(positiveInt, negativeInt).foreach(v => { | ||
val e = Cast(First(f, ignoreNulls = true), IntegerType) <=> v | ||
Seq(positiveLong, negativeLong).foreach (v => { | ||
val e = Cast(SparkPartitionID(), LongType) <=> v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use Rand
?
CollectSet(_: Expression) | ||
).foreach { | ||
aggBuilder => | ||
val agg = aggBuilder('a) | ||
test(s"Eliminate Distinct in ${agg.prettyName}") { | ||
test(s"Eliminate Distinct in ${agg.toString}") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just $agg
Kubernetes integration test starting |
Kubernetes integration test status failure |
Refer to this link for build results (access rights to CI server needed): |
Test build #144906 has finished for PR 29810 at commit
|
Refer to this link for build results (access rights to CI server needed): |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Refer to this link for build results (access rights to CI server needed): |
thanks, merging to master! |
Test build #144921 has finished for PR 29810 at commit
|
Refer to this link for build results (access rights to CI server needed): |
What changes were proposed in this pull request?
Collect, first and last have mistakenly been marked as non-deterministic. They are actually deterministic iff their child expression is deterministic.
For example collect was marked as non-deterministic in #14749. The reasoning was that its output depends on the actual order of input rows. Although it is correct that these aggregators depend on the order of input rows, it does not make them non-deterministic.
In
EliminateSorts
optimizer rule, there is a methodisOrderIrrelevantAggs
, that lists all aggregators that do not depend on their input row order. Collect, first and last are correctly not listed there.An aggregator would be non-deterministic if its output for a group would depend on previous groups it has aggregated - I can't think of any practical examples of this kind of aggregator in Spark.
An analogous aggregator to these would be sum on float and double datatype - its result does depend on the order of its inputs, but is deterministic. Another similar aggregates are the
max_by
andmin_by
- deterministic functions, that can return different results when the order of rows changes.Why are the changes needed?
The optimizer rule
PushPredicateThroughNonJoin
can work in more cases.Does this PR introduce any user-facing change?
No
How was this patch tested?
UT