Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmark guide update for command-line interface / spark-submit #924

Merged
merged 6 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions docs/benchmarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ for `spark.default.parallelism`, which by default is based on the number of avai
cores. This value can be set explicitly to better control the size of the output files.

It should also be noted that no decimal types will be output. The conversion code uses explicit
schemas to ensure that decimal types are converted to floating-point types instead.
schemas to ensure that decimal types are converted to floating-point types instead because the
plugin does not yet support decimal types but these will be supported in a future release.

## Running Benchmarks
## Running Benchmarks from a Spark shell

The benchmarks can be executed in two modes currently:

- Execute the query and collect the results to the driver
- Execute the query and write the results to disk (in Parquet or CSV format)
- Execute the query and write the results to disk (in Parquet, CSV, or ORC format)

The following commands can be entered into spark-shell to register the data files that the
benchmark will query.
Expand All @@ -84,12 +85,38 @@ TpcdsLikeBench.collect(spark, "q5", iterations=3)
```

The benchmark can be executed with the following syntax to execute the query and write the results
to Parquet. There is also a `writeCsv` method for writing the output to CSV files.
to Parquet. There are also `writeCsv` and `writeOrc` methods for writing the output to CSV or ORC
files.

```scala
TpcdsLikeBench.writeParquet(spark, "q5", "/data/output/tpcds/q5", iterations=3)
```

## Running Benchmarks from spark-submit

Each of the TPC-* derived benchmarks has a command-line interface, allowing it to be submitted
to Spark using `spark-submit` which can be more practical than using the Spark shell when
running a series of benchmarks using automation.

Here is an example `spark-submit` command for running TPC-DS query 5, reading from Parquet and
writing results to Parquet. The `--output` and `--output-format` arguments can be omitted to
have the benchmark call `collect()` on the results instead.

```bash
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER_URL \
--jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \
--class com.nvidia.spark.rapids.tests.tpcds.TpcdsLikeBench \
$SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \
--input /raid/tpcds-3TB-parquet-largefiles \
--input-format parquet \
--output /raid/tpcds-output/tpcds-q5-cpu \
--output-format parquet \
--query q5 \
--summary-file-prefix tpcds-q5-cpu \
--iterations 1
```

## Benchmark JSON Output

Each benchmark run produces a JSON file containing information about the environment and the query,
Expand Down Expand Up @@ -127,6 +154,19 @@ BenchUtils.compareResults(cpu, gpu, ignoreOrdering=true, epsilon=0.0001)

This will report on any differences between the two dataframes.

The verification utility can also be run using `spark-submit` using the following syntax.

```bash
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER_URL \
--jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \
--class com.nvidia.spark.rapids.tests.common.CompareResults \
$SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \
--input1 /path/to/result1 \
--input2 /path/to/result2 \
--input-format parquet
```

## Performance Tuning

Please refer to the [Tuning Guide](tuning-guide.md) for information on performance tuning.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ object BenchUtils {
gcBetweenRuns)
}

/** Perform benchmark of writing results to ORC */
def writeOrc(
spark: SparkSession,
createDataFrame: SparkSession => DataFrame,
queryDescription: String,
filenameStub: String,
iterations: Int,
gcBetweenRuns: Boolean,
path: String,
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty): Unit = {
runBench(
spark,
createDataFrame,
WriteOrc(path, mode, writeOptions),
queryDescription,
filenameStub,
iterations,
gcBetweenRuns)
}

/** Perform benchmark of writing results to Parquet */
def writeParquet(
spark: SparkSession,
Expand Down Expand Up @@ -147,6 +168,8 @@ object BenchUtils {
case Collect() => df.collect()
case WriteCsv(path, mode, options) =>
df.write.mode(mode).options(options).csv(path)
case WriteOrc(path, mode, options) =>
df.write.mode(mode).options(options).orc(path)
case WriteParquet(path, mode, options) =>
df.write.mode(mode).options(options).parquet(path)
}
Expand Down Expand Up @@ -227,6 +250,18 @@ object BenchUtils {
queryPlansWithMetrics,
queryTimes)

case w: WriteOrc => BenchmarkReport(
filename,
queryStartTime.toEpochMilli,
environment,
testConfiguration,
"orc",
w.writeOptions,
queryDescription,
queryPlan,
queryPlansWithMetrics,
queryTimes)

case w: WriteParquet => BenchmarkReport(
filename,
queryStartTime.toEpochMilli,
Expand Down Expand Up @@ -636,6 +671,11 @@ case class WriteCsv(
mode: SaveMode,
writeOptions: Map[String, String]) extends ResultsAction

case class WriteOrc(
jlowe marked this conversation as resolved.
Show resolved Hide resolved
path: String,
mode: SaveMode,
writeOptions: Map[String, String]) extends ResultsAction

case class WriteParquet(
path: String,
mode: SaveMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,43 @@ object TpcdsLikeBench extends Logging {
writeOptions)
}

/**
* This method performs a benchmark of executing a query and writing the results to ORC files
* and can be called from Spark shell using the following syntax:
*
* TpcdsLikeBench.writeOrc(spark, "q5", 3, "/path/to/write")
*
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param path The path to write the results to
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def writeOrc(
spark: SparkSession,
query: String,
path: String,
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.writeOrc(
spark,
spark => TpcdsLikeSpark.query(query)(spark),
query,
summaryFilePrefix.getOrElse(s"tpcds-$query-csv"),
iterations,
gcBetweenRuns,
path,
mode,
writeOptions)
}

/**
* This method performs a benchmark of executing a query and writing the results to Parquet files
* and can be called from Spark shell using the following syntax:
Expand Down Expand Up @@ -158,6 +195,13 @@ object TpcdsLikeBench extends Logging {
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case "orc" =>
writeOrc(
spark,
conf.query(),
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case _ =>
println("Invalid or unspecified output format")
System.exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,43 @@ object TpchLikeBench {
writeOptions)
}

/**
* This method performs a benchmark of executing a query and writing the results to ORC files
* and can be called from Spark shell using the following syntax:
*
* TpchLikeBench.writeOrc(spark, "q5", 3, "/path/to/write")
*
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param path The path to write the results to
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def writeOrc(
spark: SparkSession,
query: String,
path: String,
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.writeOrc(
spark,
spark => getQuery(query)(spark),
query,
summaryFilePrefix.getOrElse(s"tpch-$query-csv"),
iterations,
gcBetweenRuns,
path,
mode,
writeOptions)
}

/**
* This method performs a benchmark of executing a query and writing the results to Parquet files
* and can be called from Spark shell using the following syntax:
Expand Down Expand Up @@ -157,6 +194,13 @@ object TpchLikeBench {
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case "orc" =>
writeOrc(
spark,
conf.query(),
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case _ =>
println("Invalid or unspecified output format")
System.exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,43 @@ object TpcxbbLikeBench extends Logging {
writeOptions)
}

/**
* This method performs a benchmark of executing a query and writing the results to ORC files
* and can be called from Spark shell using the following syntax:
*
* TpcxbbLikeBench.writeOrc(spark, "q5", 3, "/path/to/write")
*
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param path The path to write the results to
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param summaryFilePrefix Optional prefix for the generated JSON summary file.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def writeOrc(
spark: SparkSession,
query: String,
path: String,
mode: SaveMode = SaveMode.Overwrite,
writeOptions: Map[String, String] = Map.empty,
iterations: Int = 3,
summaryFilePrefix: Option[String] = None,
gcBetweenRuns: Boolean = false): Unit = {
BenchUtils.writeOrc(
spark,
spark => getQuery(query)(spark),
query,
summaryFilePrefix.getOrElse(s"tpcxbb-$query-csv"),
iterations,
gcBetweenRuns,
path,
mode,
writeOptions)
}

/**
* This method performs a benchmark of executing a query and writing the results to Parquet files
* and can be called from Spark shell using the following syntax:
Expand Down Expand Up @@ -155,6 +192,13 @@ object TpcxbbLikeBench extends Logging {
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case "orc" =>
writeOrc(
spark,
conf.query(),
path,
iterations = conf.iterations(),
summaryFilePrefix = conf.summaryFilePrefix.toOption)
case _ =>
println("Invalid or unspecified output format")
System.exit(-1)
Expand Down