Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7375] [SQL] Avoid row copying in exchange when sort.serializeMapOutputs takes effect #5948

Closed
wants to merge 4 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch refactors the SQL Exchange operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in #4450 /
SPARK-4550).

This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects.

Review on Reviewable

@JoshRosen
Copy link
Contributor Author

/cc @yhuai @marmbrus @liancheng @sryza. This patch is large only because of the extensive inline comments to explain the logic.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

shuffled.map(_._1)

case SinglePartition =>
// SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng, I see that you added this comment but I'm a bit confused by it. It doesn't look like orderings are used anywhere in the body of this case. Since the old code actually did disable defensive copying in one of these branches, I suspect that this comment was incorrect or out-of-date.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I'm also confused by this comment now. Especially, TakeOrdered.execute() already made a defensive copy... Removing this LGTM.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #32019 has started for PR 5948 at commit ad006a4.

val rdd = if (willMergeSort || newOrdering.nonEmpty) {
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
val serializer = getSerializer(keySchema, valueSchema, numPartitions)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the serializer creation calls a bit earlier here since we technically need to know whether the serializer supports object relocation in order to figure out whether spark.shuffle.sort.serializeMapOutputs will actually take effect. In theory, we might be able to avoid this check because SQL only uses SqlSerializer and SqlSerializer2, both of which support relocation. However, SqlSerializer extends KryoSerializer and there's a rare corner-case where user-provided Kryo registrators can change Kryo settings in a way that break's Kryo's relocatibility, causing the serialization to be bypassed. As a result, I think it's safer to create the serializer earlier and perform the full safety check.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #32019 has finished for PR 5948 at commit ad006a4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32019/
Test FAILed.

@JoshRosen
Copy link
Contributor Author

This looks like a legitimate test failure:

[info] - SPARK-6927 sorting with codegen on *** FAILED *** (1 second, 640 milliseconds)
[info]   Results do not match for query:
[info]   == Parsed Logical Plan ==
[info]   'Sort ['a ASC], true
[info]    'Project ['b]
[info]     'UnresolvedRelation [binaryData], None
[info]   
[info]   == Analyzed Logical Plan ==
[info]   Project [b#11]
[info]    Sort [a#10 ASC], true
[info]     Project [b#11,a#10]
[info]      Subquery binaryData
[info]       LogicalRDD [a#10,b#11], MapPartitionsRDD[11] at apply at Transformer.scala:22
[info]   
[info]   == Optimized Logical Plan ==
[info]   Project [b#11]
[info]    Sort [a#10 ASC], true
[info]     Project [b#11,a#10]
[info]      LogicalRDD [a#10,b#11], MapPartitionsRDD[11] at apply at Transformer.scala:22
[info]   
[info]   == Physical Plan ==
[info]   Project [b#11]
[info]    Sort [a#10 ASC], true
[info]     Exchange (RangePartitioning [a#10 ASC], 10), []
[info]      Project [b#11,a#10]
[info]       PhysicalRDD [a#10,b#11], MapPartitionsRDD[11] at apply at Transformer.scala:22
[info]   
[info]   Code Generation: true
[info]   == RDD ==
[info]   == Results ==
[info]   !== Correct Answer - 5 ==   == Spark Answer - 5 ==
[info]    [1]                        [1]
[info]   ![2]                        [3]
[info]   ![3]                        [2]
[info]    [4]                        [4]
[info]    [5]                        [5] (QueryTest.scala:61)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495)
[info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
[info]   at org.scalatest.Assertions$class.fail(Assertions.scala:1328)
[info]   at org.scalatest.FunSuite.fail(FunSuite.scala:1555)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:61)
[info]   at org.apache.spark.sql.SQLQuerySuite.sortTest(SQLQuerySuite.scala:403)
[info]   at org.apache.spark.sql.SQLQuerySuite$$anonfun$28.apply$mcV$sp(SQLQuerySuite.scala:447)
[info]   at org.apache.spark.sql.SQLQuerySuite$$anonfun$28.apply(SQLQuerySuite.scala:442)
[info]   at org.apache.spark.sql.SQLQuerySuite$$anonfun$28.apply(SQLQuerySuite.scala:442)
[info]   at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
[info]   at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
[info]   at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
[info]   at scala.collection.immutable.List.foreach(List.scala:318)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1424)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
[info]   at org.apache.spark.sql.SQLQuerySuite.org$scalatest$BeforeAndAfterAll$$super$run(SQLQuerySuite.scala:34)
[info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
[info]   at org.apache.spark.sql.SQLQuerySuite.run(SQLQuerySuite.scala:34)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:294)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:284)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info]   at java.lang.Thread.run(Thread.java:745)
[info] - SPARK-6927 external sorting with codegen on *** FAILED *** (842 milliseconds)
[info]   Results do not match for query:
[info]   == Parsed Logical Plan ==
[info]   'Sort ['a ASC], true
[info]    'Project ['b]
[info]     'UnresolvedRelation [binaryData], None
[info]   
[info]   == Analyzed Logical Plan ==
[info]   Project [b#11]
[info]    Sort [a#10 ASC], true
[info]     Project [b#11,a#10]
[info]      Subquery binaryData
[info]       LogicalRDD [a#10,b#11], MapPartitionsRDD[11] at apply at Transformer.scala:22
[info]   
[info]   == Optimized Logical Plan ==
[info]   Project [b#11]
[info]    Sort [a#10 ASC], true
[info]     Project [b#11,a#10]
[info]      LogicalRDD [a#10,b#11], MapPartitionsRDD[11] at apply at Transformer.scala:22
[info]   
[info]   == Physical Plan ==
[info]   Project [b#11]
[info]    ExternalSort [a#10 ASC], true
[info]     Exchange (RangePartitioning [a#10 ASC], 10), []
[info]      Project [b#11,a#10]
[info]       PhysicalRDD [a#10,b#11], MapPartitionsRDD[11] at apply at Transformer.scala:22
[info]   
[info]   Code Generation: true
[info]   == RDD ==
[info]   == Results ==
[info]   !== Correct Answer - 5 ==   == Spark Answer - 5 ==
[info]    [1]                        [1]
[info]   ![2]                        [3]
[info]   ![3]                        [2]
[info]    [4]                        [4]
[info]    [5]                        [5] (QueryTest.scala:61)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495)
[info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
[info]   at org.scalatest.Assertions$class.fail(Assertions.scala:1328)
[info]   at org.scalatest.FunSuite.fail(FunSuite.scala:1555)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:61)
[info]   at org.apache.spark.sql.SQLQuerySuite.sortTest(SQLQuerySuite.scala:403)
[info]   at org.apache.spark.sql.SQLQuerySuite$$anonfun$29.apply$mcV$sp(SQLQuerySuite.scala:457)
[info]   at org.apache.spark.sql.SQLQuerySuite$$anonfun$29.apply(SQLQuerySuite.scala:452)
[info]   at org.apache.spark.sql.SQLQuerySuite$$anonfun$29.apply(SQLQuerySuite.scala:452)
[info]   at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info]   at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
[info]   at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
[info]   at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
[info]   at scala.collection.immutable.List.foreach(List.scala:318)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1424)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
[info]   at org.apache.spark.sql.SQLQuerySuite.org$scalatest$BeforeAndAfterAll$$super$run(SQLQuerySuite.scala:34)
[info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
[info]   at org.apache.spark.sql.SQLQuerySuite.run(SQLQuerySuite.scala:34)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:294)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:284)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info]   at java.lang.Thread.run(Thread.java:745)

@JoshRosen
Copy link
Contributor Author

I've spotted the problem: a RangePartitioner constructed with numPartitions might partition to fewer than numPartitions partitions after sampling.

- We now defensively copy before computing the partition bounds, which is
  necessary in order to get accurate sampling.
- We now pass the actual partitioner into needToCopyObjectsBeforeShuffle(),
  which guards against the fact that RangePartitioner may produce a shuffle
  with fewer than `numPartitions` partitions.
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #32033 has started for PR 5948 at commit 6a6bfce.

// partition bounds. To get accurate samples, we need to copy the mutable keys.
val rddForSampling = childRdd.mapPartitions { iter =>
val mutablePair = new MutablePair[Row, Null]()
iter.map(row => mutablePair.update(row.copy(), null))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to know how many partitions will be shuffled, we need to create the RangePartitioner and compute its partition bounds. Depending on the results of the sampling, we may end up with a partitioner that produces fewer than numPartitions partitions, so we need to know the partitioner's actual number of partitions in order to determine whether sort-based shuffle will bypass its merge sort and fall back to hash-shuffle (in which case we can avoid a copy).

In the old code, the input to the range partitioner was rdd, so the input to the range bounds calculation may or may not have been defensively copied. As a result, I suspect that the actual bounds computed could vary depending on which shuffle mode was being used, which might result in extra work being done to compute accurate partition counts. To address this, I've updated this code to always perform a defensive copy of the partitioner input. This allows us to determine the actual partition count before computing rdd, which lets us benefit from the copy bypass optimizations.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #32033 has finished for PR 5948 at commit 6a6bfce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JoinedRow6 extends Row
    • case class WindowSpecDefinition(
    • case class WindowSpecReference(name: String) extends WindowSpec
    • sealed trait FrameBoundary
    • case class ValuePreceding(value: Int) extends FrameBoundary
    • case class ValueFollowing(value: Int) extends FrameBoundary
    • case class SpecifiedWindowFrame(
    • trait WindowFunction extends Expression
    • case class UnresolvedWindowFunction(
    • case class UnresolvedWindowExpression(
    • case class WindowExpression(
    • case class WithWindowDefinition(
    • case class Window(
    • case class Window(
    • case class ComputedWindow(

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32033/
Test PASSed.

@marmbrus
Copy link
Contributor

marmbrus commented May 7, 2015

LGTM

@JoshRosen
Copy link
Contributor Author

I think that we should merge #5849 ahead of this, since that patch will remove some of the uses of sortBasedShuffleOn and bypassMergeThreshold that are outside of needToCopyObjectsBeforeShuffle, allowing me to reduce the scope of those variables.

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@JoshRosen
Copy link
Contributor Author

I've updated this now that #5849 has been merged. Reviewable makes it easy to see the most recent batch of changes after the merge + update: https://reviewable.io/reviews/apache/spark/5948

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #32251 has started for PR 5948 at commit f305ff3.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #32251 has finished for PR 5948 at commit f305ff3.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class EnumUtil

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32251/
Test FAILed.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #32253 has started for PR 5948 at commit f305ff3.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #32253 has finished for PR 5948 at commit f305ff3.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32253/
Test FAILed.

@JoshRosen
Copy link
Contributor Author

I think that these might be spurious MiMa failures because the script doesn't seem to be reporting any error messages / binary incompatibilities. Perhaps there's a problem with the output redirection from the dev/mima script? I'm going to try running these MiMa checks locally in order to debug.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #787 has started for PR 5948 at commit f305ff3.

@JoshRosen
Copy link
Contributor Author

Looks like a problem with the build infra rather than a legitimate MiMa failure since the tests pass locally and NewSparkPullRequestBuilder was able to get past the MiMa phase. Let's hope that this test run works...

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #787 has finished for PR 5948 at commit f305ff3.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class EnumUtil
    • class ElementwiseProduct extends UnaryTransformer[Vector, Vector, ElementwiseProduct]
    • class ElementwiseProduct(val scalingVector: Vector) extends VectorTransformer
    • trait Star extends NamedExpression with trees.LeafNode[Expression]
    • trait CaseWhenLike extends Expression
    • case class CaseWhen(branches: Seq[Expression]) extends CaseWhenLike
    • case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseWhenLike
    • case class CreateTableAsSelect(

@JoshRosen
Copy link
Contributor Author

Looks like a flaky PySpark streaming test (/cc @tdas):

======================================================================
FAIL: test_count_by_value_and_window (__main__.WindowFunctionTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "pyspark/streaming/tests.py", line 418, in test_count_by_value_and_window
    self._test_func(input, func, expected)
  File "pyspark/streaming/tests.py", line 133, in _test_func
    self.assertEqual(expected, result)
AssertionError: Lists differ: [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]] != [[1], [2]]

First list contains 8 additional elements.
First extra element 2:
[3]

- [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
+ [[1], [2]]

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #790 has started for PR 5948 at commit f305ff3.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #790 has finished for PR 5948 at commit f305ff3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Ping @marmbrus @yhuai, I think this should be good to go now that it's passing tests, although you might want to quickly double-check the latest changes (easy w/ Reviewable).

@yhuai
Copy link
Contributor

yhuai commented May 9, 2015

LGTM

@yhuai
Copy link
Contributor

yhuai commented May 9, 2015

I am merging it to master and branch 1.4.

asfgit pushed a commit that referenced this pull request May 9, 2015
…apOutputs takes effect

This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in #4450 /
SPARK-4550).

This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948)
<!-- Reviewable:end -->

Author: Josh Rosen <joshrosen@databricks.com>

Closes #5948 from JoshRosen/SPARK-7375 and squashes the following commits:

f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange
899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375
6a6bfce [Josh Rosen] Fix issue related to RangePartitioning:
ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.

(cherry picked from commit cde5483)
Signed-off-by: Yin Huai <yhuai@databricks.com>
@asfgit asfgit closed this in cde5483 May 9, 2015
@JoshRosen JoshRosen deleted the SPARK-7375 branch May 9, 2015 14:33
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
…apOutputs takes effect

This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in apache#4450 /
SPARK-4550).

This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948)
<!-- Reviewable:end -->

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#5948 from JoshRosen/SPARK-7375 and squashes the following commits:

f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange
899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375
6a6bfce [Josh Rosen] Fix issue related to RangePartitioning:
ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
…apOutputs takes effect

This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in apache#4450 /
SPARK-4550).

This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948)
<!-- Reviewable:end -->

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#5948 from JoshRosen/SPARK-7375 and squashes the following commits:

f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange
899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375
6a6bfce [Josh Rosen] Fix issue related to RangePartitioning:
ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…apOutputs takes effect

This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in apache#4450 /
SPARK-4550).

This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948)
<!-- Reviewable:end -->

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#5948 from JoshRosen/SPARK-7375 and squashes the following commits:

f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange
899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375
6a6bfce [Josh Rosen] Fix issue related to RangePartitioning:
ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants