From b652a7b0acca7643216c595ee9b136fb4586d7e7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 9 Oct 2020 11:30:51 -0600 Subject: [PATCH 1/5] Add spark-submit usage Signed-off-by: Andy Grove --- docs/benchmarks.md | 45 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/docs/benchmarks.md b/docs/benchmarks.md index 82340f92157..94224149269 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -59,14 +59,15 @@ for `spark.default.parallelism`, which by default is based on the number of avai cores. This value can be set explicitly to better control the size of the output files. It should also be noted that no decimal types will be output. The conversion code uses explicit -schemas to ensure that decimal types are converted to floating-point types instead. +schemas to ensure that decimal types are converted to floating-point types instead because the +plugin does not yet support decimal types but these will be supported in a future release. -## Running Benchmarks +## Running Benchmarks from a Spark shell The benchmarks can be executed in two modes currently: - Execute the query and collect the results to the driver -- Execute the query and write the results to disk (in Parquet or CSV format) +- Execute the query and write the results to disk (in Parquet or Orc format) The following commands can be entered into spark-shell to register the data files that the benchmark will query. @@ -90,6 +91,31 @@ to Parquet. There is also a `writeCsv` method for writing the output to CSV file TpcdsLikeBench.writeParquet(spark, "q5", "/data/output/tpcds/q5", iterations=3) ``` +## Running Benchmarks from spark-submit + +Each of the TPC-* derived benchmarks has a command-line interface, allowing it to be submitted +to Spark using `spark-submit` which can be more practical than using the Spark shell when +running a series of benchmarks using automation. + +Here is an example `spark-submit` command for running TPC-DS query 5, reading from Parquet and +writing results to Parquet. The `--output` and `--output-format` arguments can be omitted to +have the benchmark call `collect()` on the results instead. + +```bash +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER_URL \ + --jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \ + --class com.nvidia.spark.rapids.tests.tpcds.TpcdsLikeBench \ + $SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \ + --input /raid/tpcds-3TB-parquet-largefiles \ + --input-format parquet \ + --output /raid/tpcds-output/tpcds-q5-cpu \ + --output-format parquet \ + --query q5 \ + --summary-file-prefix tpcds-q5-cpu \ + --iterations 1 +``` + ## Benchmark JSON Output Each benchmark run produces a JSON file containing information about the environment and the query, @@ -127,6 +153,19 @@ BenchUtils.compareResults(cpu, gpu, ignoreOrdering=true, epsilon=0.0001) This will report on any differences between the two dataframes. +The verification utility can also be run using `spark-submit` using the following syntax. + +```bash +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER_URL \ + --jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \ + --class com.nvidia.spark.rapids.tests.common.CompareResults \ + $SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \ + --input1 /path/to/result1 \ + --input2 /path/to/result2 \ + --input-format parquet +``` + ## Performance Tuning Please refer to the [Tuning Guide](tuning-guide.md) for information on performance tuning. From 3f11a067f65d0f23c4da84283b9c7e2b23551560 Mon Sep 17 00:00:00 2001 From: Sameer Raheja Date: Sat, 10 Oct 2020 16:54:13 -0700 Subject: [PATCH 2/5] Update docs/benchmarks.md --- docs/benchmarks.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/benchmarks.md b/docs/benchmarks.md index 94224149269..048ad72e8b8 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -67,7 +67,7 @@ plugin does not yet support decimal types but these will be supported in a futur The benchmarks can be executed in two modes currently: - Execute the query and collect the results to the driver -- Execute the query and write the results to disk (in Parquet or Orc format) +- Execute the query and write the results to disk (in Parquet or ORC format) The following commands can be entered into spark-shell to register the data files that the benchmark will query. From 83a114b08b3e75f0ffd128f3ad8ff687a03b07b3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Oct 2020 10:17:48 -0600 Subject: [PATCH 3/5] Add option to write to ORC Signed-off-by: Andy Grove --- docs/benchmarks.md | 5 ++- .../rapids/tests/common/BenchUtils.scala | 26 +++++++++++ .../rapids/tests/tpcds/TpcdsLikeBench.scala | 44 +++++++++++++++++++ .../rapids/tests/tpch/TpchLikeBench.scala | 44 +++++++++++++++++++ .../rapids/tests/tpcxbb/TpcxbbLikeBench.scala | 44 +++++++++++++++++++ 5 files changed, 161 insertions(+), 2 deletions(-) diff --git a/docs/benchmarks.md b/docs/benchmarks.md index 94224149269..988668b0f42 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -67,7 +67,7 @@ plugin does not yet support decimal types but these will be supported in a futur The benchmarks can be executed in two modes currently: - Execute the query and collect the results to the driver -- Execute the query and write the results to disk (in Parquet or Orc format) +- Execute the query and write the results to disk (in Parquet, CSV, or ORC format) The following commands can be entered into spark-shell to register the data files that the benchmark will query. @@ -85,7 +85,8 @@ TpcdsLikeBench.collect(spark, "q5", iterations=3) ``` The benchmark can be executed with the following syntax to execute the query and write the results -to Parquet. There is also a `writeCsv` method for writing the output to CSV files. +to Parquet. There are also `writeCsv` and `writeOrc` methods for writing the output to CSV or ORC +files. ```scala TpcdsLikeBench.writeParquet(spark, "q5", "/data/output/tpcds/q5", iterations=3) diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala index 924b8f57cc2..7974a557f3e 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala @@ -78,6 +78,27 @@ object BenchUtils { gcBetweenRuns) } + /** Perform benchmark of writing results to ORC */ + def writeOrc( + spark: SparkSession, + createDataFrame: SparkSession => DataFrame, + queryDescription: String, + filenameStub: String, + iterations: Int, + gcBetweenRuns: Boolean, + path: String, + mode: SaveMode = SaveMode.Overwrite, + writeOptions: Map[String, String] = Map.empty): Unit = { + runBench( + spark, + createDataFrame, + WriteOrc(path, mode, writeOptions), + queryDescription, + filenameStub, + iterations, + gcBetweenRuns) + } + /** Perform benchmark of writing results to Parquet */ def writeParquet( spark: SparkSession, @@ -636,6 +657,11 @@ case class WriteCsv( mode: SaveMode, writeOptions: Map[String, String]) extends ResultsAction +case class WriteOrc( + path: String, + mode: SaveMode, + writeOptions: Map[String, String]) extends ResultsAction + case class WriteParquet( path: String, mode: SaveMode, diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala index d5f503b3350..9a74c18b572 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala @@ -89,6 +89,43 @@ object TpcdsLikeBench extends Logging { writeOptions) } + /** + * This method performs a benchmark of executing a query and writing the results to ORC files + * and can be called from Spark shell using the following syntax: + * + * TpcdsLikeBench.writeOrc(spark, "q5", 3, "/path/to/write") + * + * @param spark The Spark session + * @param query The name of the query to run e.g. "q5" + * @param path The path to write the results to + * @param mode The SaveMode to use when writing the results + * @param writeOptions Write options + * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. + * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to + * call `unregisterShuffle` + */ + def writeOrc( + spark: SparkSession, + query: String, + path: String, + mode: SaveMode = SaveMode.Overwrite, + writeOptions: Map[String, String] = Map.empty, + iterations: Int = 3, + summaryFilePrefix: Option[String] = None, + gcBetweenRuns: Boolean = false): Unit = { + BenchUtils.writeOrc( + spark, + spark => TpcdsLikeSpark.query(query)(spark), + query, + summaryFilePrefix.getOrElse(s"tpcds-$query-csv"), + iterations, + gcBetweenRuns, + path, + mode, + writeOptions) + } + /** * This method performs a benchmark of executing a query and writing the results to Parquet files * and can be called from Spark shell using the following syntax: @@ -158,6 +195,13 @@ object TpcdsLikeBench extends Logging { path, iterations = conf.iterations(), summaryFilePrefix = conf.summaryFilePrefix.toOption) + case "orc" => + writeOrc( + spark, + conf.query(), + path, + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case _ => println("Invalid or unspecified output format") System.exit(-1) diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala index f41ab97432f..6c2e8545c12 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala @@ -88,6 +88,43 @@ object TpchLikeBench { writeOptions) } + /** + * This method performs a benchmark of executing a query and writing the results to ORC files + * and can be called from Spark shell using the following syntax: + * + * TpchLikeBench.writeOrc(spark, "q5", 3, "/path/to/write") + * + * @param spark The Spark session + * @param query The name of the query to run e.g. "q5" + * @param path The path to write the results to + * @param mode The SaveMode to use when writing the results + * @param writeOptions Write options + * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. + * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to + * call `unregisterShuffle` + */ + def writeOrc( + spark: SparkSession, + query: String, + path: String, + mode: SaveMode = SaveMode.Overwrite, + writeOptions: Map[String, String] = Map.empty, + iterations: Int = 3, + summaryFilePrefix: Option[String] = None, + gcBetweenRuns: Boolean = false): Unit = { + BenchUtils.writeOrc( + spark, + spark => getQuery(query)(spark), + query, + summaryFilePrefix.getOrElse(s"tpch-$query-csv"), + iterations, + gcBetweenRuns, + path, + mode, + writeOptions) + } + /** * This method performs a benchmark of executing a query and writing the results to Parquet files * and can be called from Spark shell using the following syntax: @@ -157,6 +194,13 @@ object TpchLikeBench { path, iterations = conf.iterations(), summaryFilePrefix = conf.summaryFilePrefix.toOption) + case "orc" => + writeOrc( + spark, + conf.query(), + path, + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case _ => println("Invalid or unspecified output format") System.exit(-1) diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala index b5aadfc7aa5..4e2ab72a932 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala @@ -88,6 +88,43 @@ object TpcxbbLikeBench extends Logging { writeOptions) } + /** + * This method performs a benchmark of executing a query and writing the results to ORC files + * and can be called from Spark shell using the following syntax: + * + * TpcxbbLikeBench.writeOrc(spark, "q5", 3, "/path/to/write") + * + * @param spark The Spark session + * @param query The name of the query to run e.g. "q5" + * @param path The path to write the results to + * @param mode The SaveMode to use when writing the results + * @param writeOptions Write options + * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. + * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to + * call `unregisterShuffle` + */ + def writeOrc( + spark: SparkSession, + query: String, + path: String, + mode: SaveMode = SaveMode.Overwrite, + writeOptions: Map[String, String] = Map.empty, + iterations: Int = 3, + summaryFilePrefix: Option[String] = None, + gcBetweenRuns: Boolean = false): Unit = { + BenchUtils.writeOrc( + spark, + spark => getQuery(query)(spark), + query, + summaryFilePrefix.getOrElse(s"tpcxbb-$query-csv"), + iterations, + gcBetweenRuns, + path, + mode, + writeOptions) + } + /** * This method performs a benchmark of executing a query and writing the results to Parquet files * and can be called from Spark shell using the following syntax: @@ -155,6 +192,13 @@ object TpcxbbLikeBench extends Logging { path, iterations = conf.iterations(), summaryFilePrefix = conf.summaryFilePrefix.toOption) + case "orc" => + writeOrc( + spark, + conf.query(), + path, + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case _ => println("Invalid or unspecified output format") System.exit(-1) From 5194d8a129e60142731a9e7e4307ecdef33bf453 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Oct 2020 10:26:20 -0600 Subject: [PATCH 4/5] Add match for WriteOrc Signed-off-by: Andy Grove --- .../scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala index 7974a557f3e..c2c48a0bef9 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala @@ -168,6 +168,8 @@ object BenchUtils { case Collect() => df.collect() case WriteCsv(path, mode, options) => df.write.mode(mode).options(options).csv(path) + case WriteOrc(path, mode, options) => + df.write.mode(mode).options(options).orc(path) case WriteParquet(path, mode, options) => df.write.mode(mode).options(options).parquet(path) } From f4fe6fcac4ba5e7384061844d25a63a51d19a408 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Oct 2020 11:03:36 -0600 Subject: [PATCH 5/5] Add match for Orc when writing results summary file Signed-off-by: Andy Grove --- .../spark/rapids/tests/common/BenchUtils.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala index c2c48a0bef9..41f62d0e498 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala @@ -250,6 +250,18 @@ object BenchUtils { queryPlansWithMetrics, queryTimes) + case w: WriteOrc => BenchmarkReport( + filename, + queryStartTime.toEpochMilli, + environment, + testConfiguration, + "orc", + w.writeOptions, + queryDescription, + queryPlan, + queryPlansWithMetrics, + queryTimes) + case w: WriteParquet => BenchmarkReport( filename, queryStartTime.toEpochMilli,