Skip to content

Commit

Permalink
Add support for running multiple queries in BenchmarkRunner (NVIDIA#1591
Browse files Browse the repository at this point in the history
)

* Add support for running multiple queries in BenchmarkRunner

Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored and gerashegalov committed Jan 29, 2021
1 parent 68af473 commit 6b03c4a
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,11 @@ object BenchmarkRunner {
System.exit(-1)
}

if (conf.query.isEmpty) {
System.err.println("At least one query must be specified")
System.exit(-1)
}

val benchmarks = Map(
"tpcds" -> new TpcdsLikeBench(conf.appendDat()),
"tpch" -> new TpchLikeBench(),
Expand All @@ -51,7 +56,7 @@ object BenchmarkRunner {

benchmarks.get(conf.benchmark().toLowerCase) match {
case Some(bench) =>
val appName = s"${bench.name()} Like Bench ${conf.query()}"
val appName = s"${bench.name()} Like Bench ${conf.query().mkString(",")}"
val spark = SparkSession.builder.appName(appName).getOrCreate()
spark.sparkContext.setJobDescription("Register input tables")
conf.inputFormat().toLowerCase match {
Expand All @@ -64,61 +69,65 @@ object BenchmarkRunner {
}

val runner = new BenchmarkRunner(bench)
println(s"*** RUNNING ${bench.name()} QUERY ${conf.query()}")
val report = Try(conf.output.toOption match {
case Some(path) => conf.outputFormat().toLowerCase match {
case "parquet" =>
runner.writeParquet(
spark,
conf.query(),
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption,
gcBetweenRuns = conf.gcBetweenRuns())
case "csv" =>
runner.writeCsv(
spark,
conf.query(),
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption,
gcBetweenRuns = conf.gcBetweenRuns())
case "orc" =>
runner.writeOrc(

conf.query().foreach { query =>

println(s"*** RUNNING ${bench.name()} QUERY $query")
val report = Try(conf.output.toOption match {
case Some(path) => conf.outputFormat().toLowerCase match {
case "parquet" =>
runner.writeParquet(
spark,
query,
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption,
gcBetweenRuns = conf.gcBetweenRuns())
case "csv" =>
runner.writeCsv(
spark,
query,
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption,
gcBetweenRuns = conf.gcBetweenRuns())
case "orc" =>
runner.writeOrc(
spark,
query,
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption,
gcBetweenRuns = conf.gcBetweenRuns())
case other =>
throw new IllegalArgumentException(s"Invalid or unspecified output format: $other")
}
case _ =>
runner.collect(
spark,
conf.query(),
path,
iterations = conf.iterations(),
query,
conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption,
gcBetweenRuns = conf.gcBetweenRuns())
case other =>
throw new IllegalArgumentException(s"Invalid or unspecified output format: $other")
}
case _ =>
runner.collect(
spark,
conf.query(),
conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption,
gcBetweenRuns = conf.gcBetweenRuns())
})
})

report match {
case Success(report) =>
if (conf.uploadUri.isSupplied) {
println(s"Uploading ${report.filename} to " +
s"${conf.uploadUri()}/${report.filename}")
report match {
case Success(report) =>
if (conf.uploadUri.isSupplied) {
println(s"Uploading ${report.filename} to " +
s"${conf.uploadUri()}/${report.filename}")

val hadoopConf = spark.sparkContext.hadoopConfiguration
val fs = FileSystem.newInstance(new URI(conf.uploadUri()), hadoopConf)
fs.copyFromLocalFile(
new Path(report.filename),
new Path(conf.uploadUri(), report.filename))
}
val hadoopConf = spark.sparkContext.hadoopConfiguration
val fs = FileSystem.newInstance(new URI(conf.uploadUri()), hadoopConf)
fs.copyFromLocalFile(
new Path(report.filename),
new Path(conf.uploadUri(), report.filename))
}

case Failure(e) =>
System.err.println(e.getMessage)
System.exit(-1)
case Failure(e) =>
System.err.println(e.getMessage)
System.exit(-1)
}
}
case _ =>
System.err.println(s"Invalid benchmark name: ${conf.benchmark()}. Supported benchmarks " +
Expand Down Expand Up @@ -285,7 +294,7 @@ class BenchmarkConf(arguments: Seq[String]) extends ScallopConf(arguments) {
val input = opt[String](required = true)
val inputFormat = opt[String](required = true)
val appendDat = opt[Boolean](required = false, default = Some(false))
val query = opt[String](required = true)
val query = opt[List[String]](required = true)
val iterations = opt[Int](default = Some(3))
val output = opt[String](required = false)
val outputFormat = opt[String](required = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ object BenchUtils {
assert(iterations > 0)

val queryStartTime = Instant.now()
val logPrefix = s"[BENCHMARK RUNNER] [$queryDescription]"

val queryPlansWithMetrics = new ListBuffer[SparkPlanNode]()
val exceptions = new ListBuffer[String]()
Expand All @@ -171,7 +172,7 @@ object BenchUtils {
spark.listenerManager.register(new BenchmarkListener(queryPlansWithMetrics, exceptions))
}

println(s"*** Start iteration $i:")
println(s"$logPrefix Start iteration $i:")
val start = System.nanoTime()
try {
df = createDataFrame(spark)
Expand All @@ -189,13 +190,13 @@ object BenchUtils {
val end = System.nanoTime()
val elapsed = NANOSECONDS.toMillis(end - start)
queryTimes.append(elapsed)
println(s"*** Iteration $i took $elapsed msec.")
println(s"$logPrefix Iteration $i took $elapsed msec.")

} catch {
case e: Exception =>
val end = System.nanoTime()
val elapsed = NANOSECONDS.toMillis(end - start)
println(s"*** Iteration $i failed after $elapsed msec.")
println(s"$logPrefix Iteration $i failed after $elapsed msec.")
queryTimes.append(-1)
exceptions.append(BenchUtils.stackTraceAsString(e))
e.printStackTrace()
Expand All @@ -207,25 +208,25 @@ object BenchUtils {

// summarize all query times
for (i <- 0 until iterations) {
println(s"Iteration $i took ${queryTimes(i)} msec.")
println(s"$logPrefix Iteration $i took ${queryTimes(i)} msec.")
}

// for multiple runs, summarize cold/hot timings
if (iterations > 1) {
println(s"Cold run: ${queryTimes(0)} msec.")
println(s"$logPrefix Cold run: ${queryTimes(0)} msec.")
val hotRuns = queryTimes.drop(1)
val numHotRuns = hotRuns.length
println(s"Best of $numHotRuns hot run(s): ${hotRuns.min} msec.")
println(s"Worst of $numHotRuns hot run(s): ${hotRuns.max} msec.")
println(s"Average of $numHotRuns hot run(s): " +
println(s"$logPrefix Best of $numHotRuns hot run(s): ${hotRuns.min} msec.")
println(s"$logPrefix Worst of $numHotRuns hot run(s): ${hotRuns.max} msec.")
println(s"$logPrefix Average of $numHotRuns hot run(s): " +
s"${hotRuns.sum.toDouble / numHotRuns} msec.")
}
}

// write results to file
val suffix = if (exceptions.isEmpty) "" else "-failed"
val filename = s"$filenameStub-${queryStartTime.toEpochMilli}$suffix.json"
println(s"Saving benchmark report to $filename")
println(s"$logPrefix Saving benchmark report to $filename")

// try not to leak secrets
val redacted = Seq("TOKEN", "SECRET", "PASSWORD")
Expand Down

0 comments on commit 6b03c4a

Please sign in to comment.