-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7375] [SQL] Avoid row copying in exchange when sort.serializeMapOutputs takes effect #5948
Conversation
…erializeMapOutputs takes effect.
/cc @yhuai @marmbrus @liancheng @sryza. This patch is large only because of the extensive inline comments to explain the logic. |
Merged build triggered. |
Merged build started. |
shuffled.map(_._1) | ||
|
||
case SinglePartition => | ||
// SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liancheng, I see that you added this comment but I'm a bit confused by it. It doesn't look like orderings are used anywhere in the body of this case
. Since the old code actually did disable defensive copying in one of these branches, I suspect that this comment was incorrect or out-of-date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I'm also confused by this comment now. Especially, TakeOrdered.execute()
already made a defensive copy... Removing this LGTM.
Test build #32019 has started for PR 5948 at commit |
val rdd = if (willMergeSort || newOrdering.nonEmpty) { | ||
val keySchema = expressions.map(_.dataType).toArray | ||
val valueSchema = child.output.map(_.dataType).toArray | ||
val serializer = getSerializer(keySchema, valueSchema, numPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the serializer creation calls a bit earlier here since we technically need to know whether the serializer supports object relocation in order to figure out whether spark.shuffle.sort.serializeMapOutputs
will actually take effect. In theory, we might be able to avoid this check because SQL only uses SqlSerializer and SqlSerializer2, both of which support relocation. However, SqlSerializer extends KryoSerializer and there's a rare corner-case where user-provided Kryo registrators can change Kryo settings in a way that break's Kryo's relocatibility, causing the serialization to be bypassed. As a result, I think it's safer to create the serializer earlier and perform the full safety check.
Test build #32019 has finished for PR 5948 at commit
|
Merged build finished. Test FAILed. |
Test FAILed. |
This looks like a legitimate test failure:
|
I've spotted the problem: a |
- We now defensively copy before computing the partition bounds, which is necessary in order to get accurate sampling. - We now pass the actual partitioner into needToCopyObjectsBeforeShuffle(), which guards against the fact that RangePartitioner may produce a shuffle with fewer than `numPartitions` partitions.
Merged build triggered. |
Merged build started. |
Test build #32033 has started for PR 5948 at commit |
// partition bounds. To get accurate samples, we need to copy the mutable keys. | ||
val rddForSampling = childRdd.mapPartitions { iter => | ||
val mutablePair = new MutablePair[Row, Null]() | ||
iter.map(row => mutablePair.update(row.copy(), null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to know how many partitions will be shuffled, we need to create the RangePartitioner and compute its partition bounds. Depending on the results of the sampling, we may end up with a partitioner that produces fewer than numPartitions
partitions, so we need to know the partitioner's actual number of partitions in order to determine whether sort-based shuffle will bypass its merge sort and fall back to hash-shuffle (in which case we can avoid a copy).
In the old code, the input to the range partitioner was rdd
, so the input to the range bounds calculation may or may not have been defensively copied. As a result, I suspect that the actual bounds computed could vary depending on which shuffle mode was being used, which might result in extra work being done to compute accurate partition counts. To address this, I've updated this code to always perform a defensive copy of the partitioner input. This allows us to determine the actual partition count before computing rdd
, which lets us benefit from the copy bypass optimizations.
Test build #32033 has finished for PR 5948 at commit
|
Merged build finished. Test PASSed. |
Test PASSed. |
LGTM |
I think that we should merge #5849 ahead of this, since that patch will remove some of the uses of |
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
I've updated this now that #5849 has been merged. Reviewable makes it easy to see the most recent batch of changes after the merge + update: https://reviewable.io/reviews/apache/spark/5948 |
Merged build triggered. |
Merged build started. |
Test build #32251 has started for PR 5948 at commit |
Test build #32251 has finished for PR 5948 at commit
|
Merged build finished. Test FAILed. |
Test FAILed. |
Jenkins, retest this please. |
Merged build triggered. |
Merged build started. |
Test build #32253 has started for PR 5948 at commit |
Test build #32253 has finished for PR 5948 at commit
|
Merged build finished. Test FAILed. |
Test FAILed. |
I think that these might be spurious MiMa failures because the script doesn't seem to be reporting any error messages / binary incompatibilities. Perhaps there's a problem with the output redirection from the |
Test build #787 has started for PR 5948 at commit |
Looks like a problem with the build infra rather than a legitimate MiMa failure since the tests pass locally and NewSparkPullRequestBuilder was able to get past the MiMa phase. Let's hope that this test run works... |
Test build #787 has finished for PR 5948 at commit
|
Looks like a flaky PySpark streaming test (/cc @tdas):
|
Test build #790 has started for PR 5948 at commit |
Test build #790 has finished for PR 5948 at commit
|
LGTM |
I am merging it to master and branch 1.4. |
…apOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in #4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes #5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect. (cherry picked from commit cde5483) Signed-off-by: Yin Huai <yhuai@databricks.com>
…apOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in apache#4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
…apOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in apache#4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
…apOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in apache#4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
This patch refactors the SQL
Exchange
operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in #4450 /SPARK-4550).
This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects.