Skip to content

Commit

Permalink
Add option to control number of partitions when converting from CSV t…
Browse files Browse the repository at this point in the history
…o Parquet (NVIDIA#915)

* Add command-line arguments for applying coalesce and repartition on a per-table basis

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Move command-line validation logic and address other feedback

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Update copyright years and fix import order

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Update docs/benchmarks.md

Co-authored-by: Jason Lowe <jlowe@nvidia.com>

* Remove withPartitioning option from TPC-H and TPC-xBB file conversion

Signed-off-by: Andy Grove <andygrove@nvidia.com>

Co-authored-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
andygrove and jlowe authored Oct 15, 2020
1 parent 8d573a5 commit 08aa40e
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 121 deletions.
34 changes: 29 additions & 5 deletions docs/benchmarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ benchmark.

| Benchmark | Package | Class Names |
|-----------|--------------------------------------|----------------------------------|
| TPC-DS | com.nvidia.spark.rapids.tests.tpcds | TpcdsLikeSpark, TpcdsLikeBench |
| TPC-xBB | com.nvidia.spark.rapids.tests.tpcxbb | TpcxbbLikeSpark, TpcxbbLikeBench |
| TPC-H | com.nvidia.spark.rapids.tests.tpch | TpchLikeSpark, TpchLikeBench |
| TPC-DS | com.nvidia.spark.rapids.tests.tpcds | ConvertFiles, TpcdsLikeBench |
| TPC-xBB | com.nvidia.spark.rapids.tests.tpcxbb | ConvertFiles, TpcxbbLikeBench |
| TPC-H | com.nvidia.spark.rapids.tests.tpch | ConvertFiles, TpchLikeBench |

## Spark Shell

Expand Down Expand Up @@ -55,8 +55,32 @@ TpcdsLikeSpark.csvToParquet(spark, "/path/to/input", "/path/to/output")

Note that the code for converting CSV to Parquet does not explicitly specify the number of
partitions to write, so the size of the resulting parquet files will vary depending on the value
for `spark.default.parallelism`, which by default is based on the number of available executor
cores. This value can be set explicitly to better control the size of the output files.
for `spark.default.parallelism`, which by default is based on the number of available executor
cores. However, the file conversion methods accept `coalesce` and `repartition` arguments to
better control the size of the partitions on a per-table basis.

Example using `coalesce` and `repartition` options to control the number and size of partitions
for specific tables.

```scala
TpcdsLikeSpark.csvToParquet(spark, "/path/to/input", "/path/to/output",
coalesce=Map("customer_address" -> 1), repartition=Map("web_sales" -> 256))
```

It is also possible to use `spark-submit` to run the file conversion process.

```bash
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER_URL \
--jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \
--class com.nvidia.spark.rapids.tests.tpcds.ConvertFiles \
$SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \
--input /path/to/input \
--output /path/to/output \
--output-format parquet \
--coalesce customer_address=1 \
--repartition web_sales=256 inventory=128
```

It should also be noted that no decimal types will be output. The conversion code uses explicit
schemas to ensure that decimal types are converted to floating-point types instead because the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,33 @@ object BenchUtils {
os.close()
}

def validateCoalesceRepartition(
coalesce: Map[String, Int],
repartition: Map[String, Int]): Unit = {
val duplicates = coalesce.keys.filter(name => repartition.contains(name))
if (duplicates.nonEmpty) {
throw new IllegalArgumentException(
s"Cannot both coalesce and repartition the same table: ${duplicates.mkString(",")}")
}
}

def applyCoalesceRepartition(
name: String,
df: DataFrame,
coalesce: Map[String, Int],
repartition: Map[String, Int]): DataFrame = {
(coalesce.get(name), repartition.get(name)) match {
case (Some(_), Some(_)) =>
// this should be unreachable due to earlier validation
throw new IllegalArgumentException(
s"Cannot both coalesce and repartition the same table: $name")
case (Some(n), _) => df.coalesce(n)
case (_, Some(n)) => df.repartition(n)
case _ => df
}
}


/**
* Generate a DOT graph for one query plan, or showing differences between two query plans.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package com.nvidia.spark.rapids.tests.tpcds

import com.nvidia.spark.rapids.tests.common.BenchUtils
import com.nvidia.spark.rapids.tests.tpcds.TpcdsLikeSpark.{csvToOrc, csvToParquet}
import org.rogach.scallop.ScallopConf

import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.types.{DateType, DoubleType, IntegerType, LongType, StringType, StructField, StructType}

// scalastyle:off line.size.limit

case class Table(
name: String, // also the base name for the data
partitionColumns: Seq[String],
Expand Down Expand Up @@ -59,11 +61,14 @@ case class Table(

private def setupWrite(
spark: SparkSession,
name: String,
inputBase: String,
coalesce: Map[String, Int],
repartition: Map[String, Int],
writePartitioning: Boolean): DataFrameWriter[Row] = {
val tmp = readCSV(spark, inputBase)
.write
.mode("overwrite")
val df = readCSV(spark, inputBase)
val repart = BenchUtils.applyCoalesceRepartition(name, df, coalesce, repartition)
val tmp = repart.write.mode("overwrite")
if (writePartitioning && partitionColumns.nonEmpty) {
tmp.partitionBy(partitionColumns: _*)
} else {
Expand All @@ -75,15 +80,21 @@ case class Table(
spark: SparkSession,
inputBase: String,
outputBase: String,
coalesce: Map[String, Int],
repartition: Map[String, Int],
writePartitioning: Boolean): Unit =
setupWrite(spark, inputBase, writePartitioning).parquet(path(outputBase))
setupWrite(spark, name, inputBase, coalesce, repartition, writePartitioning)
.parquet(path(outputBase))

def csvToOrc(
spark: SparkSession,
inputBase: String,
outputBase: String,
coalesce: Map[String, Int],
repartition: Map[String, Int],
writePartitioning: Boolean): Unit =
setupWrite(spark, inputBase, writePartitioning).orc(path(outputBase))
setupWrite(spark, name, inputBase, coalesce, repartition, writePartitioning)
.orc(path(outputBase))
}

case class Query(name: String, query: String) {
Expand All @@ -97,44 +108,36 @@ case class Query(name: String, query: String) {
* correctness tests auto sort the data to account for ambiguous ordering.
*/
object TpcdsLikeSpark {

/**
* Main method allows us to submit using spark-submit to perform conversions from CSV to
* Parquet or Orc.
*/
def main(arg: Array[String]): Unit = {
val baseInput = arg(0)
val baseOutput = arg(1)
val targetFileType = arg(2)
val withPartitioning = if (arg.length > 3) {
arg(3).toBoolean
} else {
false
}

val spark = SparkSession.builder.appName("TPC-DS Like File Conversion").getOrCreate()

targetFileType match {
case "parquet" => csvToParquet(spark, baseInput, baseOutput, withPartitioning)
case "orc" => csvToOrc(spark, baseInput, baseOutput, withPartitioning)
}

}

def csvToParquet(
spark: SparkSession,
baseInput: String,
baseOutput: String,
coalesce: Map[String, Int] = Map.empty,
repartition: Map[String, Int] = Map.empty,
writePartitioning: Boolean = false): Unit = {
tables.foreach(_.csvToParquet(spark, baseInput, baseOutput, writePartitioning))
tables.foreach(_.csvToParquet(
spark,
baseInput,
baseOutput,
coalesce,
repartition,
writePartitioning))
}

def csvToOrc(
spark: SparkSession,
baseInput: String,
baseOutput: String,
coalesce: Map[String, Int] = Map.empty,
repartition: Map[String, Int] = Map.empty,
writePartitioning: Boolean = false): Unit = {
tables.foreach(_.csvToOrc(spark, baseInput, baseOutput, writePartitioning))
tables.foreach(_.csvToOrc(
spark,
baseInput,
baseOutput,
coalesce,
repartition,
writePartitioning))
}

def setupAllCSV(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = {
Expand Down Expand Up @@ -705,6 +708,8 @@ object TpcdsLikeSpark {
)))
)

// scalastyle:off line.size.limit

val queries : Map[String, Query] = Array(
Query("q1",
"""
Expand Down Expand Up @@ -4695,3 +4700,44 @@ object TpcdsLikeSpark {
}

// scalastyle:on line.size.limit

object ConvertFiles {
/**
* Main method allows us to submit using spark-submit to perform conversions from CSV to
* Parquet or Orc.
*/
def main(arg: Array[String]): Unit = {
val conf = new FileConversionConf(arg)
val spark = SparkSession.builder.appName("TPC-DS Like File Conversion").getOrCreate()
conf.outputFormat() match {
case "parquet" =>
csvToParquet(
spark,
conf.input(),
conf.output(),
conf.coalesce,
conf.repartition,
conf.withPartitioning())
case "orc" =>
csvToOrc(
spark,
conf.input(),
conf.output(),
conf.coalesce,
conf.repartition,
conf.withPartitioning())
}
}
}

class FileConversionConf(arguments: Seq[String]) extends ScallopConf(arguments) {
val input = opt[String](required = true)
val output = opt[String](required = true)
val outputFormat = opt[String](required = true)
val coalesce = propsLong[Int]("coalesce")
val repartition = propsLong[Int]("repartition")
val withPartitioning = opt[Boolean](default = Some(false))
verify()
BenchUtils.validateCoalesceRepartition(coalesce, repartition)
}

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 08aa40e

Please sign in to comment.