-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26164][SQL] Allow concurrent writers for writing dynamic partitions and bucket table #32198
Conversation
cc @cloud-fan and @maropu could you help take a look when you have time? Thanks. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
@c21 would you mind rebasing w/ the latest master branch? Seems like your branch is based on the old master branch. |
Test build #754194290 for PR 32198 at commit |
@HyukjinKwon - thanks for the heads up, updated. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #137451 has finished for PR 32198 at commit
|
Test build #137459 has finished for PR 32198 at commit
|
} | ||
} | ||
|
||
sealed abstract class WriterMode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This abstraction is a bit confusing. single writer or concurrent writers are like a mode that is decided statically. before-sort and after-sort are more like runtime states instead of mode.
I'd expect different FileFormatDataWriter
implementations for single and concurrent writers, and the concurrent writers implementation has a boolean state to indicate before and after sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - sounds good I agree with it. Will re-structure the code.
Btw what do you think of change in WriteTaskStatsTracker
and BasicWriteTaskStatsTracker
? Do you have any concern with those interface change?
* Keep all writers open and write rows one by one. | ||
* - Step 2: If number of concurrent writers exceeds limit, sort rest of rows. Write rows | ||
* one by one, and eagerly close the writer when finishing each partition and/or | ||
* bucket. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean we can have limit + 1
writers at most?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
var outputWriter: OutputWriter, | ||
var recordsInFile: Long, | ||
var fileCounter: Int, | ||
var filePath: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean the latest file path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because we may create a new file if exceeding limit of number of records.
@cloud-fan - updated the PR to keep single and concurrent writers implementation separately. The PR is ready for review again, thanks. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #137714 has finished for PR 32198 at commit
|
@@ -3150,6 +3150,14 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(false) | |||
|
|||
val MAX_CONCURRENT_OUTPUT_WRITERS = buildConf("spark.sql.maxConcurrentOutputWriters") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxConcurrentOutputFileWriters
? To indicate it's for file source only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - updated.
numFiles += 1 | ||
} | ||
curFile = None | ||
private def getFileStats(filePath: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems it's not getFileStats
, but updateFileStats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - updated.
@@ -47,6 +48,7 @@ abstract class FileFormatDataWriter( | |||
protected val MAX_FILE_COUNTER: Int = 1000 * 1000 | |||
protected val updatedPartitions: mutable.Set[String] = mutable.Set[String]() | |||
protected var currentWriter: OutputWriter = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems all OutputWriter
implementations have a path string. Shall we simply add a def path: String
in OutputWriter
? Then we don't need the currentPath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - makes sense. Af first place I was hesitating to make broader change of interface OutputWriter
. But updated now.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
Show resolved
Hide resolved
var bucketId: Option[Int]) | ||
|
||
/** Wrapper class for status of a unique concurrent output writer. */ | ||
private case class WriterStatus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its fields are all var
, we can make it a class instead of case class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - updated.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
Show resolved
Hide resolved
dataWriter.write(iterator.next()) | ||
dataWriter match { | ||
case w: DynamicPartitionDataConcurrentWriter => | ||
w.writeWithIterator(iterator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make it an API in the base class, which by default just do
while (iterator.hasNext) {
write(iterator.next())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - wondering what's the benefit of doing it? Updated anyway now. After a second thought I think I get it. You want to avoid the pattern matching here, updated.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
Outdated
Show resolved
Hide resolved
test("SPARK-26164: Allow concurrent writers for multiple partitions and buckets") { | ||
withTable("t1", "t2") { | ||
val df = spark.range(200).map(_ => { | ||
val n = scala.util.Random.nextInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use a fixed seed in the test? Otherwise there is a small possibility that the distinct values are less than 3 and the fallback test doesn't trigger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - good call. Updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except some minor comments
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed all comments besides the ones I replied back with questions. cc @cloud-fan thanks.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
Show resolved
Hide resolved
statsTrackers.foreach(_.newPartition(currentWriterId.partitionValues.get)) | ||
} | ||
} | ||
retrieveWriterInMap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - updated.
s" which is beyond max value ${concurrentOutputWriterSpec.maxWriters + 1}") | ||
} | ||
concurrentWriters.put( | ||
WriterIndex(currentWriterId.partitionValues, currentWriterId.bucketId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - updated.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
Show resolved
Hide resolved
test("SPARK-26164: Allow concurrent writers for multiple partitions and buckets") { | ||
withTable("t1", "t2") { | ||
val df = spark.range(200).map(_ => { | ||
val n = scala.util.Random.nextInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - good call. Updated.
Kubernetes integration test starting |
Kubernetes integration test status failure |
*/ | ||
private def clearCurrentWriterStatus(): Unit = { | ||
if (currentWriterId.partitionValues.isDefined || currentWriterId.bucketId.isDefined) { | ||
updateCurrentWriterStatusInMap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we call it right after when sorted
becomes true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish I could do it to tight the logic more closely, but unfortunately no. We need to write a record (writeRecord
) between (1).set the sorted
to true (setupCurrentWriterUsingMap
) and (2).clean up current writer status (clearCurrentWriterStatus
).
writeRecord
will change the status of recordsInFile
to be increased by 1.
Test build #137935 has finished for PR 32198 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #137970 has finished for PR 32198 at commit
|
thanks, merging to master! |
Thank you @cloud-fan for all the dedicated help and careful review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late +1, thanks @c21!
@c21 this doesn't do any sort of memory tracking right? How do you avoid OOMs? |
One more thing, how much does this improve the write? Local sorts before the write are typically not too bad if you look at the cycles spend during the write. A much bigger target here would be to properly interleave I/O and CPU operations. You sort of achieve that by having multiple writers, but it IMO feels like quite a big hammer. |
Yes. It seems to me there's no way to track the memory usage accurately because writer is using on-heap memory. And we need memory usage information available to retrieve from each individual writer implementation (Parquet, ORC, Aveo, etc), which is not the case right now. One immature idea though is to look at executor JVM heap memory usage (which I think should already be captured). |
Note the feature is designed to be disabled by default, and to be enabled case by case now. The fallback logic here is intended to avoid OOM when opening too many writers. |
I will add a benchmark for this as a followup. IMHO how much this can improve thing is really depending on query shape (cardinality of dynamic partitions and buckets). In one environment, if most queries having low number of partitions and users set buckets relatively small, this feature can help more. If in another environment, query tends to write a lot of partitions and users set buckets quite large, this feature helps less. We do see benefit for improving query internally and people raised the request in spark dev as well. |
…file the row is written to ### What changes were proposed in this pull request? This is a follow-up of #32198 Before #32198, in `WriteTaskStatsTracker.newRow`, we know that the row is written to the current file. After #32198 , we no longer know this connection. This PR adds the file path parameter in `WriteTaskStatsTracker.newRow` to bring back the connection. ### Why are the changes needed? To not break some custom `WriteTaskStatsTracker` implementations. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes #32459 from cloud-fan/minor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@c21 Hi~ Any link for benchmark? Thanks~ |
What changes were proposed in this pull request?
This is a re-proposal of #23163. Currently spark always requires a local sort before writing to output table with dynamic partition/bucket columns. The sort can be unnecessary if cardinality of partition/bucket values is small, and can be avoided by keeping multiple output writers concurrently.
This PR introduces a config
spark.sql.maxConcurrentOutputFileWriters
(which disables this feature by default), where user can tune the maximal number of concurrent writers. The config is needed here as we cannot keep arbitrary number of writers in task memory which can cause OOM (especially for Parquet/ORC vectorization writer).The feature is to first use concurrent writers to write rows. If the number of writers exceeds the above config specified limit. Sort rest of rows and write rows one by one (See
DynamicPartitionDataConcurrentWriter.writeWithIterator()
).In addition, interface
WriteTaskStatsTracker
and its implementationBasicWriteTaskStatsTracker
are also changed because previously they are relying on the assumption that only one writer is active for writing dynamic partitions and bucketed table.Why are the changes needed?
Avoid the sort before writing output for dynamic partitioned query and bucketed table.
Help improve CPU and IO performance for these queries.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test in
DataFrameReaderWriterSuite.scala
.