Skip to content

Commit

Permalink
[SPARK-34399][SQL][3.2] Add commit duration to SQL tab's graph node
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Since we have add log about commit time, I think this useful and we can make user know it directly in SQL tab's UI.

![image](https://user-images.githubusercontent.com/46485123/126647754-dc3ba83a-5391-427c-8a67-e6af46e82290.png)

### Why are the changes needed?
Make user can directly know commit duration.

### Does this PR introduce _any_ user-facing change?
User can see file commit duration in SQL tab's SQL plan graph

### How was this patch tested?
Mannul tested

Closes #33553 from AngersZhuuuu/SPARK-34399-FOLLOWUP.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
AngersZhuuuu authored and cloud-fan committed Jul 30, 2021
1 parent 26b8297 commit a96e9e1
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.SerializableConfiguration

Expand All @@ -48,7 +49,9 @@ case class BasicWriteTaskStats(
/**
* Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]].
*/
class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
class BasicWriteTaskStatsTracker(
hadoopConf: Configuration,
taskCommitTimeMetric: Option[SQLMetric] = None)
extends WriteTaskStatsTracker with Logging {

private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty
Expand Down Expand Up @@ -155,7 +158,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
numRows += 1
}

override def getFinalStats(): WriteTaskStats = {
override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
submittedFiles.foreach(updateFileStats)
submittedFiles.clear()

Expand All @@ -170,6 +173,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
"This could be due to the output format not writing empty files, " +
"or files being not immediately visible in the filesystem.")
}
taskCommitTimeMetric.foreach(_ += taskCommitTime)
BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows)
}
}
Expand All @@ -183,14 +187,21 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
*/
class BasicWriteJobStatsTracker(
serializableHadoopConf: SerializableConfiguration,
@transient val metrics: Map[String, SQLMetric])
@transient val driverSideMetrics: Map[String, SQLMetric],
taskCommitTimeMetric: SQLMetric)
extends WriteJobStatsTracker {

def this(
serializableHadoopConf: SerializableConfiguration,
metrics: Map[String, SQLMetric]) = {
this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME))
}

override def newTaskInstance(): WriteTaskStatsTracker = {
new BasicWriteTaskStatsTracker(serializableHadoopConf.value)
new BasicWriteTaskStatsTracker(serializableHadoopConf.value, Some(taskCommitTimeMetric))
}

override def processStats(stats: Seq[WriteTaskStats]): Unit = {
override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = {
val sparkContext = SparkContext.getActive.get
var partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty
var numFiles: Long = 0L
Expand All @@ -206,13 +217,14 @@ class BasicWriteJobStatsTracker(
totalNumOutput += summary.numRows
}

metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles)
metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(partitionsSet.size)
driverSideMetrics(JOB_COMMIT_TIME).add(jobCommitTime)
driverSideMetrics(NUM_FILES_KEY).add(numFiles)
driverSideMetrics(NUM_OUTPUT_BYTES_KEY).add(totalNumBytes)
driverSideMetrics(NUM_OUTPUT_ROWS_KEY).add(totalNumOutput)
driverSideMetrics(NUM_PARTS_KEY).add(partitionsSet.size)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList)
}
}

Expand All @@ -221,6 +233,8 @@ object BasicWriteJobStatsTracker {
private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
private val NUM_PARTS_KEY = "numParts"
val TASK_COMMIT_TIME = "taskCommitTime"
val JOB_COMMIT_TIME = "jobCommitTime"
/** XAttr key of the data length header added in HADOOP-17414. */
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"

Expand All @@ -230,7 +244,9 @@ object BasicWriteJobStatsTracker {
NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"),
NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"),
NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part"),
TASK_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "task commit time"),
JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOut
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* Abstract class for writing out data in a single Spark task.
Expand Down Expand Up @@ -103,10 +103,13 @@ abstract class FileFormatDataWriter(
*/
override def commit(): WriteTaskResult = {
releaseResources()
val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
committer.commitTask(taskAttemptContext)
}
val summary = ExecutedWriteSummary(
updatedPartitions = updatedPartitions.toSet,
stats = statsTrackers.map(_.getFinalStats()))
WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
stats = statsTrackers.map(_.getFinalStats(taskCommitTime)))
WriteTaskResult(taskCommitMessage, summary)
}

def abort(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ object FileFormatWriter extends Logging {
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")

processStats(description.statsTrackers, ret.map(_.summary.stats))
processStats(description.statsTrackers, ret.map(_.summary.stats), duration)
logInfo(s"Finished processing stats for write job ${description.uuid}.")

// return a set of all the partition paths that were updated during this job
Expand Down Expand Up @@ -328,7 +328,8 @@ object FileFormatWriter extends Logging {
*/
private[datasources] def processStats(
statsTrackers: Seq[WriteJobStatsTracker],
statsPerTask: Seq[Seq[WriteTaskStats]])
statsPerTask: Seq[Seq[WriteTaskStats]],
jobCommitDuration: Long)
: Unit = {

val numStatsTrackers = statsTrackers.length
Expand All @@ -345,7 +346,7 @@ object FileFormatWriter extends Logging {
}

statsTrackers.zip(statsPerTracker).foreach {
case (statsTracker, stats) => statsTracker.processStats(stats)
case (statsTracker, stats) => statsTracker.processStats(stats, jobCommitDuration)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ trait WriteTaskStatsTracker {

/**
* Returns the final statistics computed so far.
* @param taskCommitTime Time of committing the task.
* @note This may only be called once. Further use of the object may lead to undefined behavior.
* @return An object of subtype of [[WriteTaskStats]], to be sent to the driver.
*/
def getFinalStats(): WriteTaskStats
def getFinalStats(taskCommitTime: Long): WriteTaskStats
}


Expand All @@ -93,6 +94,7 @@ trait WriteJobStatsTracker extends Serializable {
* Process the given collection of stats computed during this job.
* E.g. aggregate them, write them to memory / disk, issue warnings, whatever.
* @param stats One [[WriteTaskStats]] object from each successful write task.
* @param jobCommitTime Time of committing the job.
* @note The type of @param `stats` is too generic. These classes should probably be parametrized:
* WriteTaskStatsTracker[S <: WriteTaskStats]
* WriteJobStatsTracker[S <: WriteTaskStats, T <: WriteTaskStatsTracker[S]]
Expand All @@ -103,5 +105,5 @@ trait WriteJobStatsTracker extends Serializable {
* to the expected derived type when implementing this method in a derived class.
* The framework will make sure to call this with the right arguments.
*/
def processStats(stats: Seq[WriteTaskStats]): Unit
def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FileBatchWrite(
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, results.map(_.commitMsg)) }
logInfo(s"Write Job ${description.uuid} committed. Elapsed time: $duration ms.")

processStats(description.statsTrackers, results.map(_.summary.stats))
processStats(description.statsTrackers, results.map(_.summary.stats), duration)
logInfo(s"Finished processing stats for write job ${description.uuid}.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
}

private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = {
tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats]
tracker.getFinalStats(0L).asInstanceOf[BasicWriteTaskStats]
}

test("No files in run") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
class CustomWriteTaskStatsTrackerSuite extends SparkFunSuite {

def checkFinalStats(tracker: CustomWriteTaskStatsTracker, result: Map[String, Int]): Unit = {
assert(tracker.getFinalStats().asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result)
assert(tracker.getFinalStats(0L).asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result)
}

test("sequential file writing") {
Expand Down Expand Up @@ -64,7 +64,7 @@ class CustomWriteTaskStatsTracker extends WriteTaskStatsTracker {
numRowsPerFile(filePath) += 1
}

override def getFinalStats(): WriteTaskStats = {
override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
CustomWriteTaskStats(numRowsPerFile.toMap)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ import java.io.File
import scala.reflect.{classTag, ClassTag}
import scala.util.Random

import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, SQLHadoopMapReduceCommitProtocol}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -790,6 +794,24 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}

test("SPARK-34399: Add job commit duration metrics for DataWritingCommand") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
"org.apache.spark.sql.execution.metric.CustomFileCommitProtocol") {
withTable("t", "t2") {
sql("CREATE TABLE t(id STRING) USING PARQUET")
val df = sql("INSERT INTO TABLE t SELECT 'abc'")
val insert = df.queryExecution.executedPlan.collect {
case CommandResultExec(_, dataWriting: DataWritingCommandExec, _) => dataWriting.cmd
}
assert(insert.size == 1)
assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.JOB_COMMIT_TIME))
assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.TASK_COMMIT_TIME))
assert(insert.head.metrics(BasicWriteJobStatsTracker.JOB_COMMIT_TIME).value > 0)
assert(insert.head.metrics(BasicWriteJobStatsTracker.TASK_COMMIT_TIME).value > 0)
}
}
}

test("SPARK-34567: Add metrics for CTAS operator") {
withTable("t") {
val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a")
Expand All @@ -807,3 +829,15 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}
}

case class CustomFileCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) {
override def commitTask(
taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = {
Thread.sleep(Random.nextInt(100))
super.commitTask(taskContext)
}
}

0 comments on commit a96e9e1

Please sign in to comment.