Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update partitioning logic in ShuffledBatchRDD #319

Merged
merged 3 commits into from
Aug 3, 2020

Conversation

andygrove
Copy link
Contributor

@andygrove andygrove commented Jul 2, 2020

This PR updates the logic in ShuffledBatchRDD to reflect recent changes in Spark's ShuffledRowRDD related to AQE support.

Note that there are oustanding questions about how these changes affect UCX, so UCX is disabled for now when AQE is enabled.

@andygrove andygrove requested a review from abellina July 2, 2020 22:30
@andygrove
Copy link
Contributor Author

@abellina Could you take a look when you get a chance? This is one of the changes I had to make when working on the AQE POC.

@andygrove andygrove self-assigned this Jul 2, 2020
@andygrove andygrove added the feature request New feature or request label Jul 2, 2020
@andygrove andygrove added this to the Jul 6 - Jul 17 milestone Jul 2, 2020
@andygrove
Copy link
Contributor Author

build

@kuhushukla
Copy link
Collaborator

I'll try and chime in this weekend on this change.

@abellina
Copy link
Collaborator

abellina commented Jul 2, 2020

To me at high level this looks fine. It is echoing the changes made in the row-based ShuffledRowRDD. The question I have is how it works with the shuffle plugin. Given AQE, with the shuffle plugin, the expectation is that this will likely fail, so we may want to turn off the shuffle plugin in that case (it could be a separate bug). I'll test with your patch (AQE) and the shuffle plugin also.

@abellina
Copy link
Collaborator

abellina commented Jul 6, 2020

@andygrove I haven't had time to test this yet, but I am fairly sure it will fail if the shuffle plugin is enabled, I'll try this today with some help from you likely (not sure what I can run with it enabled).

Here's what I am thinking: the writes are going to be cached and stamped as rapids blocks, and then the reads are going to ignore this all together (the getReaderForRange falls back as if it were a CPU shuffle). We do need to implement getReaderForRange for the plugin, but one way to get around this is to revert back to the legacy shuffle if AQE is on for now (https://github.com/NVIDIA/spark-rapids/blob/branch-0.2/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala#L194).

@abellina
Copy link
Collaborator

@andygrove here's the issue I mentioned on the read side: #362. I think it's a separate issue, that should come with tests also (somehow)

@andygrove andygrove changed the title [WIP] Update partitioning logic in ShuffledBatchRDD Update partitioning logic in ShuffledBatchRDD Jul 28, 2020
@andygrove
Copy link
Contributor Author

build

@@ -203,7 +204,12 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole
logWarning("Rapids Shuffle Plugin is falling back to SortShuffleManager because " +
"external shuffle is enabled")
}
fallThroughDueToExternalShuffle
val isAdaptiveEnabled = conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false").toBoolean
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a stringDefault value in SQLConf.ADAPTIVE_EXECUTION_ENABLED which should let us stay up to date with the default if it changes. The main problem with that is that we are compiling this against one specific version of spark so if the default changes from one version to another this will not stay up to date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that we could potentially do in the shim layer though so that it does compile against each version we support. I'll look into doing that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove we should wait on this PR then? Or you are thinking a different PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it to this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is a boolean config so it has to have a value and it doesn't make sense to check for a default value. I'll remove the default part instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is updated now.

abellina
abellina previously approved these changes Jul 30, 2020
@andygrove
Copy link
Contributor Author

build

Signed-off-by: Andy Grove <andygrove@nvidia.com>
@andygrove
Copy link
Contributor Author

build

@andygrove
Copy link
Contributor Author

There was a build failure with the 3.1.0 shim:

13:44:04  [ERROR] case class GpuBroadcastHashJoinExec(
13:44:04  [ERROR]            ^
13:44:04  [ERROR] /ansible-managed/jenkins-slave/slave4/workspace/spark/rapids_premerge-github/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala:77: class GpuShuffledHashJoinExec needs to be abstract, since:
13:44:04  it has 2 unimplemented members.
13:44:04  /** As seen from class GpuShuffledHashJoinExec, the missing signatures are as follows.
13:44:04   *  For convenience, these are usable as stub implementations.
13:44:04   */
13:44:04    // Members declared in org.apache.spark.sql.execution.CodegenSupport
13:44:04    def inputRDDs(): Seq[org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow]] = ???
13:44:04    
13:44:04    // Members declared in org.apache.spark.sql.execution.joins.HashJoin
13:44:04    protected def prepareRelation(ctx: org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext): (String, Boolean) = ???

Seems to be related to this recent Spark commit:

apache/spark@ae82768

Signed-off-by: Andy Grove <andygrove@nvidia.com>
Signed-off-by: Andy Grove <andygrove@nvidia.com>
@andygrove
Copy link
Contributor Author

build

@andygrove andygrove merged commit 9d88311 into NVIDIA:branch-0.2 Aug 3, 2020
@andygrove andygrove deleted the update-shuffle-logic branch August 3, 2020 13:22
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
Signed-off-by: Andy Grove <andygrove@nvidia.com>
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
Signed-off-by: Andy Grove <andygrove@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants