diff --git a/docs/benchmarks.md b/docs/benchmarks.md index 82340f92157..988668b0f42 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -59,14 +59,15 @@ for `spark.default.parallelism`, which by default is based on the number of avai cores. This value can be set explicitly to better control the size of the output files. It should also be noted that no decimal types will be output. The conversion code uses explicit -schemas to ensure that decimal types are converted to floating-point types instead. +schemas to ensure that decimal types are converted to floating-point types instead because the +plugin does not yet support decimal types but these will be supported in a future release. -## Running Benchmarks +## Running Benchmarks from a Spark shell The benchmarks can be executed in two modes currently: - Execute the query and collect the results to the driver -- Execute the query and write the results to disk (in Parquet or CSV format) +- Execute the query and write the results to disk (in Parquet, CSV, or ORC format) The following commands can be entered into spark-shell to register the data files that the benchmark will query. @@ -84,12 +85,38 @@ TpcdsLikeBench.collect(spark, "q5", iterations=3) ``` The benchmark can be executed with the following syntax to execute the query and write the results -to Parquet. There is also a `writeCsv` method for writing the output to CSV files. +to Parquet. There are also `writeCsv` and `writeOrc` methods for writing the output to CSV or ORC +files. ```scala TpcdsLikeBench.writeParquet(spark, "q5", "/data/output/tpcds/q5", iterations=3) ``` +## Running Benchmarks from spark-submit + +Each of the TPC-* derived benchmarks has a command-line interface, allowing it to be submitted +to Spark using `spark-submit` which can be more practical than using the Spark shell when +running a series of benchmarks using automation. + +Here is an example `spark-submit` command for running TPC-DS query 5, reading from Parquet and +writing results to Parquet. The `--output` and `--output-format` arguments can be omitted to +have the benchmark call `collect()` on the results instead. + +```bash +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER_URL \ + --jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \ + --class com.nvidia.spark.rapids.tests.tpcds.TpcdsLikeBench \ + $SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \ + --input /raid/tpcds-3TB-parquet-largefiles \ + --input-format parquet \ + --output /raid/tpcds-output/tpcds-q5-cpu \ + --output-format parquet \ + --query q5 \ + --summary-file-prefix tpcds-q5-cpu \ + --iterations 1 +``` + ## Benchmark JSON Output Each benchmark run produces a JSON file containing information about the environment and the query, @@ -127,6 +154,19 @@ BenchUtils.compareResults(cpu, gpu, ignoreOrdering=true, epsilon=0.0001) This will report on any differences between the two dataframes. +The verification utility can also be run using `spark-submit` using the following syntax. + +```bash +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER_URL \ + --jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \ + --class com.nvidia.spark.rapids.tests.common.CompareResults \ + $SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \ + --input1 /path/to/result1 \ + --input2 /path/to/result2 \ + --input-format parquet +``` + ## Performance Tuning Please refer to the [Tuning Guide](tuning-guide.md) for information on performance tuning. diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala index 924b8f57cc2..41f62d0e498 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala @@ -78,6 +78,27 @@ object BenchUtils { gcBetweenRuns) } + /** Perform benchmark of writing results to ORC */ + def writeOrc( + spark: SparkSession, + createDataFrame: SparkSession => DataFrame, + queryDescription: String, + filenameStub: String, + iterations: Int, + gcBetweenRuns: Boolean, + path: String, + mode: SaveMode = SaveMode.Overwrite, + writeOptions: Map[String, String] = Map.empty): Unit = { + runBench( + spark, + createDataFrame, + WriteOrc(path, mode, writeOptions), + queryDescription, + filenameStub, + iterations, + gcBetweenRuns) + } + /** Perform benchmark of writing results to Parquet */ def writeParquet( spark: SparkSession, @@ -147,6 +168,8 @@ object BenchUtils { case Collect() => df.collect() case WriteCsv(path, mode, options) => df.write.mode(mode).options(options).csv(path) + case WriteOrc(path, mode, options) => + df.write.mode(mode).options(options).orc(path) case WriteParquet(path, mode, options) => df.write.mode(mode).options(options).parquet(path) } @@ -227,6 +250,18 @@ object BenchUtils { queryPlansWithMetrics, queryTimes) + case w: WriteOrc => BenchmarkReport( + filename, + queryStartTime.toEpochMilli, + environment, + testConfiguration, + "orc", + w.writeOptions, + queryDescription, + queryPlan, + queryPlansWithMetrics, + queryTimes) + case w: WriteParquet => BenchmarkReport( filename, queryStartTime.toEpochMilli, @@ -636,6 +671,11 @@ case class WriteCsv( mode: SaveMode, writeOptions: Map[String, String]) extends ResultsAction +case class WriteOrc( + path: String, + mode: SaveMode, + writeOptions: Map[String, String]) extends ResultsAction + case class WriteParquet( path: String, mode: SaveMode, diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala index d5f503b3350..9a74c18b572 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeBench.scala @@ -89,6 +89,43 @@ object TpcdsLikeBench extends Logging { writeOptions) } + /** + * This method performs a benchmark of executing a query and writing the results to ORC files + * and can be called from Spark shell using the following syntax: + * + * TpcdsLikeBench.writeOrc(spark, "q5", 3, "/path/to/write") + * + * @param spark The Spark session + * @param query The name of the query to run e.g. "q5" + * @param path The path to write the results to + * @param mode The SaveMode to use when writing the results + * @param writeOptions Write options + * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. + * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to + * call `unregisterShuffle` + */ + def writeOrc( + spark: SparkSession, + query: String, + path: String, + mode: SaveMode = SaveMode.Overwrite, + writeOptions: Map[String, String] = Map.empty, + iterations: Int = 3, + summaryFilePrefix: Option[String] = None, + gcBetweenRuns: Boolean = false): Unit = { + BenchUtils.writeOrc( + spark, + spark => TpcdsLikeSpark.query(query)(spark), + query, + summaryFilePrefix.getOrElse(s"tpcds-$query-csv"), + iterations, + gcBetweenRuns, + path, + mode, + writeOptions) + } + /** * This method performs a benchmark of executing a query and writing the results to Parquet files * and can be called from Spark shell using the following syntax: @@ -158,6 +195,13 @@ object TpcdsLikeBench extends Logging { path, iterations = conf.iterations(), summaryFilePrefix = conf.summaryFilePrefix.toOption) + case "orc" => + writeOrc( + spark, + conf.query(), + path, + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case _ => println("Invalid or unspecified output format") System.exit(-1) diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala index f41ab97432f..6c2e8545c12 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala @@ -88,6 +88,43 @@ object TpchLikeBench { writeOptions) } + /** + * This method performs a benchmark of executing a query and writing the results to ORC files + * and can be called from Spark shell using the following syntax: + * + * TpchLikeBench.writeOrc(spark, "q5", 3, "/path/to/write") + * + * @param spark The Spark session + * @param query The name of the query to run e.g. "q5" + * @param path The path to write the results to + * @param mode The SaveMode to use when writing the results + * @param writeOptions Write options + * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. + * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to + * call `unregisterShuffle` + */ + def writeOrc( + spark: SparkSession, + query: String, + path: String, + mode: SaveMode = SaveMode.Overwrite, + writeOptions: Map[String, String] = Map.empty, + iterations: Int = 3, + summaryFilePrefix: Option[String] = None, + gcBetweenRuns: Boolean = false): Unit = { + BenchUtils.writeOrc( + spark, + spark => getQuery(query)(spark), + query, + summaryFilePrefix.getOrElse(s"tpch-$query-csv"), + iterations, + gcBetweenRuns, + path, + mode, + writeOptions) + } + /** * This method performs a benchmark of executing a query and writing the results to Parquet files * and can be called from Spark shell using the following syntax: @@ -157,6 +194,13 @@ object TpchLikeBench { path, iterations = conf.iterations(), summaryFilePrefix = conf.summaryFilePrefix.toOption) + case "orc" => + writeOrc( + spark, + conf.query(), + path, + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case _ => println("Invalid or unspecified output format") System.exit(-1) diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala index b5aadfc7aa5..4e2ab72a932 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala @@ -88,6 +88,43 @@ object TpcxbbLikeBench extends Logging { writeOptions) } + /** + * This method performs a benchmark of executing a query and writing the results to ORC files + * and can be called from Spark shell using the following syntax: + * + * TpcxbbLikeBench.writeOrc(spark, "q5", 3, "/path/to/write") + * + * @param spark The Spark session + * @param query The name of the query to run e.g. "q5" + * @param path The path to write the results to + * @param mode The SaveMode to use when writing the results + * @param writeOptions Write options + * @param iterations The number of times to run the query. + * @param summaryFilePrefix Optional prefix for the generated JSON summary file. + * @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to + * call `unregisterShuffle` + */ + def writeOrc( + spark: SparkSession, + query: String, + path: String, + mode: SaveMode = SaveMode.Overwrite, + writeOptions: Map[String, String] = Map.empty, + iterations: Int = 3, + summaryFilePrefix: Option[String] = None, + gcBetweenRuns: Boolean = false): Unit = { + BenchUtils.writeOrc( + spark, + spark => getQuery(query)(spark), + query, + summaryFilePrefix.getOrElse(s"tpcxbb-$query-csv"), + iterations, + gcBetweenRuns, + path, + mode, + writeOptions) + } + /** * This method performs a benchmark of executing a query and writing the results to Parquet files * and can be called from Spark shell using the following syntax: @@ -155,6 +192,13 @@ object TpcxbbLikeBench extends Logging { path, iterations = conf.iterations(), summaryFilePrefix = conf.summaryFilePrefix.toOption) + case "orc" => + writeOrc( + spark, + conf.query(), + path, + iterations = conf.iterations(), + summaryFilePrefix = conf.summaryFilePrefix.toOption) case _ => println("Invalid or unspecified output format") System.exit(-1)