Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26164][SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort #23163

Closed
wants to merge 2 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Nov 28, 2018

What changes were proposed in this pull request?

Currently spark always requires a local sort before writing to output table on partition/bucket columns (see write.requiredOrdering in FileFormatWriter.scala), which is unnecessary, and can be avoided by keeping multiple output writers concurrently in FileFormatDataWriter.scala.

This pr is first doing hash-based write, then falling back to sort-based write (current implementation) when number of opened writer exceeding a threshold (controlled by a config). Specifically:

  1. (hash-based write) Maintain mapping between file path and output writer, and re-use writer for writing input row. In case of the number of opened output writers exceeding a threshold (can be changed by a config), we go to 2.

  2. (sort-based write) Sort the rest of input rows (use the same sorter in SortExec). Then writing the rest of sorted rows, and we can close the writer on the fly, in case no more rows for current file path.

How was this patch tested?

Added unit test in DataFrameReaderWriterSuite.scala. Existing test like SQLMetricsSuite.scala would already exercise the code path of executor write metrics.

@c21
Copy link
Contributor Author

c21 commented Nov 28, 2018

cc people who have most context for review - @cloud-fan, @tejasapatil and @sameeragarwal. Thanks!

@gatorsmile
Copy link
Member

add to whitelist

@gatorsmile
Copy link
Member

@c21 Any perf number?

@SparkQA
Copy link

SparkQA commented Nov 29, 2018

Test build #99412 has finished for PR 23163 at commit c2e81eb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Nov 29, 2018

@gatorsmile:

Any perf number?

From my employer company workload for query writing dynamic partitions, we see >20% reserved CPU time (executor wall clock time) reduction, and >20% disk spill size reduction, after rolling out the change to use concurrent writers instead of sort (i.e. hash-based write in this pr).

I am not sure whether it's the performance number you were looking for. Let me know if anything needed. Thanks.

In addition, I updated the pr, as I found I need to change BasicWriteTaskStatsTracker as well.

@SparkQA
Copy link

SparkQA commented Nov 29, 2018

Test build #99432 has finished for PR 23163 at commit a7ddb22.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Dec 1, 2018

cc @cloud-fan and @gatorsmile:

I think this pr is ready for review. Could you guys take a look when you have time? Thanks!
The test failure (fails due to an unknown error code, -9) seems to be unrelated to my change.

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 2, 2018

Test build #99579 has finished for PR 23163 at commit a7ddb22.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 3, 2018

Test build #99585 has finished for PR 23163 at commit 6cb993b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Dec 3, 2018

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Dec 3, 2018

Test build #99591 has finished for PR 23163 at commit 6cb993b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Dec 3, 2018

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Dec 3, 2018

Test build #99624 has finished for PR 23163 at commit 6cb993b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@heary-cao
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Dec 5, 2018

Test build #99708 has finished for PR 23163 at commit 6cb993b.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 3, 2019

Test build #100673 has finished for PR 23163 at commit 7c544ab.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Jan 3, 2019

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jan 3, 2019

Test build #100696 has finished for PR 23163 at commit 7c544ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jan 4, 2019

Test build #100714 has finished for PR 23163 at commit 7c544ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @gengliangwang

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

ping @c21 to update or close

@yizhu-wish
Copy link

Is this still being actively worked on?

@c21
Copy link
Contributor Author

c21 commented Feb 4, 2020

@HyukjinKwon, @yizhu-wish - I am resuming on this, and will have update in next few days, thanks.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label May 15, 2020
@github-actions github-actions bot closed this May 17, 2020
cloud-fan pushed a commit that referenced this pull request Apr 27, 2021
…tions and bucket table

### What changes were proposed in this pull request?

This is a re-proposal of #23163. Currently spark always requires a [local sort](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L188) before writing to output table with dynamic partition/bucket columns. The sort can be unnecessary if cardinality of partition/bucket values is small, and can be avoided by keeping multiple output writers concurrently.

This PR introduces a config `spark.sql.maxConcurrentOutputFileWriters` (which disables this feature by default), where user can tune the maximal number of concurrent writers. The config is needed here as we cannot keep arbitrary number of writers in task memory which can cause OOM (especially for Parquet/ORC vectorization writer).

The feature is to first use concurrent writers to write rows. If the number of writers exceeds the above config specified limit. Sort rest of rows and write rows one by one (See `DynamicPartitionDataConcurrentWriter.writeWithIterator()`).

In addition, interface `WriteTaskStatsTracker` and its implementation `BasicWriteTaskStatsTracker` are also changed because previously they are relying on the assumption that only one writer is active for writing dynamic partitions and bucketed table.

### Why are the changes needed?

Avoid the sort before writing output for dynamic partitioned query and bucketed table.
Help improve CPU and IO performance for these queries.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `DataFrameReaderWriterSuite.scala`.

Closes #32198 from c21/writer.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants