Skip to content

Commit

Permalink
[SPARK-26164][SQL] Allow concurrent writers for writing dynamic parti…
Browse files Browse the repository at this point in the history
…tions and bucket table

### What changes were proposed in this pull request?

This is a re-proposal of #23163. Currently spark always requires a [local sort](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L188) before writing to output table with dynamic partition/bucket columns. The sort can be unnecessary if cardinality of partition/bucket values is small, and can be avoided by keeping multiple output writers concurrently.

This PR introduces a config `spark.sql.maxConcurrentOutputFileWriters` (which disables this feature by default), where user can tune the maximal number of concurrent writers. The config is needed here as we cannot keep arbitrary number of writers in task memory which can cause OOM (especially for Parquet/ORC vectorization writer).

The feature is to first use concurrent writers to write rows. If the number of writers exceeds the above config specified limit. Sort rest of rows and write rows one by one (See `DynamicPartitionDataConcurrentWriter.writeWithIterator()`).

In addition, interface `WriteTaskStatsTracker` and its implementation `BasicWriteTaskStatsTracker` are also changed because previously they are relying on the assumption that only one writer is active for writing dynamic partitions and bucketed table.

### Why are the changes needed?

Avoid the sort before writing output for dynamic partitioned query and bucketed table.
Help improve CPU and IO performance for these queries.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `DataFrameReaderWriterSuite.scala`.

Closes #32198 from c21/writer.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
c21 authored and cloud-fan committed Apr 27, 2021
1 parent 7779fce commit 7f51106
Show file tree
Hide file tree
Showing 18 changed files with 403 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.types._

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[avro] class AvroOutputWriter(
path: String,
val path: String,
context: TaskAttemptContext,
schema: StructType,
avroSchema: Schema) extends OutputWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

private[libsvm] class LibSVMOutputWriter(
path: String,
val path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3150,6 +3150,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val MAX_CONCURRENT_OUTPUT_FILE_WRITERS = buildConf("spark.sql.maxConcurrentOutputFileWriters")
.internal()
.doc("Maximum number of output file writers to use concurrently. If number of writers " +
"needed reaches this limit, task will sort rest of output then writing them.")
.version("3.2.0")
.intConf
.createWithDefault(0)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3839,6 +3847,8 @@ class SQLConf extends Serializable with Logging {

def decorrelateInnerQueryEnabled: Boolean = getConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED)

def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)

private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty
private[this] var numFiles: Int = 0
private[this] var submittedFiles: Int = 0
private[this] var numSubmittedFiles: Int = 0
private[this] var numBytes: Long = 0L
private[this] var numRows: Long = 0L

private[this] var curFile: Option[String] = None
private[this] val submittedFiles = mutable.HashSet[String]()

/**
* Get the size of the file expected to have been written by a worker.
Expand Down Expand Up @@ -134,23 +134,20 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
partitions.append(partitionValues)
}

override def newBucket(bucketId: Int): Unit = {
// currently unhandled
override def newFile(filePath: String): Unit = {
submittedFiles += filePath
numSubmittedFiles += 1
}

override def newFile(filePath: String): Unit = {
statCurrentFile()
curFile = Some(filePath)
submittedFiles += 1
override def closeFile(filePath: String): Unit = {
updateFileStats(filePath)
submittedFiles.remove(filePath)
}

private def statCurrentFile(): Unit = {
curFile.foreach { path =>
getFileSize(path).foreach { len =>
numBytes += len
numFiles += 1
}
curFile = None
private def updateFileStats(filePath: String): Unit = {
getFileSize(filePath).foreach { len =>
numBytes += len
numFiles += 1
}
}

Expand All @@ -159,16 +156,17 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
}

override def getFinalStats(): WriteTaskStats = {
statCurrentFile()
submittedFiles.foreach(updateFileStats)
submittedFiles.clear()

// Reports bytesWritten and recordsWritten to the Spark output metrics.
Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics =>
outputMetrics.setBytesWritten(numBytes)
outputMetrics.setRecordsWritten(numRows)
}

if (submittedFiles != numFiles) {
logInfo(s"Expected $submittedFiles files, but only saw $numFiles. " +
if (numSubmittedFiles != numFiles) {
logInfo(s"Expected $numSubmittedFiles files, but only saw $numFiles. " +
"This could be due to the output format not writing empty files, " +
"or files being not immediately visible in the filesystem.")
}
Expand Down
Loading

0 comments on commit 7f51106

Please sign in to comment.