Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Record stage-level metrics when running benchmarks #847

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.convert.ImplicitConversions.`iterator asScala`
import scala.collection.mutable.ListBuffer
import scala.util.Try

import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.json4s.jackson.Serialization.writePretty

import org.apache.spark.{SPARK_BUILD_USER, SPARK_VERSION}
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.execution.{InputAdapter, QueryExecution, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
Expand Down Expand Up @@ -127,14 +129,16 @@ object BenchUtils {
val queryStartTime = Instant.now()

val queryPlansWithMetrics = new ListBuffer[SparkPlanNode]()
val stageMetrics = new ListBuffer[StageMetrics]()

var df: DataFrame = null
val queryTimes = new ListBuffer[Long]()
for (i <- 0 until iterations) {

// capture spark plan metrics on the final run
// capture spark metrics on the final run
if (i+1 == iterations) {
spark.listenerManager.register(new BenchmarkListener(queryPlansWithMetrics))
spark.sparkContext.addSparkListener(new BenchSparkListener(stageMetrics))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have BenchmarkListener and BenchSparkListener, both of which I'm not sure what are from just the names. Perhaps we should give the more meaningful names. I get that they might be used for more then just stage metrics for instance, but would be nice to differentiate them somehow or use a common one.

}

println(s"*** Start iteration $i:")
Expand Down Expand Up @@ -211,6 +215,7 @@ object BenchUtils {
queryDescription,
queryPlan,
queryPlansWithMetrics,
stageMetrics,
queryTimes)

case w: WriteCsv => BenchmarkReport(
Expand All @@ -223,6 +228,7 @@ object BenchUtils {
queryDescription,
queryPlan,
queryPlansWithMetrics,
stageMetrics,
queryTimes)

case w: WriteParquet => BenchmarkReport(
Expand All @@ -235,6 +241,7 @@ object BenchUtils {
queryDescription,
queryPlan,
queryPlansWithMetrics,
stageMetrics,
queryTimes)
}

Expand Down Expand Up @@ -549,6 +556,42 @@ class BenchmarkListener(list: ListBuffer[SparkPlanNode]) extends QueryExecutionL
}
}

class BenchSparkListener(executionMetrics: ListBuffer[StageMetrics]) extends SparkListener {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps rename executionMetrics to have stage in the name to be more clear

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageInfo = stageCompleted.stageInfo
val taskMetrics = stageInfo.taskMetrics

val stageMetrics = stageInfo.accumulables.map(acc => Try {
val name = acc._2.name.getOrElse("")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the usage of Try in this case? What benefit does it add because you have the getORElse - are you expecting _2 to be null?
Also not sure how much it happens but if they don't have names and multiple accumulators are "" we are going to lose that information, perhaps we should use id

val value = acc._2.value.getOrElse(0L).asInstanceOf[Long]
name -> value
}).filter(_.isSuccess)
.map(_.get)
.filter(_._1.nonEmpty)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here why do we filter out ones without names? the api allows you to create one without a name I would say we just pass it along with id. I guess maybe we are thinking about only ones we create will always have names?

.toMap

val taskMetricsSummary = Map(
"executorDeserializeTime" -> taskMetrics.executorDeserializeTime,
"executorDeserializeCpuTime" -> taskMetrics.executorDeserializeCpuTime,
"executorRunTime" -> taskMetrics.executorRunTime,
"executorCpuTime" -> taskMetrics.executorCpuTime,
"resultSize" -> taskMetrics.resultSize,
"jvmGCTime" -> taskMetrics.jvmGCTime,
"resultSerializationTime" -> taskMetrics.resultSerializationTime,
"memoryBytesSpilled" -> taskMetrics.memoryBytesSpilled,
"diskBytesSpilled" -> taskMetrics.diskBytesSpilled,
"peakExecutionMemory" -> taskMetrics.peakExecutionMemory
)

executionMetrics += StageMetrics(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also want to keep if the stage failed?

stageInfo.stageId,
stageInfo.parentIds,
stageInfo.numTasks,
stageMetrics,
taskMetricsSummary)
}
}

/** Top level benchmark report class */
case class BenchmarkReport(
filename: String,
Expand All @@ -560,6 +603,7 @@ case class BenchmarkReport(
query: String,
queryPlan: QueryPlan,
queryPlans: Seq[SparkPlanNode],
stageMetrics: Seq[StageMetrics],
queryTimes: Seq[Long])

/** Configuration options that affect how the tests are run */
Expand All @@ -584,6 +628,15 @@ case class SparkSQLMetric(
metricType: String,
value: Any)

/** Summary of stage-level metrics */
case class StageMetrics(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - its a little bit confusing with the name of this because it looks like a Spark name. like StageInfo. I wonder if we should add something to the name just to be easier to differentiate. Its not that big of a deal though.

stageId: Int,
parentIds: Seq[Int],
taskCount: Int,
stageMetrics: Map[String, Long],
taskMetrics: Map[String, Long]
)

/** Details about the environment where the benchmark ran */
case class Environment(
envVars: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.nvidia.spark.rapids.AdaptiveQueryExecSuite.TEST_FILES_ROOT
import com.nvidia.spark.rapids.TestUtils
import org.scalatest.{BeforeAndAfterEach, FunSuite}

import org.apache.spark.sql.SparkSession

object BenchUtilsSuite {
val TEST_FILES_ROOT: File = TestUtils.getTempDir(this.getClass.getSimpleName)
}
Expand All @@ -35,6 +37,29 @@ class BenchUtilsSuite extends FunSuite with BeforeAndAfterEach {
org.apache.commons.io.FileUtils.deleteDirectory(TEST_FILES_ROOT)
}

test("collect metrics") {
val spark = SparkSession.builder().master("local[*]").getOrCreate()

val filenameStub = s"test-collect-metrics"

BenchUtils.runBench(
spark,
spark => spark.range(100).toDF("a"),
Collect(),
queryDescription = "test",
filenameStub = new File(TEST_FILES_ROOT, filenameStub).getAbsolutePath,
iterations = 1,
gcBetweenRuns = false
)

val files = TEST_FILES_ROOT.list((_: File, s: String) => s.startsWith(filenameStub))
assert(files.length==1)

val report = BenchUtils.readReport(new File(TEST_FILES_ROOT, files.head))
assert(report.stageMetrics.nonEmpty)
}


test("round-trip serialize benchmark results") {

val report = BenchmarkReport(
Expand All @@ -50,6 +75,7 @@ class BenchUtilsSuite extends FunSuite with BeforeAndAfterEach {
query = "q1",
queryPlan = QueryPlan("logical", "physical"),
Seq.empty,
Seq.empty,
queryTimes = Seq(99, 88, 77))

val filename = s"$TEST_FILES_ROOT/BenchUtilsSuite-${System.currentTimeMillis()}.json"
Expand Down