Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32940][SQL] Collect, first and last should be deterministic aggregate functions #29810

Closed
wants to merge 25 commits into from

Conversation

tanelk
Copy link
Contributor

@tanelk tanelk commented Sep 19, 2020

What changes were proposed in this pull request?

Collect, first and last have mistakenly been marked as non-deterministic. They are actually deterministic iff their child expression is deterministic.

For example collect was marked as non-deterministic in #14749. The reasoning was that its output depends on the actual order of input rows. Although it is correct that these aggregators depend on the order of input rows, it does not make them non-deterministic.

In EliminateSorts optimizer rule, there is a method isOrderIrrelevantAggs, that lists all aggregators that do not depend on their input row order. Collect, first and last are correctly not listed there.
An aggregator would be non-deterministic if its output for a group would depend on previous groups it has aggregated - I can't think of any practical examples of this kind of aggregator in Spark.

An analogous aggregator to these would be sum on float and double datatype - its result does depend on the order of its inputs, but is deterministic. Another similar aggregates are the max_by and min_by - deterministic functions, that can return different results when the order of rows changes.

Why are the changes needed?

The optimizer rule PushPredicateThroughNonJoin can work in more cases.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

Comment on lines 123 to 127
Seq(positiveInt, negativeInt).foreach (v => {
val e = Cast(First(f, ignoreNulls = true), IntegerType) <=> v
Seq(positiveLong, negativeLong).foreach (v => {
val e = Cast(SparkPartitionID(), LongType) <=> v
assertEquivalent(e, e, evaluate = false)
val e2 = Cast(Literal(30.toShort), IntegerType) >= v
val e2 = Cast(Literal(30), LongType) >= v
assertEquivalent(e2, e2, evaluate = false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no other non-deterministic expression, that can return a short, so I had to change this test a bit.

@tanelk
Copy link
Contributor Author

tanelk commented Sep 19, 2020

@cloud-fan, you reviewed the related pull request (although years back).

@dongjoon-hyun
Copy link
Member

ok to test

@@ -63,9 +63,6 @@ case class First(child: Expression, ignoreNulls: Boolean)

override def nullable: Boolean = true

// First is not a deterministic function.
override lazy val deterministic: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you may need to update the note above and says like "The function can be non-deterministic because its results depend on the order of input rows which are usually non-deterministic after a shuffle." You might need to update functions.py, functions.R and functions.scala

@SparkQA
Copy link

SparkQA commented Sep 20, 2020

Test build #128898 has finished for PR 29810 at commit b9fd2f1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 20, 2020

Test build #128910 has finished for PR 29810 at commit b0919a2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

In EliminateSorts optimizer rule, there is a method isOrderIrrelevantAggs, that lists all aggregators that do not depend on their input row order. Collect, first and last are correctly not listed there.

hmm, it's pretty weird if we list first/last there. Removing sort will definitely change the query result, doesn't it?

@tanelk
Copy link
Contributor Author

tanelk commented Sep 21, 2020

In EliminateSorts optimizer rule, there is a method isOrderIrrelevantAggs, that lists all aggregators that do not depend on their input row order. Collect, first and last are correctly not listed there.

hmm, it's pretty weird if we list first/last there. Removing sort will definitely change the query result, doesn't it?

Sorry if I didn't word it correctly - these are not listed there. I tried to exemplify the difference between deterministic and order irrelevant.

@hvanhovell
Copy link
Contributor

Maybe I am missing something here. AFAIK the problem with First/Last/CollectList methods is that we can't control how results are merged. This depends on how we shuffle fetches results and this is not deterministic.

@tanelk
Copy link
Contributor Author

tanelk commented Sep 21, 2020

Maybe I am missing something here. AFAIK the problem with First/Last/CollectList methods is that we can't control how results are merged. This depends on how we shuffle fetches results and this is not deterministic.

You are 100% correct. As a user, this is how I would also understand the term deterministic.
But, internally deterministic has different meaning - by this definition Sum should be also non-deterministic if its input type is float or double.

I'll copy our internal definition:

   * Note that this means that an expression should be considered as non-deterministic if:
   * - it relies on some mutable internal state, or
   * - it relies on some implicit input that is not part of the children expression list.
   * - it has non-deterministic child or children.
   * - it assumes the input satisfies some certain condition via the child operator.

For aggregation expressions the internal state part can introduce extra confusion - of course all of them have some internal state about the current group they are aggregating (running count, largest value seen so far, etc), but they do not "remember" the previous groups they have aggregated.

There is a separate optimizer rule EliminateSorts, that keeps track of aggregators, that do not depend on input order - max, count, etc. But these are a subset of all deterministic aggregators.

For context, why this is relevant:
A snippet from PushPredicateThroughNonJoin

case filter @ Filter(condition, aggregate: Aggregate)
if aggregate.aggregateExpressions.forall(_.deterministic)
&& aggregate.groupingExpressions.nonEmpty =>
val aliasMap = getAliasMap(aggregate)

Basically this case will filter out groups in the aggregation before aggregating the values. Within one group the aggregator will still see all the same rows in the same order, but it would not see the groups, that were filtered out. This would change the output of an aggregator, that remembers previous groups (non-deterministic), but it would not change the output of an aggregator, that only cares about the current group (deterministic, but possibly order relevant).

# Conflicts:
#	python/pyspark/sql/functions.py
#	sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparisonSuite.scala
@github-actions github-actions bot added the CORE label Dec 30, 2020
@SparkQA
Copy link

SparkQA commented Dec 30, 2020

Test build #133545 has finished for PR 29810 at commit a080b53.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 31, 2020

Test build #133547 has finished for PR 29810 at commit dc6e7c0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 24, 2021

Test build #140262 has finished for PR 29810 at commit e5e9a04.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 30, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48272/

@SparkQA
Copy link

SparkQA commented Sep 30, 2021

Test build #143761 has finished for PR 29810 at commit 56fbf15.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 30, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48272/

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48881/

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48881/

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Test build #144407 has finished for PR 29810 at commit 0d40311.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class HistogramPlotBase(NumericPlotBase):
  • class KdePlotBase(NumericPlotBase):
  • new_class = type(NameTypeHolder.short_name, (NameTypeHolder,),
  • class Database(NamedTuple):
  • class Table(NamedTuple):
  • class Column(NamedTuple):
  • class Function(NamedTuple):
  • class SparkUpgradeException(CapturedException):
  • protected class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
  • public class ExpressionImplUtils
  • class IndexAlreadyExistsException(message: String, cause: Option[Throwable] = None)
  • class NoSuchIndexException(message: String, cause: Option[Throwable] = None)
  • trait ExtractValue extends Expression with NullIntolerant
  • case class AesEncrypt(input: Expression, key: Expression, child: Expression)
  • case class AesDecrypt(input: Expression, key: Expression, child: Expression)
  • case class SetCatalogAndNamespace(child: LogicalPlan) extends UnaryCommand
  • case class CreateFunction(
  • case class CreateView(
  • case class SetCatalogCommand(catalogName: String) extends LeafRunnableCommand
  • case class SetNamespaceCommand(namespace: Seq[String]) extends LeafRunnableCommand
  • case class ShowCatalogsCommand(pattern: Option[String]) extends LeafRunnableCommand
  • case class HashedRelationBroadcastMode(key: Seq[Expression], isNullAware: Boolean = false)

@cloud-fan
Copy link
Contributor

There is inevitable randomness in the input of aggregate functions, because the shuffle reader may produce data with random orders, and we are not able to completely eliminate this randomness. For example, even sum has randomness, as adding up floating values with different orders can lead to slightly different results.

I don't think first/last/collect has a significant difference and +1 to mark them as deterministic.

@cloud-fan
Copy link
Contributor

retest this please

Seq(positiveInt, negativeInt).foreach(v => {
val e = Cast(First(f, ignoreNulls = true), IntegerType) <=> v
Seq(positiveLong, negativeLong).foreach (v => {
val e = Cast(SparkPartitionID(), LongType) <=> v
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use Rand?

CollectSet(_: Expression)
).foreach {
aggBuilder =>
val agg = aggBuilder('a)
test(s"Eliminate Distinct in ${agg.prettyName}") {
test(s"Eliminate Distinct in ${agg.toString}") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just $agg

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49375/

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49375/

@AmplabJenkins
Copy link

Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49375/

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Test build #144906 has finished for PR 29810 at commit 0d40311.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class HistogramPlotBase(NumericPlotBase):
  • class KdePlotBase(NumericPlotBase):
  • new_class = type(NameTypeHolder.short_name, (NameTypeHolder,),
  • class Database(NamedTuple):
  • class Table(NamedTuple):
  • class Column(NamedTuple):
  • class Function(NamedTuple):
  • class SparkUpgradeException(CapturedException):
  • protected class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
  • public class ExpressionImplUtils
  • class IndexAlreadyExistsException(message: String, cause: Option[Throwable] = None)
  • class NoSuchIndexException(message: String, cause: Option[Throwable] = None)
  • trait ExtractValue extends Expression with NullIntolerant
  • case class AesEncrypt(input: Expression, key: Expression, child: Expression)
  • case class AesDecrypt(input: Expression, key: Expression, child: Expression)
  • case class SetCatalogAndNamespace(child: LogicalPlan) extends UnaryCommand
  • case class CreateFunction(
  • case class CreateView(
  • case class SetCatalogCommand(catalogName: String) extends LeafRunnableCommand
  • case class SetNamespaceCommand(namespace: Seq[String]) extends LeafRunnableCommand
  • case class ShowCatalogsCommand(pattern: Option[String]) extends LeafRunnableCommand
  • case class HashedRelationBroadcastMode(key: Seq[Expression], isNullAware: Boolean = false)

@AmplabJenkins
Copy link

Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144906/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49393/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49393/

@AmplabJenkins
Copy link

Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/49393/

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 58e07e0 Nov 5, 2021
@tanelk tanelk deleted the SPARK-32940 branch November 5, 2021 10:52
@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Test build #144921 has finished for PR 29810 at commit e4ed57c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/144921/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants