diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index 160ee6dc8d555..f3815ab14cb27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -29,6 +29,7 @@ import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.util.SerializableConfiguration @@ -48,7 +49,9 @@ case class BasicWriteTaskStats( /** * Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]]. */ -class BasicWriteTaskStatsTracker(hadoopConf: Configuration) +class BasicWriteTaskStatsTracker( + hadoopConf: Configuration, + taskCommitTimeMetric: Option[SQLMetric] = None) extends WriteTaskStatsTracker with Logging { private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty @@ -155,7 +158,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) numRows += 1 } - override def getFinalStats(): WriteTaskStats = { + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { submittedFiles.foreach(updateFileStats) submittedFiles.clear() @@ -170,6 +173,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) "This could be due to the output format not writing empty files, " + "or files being not immediately visible in the filesystem.") } + taskCommitTimeMetric.foreach(_ += taskCommitTime) BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows) } } @@ -183,14 +187,21 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) */ class BasicWriteJobStatsTracker( serializableHadoopConf: SerializableConfiguration, - @transient val metrics: Map[String, SQLMetric]) + @transient val driverSideMetrics: Map[String, SQLMetric], + taskCommitTimeMetric: SQLMetric) extends WriteJobStatsTracker { + def this( + serializableHadoopConf: SerializableConfiguration, + metrics: Map[String, SQLMetric]) = { + this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME)) + } + override def newTaskInstance(): WriteTaskStatsTracker = { - new BasicWriteTaskStatsTracker(serializableHadoopConf.value) + new BasicWriteTaskStatsTracker(serializableHadoopConf.value, Some(taskCommitTimeMetric)) } - override def processStats(stats: Seq[WriteTaskStats]): Unit = { + override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { val sparkContext = SparkContext.getActive.get var partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty var numFiles: Long = 0L @@ -206,13 +217,14 @@ class BasicWriteJobStatsTracker( totalNumOutput += summary.numRows } - metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles) - metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) - metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) - metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(partitionsSet.size) + driverSideMetrics(JOB_COMMIT_TIME).add(jobCommitTime) + driverSideMetrics(NUM_FILES_KEY).add(numFiles) + driverSideMetrics(NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) + driverSideMetrics(NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) + driverSideMetrics(NUM_PARTS_KEY).add(partitionsSet.size) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList) } } @@ -221,6 +233,8 @@ object BasicWriteJobStatsTracker { private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes" private val NUM_OUTPUT_ROWS_KEY = "numOutputRows" private val NUM_PARTS_KEY = "numParts" + val TASK_COMMIT_TIME = "taskCommitTime" + val JOB_COMMIT_TIME = "jobCommitTime" /** XAttr key of the data length header added in HADOOP-17414. */ val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length" @@ -230,7 +244,9 @@ object BasicWriteJobStatsTracker { NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"), NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"), NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part") + NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part"), + TASK_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "task commit time"), + JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time") ) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 365a9036e0a1e..815d8ac32341b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOut import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, Utils} /** * Abstract class for writing out data in a single Spark task. @@ -103,10 +103,13 @@ abstract class FileFormatDataWriter( */ override def commit(): WriteTaskResult = { releaseResources() + val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs { + committer.commitTask(taskAttemptContext) + } val summary = ExecutedWriteSummary( updatedPartitions = updatedPartitions.toSet, - stats = statsTrackers.map(_.getFinalStats())) - WriteTaskResult(committer.commitTask(taskAttemptContext), summary) + stats = statsTrackers.map(_.getFinalStats(taskCommitTime))) + WriteTaskResult(taskCommitMessage, summary) } def abort(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 6839a4db0bc28..cd3d101ac26e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -240,7 +240,7 @@ object FileFormatWriter extends Logging { val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) } logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.") - processStats(description.statsTrackers, ret.map(_.summary.stats)) + processStats(description.statsTrackers, ret.map(_.summary.stats), duration) logInfo(s"Finished processing stats for write job ${description.uuid}.") // return a set of all the partition paths that were updated during this job @@ -328,7 +328,8 @@ object FileFormatWriter extends Logging { */ private[datasources] def processStats( statsTrackers: Seq[WriteJobStatsTracker], - statsPerTask: Seq[Seq[WriteTaskStats]]) + statsPerTask: Seq[Seq[WriteTaskStats]], + jobCommitDuration: Long) : Unit = { val numStatsTrackers = statsTrackers.length @@ -345,7 +346,7 @@ object FileFormatWriter extends Logging { } statsTrackers.zip(statsPerTracker).foreach { - case (statsTracker, stats) => statsTracker.processStats(stats) + case (statsTracker, stats) => statsTracker.processStats(stats, jobCommitDuration) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala index f58aa33be8695..157ed0120bf3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala @@ -66,10 +66,11 @@ trait WriteTaskStatsTracker { /** * Returns the final statistics computed so far. + * @param taskCommitTime Time of committing the task. * @note This may only be called once. Further use of the object may lead to undefined behavior. * @return An object of subtype of [[WriteTaskStats]], to be sent to the driver. */ - def getFinalStats(): WriteTaskStats + def getFinalStats(taskCommitTime: Long): WriteTaskStats } @@ -93,6 +94,7 @@ trait WriteJobStatsTracker extends Serializable { * Process the given collection of stats computed during this job. * E.g. aggregate them, write them to memory / disk, issue warnings, whatever. * @param stats One [[WriteTaskStats]] object from each successful write task. + * @param jobCommitTime Time of committing the job. * @note The type of @param `stats` is too generic. These classes should probably be parametrized: * WriteTaskStatsTracker[S <: WriteTaskStats] * WriteJobStatsTracker[S <: WriteTaskStats, T <: WriteTaskStatsTracker[S]] @@ -103,5 +105,5 @@ trait WriteJobStatsTracker extends Serializable { * to the expected derived type when implementing this method in a derived class. * The framework will make sure to call this with the right arguments. */ - def processStats(stats: Seq[WriteTaskStats]): Unit + def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala index 7227e48bc9a11..ead5114d38680 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala @@ -36,7 +36,7 @@ class FileBatchWrite( val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, results.map(_.commitMsg)) } logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.") - processStats(description.statsTrackers, results.map(_.summary.stats)) + processStats(description.statsTrackers, results.map(_.summary.stats), duration) logInfo(s"Finished processing stats for write job ${description.uuid}.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala index 0237679942714..982e428312b7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala @@ -73,7 +73,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { } private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = { - tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] + tracker.getFinalStats(0L).asInstanceOf[BasicWriteTaskStats] } test("No files in run") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala index 82d873a2cd81b..e9f625b2ded9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CustomWriteTaskStatsTrackerSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow class CustomWriteTaskStatsTrackerSuite extends SparkFunSuite { def checkFinalStats(tracker: CustomWriteTaskStatsTracker, result: Map[String, Int]): Unit = { - assert(tracker.getFinalStats().asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result) + assert(tracker.getFinalStats(0L).asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result) } test("sequential file writing") { @@ -64,7 +64,7 @@ class CustomWriteTaskStatsTracker extends WriteTaskStatsTracker { numRowsPerFile(filePath) += 1 } - override def getFinalStats(): WriteTaskStats = { + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { CustomWriteTaskStats(numRowsPerFile.toMap) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 9d4fbd72b840d..32428fbde0016 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -22,6 +22,9 @@ import java.io.File import scala.reflect.{classTag, ClassTag} import scala.util.Random +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation @@ -29,6 +32,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, SQLHadoopMapReduceCommitProtocol} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.functions._ @@ -790,6 +794,24 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("SPARK-34399: Add job commit duration metrics for DataWritingCommand") { + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + "org.apache.spark.sql.execution.metric.CustomFileCommitProtocol") { + withTable("t", "t2") { + sql("CREATE TABLE t(id STRING) USING PARQUET") + val df = sql("INSERT INTO TABLE t SELECT 'abc'") + val insert = df.queryExecution.executedPlan.collect { + case CommandResultExec(_, dataWriting: DataWritingCommandExec, _) => dataWriting.cmd + } + assert(insert.size == 1) + assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.JOB_COMMIT_TIME)) + assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.TASK_COMMIT_TIME)) + assert(insert.head.metrics(BasicWriteJobStatsTracker.JOB_COMMIT_TIME).value > 0) + assert(insert.head.metrics(BasicWriteJobStatsTracker.TASK_COMMIT_TIME).value > 0) + } + } + } + test("SPARK-34567: Add metrics for CTAS operator") { withTable("t") { val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a") @@ -807,3 +829,15 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } } + +case class CustomFileCommitProtocol( + jobId: String, + path: String, + dynamicPartitionOverwrite: Boolean = false) + extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + override def commitTask( + taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = { + Thread.sleep(Random.nextInt(100)) + super.commitTask(taskContext) + } +}