Skip to content

Commit

Permalink
Benchmark runner now produces failure report if the query plan is inv…
Browse files Browse the repository at this point in the history
…alid (NVIDIA#1299)

Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Dec 8, 2020
1 parent 97af8ec commit c8734ec
Showing 1 changed file with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ object BenchUtils {
val elapsed = NANOSECONDS.toMillis(end - start)
println(s"*** Iteration $i failed after $elapsed msec.")
queryTimes.append(-1)
exceptions.append(BenchUtils.toString(e))
exceptions.append(BenchUtils.stackTraceAsString(e))
e.printStackTrace()
}
}
Expand Down Expand Up @@ -241,9 +241,19 @@ object BenchUtils {
sparkConf = df.sparkSession.conf.getAll,
getSparkVersion)

// if the query plan is invalid, referencing the `executedPlan` lazy val
// can throw an exception
val executedPlanStr = try {
df.queryExecution.executedPlan.toString()
} catch {
case e: Exception =>
exceptions.append(stackTraceAsString(e))
"Failed to capture executedPlan - see exceptions in report"
}

val queryPlan = QueryPlan(
df.queryExecution.logical.toString(),
df.queryExecution.executedPlan.toString()
executedPlanStr
)

val report = resultsAction match {
Expand Down Expand Up @@ -662,7 +672,7 @@ object BenchUtils {
}
}

def toString(e: Exception): String = {
def stackTraceAsString(e: Throwable): String = {
val sw = new StringWriter()
val w = new PrintWriter(sw)
e.printStackTrace(w)
Expand All @@ -676,12 +686,21 @@ class BenchmarkListener(
exceptions: ListBuffer[String]) extends QueryExecutionListener {

override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
queryPlans += toJson(qe.executedPlan)
addQueryPlan(qe)
}

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
queryPlans += toJson(qe.executedPlan)
exceptions += BenchUtils.toString(exception)
addQueryPlan(qe)
exceptions += BenchUtils.stackTraceAsString(exception)
}

private def addQueryPlan(qe: QueryExecution) = {
try {
queryPlans += toJson(qe.executedPlan)
} catch {
case e: Exception =>
exceptions.append(BenchUtils.stackTraceAsString(e))
}
}

private def toJson(plan: SparkPlan): SparkPlanNode = {
Expand Down

0 comments on commit c8734ec

Please sign in to comment.