Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command-line argument for benchmark result filename #904

Merged
merged 2 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>

<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>

<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>

</plugin>
<!-- disable surefire as we are using scalatest only -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@ object TpcdsLikeBench extends Logging {
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def collect(
spark: SparkSession,
query: String,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.collect(
spark,
spark => TpcdsLikeSpark.query(query)(spark),
query,
s"tpcds-$query-collect",
summaryFilePrefix.getOrElse(s"tpcds-$query-collect"),
iterations,
gcBetweenRuns)
}
Expand All @@ -62,6 +64,7 @@ object TpcdsLikeBench extends Logging {
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
Expand All @@ -72,12 +75,13 @@ object TpcdsLikeBench extends Logging {
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.writeCsv(
spark,
spark => TpcdsLikeSpark.query(query)(spark),
query,
s"tpcds-$query-csv",
summaryFilePrefix.getOrElse(s"tpcds-$query-csv"),
iterations,
gcBetweenRuns,
path,
Expand All @@ -97,6 +101,7 @@ object TpcdsLikeBench extends Logging {
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
Expand All @@ -107,12 +112,13 @@ object TpcdsLikeBench extends Logging {
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.writeParquet(
spark,
spark => TpcdsLikeSpark.query(query)(spark),
query,
s"tpcds-$query-parquet",
summaryFilePrefix.getOrElse(s"tpcds-$query-parquet"),
iterations,
gcBetweenRuns,
path,
Expand Down Expand Up @@ -143,19 +149,25 @@ object TpcdsLikeBench extends Logging {
spark,
conf.query(),
path,
iterations = conf.iterations())
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case "csv" =>
writeCsv(
spark,
conf.query(),
path,
iterations = conf.iterations())
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case _ =>
println("Invalid or unspecified output format")
System.exit(-1)
}
case _ =>
collect(spark, conf.query(), conf.iterations())
collect(
spark,
conf.query(),
conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
}
}
}
Expand All @@ -167,6 +179,7 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val iterations = opt[Int](default = Some(3))
val output = opt[String](required = false)
val outputFormat = opt[String](required = false)
val summaryFilePrefix = opt[String](required = false)
verify()
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ object TpchLikeBench {
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def collect(
spark: SparkSession,
query: String,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.collect(
spark,
spark => getQuery(query)(spark),
query,
s"tpch-$query-collect",
summaryFilePrefix.getOrElse(s"tpch-$query-collect"),
iterations,
gcBetweenRuns)
}
Expand All @@ -61,6 +63,7 @@ object TpchLikeBench {
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
Expand All @@ -71,12 +74,13 @@ object TpchLikeBench {
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.writeCsv(
spark,
spark => getQuery(query)(spark),
query,
s"tpch-$query-csv",
summaryFilePrefix.getOrElse(s"tpch-$query-csv"),
iterations,
gcBetweenRuns,
path,
Expand All @@ -96,6 +100,7 @@ object TpchLikeBench {
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
Expand All @@ -106,12 +111,13 @@ object TpchLikeBench {
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.writeParquet(
spark,
spark => getQuery(query)(spark),
query,
s"tpch-$query-parquet",
summaryFilePrefix.getOrElse(s"tpch-$query-parquet"),
iterations,
gcBetweenRuns,
path,
Expand Down Expand Up @@ -142,19 +148,25 @@ object TpchLikeBench {
spark,
conf.query(),
path,
iterations = conf.iterations())
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case "csv" =>
writeCsv(
spark,
conf.query(),
path,
iterations = conf.iterations())
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case _ =>
println("Invalid or unspecified output format")
System.exit(-1)
}
case _ =>
collect(spark, conf.query(), conf.iterations())
collect(
spark,
conf.query(),
conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
}
}

Expand Down Expand Up @@ -193,5 +205,6 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val iterations = opt[Int](default = Some(3))
val output = opt[String](required = false)
val outputFormat = opt[String](required = false)
val summaryFilePrefix = opt[String](required = false)
verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ object TpcxbbLikeBench extends Logging {
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def collect(
spark: SparkSession,
query: String,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.collect(
spark,
spark => getQuery(query)(spark),
query,
s"tpcxbb-$query-collect",
summaryFilePrefix.getOrElse(s"tpcxbb-$query-collect"),
iterations,
gcBetweenRuns)
}
Expand All @@ -61,6 +63,7 @@ object TpcxbbLikeBench extends Logging {
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
Expand All @@ -71,12 +74,13 @@ object TpcxbbLikeBench extends Logging {
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.writeCsv(
spark,
spark => getQuery(query)(spark),
query,
s"tpcxbb-$query-csv",
summaryFilePrefix.getOrElse(s"tpcxbb-$query-csv"),
iterations,
gcBetweenRuns,
path,
Expand All @@ -96,6 +100,7 @@ object TpcxbbLikeBench extends Logging {
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
Expand All @@ -106,12 +111,13 @@ object TpcxbbLikeBench extends Logging {
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.writeParquet(
spark,
spark => getQuery(query)(spark),
query,
s"tpcxbb-$query-parquet",
summaryFilePrefix.getOrElse(s"tpcxbb-$query-parquet"),
iterations,
gcBetweenRuns,
path,
Expand Down Expand Up @@ -140,19 +146,25 @@ object TpcxbbLikeBench extends Logging {
spark,
conf.query(),
path,
iterations = conf.iterations())
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case "csv" =>
writeCsv(
spark,
conf.query(),
path,
iterations = conf.iterations())
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case _ =>
println("Invalid or unspecified output format")
System.exit(-1)
}
case _ =>
collect(spark, conf.query(), conf.iterations())
collect(
spark,
conf.query(),
conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
}
}

Expand Down Expand Up @@ -207,5 +219,6 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val iterations = opt[Int](default = Some(3))
val output = opt[String](required = false)
val outputFormat = opt[String](required = false)
val summaryFilePrefix = opt[String](required = false)
verify()
}