diff --git a/integration_tests/pom.xml b/integration_tests/pom.xml index 18ef64ec5a8e..b2fd7f2c8572 100644 --- a/integration_tests/pom.xml +++ b/integration_tests/pom.xml @@ -126,6 +126,26 @@ + + maven-assembly-plugin + + + + jar-with-dependencies + + + + + + make-assembly + package + + single + + + + + org.apache.maven.plugins diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala index 34790a8ba3f3..d5f503b33501 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala @@ -33,6 +33,7 @@ object TpcdsLikeBench extends Logging { * @param spark The Spark session * @param query The name of the query to run e.g. "q5" * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to * call `unregisterShuffle` */ @@ -40,12 +41,13 @@ object TpcdsLikeBench extends Logging { spark: SparkSession, query: String, iterations: Int = 3, + summaryFilePrefix: Option[String] = None, gcBetweenRuns: Boolean = false): Unit = { BenchUtils.collect( spark, spark => TpcdsLikeSpark.query(query)(spark), query, - s"tpcds-$query-collect", + summaryFilePrefix.getOrElse(s"tpcds-$query-collect"), iterations, gcBetweenRuns) } @@ -62,6 +64,7 @@ object TpcdsLikeBench extends Logging { * @param mode The SaveMode to use when writing the results * @param writeOptions Write options * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to * call `unregisterShuffle` */ @@ -72,12 +75,13 @@ object TpcdsLikeBench extends Logging { mode: SaveMode = SaveMode.Overwrite, writeOptions: Map[String, String] = Map.empty, iterations: Int = 3, + summaryFilePrefix: Option[String] = None, gcBetweenRuns: Boolean = false): Unit = { BenchUtils.writeCsv( spark, spark => TpcdsLikeSpark.query(query)(spark), query, - s"tpcds-$query-csv", + summaryFilePrefix.getOrElse(s"tpcds-$query-csv"), iterations, gcBetweenRuns, path, @@ -97,6 +101,7 @@ object TpcdsLikeBench extends Logging { * @param mode The SaveMode to use when writing the results * @param writeOptions Write options * @param iterations The number of times to run the query + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to * call `unregisterShuffle` */ @@ -107,12 +112,13 @@ object TpcdsLikeBench extends Logging { mode: SaveMode = SaveMode.Overwrite, writeOptions: Map[String, String] = Map.empty, iterations: Int = 3, + summaryFilePrefix: Option[String] = None, gcBetweenRuns: Boolean = false): Unit = { BenchUtils.writeParquet( spark, spark => TpcdsLikeSpark.query(query)(spark), query, - s"tpcds-$query-parquet", + summaryFilePrefix.getOrElse(s"tpcds-$query-parquet"), iterations, gcBetweenRuns, path, @@ -143,19 +149,25 @@ object TpcdsLikeBench extends Logging { spark, conf.query(), path, - iterations = conf.iterations()) + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case "csv" => writeCsv( spark, conf.query(), path, - iterations = conf.iterations()) + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case _ => println("Invalid or unspecified output format") System.exit(-1) } case _ => - collect(spark, conf.query(), conf.iterations()) + collect( + spark, + conf.query(), + conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) } } } @@ -167,6 +179,7 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { val iterations = opt[Int](default = Some(3)) val output = opt[String](required = false) val outputFormat = opt[String](required = false) + val summaryFilePrefix = opt[String](required = false) verify() } diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala index f1b5c8bf106f..f41ab97432f0 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala @@ -32,6 +32,7 @@ object TpchLikeBench { * @param spark The Spark session * @param query The name of the query to run e.g. "q5" * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to * call `unregisterShuffle` */ @@ -39,12 +40,13 @@ object TpchLikeBench { spark: SparkSession, query: String, iterations: Int = 3, + summaryFilePrefix: Option[String] = None, gcBetweenRuns: Boolean = false): Unit = { BenchUtils.collect( spark, spark => getQuery(query)(spark), query, - s"tpch-$query-collect", + summaryFilePrefix.getOrElse(s"tpch-$query-collect"), iterations, gcBetweenRuns) } @@ -61,6 +63,7 @@ object TpchLikeBench { * @param mode The SaveMode to use when writing the results * @param writeOptions Write options * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to * call `unregisterShuffle` */ @@ -71,12 +74,13 @@ object TpchLikeBench { mode: SaveMode = SaveMode.Overwrite, writeOptions: Map[String, String] = Map.empty, iterations: Int = 3, + summaryFilePrefix: Option[String] = None, gcBetweenRuns: Boolean = false): Unit = { BenchUtils.writeCsv( spark, spark => getQuery(query)(spark), query, - s"tpch-$query-csv", + summaryFilePrefix.getOrElse(s"tpch-$query-csv"), iterations, gcBetweenRuns, path, @@ -96,6 +100,7 @@ object TpchLikeBench { * @param mode The SaveMode to use when writing the results * @param writeOptions Write options * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to * call `unregisterShuffle` */ @@ -106,12 +111,13 @@ object TpchLikeBench { mode: SaveMode = SaveMode.Overwrite, writeOptions: Map[String, String] = Map.empty, iterations: Int = 3, + summaryFilePrefix: Option[String] = None, gcBetweenRuns: Boolean = false): Unit = { BenchUtils.writeParquet( spark, spark => getQuery(query)(spark), query, - s"tpch-$query-parquet", + summaryFilePrefix.getOrElse(s"tpch-$query-parquet"), iterations, gcBetweenRuns, path, @@ -142,19 +148,25 @@ object TpchLikeBench { spark, conf.query(), path, - iterations = conf.iterations()) + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case "csv" => writeCsv( spark, conf.query(), path, - iterations = conf.iterations()) + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case _ => println("Invalid or unspecified output format") System.exit(-1) } case _ => - collect(spark, conf.query(), conf.iterations()) + collect( + spark, + conf.query(), + conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) } } @@ -193,5 +205,6 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { val iterations = opt[Int](default = Some(3)) val output = opt[String](required = false) val outputFormat = opt[String](required = false) + val summaryFilePrefix = opt[String](required = false) verify() } \ No newline at end of file diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala index 7049947dde81..b5aadfc7aa58 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala @@ -32,6 +32,7 @@ object TpcxbbLikeBench extends Logging { * @param spark The Spark session * @param query The name of the query to run e.g. "q5" * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to * call `unregisterShuffle` */ @@ -39,12 +40,13 @@ object TpcxbbLikeBench extends Logging { spark: SparkSession, query: String, iterations: Int = 3, + summaryFilePrefix: Option[String] = None, gcBetweenRuns: Boolean = false): Unit = { BenchUtils.collect( spark, spark => getQuery(query)(spark), query, - s"tpcxbb-$query-collect", + summaryFilePrefix.getOrElse(s"tpcxbb-$query-collect"), iterations, gcBetweenRuns) } @@ -61,6 +63,7 @@ object TpcxbbLikeBench extends Logging { * @param mode The SaveMode to use when writing the results * @param writeOptions Write options * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to * call `unregisterShuffle` */ @@ -71,12 +74,13 @@ object TpcxbbLikeBench extends Logging { mode: SaveMode = SaveMode.Overwrite, writeOptions: Map[String, String] = Map.empty, iterations: Int = 3, + summaryFilePrefix: Option[String] = None, gcBetweenRuns: Boolean = false): Unit = { BenchUtils.writeCsv( spark, spark => getQuery(query)(spark), query, - s"tpcxbb-$query-csv", + summaryFilePrefix.getOrElse(s"tpcxbb-$query-csv"), iterations, gcBetweenRuns, path, @@ -96,6 +100,7 @@ object TpcxbbLikeBench extends Logging { * @param mode The SaveMode to use when writing the results * @param writeOptions Write options * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to * call `unregisterShuffle` */ @@ -106,12 +111,13 @@ object TpcxbbLikeBench extends Logging { mode: SaveMode = SaveMode.Overwrite, writeOptions: Map[String, String] = Map.empty, iterations: Int = 3, + summaryFilePrefix: Option[String] = None, gcBetweenRuns: Boolean = false): Unit = { BenchUtils.writeParquet( spark, spark => getQuery(query)(spark), query, - s"tpcxbb-$query-parquet", + summaryFilePrefix.getOrElse(s"tpcxbb-$query-parquet"), iterations, gcBetweenRuns, path, @@ -140,19 +146,25 @@ object TpcxbbLikeBench extends Logging { spark, conf.query(), path, - iterations = conf.iterations()) + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case "csv" => writeCsv( spark, conf.query(), path, - iterations = conf.iterations()) + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case _ => println("Invalid or unspecified output format") System.exit(-1) } case _ => - collect(spark, conf.query(), conf.iterations()) + collect( + spark, + conf.query(), + conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) } } @@ -207,5 +219,6 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { val iterations = opt[Int](default = Some(3)) val output = opt[String](required = false) val outputFormat = opt[String](required = false) + val summaryFilePrefix = opt[String](required = false) verify() } \ No newline at end of file