-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-4550. In sort-based shuffle, store map outputs in serialized form #4450
Conversation
Test build #26998 has started for PR 4450 at commit
|
Test build #26998 has finished for PR 4450 at commit
|
Test FAILed. |
cb8c458
to
6d50599
Compare
Test build #26999 has started for PR 4450 at commit
|
Test build #26999 has finished for PR 4450 at commit
|
Test FAILed. |
6d50599
to
c88830b
Compare
Test build #27006 has started for PR 4450 at commit
|
Test build #27006 has finished for PR 4450 at commit
|
Test FAILed. |
Test build #27050 has started for PR 4450 at commit
|
Test build #27051 has started for PR 4450 at commit
|
Test build #27051 has finished for PR 4450 at commit
|
Test FAILed. |
503136a
to
6acfb4e
Compare
Test build #27052 has started for PR 4450 at commit
|
Test build #27050 has finished for PR 4450 at commit
|
Test FAILed. |
Test build #27054 has started for PR 4450 at commit
|
Test build #27052 has finished for PR 4450 at commit
|
Test FAILed. |
Test build #27054 has finished for PR 4450 at commit
|
Test FAILed. |
6c9c16d
to
675e767
Compare
Test build #27152 has started for PR 4450 at commit
|
Thanks - I'm pretty swamped right now. Will take a look once I free up more (maybe next week) |
Test build #27152 has finished for PR 4450 at commit
|
JoshRosen this PR addresses the comments you left on #4450 after it got merged. Author: Sandy Ryza <sandy@cloudera.com> Closes #5916 from sryza/sandy-spark-4550-cleanup and squashes the following commits: dee3d85 [Sandy Ryza] Some minor cleanup after SPARK-4550. (cherry picked from commit 0092abb) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
JoshRosen this PR addresses the comments you left on #4450 after it got merged. Author: Sandy Ryza <sandy@cloudera.com> Closes #5916 from sryza/sandy-spark-4550-cleanup and squashes the following commits: dee3d85 [Sandy Ryza] Some minor cleanup after SPARK-4550.
…ializers support object relocation This patch extends the `Serializer` interface with a new `Private` API which allows serializers to indicate whether they support relocation of serialized objects in serializer stream output. This relocatibilty property is described in more detail in `Serializer.scala`, but in a nutshell a serializer supports relocation if reordering the bytes of serialized objects in serialization stream output is equivalent to having re-ordered those elements prior to serializing them. The optimized shuffle path introduced in #4450 and #5868 both rely on serializers having this property; this patch just centralizes the logic for determining whether a serializer has this property. I also added tests and comments clarifying when this works for KryoSerializer. This change allows the optimizations in #4450 to be applied for shuffles that use `SqlSerializer2`. Author: Josh Rosen <joshrosen@databricks.com> Closes #5924 from JoshRosen/SPARK-7311 and squashes the following commits: 50a68ca [Josh Rosen] Address minor nits 0a7ebd7 [Josh Rosen] Clarify reason why SqlSerializer2 supports this serializer 123b992 [Josh Rosen] Cleanup for submitting as standalone patch. 4aa61b2 [Josh Rosen] Add missing newline 2c1233a [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: 0ba75e6 [Josh Rosen] Add tests for serializer relocation property. 450fa21 [Josh Rosen] Back out accidental log4j.properties change 86d4dcd [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation b9624ee [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. (cherry picked from commit 002c123) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
…ializers support object relocation This patch extends the `Serializer` interface with a new `Private` API which allows serializers to indicate whether they support relocation of serialized objects in serializer stream output. This relocatibilty property is described in more detail in `Serializer.scala`, but in a nutshell a serializer supports relocation if reordering the bytes of serialized objects in serialization stream output is equivalent to having re-ordered those elements prior to serializing them. The optimized shuffle path introduced in #4450 and #5868 both rely on serializers having this property; this patch just centralizes the logic for determining whether a serializer has this property. I also added tests and comments clarifying when this works for KryoSerializer. This change allows the optimizations in #4450 to be applied for shuffles that use `SqlSerializer2`. Author: Josh Rosen <joshrosen@databricks.com> Closes #5924 from JoshRosen/SPARK-7311 and squashes the following commits: 50a68ca [Josh Rosen] Address minor nits 0a7ebd7 [Josh Rosen] Clarify reason why SqlSerializer2 supports this serializer 123b992 [Josh Rosen] Cleanup for submitting as standalone patch. 4aa61b2 [Josh Rosen] Add missing newline 2c1233a [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: 0ba75e6 [Josh Rosen] Add tests for serializer relocation property. 450fa21 [Josh Rosen] Back out accidental log4j.properties change 86d4dcd [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation b9624ee [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used.
…apOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in #4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes #5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect. (cherry picked from commit cde5483) Signed-off-by: Yin Huai <yhuai@databricks.com>
…apOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in #4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes #5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
…ache-aware sort This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records. The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf. The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles. UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: - The shuffle dependency specifies no aggregation or output ordering. - The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers). - The shuffle produces fewer than 16777216 output partitions. - No individual record is larger than 128 MB when serialized. In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer. At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file. UnsafeShuffleManager optimizes this process in several ways: - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details. - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache. - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge. - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge. The shuffle read path is unchanged. This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / #4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in #5725). ### Future work There are several tasks that build upon this patch, which will be left to future work: - [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data. - Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL). <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes #5868 from JoshRosen/unsafe-sort and squashes the following commits: ef0a86e [Josh Rosen] Fix scalastyle errors 7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data. d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances. 52a9981 [Josh Rosen] Fix some bugs in the address packing code. 51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort 4023fa4 [Josh Rosen] Add @Private annotation to some Java classes. de40b9d [Josh Rosen] More comments to try to explain metrics code df07699 [Josh Rosen] Attempt to clarify confusing metrics update code 5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file. d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter 4a2c785 [Josh Rosen] rename 'sort buffer' to 'pointer array' 6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter. 57312c9 [Josh Rosen] Clarify fileBufferSize units 2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter. fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer. 85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator. 0ad34da [Josh Rosen] Fix off-by-one in nextInt() call 56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS. e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding. 4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics. d4e6d89 [Josh Rosen] Update to bit shifting constants 69d5899 [Josh Rosen] Remove some unnecessary override vals 8531286 [Josh Rosen] Add tests that automatically trigger spills. 7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap(). e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections 39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!) 1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class. ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable. ae538dc [Josh Rosen] Document UnsafeShuffleManager. ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions. 0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass. b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance. 1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations. b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless. f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation. 4a01c45 [Josh Rosen] Remove unnecessary log message 27b18b0 [Josh Rosen] That for inserting records AT the max record size. fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes. 9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's 67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager 8f5061a [Josh Rosen] Strengthen assertion to check partitioning 01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite 1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename. e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort 9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors 7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling. 722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests. 9883e30 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort b95e642 [Josh Rosen] Refactor and document logic that decides when to spill. 1ce1300 [Josh Rosen] More minor cleanup 5e8cf75 [Josh Rosen] More minor cleanup e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface. cfe0ec4 [Josh Rosen] Address a number of minor review comments: 8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter 11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics. b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter. 4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests. 133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter. f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort. 57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort. 69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode. 7ee918e [Josh Rosen] Re-order imports in tests 3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces 3490512 [Josh Rosen] Misc. cleanup f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces. 2776aca [Josh Rosen] First passing test for ExternalSorter. 5e100b2 [Josh Rosen] Super-messy WIP on external sort 595923a [Josh Rosen] Remove some unused variables. 8958584 [Josh Rosen] Fix bug in calculating free space in current page. f17fa8f [Josh Rosen] Add missing newline c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: b8a09fe [Josh Rosen] Back out accidental log4j.properties change bfc12d3 [Josh Rosen] Add tests for serializer relocation property. 240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert() 1433b42 [Josh Rosen] Store record length as int instead of long. 026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter 0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java. 87e721b [Josh Rosen] Renaming and comments d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite 9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter. 253f13e [Josh Rosen] More cleanup 8e3ec20 [Josh Rosen] Begin code cleanup. 4d2f5e1 [Josh Rosen] WIP 3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter 767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter. e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter 57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter abf7bfe [Josh Rosen] Add basic test case. 81d52c5 [Josh Rosen] WIP on UnsafeSorter (cherry picked from commit 73bed40) Signed-off-by: Reynold Xin <rxin@databricks.com>
…ache-aware sort This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records. The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf. The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles. UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: - The shuffle dependency specifies no aggregation or output ordering. - The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers). - The shuffle produces fewer than 16777216 output partitions. - No individual record is larger than 128 MB when serialized. In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer. At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file. UnsafeShuffleManager optimizes this process in several ways: - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details. - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache. - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge. - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge. The shuffle read path is unchanged. This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / #4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in #5725). ### Future work There are several tasks that build upon this patch, which will be left to future work: - [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data. - Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL). <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes #5868 from JoshRosen/unsafe-sort and squashes the following commits: ef0a86e [Josh Rosen] Fix scalastyle errors 7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data. d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances. 52a9981 [Josh Rosen] Fix some bugs in the address packing code. 51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort 4023fa4 [Josh Rosen] Add @Private annotation to some Java classes. de40b9d [Josh Rosen] More comments to try to explain metrics code df07699 [Josh Rosen] Attempt to clarify confusing metrics update code 5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file. d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter 4a2c785 [Josh Rosen] rename 'sort buffer' to 'pointer array' 6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter. 57312c9 [Josh Rosen] Clarify fileBufferSize units 2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter. fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer. 85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator. 0ad34da [Josh Rosen] Fix off-by-one in nextInt() call 56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS. e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding. 4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics. d4e6d89 [Josh Rosen] Update to bit shifting constants 69d5899 [Josh Rosen] Remove some unnecessary override vals 8531286 [Josh Rosen] Add tests that automatically trigger spills. 7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap(). e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections 39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!) 1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class. ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable. ae538dc [Josh Rosen] Document UnsafeShuffleManager. ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions. 0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass. b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance. 1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations. b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless. f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation. 4a01c45 [Josh Rosen] Remove unnecessary log message 27b18b0 [Josh Rosen] That for inserting records AT the max record size. fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes. 9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's 67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager 8f5061a [Josh Rosen] Strengthen assertion to check partitioning 01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite 1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename. e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort 9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors 7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling. 722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests. 9883e30 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort b95e642 [Josh Rosen] Refactor and document logic that decides when to spill. 1ce1300 [Josh Rosen] More minor cleanup 5e8cf75 [Josh Rosen] More minor cleanup e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface. cfe0ec4 [Josh Rosen] Address a number of minor review comments: 8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter 11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics. b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter. 4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests. 133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter. f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort. 57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort. 69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode. 7ee918e [Josh Rosen] Re-order imports in tests 3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces 3490512 [Josh Rosen] Misc. cleanup f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces. 2776aca [Josh Rosen] First passing test for ExternalSorter. 5e100b2 [Josh Rosen] Super-messy WIP on external sort 595923a [Josh Rosen] Remove some unused variables. 8958584 [Josh Rosen] Fix bug in calculating free space in current page. f17fa8f [Josh Rosen] Add missing newline c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: b8a09fe [Josh Rosen] Back out accidental log4j.properties change bfc12d3 [Josh Rosen] Add tests for serializer relocation property. 240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert() 1433b42 [Josh Rosen] Store record length as int instead of long. 026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter 0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java. 87e721b [Josh Rosen] Renaming and comments d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite 9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter. 253f13e [Josh Rosen] More cleanup 8e3ec20 [Josh Rosen] Begin code cleanup. 4d2f5e1 [Josh Rosen] WIP 3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter 767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter. e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter 57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter abf7bfe [Josh Rosen] Add basic test case. 81d52c5 [Josh Rosen] WIP on UnsafeSorter
writer.write(key) | ||
writer.write(value) | ||
val partitionId = it.nextPartition() | ||
it.writeNext(writer) | ||
elementsPerPartition(partitionId) += 1 | ||
objectsWritten += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still correct? Based on my understanding of destructiveSortedWritablePartitionedIterator
(which is kind of a mouthful!), it looks like the it.writeNext(writer)
call should be writing an entire partition's worth of records to the output, but here and on the previous line we're performing some increments that I think only make sense for a record-at-a-time iterator, which we're no longer using in this block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that this patch has rendered spark.shuffle.spill.batchSize
useless. It looks like we don't have any tests that are capable of detecting whether this setting actually takes effect or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I retract this comment: I got confused because nextPartition
sort of sounded like we were striding across a partition boundary. It looks like this code is fine, even if it's somewhat confusing.
Refer to the JIRA for the design doc and some perf results. I wanted to call out some of the more possibly controversial changes up front: * Map outputs are only stored in serialized form when Kryo is in use. I'm still unsure whether Java-serialized objects can be relocated. At the very least, Java serialization writes out a stream header which causes problems with the current approach, so I decided to leave investigating this to future work. * The shuffle now explicitly operates on key-value pairs instead of any object. Data is written to shuffle files in alternating keys and values instead of key-value tuples. `BlockObjectWriter.write` now accepts a key argument and a value argument instead of any object. * The map output buffer can hold a max of Integer.MAX_VALUE bytes. Though this wouldn't be terribly difficult to change. * When spilling occurs, the objects that still in memory at merge time end up serialized and deserialized an extra time. Author: Sandy Ryza <sandy@cloudera.com> Closes apache#4450 from sryza/sandy-spark-4550 and squashes the following commits: 8c70dd9 [Sandy Ryza] Fix serialization 9c16fe6 [Sandy Ryza] Fix a couple tests and move getAutoReset to KryoSerializerInstance 6c54e06 [Sandy Ryza] Fix scalastyle d8462d8 [Sandy Ryza] SPARK-4550
JoshRosen this PR addresses the comments you left on apache#4450 after it got merged. Author: Sandy Ryza <sandy@cloudera.com> Closes apache#5916 from sryza/sandy-spark-4550-cleanup and squashes the following commits: dee3d85 [Sandy Ryza] Some minor cleanup after SPARK-4550.
…ializers support object relocation This patch extends the `Serializer` interface with a new `Private` API which allows serializers to indicate whether they support relocation of serialized objects in serializer stream output. This relocatibilty property is described in more detail in `Serializer.scala`, but in a nutshell a serializer supports relocation if reordering the bytes of serialized objects in serialization stream output is equivalent to having re-ordered those elements prior to serializing them. The optimized shuffle path introduced in apache#4450 and apache#5868 both rely on serializers having this property; this patch just centralizes the logic for determining whether a serializer has this property. I also added tests and comments clarifying when this works for KryoSerializer. This change allows the optimizations in apache#4450 to be applied for shuffles that use `SqlSerializer2`. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5924 from JoshRosen/SPARK-7311 and squashes the following commits: 50a68ca [Josh Rosen] Address minor nits 0a7ebd7 [Josh Rosen] Clarify reason why SqlSerializer2 supports this serializer 123b992 [Josh Rosen] Cleanup for submitting as standalone patch. 4aa61b2 [Josh Rosen] Add missing newline 2c1233a [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: 0ba75e6 [Josh Rosen] Add tests for serializer relocation property. 450fa21 [Josh Rosen] Back out accidental log4j.properties change 86d4dcd [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation b9624ee [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used.
…apOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in apache#4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
…ache-aware sort This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records. The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf. The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles. UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: - The shuffle dependency specifies no aggregation or output ordering. - The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers). - The shuffle produces fewer than 16777216 output partitions. - No individual record is larger than 128 MB when serialized. In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer. At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file. UnsafeShuffleManager optimizes this process in several ways: - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details. - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache. - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge. - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge. The shuffle read path is unchanged. This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / apache#4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in apache#5725). ### Future work There are several tasks that build upon this patch, which will be left to future work: - [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data. - Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL). <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5868 from JoshRosen/unsafe-sort and squashes the following commits: ef0a86e [Josh Rosen] Fix scalastyle errors 7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data. d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances. 52a9981 [Josh Rosen] Fix some bugs in the address packing code. 51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort 4023fa4 [Josh Rosen] Add @Private annotation to some Java classes. de40b9d [Josh Rosen] More comments to try to explain metrics code df07699 [Josh Rosen] Attempt to clarify confusing metrics update code 5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file. d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter 4a2c785 [Josh Rosen] rename 'sort buffer' to 'pointer array' 6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter. 57312c9 [Josh Rosen] Clarify fileBufferSize units 2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter. fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer. 85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator. 0ad34da [Josh Rosen] Fix off-by-one in nextInt() call 56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS. e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding. 4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics. d4e6d89 [Josh Rosen] Update to bit shifting constants 69d5899 [Josh Rosen] Remove some unnecessary override vals 8531286 [Josh Rosen] Add tests that automatically trigger spills. 7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap(). e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections 39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!) 1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class. ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable. ae538dc [Josh Rosen] Document UnsafeShuffleManager. ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions. 0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass. b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance. 1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations. b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless. f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation. 4a01c45 [Josh Rosen] Remove unnecessary log message 27b18b0 [Josh Rosen] That for inserting records AT the max record size. fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes. 9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's 67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager 8f5061a [Josh Rosen] Strengthen assertion to check partitioning 01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite 1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename. e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort 9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors 7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling. 722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests. 9883e30 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort b95e642 [Josh Rosen] Refactor and document logic that decides when to spill. 1ce1300 [Josh Rosen] More minor cleanup 5e8cf75 [Josh Rosen] More minor cleanup e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface. cfe0ec4 [Josh Rosen] Address a number of minor review comments: 8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter 11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics. b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter. 4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests. 133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter. f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort. 57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort. 69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode. 7ee918e [Josh Rosen] Re-order imports in tests 3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces 3490512 [Josh Rosen] Misc. cleanup f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces. 2776aca [Josh Rosen] First passing test for ExternalSorter. 5e100b2 [Josh Rosen] Super-messy WIP on external sort 595923a [Josh Rosen] Remove some unused variables. 8958584 [Josh Rosen] Fix bug in calculating free space in current page. f17fa8f [Josh Rosen] Add missing newline c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: b8a09fe [Josh Rosen] Back out accidental log4j.properties change bfc12d3 [Josh Rosen] Add tests for serializer relocation property. 240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert() 1433b42 [Josh Rosen] Store record length as int instead of long. 026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter 0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java. 87e721b [Josh Rosen] Renaming and comments d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite 9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter. 253f13e [Josh Rosen] More cleanup 8e3ec20 [Josh Rosen] Begin code cleanup. 4d2f5e1 [Josh Rosen] WIP 3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter 767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter. e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter 57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter abf7bfe [Josh Rosen] Add basic test case. 81d52c5 [Josh Rosen] WIP on UnsafeSorter
Refer to the JIRA for the design doc and some perf results. I wanted to call out some of the more possibly controversial changes up front: * Map outputs are only stored in serialized form when Kryo is in use. I'm still unsure whether Java-serialized objects can be relocated. At the very least, Java serialization writes out a stream header which causes problems with the current approach, so I decided to leave investigating this to future work. * The shuffle now explicitly operates on key-value pairs instead of any object. Data is written to shuffle files in alternating keys and values instead of key-value tuples. `BlockObjectWriter.write` now accepts a key argument and a value argument instead of any object. * The map output buffer can hold a max of Integer.MAX_VALUE bytes. Though this wouldn't be terribly difficult to change. * When spilling occurs, the objects that still in memory at merge time end up serialized and deserialized an extra time. Author: Sandy Ryza <sandy@cloudera.com> Closes apache#4450 from sryza/sandy-spark-4550 and squashes the following commits: 8c70dd9 [Sandy Ryza] Fix serialization 9c16fe6 [Sandy Ryza] Fix a couple tests and move getAutoReset to KryoSerializerInstance 6c54e06 [Sandy Ryza] Fix scalastyle d8462d8 [Sandy Ryza] SPARK-4550
JoshRosen this PR addresses the comments you left on apache#4450 after it got merged. Author: Sandy Ryza <sandy@cloudera.com> Closes apache#5916 from sryza/sandy-spark-4550-cleanup and squashes the following commits: dee3d85 [Sandy Ryza] Some minor cleanup after SPARK-4550.
…ializers support object relocation This patch extends the `Serializer` interface with a new `Private` API which allows serializers to indicate whether they support relocation of serialized objects in serializer stream output. This relocatibilty property is described in more detail in `Serializer.scala`, but in a nutshell a serializer supports relocation if reordering the bytes of serialized objects in serialization stream output is equivalent to having re-ordered those elements prior to serializing them. The optimized shuffle path introduced in apache#4450 and apache#5868 both rely on serializers having this property; this patch just centralizes the logic for determining whether a serializer has this property. I also added tests and comments clarifying when this works for KryoSerializer. This change allows the optimizations in apache#4450 to be applied for shuffles that use `SqlSerializer2`. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5924 from JoshRosen/SPARK-7311 and squashes the following commits: 50a68ca [Josh Rosen] Address minor nits 0a7ebd7 [Josh Rosen] Clarify reason why SqlSerializer2 supports this serializer 123b992 [Josh Rosen] Cleanup for submitting as standalone patch. 4aa61b2 [Josh Rosen] Add missing newline 2c1233a [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: 0ba75e6 [Josh Rosen] Add tests for serializer relocation property. 450fa21 [Josh Rosen] Back out accidental log4j.properties change 86d4dcd [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation b9624ee [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used.
…apOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in apache#4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
…ache-aware sort This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records. The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf. The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles. UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: - The shuffle dependency specifies no aggregation or output ordering. - The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers). - The shuffle produces fewer than 16777216 output partitions. - No individual record is larger than 128 MB when serialized. In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer. At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file. UnsafeShuffleManager optimizes this process in several ways: - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details. - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache. - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge. - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge. The shuffle read path is unchanged. This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / apache#4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in apache#5725). ### Future work There are several tasks that build upon this patch, which will be left to future work: - [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data. - Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL). <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5868 from JoshRosen/unsafe-sort and squashes the following commits: ef0a86e [Josh Rosen] Fix scalastyle errors 7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data. d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances. 52a9981 [Josh Rosen] Fix some bugs in the address packing code. 51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort 4023fa4 [Josh Rosen] Add @Private annotation to some Java classes. de40b9d [Josh Rosen] More comments to try to explain metrics code df07699 [Josh Rosen] Attempt to clarify confusing metrics update code 5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file. d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter 4a2c785 [Josh Rosen] rename 'sort buffer' to 'pointer array' 6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter. 57312c9 [Josh Rosen] Clarify fileBufferSize units 2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter. fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer. 85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator. 0ad34da [Josh Rosen] Fix off-by-one in nextInt() call 56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS. e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding. 4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics. d4e6d89 [Josh Rosen] Update to bit shifting constants 69d5899 [Josh Rosen] Remove some unnecessary override vals 8531286 [Josh Rosen] Add tests that automatically trigger spills. 7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap(). e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections 39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!) 1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class. ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable. ae538dc [Josh Rosen] Document UnsafeShuffleManager. ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions. 0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass. b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance. 1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations. b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless. f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation. 4a01c45 [Josh Rosen] Remove unnecessary log message 27b18b0 [Josh Rosen] That for inserting records AT the max record size. fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes. 9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's 67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager 8f5061a [Josh Rosen] Strengthen assertion to check partitioning 01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite 1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename. e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort 9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors 7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling. 722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests. 9883e30 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort b95e642 [Josh Rosen] Refactor and document logic that decides when to spill. 1ce1300 [Josh Rosen] More minor cleanup 5e8cf75 [Josh Rosen] More minor cleanup e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface. cfe0ec4 [Josh Rosen] Address a number of minor review comments: 8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter 11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics. b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter. 4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests. 133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter. f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort. 57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort. 69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode. 7ee918e [Josh Rosen] Re-order imports in tests 3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces 3490512 [Josh Rosen] Misc. cleanup f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces. 2776aca [Josh Rosen] First passing test for ExternalSorter. 5e100b2 [Josh Rosen] Super-messy WIP on external sort 595923a [Josh Rosen] Remove some unused variables. 8958584 [Josh Rosen] Fix bug in calculating free space in current page. f17fa8f [Josh Rosen] Add missing newline c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: b8a09fe [Josh Rosen] Back out accidental log4j.properties change bfc12d3 [Josh Rosen] Add tests for serializer relocation property. 240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert() 1433b42 [Josh Rosen] Store record length as int instead of long. 026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter 0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java. 87e721b [Josh Rosen] Renaming and comments d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite 9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter. 253f13e [Josh Rosen] More cleanup 8e3ec20 [Josh Rosen] Begin code cleanup. 4d2f5e1 [Josh Rosen] WIP 3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter 767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter. e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter 57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter abf7bfe [Josh Rosen] Add basic test case. 81d52c5 [Josh Rosen] WIP on UnsafeSorter
Refer to the JIRA for the design doc and some perf results. I wanted to call out some of the more possibly controversial changes up front: * Map outputs are only stored in serialized form when Kryo is in use. I'm still unsure whether Java-serialized objects can be relocated. At the very least, Java serialization writes out a stream header which causes problems with the current approach, so I decided to leave investigating this to future work. * The shuffle now explicitly operates on key-value pairs instead of any object. Data is written to shuffle files in alternating keys and values instead of key-value tuples. `BlockObjectWriter.write` now accepts a key argument and a value argument instead of any object. * The map output buffer can hold a max of Integer.MAX_VALUE bytes. Though this wouldn't be terribly difficult to change. * When spilling occurs, the objects that still in memory at merge time end up serialized and deserialized an extra time. Author: Sandy Ryza <sandy@cloudera.com> Closes apache#4450 from sryza/sandy-spark-4550 and squashes the following commits: 8c70dd9 [Sandy Ryza] Fix serialization 9c16fe6 [Sandy Ryza] Fix a couple tests and move getAutoReset to KryoSerializerInstance 6c54e06 [Sandy Ryza] Fix scalastyle d8462d8 [Sandy Ryza] SPARK-4550
JoshRosen this PR addresses the comments you left on apache#4450 after it got merged. Author: Sandy Ryza <sandy@cloudera.com> Closes apache#5916 from sryza/sandy-spark-4550-cleanup and squashes the following commits: dee3d85 [Sandy Ryza] Some minor cleanup after SPARK-4550.
…ializers support object relocation This patch extends the `Serializer` interface with a new `Private` API which allows serializers to indicate whether they support relocation of serialized objects in serializer stream output. This relocatibilty property is described in more detail in `Serializer.scala`, but in a nutshell a serializer supports relocation if reordering the bytes of serialized objects in serialization stream output is equivalent to having re-ordered those elements prior to serializing them. The optimized shuffle path introduced in apache#4450 and apache#5868 both rely on serializers having this property; this patch just centralizes the logic for determining whether a serializer has this property. I also added tests and comments clarifying when this works for KryoSerializer. This change allows the optimizations in apache#4450 to be applied for shuffles that use `SqlSerializer2`. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5924 from JoshRosen/SPARK-7311 and squashes the following commits: 50a68ca [Josh Rosen] Address minor nits 0a7ebd7 [Josh Rosen] Clarify reason why SqlSerializer2 supports this serializer 123b992 [Josh Rosen] Cleanup for submitting as standalone patch. 4aa61b2 [Josh Rosen] Add missing newline 2c1233a [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: 0ba75e6 [Josh Rosen] Add tests for serializer relocation property. 450fa21 [Josh Rosen] Back out accidental log4j.properties change 86d4dcd [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation b9624ee [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used.
…apOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in apache#4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
…ache-aware sort This patch introduces a new shuffle manager that enhances the existing sort-based shuffle with a new cache-friendly sort algorithm that operates directly on binary data. The goals of this patch are to lower memory usage and Java object overheads during shuffle and to speed up sorting. It also lays groundwork for follow-up patches that will enable end-to-end processing of serialized records. The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting `spark.shuffle.manager=tungsten-sort` in SparkConf. The new shuffle manager uses directly-managed memory to implement several performance optimizations for certain types of shuffles. In cases where the new performance optimizations cannot be applied, the new shuffle manager delegates to SortShuffleManager to handle those shuffles. UnsafeShuffleManager's optimizations will apply when _all_ of the following conditions hold: - The shuffle dependency specifies no aggregation or output ordering. - The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers). - The shuffle produces fewer than 16777216 output partitions. - No individual record is larger than 128 MB when serialized. In addition, extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark's LZF serializer. At a high-level, UnsafeShuffleManager's design is similar to Spark's existing SortShuffleManager. In sort-based shuffle, incoming records are sorted according to their target partition ids, then written to a single map output file. Reducers fetch contiguous regions of this file in order to read their portion of the map output. In cases where the map output data is too large to fit in memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged to produce the final output file. UnsafeShuffleManager optimizes this process in several ways: - Its sort operates on serialized binary data rather than Java objects, which reduces memory consumption and GC overheads. This optimization requires the record serializer to have certain properties to allow serialized records to be re-ordered without requiring deserialization. See SPARK-4550, where this optimization was first proposed and implemented, for more details. - It uses a specialized cache-efficient sorter (UnsafeShuffleExternalSorter) that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, this fits more of the array into cache. - The spill merging procedure operates on blocks of serialized records that belong to the same partition and does not need to deserialize records during the merge. - When the spill compression codec supports concatenation of compressed data, the spill merge simply concatenates the serialized and compressed spill partitions to produce the final output partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used and avoids the need to allocate decompression or copying buffers during the merge. The shuffle read path is unchanged. This patch is similar to [SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / apache#4450 but uses a slightly different implementation. The `unsafe`-based implementation featured in this patch lays the groundwork for followup patches that will enable sorting to operate on serialized data pages that will be prepared by Spark SQL's new `unsafe` operators (such as the new aggregation operator introduced in apache#5725). ### Future work There are several tasks that build upon this patch, which will be left to future work: - [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / extend the shuffle interfaces to accept binary data as input. The goal here is to let us bypass serialization steps in cases where the sort input is produced by an operator that operates directly on binary data. - Extension / redesign of the `Serializer` API. We can add new methods which allow serializers to determine the size requirements for serializing objects and for serializing objects directly to a specified memory address (similar to how `UnsafeRowConverter` works in Spark SQL). <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5868) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5868 from JoshRosen/unsafe-sort and squashes the following commits: ef0a86e [Josh Rosen] Fix scalastyle errors 7610f2f [Josh Rosen] Add tests for proper cleanup of shuffle data. d494ffe [Josh Rosen] Fix deserialization of JavaSerializer instances. 52a9981 [Josh Rosen] Fix some bugs in the address packing code. 51812a7 [Josh Rosen] Change shuffle manager sort name to tungsten-sort 4023fa4 [Josh Rosen] Add @Private annotation to some Java classes. de40b9d [Josh Rosen] More comments to try to explain metrics code df07699 [Josh Rosen] Attempt to clarify confusing metrics update code 5e189c6 [Josh Rosen] Track time spend closing / flushing files; split TimeTrackingOutputStream into separate file. d5779c6 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort c2ce78e [Josh Rosen] Fix a missed usage of MAX_PARTITION_ID e3b8855 [Josh Rosen] Cleanup in UnsafeShuffleWriter 4a2c785 [Josh Rosen] rename 'sort buffer' to 'pointer array' 6276168 [Josh Rosen] Remove ability to disable spilling in UnsafeShuffleExternalSorter. 57312c9 [Josh Rosen] Clarify fileBufferSize units 2d4e4f4 [Josh Rosen] Address some minor comments in UnsafeShuffleExternalSorter. fdcac08 [Josh Rosen] Guard against overflow when expanding sort buffer. 85da63f [Josh Rosen] Cleanup in UnsafeShuffleSorterIterator. 0ad34da [Josh Rosen] Fix off-by-one in nextInt() call 56781a1 [Josh Rosen] Rename UnsafeShuffleSorter to UnsafeShuffleInMemorySorter e995d1a [Josh Rosen] Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS. e58a6b4 [Josh Rosen] Add more tests for PackedRecordPointer encoding. 4f0b770 [Josh Rosen] Attempt to implement proper shuffle write metrics. d4e6d89 [Josh Rosen] Update to bit shifting constants 69d5899 [Josh Rosen] Remove some unnecessary override vals 8531286 [Josh Rosen] Add tests that automatically trigger spills. 7c953f9 [Josh Rosen] Add test that covers UnsafeShuffleSortDataFormat.swap(). e1855e5 [Josh Rosen] Fix a handful of misc. IntelliJ inspections 39434f9 [Josh Rosen] Avoid integer multiplication overflow in getMemoryUsage (thanks FindBugs!) 1e3ad52 [Josh Rosen] Delete unused ByteBufferOutputStream class. ea4f85f [Josh Rosen] Roll back an unnecessary change in Spillable. ae538dc [Josh Rosen] Document UnsafeShuffleManager. ec6d626 [Josh Rosen] Add notes on maximum # of supported shuffle partitions. 0d4d199 [Josh Rosen] Bump up shuffle.memoryFraction to make tests pass. b3b1924 [Josh Rosen] Properly implement close() and flush() in DummySerializerInstance. 1ef56c7 [Josh Rosen] Revise compression codec support in merger; test cross product of configurations. b57c17f [Josh Rosen] Disable some overly-verbose logs that rendered DEBUG useless. f780fb1 [Josh Rosen] Add test demonstrating which compression codecs support concatenation. 4a01c45 [Josh Rosen] Remove unnecessary log message 27b18b0 [Josh Rosen] That for inserting records AT the max record size. fcd9a3c [Josh Rosen] Add notes + tests for maximum record / page sizes. 9d1ee7c [Josh Rosen] Fix MiMa excludes for ShuffleWriter change fd4bb9e [Josh Rosen] Use own ByteBufferOutputStream rather than Kryo's 67d25ba [Josh Rosen] Update Exchange operator's copying logic to account for new shuffle manager 8f5061a [Josh Rosen] Strengthen assertion to check partitioning 01afc74 [Josh Rosen] Actually read data in UnsafeShuffleWriterSuite 1929a74 [Josh Rosen] Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolver rename. e8718dd [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort 9b7ebed [Josh Rosen] More defensive programming RE: cleaning up spill files and memory after errors 7cd013b [Josh Rosen] Begin refactoring to enable proper tests for spilling. 722849b [Josh Rosen] Add workaround for transferTo() bug in merging code; refactor tests. 9883e30 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort b95e642 [Josh Rosen] Refactor and document logic that decides when to spill. 1ce1300 [Josh Rosen] More minor cleanup 5e8cf75 [Josh Rosen] More minor cleanup e67f1ea [Josh Rosen] Remove upper type bound in ShuffleWriter interface. cfe0ec4 [Josh Rosen] Address a number of minor review comments: 8a6fe52 [Josh Rosen] Rename UnsafeShuffleSpillWriter to UnsafeShuffleExternalSorter 11feeb6 [Josh Rosen] Update TODOs related to shuffle write metrics. b674412 [Josh Rosen] Merge remote-tracking branch 'origin/master' into unsafe-sort aaea17b [Josh Rosen] Add comments to UnsafeShuffleSpillWriter. 4f70141 [Josh Rosen] Fix merging; now passes UnsafeShuffleSuite tests. 133c8c9 [Josh Rosen] WIP towards testing UnsafeShuffleWriter. f480fb2 [Josh Rosen] WIP in mega-refactoring towards shuffle-specific sort. 57f1ec0 [Josh Rosen] WIP towards packed record pointers for use in optimized shuffle sort. 69232fd [Josh Rosen] Enable compressible address encoding for off-heap mode. 7ee918e [Josh Rosen] Re-order imports in tests 3aeaff7 [Josh Rosen] More refactoring and cleanup; begin cleaning iterator interfaces 3490512 [Josh Rosen] Misc. cleanup f156a8f [Josh Rosen] Hacky metrics integration; refactor some interfaces. 2776aca [Josh Rosen] First passing test for ExternalSorter. 5e100b2 [Josh Rosen] Super-messy WIP on external sort 595923a [Josh Rosen] Remove some unused variables. 8958584 [Josh Rosen] Fix bug in calculating free space in current page. f17fa8f [Josh Rosen] Add missing newline c2fca17 [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use: b8a09fe [Josh Rosen] Back out accidental log4j.properties change bfc12d3 [Josh Rosen] Add tests for serializer relocation property. 240864c [Josh Rosen] Remove PrefixComputer and require prefix to be specified as part of insert() 1433b42 [Josh Rosen] Store record length as int instead of long. 026b497 [Josh Rosen] Re-use a buffer in UnsafeShuffleWriter 0748458 [Josh Rosen] Port UnsafeShuffleWriter to Java. 87e721b [Josh Rosen] Renaming and comments d3cc310 [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation e2d96ca [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used. e267cee [Josh Rosen] Fix compilation of UnsafeSorterSuite 9c6cf58 [Josh Rosen] Refactor to use DiskBlockObjectWriter. 253f13e [Josh Rosen] More cleanup 8e3ec20 [Josh Rosen] Begin code cleanup. 4d2f5e1 [Josh Rosen] WIP 3db12de [Josh Rosen] Minor simplification and sanity checks in UnsafeSorter 767d3ca [Josh Rosen] Fix invalid range in UnsafeSorter. e900152 [Josh Rosen] Add test for empty iterator in UnsafeSorter 57a4ea0 [Josh Rosen] Make initialSize configurable in UnsafeSorter abf7bfe [Josh Rosen] Add basic test case. 81d52c5 [Josh Rosen] WIP on UnsafeSorter
Refer to the JIRA for the design doc and some perf results.
I wanted to call out some of the more possibly controversial changes up front:
BlockObjectWriter.write
now accepts a key argument and a value argument instead of any object.