Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36559][SQL][PYTHON] Create plans dedicated to distributed-sequence index for optimization #33807

Closed
wants to merge 2 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Aug 23, 2021

What changes were proposed in this pull request?

This PR proposes to move distributed-sequence index implementation to SQL plan to leverage optimizations such as column pruning.

import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'distributed-sequence')
ps.range(10).id.value_counts().to_frame().spark.explain()

Before:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#51L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#51L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#70]
      +- HashAggregate(keys=[id#37L], functions=[count(1)], output=[__index_level_0__#48L, count#51L])
         +- Exchange hashpartitioning(id#37L, 200), ENSURE_REQUIREMENTS, [id=#67]
            +- HashAggregate(keys=[id#37L], functions=[partial_count(1)], output=[id#37L, count#63L])
               +- Project [id#37L]
                  +- Filter atleastnnonnulls(1, id#37L)
                     +- Scan ExistingRDD[__index_level_0__#36L,id#37L]
                        # ^^^ Base DataFrame created by the output RDD from zipWithIndex (and checkpointed)

After:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#275L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#275L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#174]
      +- HashAggregate(keys=[id#258L], functions=[count(1)])
         +- HashAggregate(keys=[id#258L], functions=[partial_count(1)])
            +- Filter atleastnnonnulls(1, id#258L)
               +- Range (0, 10, step=1, splits=16)
                  # ^^^ Removed the Spark job execution for `zipWithIndex`

Why are the changes needed?

To leverage optimization of SQL engine and avoid unnecessary shuffle to create default index.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unittests were added. Also, this PR will test all unittests in pandas API on Spark after switching the default index implementation to distributed-sequence.

@HyukjinKwon HyukjinKwon marked this pull request as draft August 23, 2021 06:26
@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-36559][SQL][PYTHON] Move distributed-sequence index implementation to SQL plan to leverage optimization [WIP][SPARK-36559][SQL][PYTHON] Create plans dedicated to distributed-sequence index for optimization Aug 23, 2021
@HyukjinKwon HyukjinKwon force-pushed the SPARK-36559 branch 2 times, most recently from 6f441fc to 296a6ae Compare August 23, 2021 06:31
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-36559][SQL][PYTHON] Create plans dedicated to distributed-sequence index for optimization [SPARK-36559][SQL][PYTHON] Create plans dedicated to distributed-sequence index for optimization Aug 24, 2021
@HyukjinKwon HyukjinKwon marked this pull request as ready for review August 24, 2021 09:12
@HyukjinKwon
Copy link
Member Author

cc @ueshin and @cloud-fan can you take a look when you find some time?

@SparkQA
Copy link

SparkQA commented Aug 24, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47223/

@SparkQA
Copy link

SparkQA commented Aug 24, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47223/

override def outputPartitioning: Partitioning = child.outputPartitioning

override protected def doExecute(): RDD[InternalRow] = {
child.execute().map(_.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to copy the unsafe rows before calling localCheckpoint?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, i forgot to describe it. localCheckpoint caches (persists) the data, and it stores the rows so it needs to copy. This is actually being done at Dataset.checkpoint: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L679


override protected def doExecute(): RDD[InternalRow] = {
child.execute().map(_.copy())
.localCheckpoint() // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still not sure if we need to localCheckPoint in the middle here ... but let me keep it as is for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.) if the child RDD has a shuffle, the shuffle will be triggered twice, and this checkpoint is to avoid that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shuffle will be reused. I think localCheckpoint is useful to save computation. e.g. df.sort(...).withSequenceColumn, if we don't do localCheckpoint, the shuffle is still done only once, but the local sort after shuffle will be done twice.

* increases one by one. This is for 'distributed-sequence' default index
* in pandas API on Spark.
*/
case class AttachDistributedSequenceExec(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could think about implementing this with an expression (like Python UDF or Window) .. but just decided to do this with plans to avoid making it too much complicated.

@SparkQA
Copy link

SparkQA commented Aug 24, 2021

Test build #142723 has finished for PR 33807 at commit 5d74c35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

One thing I'm worrying is that we can't push down filters through AttachDistributedSequence, but it won't happen, right?

@HyukjinKwon
Copy link
Member Author

Yeah, I think it won't happen. Just did a quick double check.

@HyukjinKwon
Copy link
Member Author

Let me merge this one into 3.2 together.

@HyukjinKwon
Copy link
Member Author

Merged to master and branch-3.2.

HyukjinKwon added a commit that referenced this pull request Aug 25, 2021
…ence index for optimization

### What changes were proposed in this pull request?

This PR proposes to move distributed-sequence index implementation to SQL plan to leverage optimizations such as column pruning.

```python
import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'distributed-sequence')
ps.range(10).id.value_counts().to_frame().spark.explain()
```

**Before:**

```bash
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#51L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#51L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#70]
      +- HashAggregate(keys=[id#37L], functions=[count(1)], output=[__index_level_0__#48L, count#51L])
         +- Exchange hashpartitioning(id#37L, 200), ENSURE_REQUIREMENTS, [id=#67]
            +- HashAggregate(keys=[id#37L], functions=[partial_count(1)], output=[id#37L, count#63L])
               +- Project [id#37L]
                  +- Filter atleastnnonnulls(1, id#37L)
                     +- Scan ExistingRDD[__index_level_0__#36L,id#37L]
                        # ^^^ Base DataFrame created by the output RDD from zipWithIndex (and checkpointed)
```

**After:**

```bash
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#275L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#275L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#174]
      +- HashAggregate(keys=[id#258L], functions=[count(1)])
         +- HashAggregate(keys=[id#258L], functions=[partial_count(1)])
            +- Filter atleastnnonnulls(1, id#258L)
               +- Range (0, 10, step=1, splits=16)
                  # ^^^ Removed the Spark job execution for `zipWithIndex`
```

### Why are the changes needed?

To leverage optimization of SQL engine and avoid unnecessary shuffle to create default index.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unittests were added. Also, this PR will test all unittests in pandas API on Spark after switching the default index implementation to `distributed-sequence`.

Closes #33807 from HyukjinKwon/SPARK-36559.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 93cec49)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the SPARK-36559 branch January 4, 2022 00:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants