-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make shuffle run on CPU if we do a join where we read from bucketed table #785
Conversation
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
build |
I am going to file an issue to add some more bucketing tests. We should have an AQE one for example and probably some other datasources |
false :: Nil | ||
} | ||
case _ => | ||
childPlans.flatMap(_.findBucketedReads()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code won't recurse down into query stages that have already executed (because query stages are leaf nodes), but I think that's ok since we would already have tagged the initial plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal is to find the next closest shuffle or read. If we encounter a shuffle before we hit a read it is fine, because the other code path should handle it. We only need to sorry about an input in the same stage.
// then we need to make sure that all of them run on the CPU instead | ||
if (bucketedReads || !shuffleExchanges.forall(canThisBeReplaced)) { | ||
val errMsg = if (bucketedReads) { | ||
"can't support shuffle on the GPU with bucketed reads!" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It might be good to explain a little more that this is related to a join. But this is really just a nit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
build |
This one is targeting branch-0.3, should this be added to 0.2 too? |
@tgravescs should this be targeted to branch-0.2? |
oops it should be against 0.2 |
build |
…able (NVIDIA#785) * Make shuffle run on CPU if we do a join where we read from bucketed table Signed-off-by: Thomas Graves <tgraves@nvidia.com>
…able (NVIDIA#785) * Make shuffle run on CPU if we do a join where we read from bucketed table Signed-off-by: Thomas Graves <tgraves@nvidia.com>
…IDIA#785) Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com> Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
closes #780
The issue is that when we have a join that is reading from 2 datasource and one of them is bucketed, the partitioning mismatches because the bucketed side is hashed on CPU side and then the other datasource which if it has a shuffle, will then be hashed and partitioned by the GPU side. The hashing is different so the data won't end up on the same partition and then we will drop data.
The fix is if we fix any joins that have a read from a bucketed table, we make sure the any shuffles to that join happen on the CPU side.