-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26164][SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort #23163
Conversation
cc people who have most context for review - @cloud-fan, @tejasapatil and @sameeragarwal. Thanks! |
add to whitelist |
@c21 Any perf number? |
Test build #99412 has finished for PR 23163 at commit
|
From my employer company workload for query writing dynamic partitions, we see >20% reserved CPU time (executor wall clock time) reduction, and >20% disk spill size reduction, after rolling out the change to use concurrent writers instead of sort (i.e. hash-based write in this pr). I am not sure whether it's the performance number you were looking for. Let me know if anything needed. Thanks. In addition, I updated the pr, as I found I need to change |
Test build #99432 has finished for PR 23163 at commit
|
cc @cloud-fan and @gatorsmile: I think this pr is ready for review. Could you guys take a look when you have time? Thanks! |
retest this please |
Test build #99579 has finished for PR 23163 at commit
|
Test build #99585 has finished for PR 23163 at commit
|
Jenkins, retest this please |
Test build #99591 has finished for PR 23163 at commit
|
Jenkins, retest this please |
Test build #99624 has finished for PR 23163 at commit
|
retest this please |
Test build #99708 has finished for PR 23163 at commit
|
Test build #100673 has finished for PR 23163 at commit
|
Jenkins, retest this please |
Test build #100696 has finished for PR 23163 at commit
|
retest this please |
Test build #100714 has finished for PR 23163 at commit
|
Can one of the admins verify this patch? |
ping @c21 to update or close |
Is this still being actively worked on? |
@HyukjinKwon, @yizhu-wish - I am resuming on this, and will have update in next few days, thanks. |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
…tions and bucket table ### What changes were proposed in this pull request? This is a re-proposal of #23163. Currently spark always requires a [local sort](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L188) before writing to output table with dynamic partition/bucket columns. The sort can be unnecessary if cardinality of partition/bucket values is small, and can be avoided by keeping multiple output writers concurrently. This PR introduces a config `spark.sql.maxConcurrentOutputFileWriters` (which disables this feature by default), where user can tune the maximal number of concurrent writers. The config is needed here as we cannot keep arbitrary number of writers in task memory which can cause OOM (especially for Parquet/ORC vectorization writer). The feature is to first use concurrent writers to write rows. If the number of writers exceeds the above config specified limit. Sort rest of rows and write rows one by one (See `DynamicPartitionDataConcurrentWriter.writeWithIterator()`). In addition, interface `WriteTaskStatsTracker` and its implementation `BasicWriteTaskStatsTracker` are also changed because previously they are relying on the assumption that only one writer is active for writing dynamic partitions and bucketed table. ### Why are the changes needed? Avoid the sort before writing output for dynamic partitioned query and bucketed table. Help improve CPU and IO performance for these queries. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `DataFrameReaderWriterSuite.scala`. Closes #32198 from c21/writer. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently spark always requires a local sort before writing to output table on partition/bucket columns (see
write.requiredOrdering
inFileFormatWriter.scala
), which is unnecessary, and can be avoided by keeping multiple output writers concurrently inFileFormatDataWriter.scala
.This pr is first doing hash-based write, then falling back to sort-based write (current implementation) when number of opened writer exceeding a threshold (controlled by a config). Specifically:
(hash-based write) Maintain mapping between file path and output writer, and re-use writer for writing input row. In case of the number of opened output writers exceeding a threshold (can be changed by a config), we go to 2.
(sort-based write) Sort the rest of input rows (use the same sorter in SortExec). Then writing the rest of sorted rows, and we can close the writer on the fly, in case no more rows for current file path.
How was this patch tested?
Added unit test in
DataFrameReaderWriterSuite.scala
. Existing test likeSQLMetricsSuite.scala
would already exercise the code path of executor write metrics.