Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34081][SQL] Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join #31145

Closed
wants to merge 2 commits into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Jan 12, 2021

What changes were proposed in this pull request?

Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases.

spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1")
spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2")
spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain

Before this pr:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
   +- HashAggregate(keys=[a#16L, b#17L], functions=[])
      +- HashAggregate(keys=[a#16L, b#17L], functions=[])
         +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#72]
            +- HashAggregate(keys=[a#16L, b#17L], functions=[])
               +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
                  :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#65]
                  :     +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
                  +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#66]
                        +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                           +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#61]
                              +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                                 +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>

After this pr:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
   +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74]
      +- HashAggregate(keys=[a#16L, b#17L], functions=[])
         +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
            :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67]
            :     +- HashAggregate(keys=[a#16L, b#17L], functions=[])
            :        +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#61]
            :           +- HashAggregate(keys=[a#16L, b#17L], functions=[])
            :              +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
            +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68]
                  +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                     +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#63]
                        +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                           +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>

Why are the changes needed?

  1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance.
  2. It will remove user added DISTINCT operator, e.g.: q38, q87.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and benchmark test.

SQL Before this PR(Seconds) After this PR(Seconds)
q14a 660 594
q14b 660 600
q38 55 29
q87 66 35

Before this pr:
image

After this pr:
image

@github-actions github-actions bot added the SQL label Jan 12, 2021
@SparkQA
Copy link

SparkQA commented Jan 12, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38544/

@SparkQA
Copy link

SparkQA commented Jan 12, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38544/

@SparkQA
Copy link

SparkQA commented Jan 12, 2021

Test build #133957 has finished for PR 31145 at commit fc06d46.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

This also changes the plan of q14a, q14b, does it cause perf regression?

I agree that it's unsure if pushing down left join through aggregate is beneficial or not, as both of them can reduce data volume. I have a simple heuristic: we look at the size metrics and see if the left join can be planned as broadcast join. If it can, then it's very likely that pushing it down is beneficial.

if aggs.forall(_.deterministic) && groups.nonEmpty &&
!aggs.exists(ScalarSubquery.hasCorrelatedScalarSubquery) &&
!(cond.nonEmpty && groups.equals(aggs) &&
cond.forall(e => splitConjunctivePredicates(e).forall(_.isInstanceOf[EqualNullSafe]))) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a new method to JoinSelectionHelper

  def canPlanAsBroadcastHashJoin(join: Join, conf: SQLConf): Boolean = {
    getBroadcastBuildSide(join.left, join.right, join.joinType,
      join.hint, hintOnly = true, conf).isDefined ||
    getBroadcastBuildSide(join.left, join.right, join.joinType,
      join.hint, hintOnly = false, conf).isDefined
  }

and then use it here:
if ... && canPlanAsBroadcastHashJoin(join, conf)

@wangyum wangyum changed the title [SPARK-24081][SQL] Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases [SPARK-24081][SQL] Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join Jan 13, 2021
@wangyum
Copy link
Member Author

wangyum commented Jan 13, 2021

This also changes the plan of q14a, q14b, does it cause perf regression?

No.

@wangyum wangyum changed the title [SPARK-24081][SQL] Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join [SPARK-34081][SQL] Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join Jan 13, 2021
@@ -334,7 +334,7 @@ Results [3]: [brand_id#13, class_id#14, category_id#15]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change this file

@@ -319,7 +319,7 @@ Results [3]: [brand_id#13, class_id#14, category_id#15]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to change this file.

@@ -174,7 +174,7 @@ Results [3]: [c_last_name#17, c_first_name#16, d_date#14]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -174,7 +174,7 @@ Results [3]: [c_last_name#17, c_first_name#16, d_date#14]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -319,7 +319,7 @@ Results [3]: [brand_id#13, class_id#14, category_id#15]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@cloud-fan
Copy link
Contributor

Since it changes the final plan of 4 TPCDS queries, can we put the benchmark result for all of these 4 queries even though some of them have no perf change?

@wangyum
Copy link
Member Author

wangyum commented Jan 13, 2021

Since it changes the final plan of 4 TPCDS queries, can we put the benchmark result for all of these 4 queries even though some of them have no perf change?

Yes. I have put the benchmark results to pr description.

@SparkQA
Copy link

SparkQA commented Jan 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38602/

@SparkQA
Copy link

SparkQA commented Jan 13, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38602/

@SparkQA
Copy link

SparkQA commented Jan 13, 2021

Test build #134015 has finished for PR 31145 at commit 6f0ba0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in d3ea308 Jan 14, 2021
@wangyum wangyum deleted the SPARK-34081 branch January 14, 2021 04:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants