Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make shuffle run on CPU if we do a join where we read from bucketed table #785

Merged
merged 4 commits into from
Sep 17, 2020

Conversation

tgravescs
Copy link
Collaborator

closes #780

The issue is that when we have a join that is reading from 2 datasource and one of them is bucketed, the partitioning mismatches because the bucketed side is hashed on CPU side and then the other datasource which if it has a shuffle, will then be hashed and partitioned by the GPU side. The hashing is different so the data won't end up on the same partition and then we will drop data.

The fix is if we fix any joins that have a read from a bucketed table, we make sure the any shuffles to that join happen on the CPU side.

tgravescs and others added 3 commits September 16, 2020 16:49
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
@tgravescs
Copy link
Collaborator Author

build

@tgravescs tgravescs self-assigned this Sep 16, 2020
@tgravescs tgravescs added the bug Something isn't working label Sep 16, 2020
@tgravescs tgravescs added this to the Sep 14 - Sep 25 milestone Sep 16, 2020
@tgravescs
Copy link
Collaborator Author

I am going to file an issue to add some more bucketing tests. We should have an AQE one for example and probably some other datasources

false :: Nil
}
case _ =>
childPlans.flatMap(_.findBucketedReads())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code won't recurse down into query stages that have already executed (because query stages are leaf nodes), but I think that's ok since we would already have tagged the initial plan.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to find the next closest shuffle or read. If we encounter a shuffle before we hit a read it is fine, because the other code path should handle it. We only need to sorry about an input in the same stage.

// then we need to make sure that all of them run on the CPU instead
if (bucketedReads || !shuffleExchanges.forall(canThisBeReplaced)) {
val errMsg = if (bucketedReads) {
"can't support shuffle on the GPU with bucketed reads!"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It might be good to explain a little more that this is related to a join. But this is really just a nit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

revans2
revans2 previously approved these changes Sep 16, 2020
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
@tgravescs
Copy link
Collaborator Author

build

@pxLi
Copy link
Collaborator

pxLi commented Sep 17, 2020

This one is targeting branch-0.3, should this be added to 0.2 too?

@sameerz
Copy link
Collaborator

sameerz commented Sep 17, 2020

@tgravescs should this be targeted to branch-0.2?

@tgravescs
Copy link
Collaborator Author

oops it should be against 0.2

@tgravescs tgravescs changed the base branch from branch-0.3 to branch-0.2 September 17, 2020 02:59
@tgravescs
Copy link
Collaborator Author

build

@revans2 revans2 merged commit 6fa0dc1 into NVIDIA:branch-0.2 Sep 17, 2020
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
…able (NVIDIA#785)

* Make shuffle run on CPU if we do a join where we read from bucketed table

Signed-off-by: Thomas Graves <tgraves@nvidia.com>
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
…able (NVIDIA#785)

* Make shuffle run on CPU if we do a join where we read from bucketed table

Signed-off-by: Thomas Graves <tgraves@nvidia.com>
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this pull request Nov 30, 2023
…IDIA#785)

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Inner Join dropping data with bucketed Table input
5 participants