-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Record stage-level metrics when running benchmarks #847
Changes from all commits
ed56257
bdaccd0
3150762
9fdaa50
76c347b
1402b54
df9ed3f
94d2f2f
a9685f1
4f4a0f1
7dfd4f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,12 +22,14 @@ import java.util.concurrent.TimeUnit.NANOSECONDS | |
|
||
import scala.collection.convert.ImplicitConversions.`iterator asScala` | ||
import scala.collection.mutable.ListBuffer | ||
import scala.util.Try | ||
|
||
import org.json4s.DefaultFormats | ||
import org.json4s.jackson.JsonMethods.parse | ||
import org.json4s.jackson.Serialization.writePretty | ||
|
||
import org.apache.spark.{SPARK_BUILD_USER, SPARK_VERSION} | ||
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted} | ||
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} | ||
import org.apache.spark.sql.execution.{InputAdapter, QueryExecution, SparkPlan, WholeStageCodegenExec} | ||
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} | ||
|
@@ -127,14 +129,16 @@ object BenchUtils { | |
val queryStartTime = Instant.now() | ||
|
||
val queryPlansWithMetrics = new ListBuffer[SparkPlanNode]() | ||
val stageMetrics = new ListBuffer[StageMetrics]() | ||
|
||
var df: DataFrame = null | ||
val queryTimes = new ListBuffer[Long]() | ||
for (i <- 0 until iterations) { | ||
|
||
// capture spark plan metrics on the final run | ||
// capture spark metrics on the final run | ||
if (i+1 == iterations) { | ||
spark.listenerManager.register(new BenchmarkListener(queryPlansWithMetrics)) | ||
spark.sparkContext.addSparkListener(new BenchSparkListener(stageMetrics)) | ||
} | ||
|
||
println(s"*** Start iteration $i:") | ||
|
@@ -211,6 +215,7 @@ object BenchUtils { | |
queryDescription, | ||
queryPlan, | ||
queryPlansWithMetrics, | ||
stageMetrics, | ||
queryTimes) | ||
|
||
case w: WriteCsv => BenchmarkReport( | ||
|
@@ -223,6 +228,7 @@ object BenchUtils { | |
queryDescription, | ||
queryPlan, | ||
queryPlansWithMetrics, | ||
stageMetrics, | ||
queryTimes) | ||
|
||
case w: WriteParquet => BenchmarkReport( | ||
|
@@ -235,6 +241,7 @@ object BenchUtils { | |
queryDescription, | ||
queryPlan, | ||
queryPlansWithMetrics, | ||
stageMetrics, | ||
queryTimes) | ||
} | ||
|
||
|
@@ -549,6 +556,42 @@ class BenchmarkListener(list: ListBuffer[SparkPlanNode]) extends QueryExecutionL | |
} | ||
} | ||
|
||
class BenchSparkListener(executionMetrics: ListBuffer[StageMetrics]) extends SparkListener { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps rename executionMetrics to have stage in the name to be more clear |
||
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { | ||
val stageInfo = stageCompleted.stageInfo | ||
val taskMetrics = stageInfo.taskMetrics | ||
|
||
val stageMetrics = stageInfo.accumulables.map(acc => Try { | ||
val name = acc._2.name.getOrElse("") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure the usage of Try in this case? What benefit does it add because you have the getORElse - are you expecting _2 to be null? |
||
val value = acc._2.value.getOrElse(0L).asInstanceOf[Long] | ||
name -> value | ||
}).filter(_.isSuccess) | ||
.map(_.get) | ||
.filter(_._1.nonEmpty) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also here why do we filter out ones without names? the api allows you to create one without a name I would say we just pass it along with id. I guess maybe we are thinking about only ones we create will always have names? |
||
.toMap | ||
|
||
val taskMetricsSummary = Map( | ||
"executorDeserializeTime" -> taskMetrics.executorDeserializeTime, | ||
"executorDeserializeCpuTime" -> taskMetrics.executorDeserializeCpuTime, | ||
"executorRunTime" -> taskMetrics.executorRunTime, | ||
"executorCpuTime" -> taskMetrics.executorCpuTime, | ||
"resultSize" -> taskMetrics.resultSize, | ||
"jvmGCTime" -> taskMetrics.jvmGCTime, | ||
"resultSerializationTime" -> taskMetrics.resultSerializationTime, | ||
"memoryBytesSpilled" -> taskMetrics.memoryBytesSpilled, | ||
"diskBytesSpilled" -> taskMetrics.diskBytesSpilled, | ||
"peakExecutionMemory" -> taskMetrics.peakExecutionMemory | ||
) | ||
|
||
executionMetrics += StageMetrics( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we also want to keep if the stage failed? |
||
stageInfo.stageId, | ||
stageInfo.parentIds, | ||
stageInfo.numTasks, | ||
stageMetrics, | ||
taskMetricsSummary) | ||
} | ||
} | ||
|
||
/** Top level benchmark report class */ | ||
case class BenchmarkReport( | ||
filename: String, | ||
|
@@ -560,6 +603,7 @@ case class BenchmarkReport( | |
query: String, | ||
queryPlan: QueryPlan, | ||
queryPlans: Seq[SparkPlanNode], | ||
stageMetrics: Seq[StageMetrics], | ||
queryTimes: Seq[Long]) | ||
|
||
/** Configuration options that affect how the tests are run */ | ||
|
@@ -584,6 +628,15 @@ case class SparkSQLMetric( | |
metricType: String, | ||
value: Any) | ||
|
||
/** Summary of stage-level metrics */ | ||
case class StageMetrics( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit - its a little bit confusing with the name of this because it looks like a Spark name. like StageInfo. I wonder if we should add something to the name just to be easier to differentiate. Its not that big of a deal though. |
||
stageId: Int, | ||
parentIds: Seq[Int], | ||
taskCount: Int, | ||
stageMetrics: Map[String, Long], | ||
taskMetrics: Map[String, Long] | ||
) | ||
|
||
/** Details about the environment where the benchmark ran */ | ||
case class Environment( | ||
envVars: Map[String, String], | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have BenchmarkListener and BenchSparkListener, both of which I'm not sure what are from just the names. Perhaps we should give the more meaningful names. I get that they might be used for more then just stage metrics for instance, but would be nice to differentiate them somehow or use a common one.