diff --git a/docs/benchmarks.md b/docs/benchmarks.md index 988668b0f42..743f5313a05 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -25,9 +25,9 @@ benchmark. | Benchmark | Package | Class Names | |-----------|--------------------------------------|----------------------------------| -| TPC-DS | com.nvidia.spark.rapids.tests.tpcds | TpcdsLikeSpark, TpcdsLikeBench | -| TPC-xBB | com.nvidia.spark.rapids.tests.tpcxbb | TpcxbbLikeSpark, TpcxbbLikeBench | -| TPC-H | com.nvidia.spark.rapids.tests.tpch | TpchLikeSpark, TpchLikeBench | +| TPC-DS | com.nvidia.spark.rapids.tests.tpcds | ConvertFiles, TpcdsLikeBench | +| TPC-xBB | com.nvidia.spark.rapids.tests.tpcxbb | ConvertFiles, TpcxbbLikeBench | +| TPC-H | com.nvidia.spark.rapids.tests.tpch | ConvertFiles, TpchLikeBench | ## Spark Shell @@ -55,8 +55,32 @@ TpcdsLikeSpark.csvToParquet(spark, "/path/to/input", "/path/to/output") Note that the code for converting CSV to Parquet does not explicitly specify the number of partitions to write, so the size of the resulting parquet files will vary depending on the value -for `spark.default.parallelism`, which by default is based on the number of available executor -cores. This value can be set explicitly to better control the size of the output files. +for `spark.default.parallelism`, which by default is based on the number of available executor +cores. However, the file conversion methods accept `coalesce` and `repartition` arguments to +better control the size of the partitions on a per-table basis. + +Example using `coalesce` and `repartition` options to control the number and size of partitions +for specific tables. + +```scala +TpcdsLikeSpark.csvToParquet(spark, "/path/to/input", "/path/to/output", + coalesce=Map("customer_address" -> 1), repartition=Map("web_sales" -> 256)) +``` + +It is also possible to use `spark-submit` to run the file conversion process. + +```bash +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER_URL \ + --jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \ + --class com.nvidia.spark.rapids.tests.tpcds.ConvertFiles \ + $SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \ + --input /path/to/input \ + --output /path/to/output \ + --output-format parquet \ + --coalesce customer_address=1 \ + --repartition web_sales=256 inventory=128 +``` It should also be noted that no decimal types will be output. The conversion code uses explicit schemas to ensure that decimal types are converted to floating-point types instead because the diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala index 41f62d0e498..77dc1825130 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/common/BenchUtils.scala @@ -291,6 +291,33 @@ object BenchUtils { os.close() } + def validateCoalesceRepartition( + coalesce: Map[String, Int], + repartition: Map[String, Int]): Unit = { + val duplicates = coalesce.keys.filter(name => repartition.contains(name)) + if (duplicates.nonEmpty) { + throw new IllegalArgumentException( + s"Cannot both coalesce and repartition the same table: ${duplicates.mkString(",")}") + } + } + + def applyCoalesceRepartition( + name: String, + df: DataFrame, + coalesce: Map[String, Int], + repartition: Map[String, Int]): DataFrame = { + (coalesce.get(name), repartition.get(name)) match { + case (Some(_), Some(_)) => + // this should be unreachable due to earlier validation + throw new IllegalArgumentException( + s"Cannot both coalesce and repartition the same table: $name") + case (Some(n), _) => df.coalesce(n) + case (_, Some(n)) => df.repartition(n) + case _ => df + } + } + + /** * Generate a DOT graph for one query plan, or showing differences between two query plans. * diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala index f5edd1fbeac..e8f3bbb89c8 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala @@ -16,11 +16,13 @@ package com.nvidia.spark.rapids.tests.tpcds +import com.nvidia.spark.rapids.tests.common.BenchUtils +import com.nvidia.spark.rapids.tests.tpcds.TpcdsLikeSpark.{csvToOrc, csvToParquet} +import org.rogach.scallop.ScallopConf + import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.types.{DateType, DoubleType, IntegerType, LongType, StringType, StructField, StructType} -// scalastyle:off line.size.limit - case class Table( name: String, // also the base name for the data partitionColumns: Seq[String], @@ -59,11 +61,14 @@ case class Table( private def setupWrite( spark: SparkSession, + name: String, inputBase: String, + coalesce: Map[String, Int], + repartition: Map[String, Int], writePartitioning: Boolean): DataFrameWriter[Row] = { - val tmp = readCSV(spark, inputBase) - .write - .mode("overwrite") + val df = readCSV(spark, inputBase) + val repart = BenchUtils.applyCoalesceRepartition(name, df, coalesce, repartition) + val tmp = repart.write.mode("overwrite") if (writePartitioning && partitionColumns.nonEmpty) { tmp.partitionBy(partitionColumns: _*) } else { @@ -75,15 +80,21 @@ case class Table( spark: SparkSession, inputBase: String, outputBase: String, + coalesce: Map[String, Int], + repartition: Map[String, Int], writePartitioning: Boolean): Unit = - setupWrite(spark, inputBase, writePartitioning).parquet(path(outputBase)) + setupWrite(spark, name, inputBase, coalesce, repartition, writePartitioning) + .parquet(path(outputBase)) def csvToOrc( spark: SparkSession, inputBase: String, outputBase: String, + coalesce: Map[String, Int], + repartition: Map[String, Int], writePartitioning: Boolean): Unit = - setupWrite(spark, inputBase, writePartitioning).orc(path(outputBase)) + setupWrite(spark, name, inputBase, coalesce, repartition, writePartitioning) + .orc(path(outputBase)) } case class Query(name: String, query: String) { @@ -97,44 +108,36 @@ case class Query(name: String, query: String) { * correctness tests auto sort the data to account for ambiguous ordering. */ object TpcdsLikeSpark { - - /** - * Main method allows us to submit using spark-submit to perform conversions from CSV to - * Parquet or Orc. - */ - def main(arg: Array[String]): Unit = { - val baseInput = arg(0) - val baseOutput = arg(1) - val targetFileType = arg(2) - val withPartitioning = if (arg.length > 3) { - arg(3).toBoolean - } else { - false - } - - val spark = SparkSession.builder.appName("TPC-DS Like File Conversion").getOrCreate() - - targetFileType match { - case "parquet" => csvToParquet(spark, baseInput, baseOutput, withPartitioning) - case "orc" => csvToOrc(spark, baseInput, baseOutput, withPartitioning) - } - - } - def csvToParquet( spark: SparkSession, baseInput: String, baseOutput: String, + coalesce: Map[String, Int] = Map.empty, + repartition: Map[String, Int] = Map.empty, writePartitioning: Boolean = false): Unit = { - tables.foreach(_.csvToParquet(spark, baseInput, baseOutput, writePartitioning)) + tables.foreach(_.csvToParquet( + spark, + baseInput, + baseOutput, + coalesce, + repartition, + writePartitioning)) } def csvToOrc( spark: SparkSession, baseInput: String, baseOutput: String, + coalesce: Map[String, Int] = Map.empty, + repartition: Map[String, Int] = Map.empty, writePartitioning: Boolean = false): Unit = { - tables.foreach(_.csvToOrc(spark, baseInput, baseOutput, writePartitioning)) + tables.foreach(_.csvToOrc( + spark, + baseInput, + baseOutput, + coalesce, + repartition, + writePartitioning)) } def setupAllCSV(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = { @@ -705,6 +708,8 @@ object TpcdsLikeSpark { ))) ) + // scalastyle:off line.size.limit + val queries : Map[String, Query] = Array( Query("q1", """ @@ -4695,3 +4700,44 @@ object TpcdsLikeSpark { } // scalastyle:on line.size.limit + +object ConvertFiles { + /** + * Main method allows us to submit using spark-submit to perform conversions from CSV to + * Parquet or Orc. + */ + def main(arg: Array[String]): Unit = { + val conf = new FileConversionConf(arg) + val spark = SparkSession.builder.appName("TPC-DS Like File Conversion").getOrCreate() + conf.outputFormat() match { + case "parquet" => + csvToParquet( + spark, + conf.input(), + conf.output(), + conf.coalesce, + conf.repartition, + conf.withPartitioning()) + case "orc" => + csvToOrc( + spark, + conf.input(), + conf.output(), + conf.coalesce, + conf.repartition, + conf.withPartitioning()) + } + } +} + +class FileConversionConf(arguments: Seq[String]) extends ScallopConf(arguments) { + val input = opt[String](required = true) + val output = opt[String](required = true) + val outputFormat = opt[String](required = true) + val coalesce = propsLong[Int]("coalesce") + val repartition = propsLong[Int]("repartition") + val withPartitioning = opt[Boolean](default = Some(false)) + verify() + BenchUtils.validateCoalesceRepartition(coalesce, repartition) +} + diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala index 6c2e8545c12..1602b5ea9e8 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeBench.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeSpark.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeSpark.scala index 801e57e7991..6770ec5309c 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeSpark.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpch/TpchLikeSpark.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,34 +16,55 @@ package com.nvidia.spark.rapids.tests.tpch -import com.nvidia.spark.rapids.tests.DebugRange +import com.nvidia.spark.rapids.tests.common.BenchUtils +import com.nvidia.spark.rapids.tests.tpch.TpchLikeSpark.{csvToOrc, csvToParquet} +import org.rogach.scallop.ScallopConf -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.types._ // scalastyle:off line.size.limit object TpchLikeSpark { - def csvToParquet(spark: SparkSession, basePath: String, baseOutput: String): Unit = { - readOrdersCSV(spark, basePath + "/orders.tbl").write.parquet(baseOutput + "/orders.tbl") - readLineitemCSV(spark, basePath + "/lineitem.tbl").write.parquet(baseOutput + "/lineitem.tbl") - readCustomerCSV(spark, basePath + "/customer.tbl").write.parquet(baseOutput + "/customer.tbl") - readNationCSV(spark, basePath + "/nation.tbl").write.parquet(baseOutput + "/nation.tbl") - readPartCSV(spark, basePath + "/part.tbl").write.parquet(baseOutput + "/part.tbl") - readPartsuppCSV(spark, basePath + "/partsupp.tbl").write.parquet(baseOutput + "/partsupp.tbl") - readRegionCSV(spark, basePath + "/region.tbl").write.parquet(baseOutput + "/region.tbl") - readSupplierCSV(spark, basePath + "/supplier.tbl").write.parquet(baseOutput + "/supplier.tbl") + private def setupWrite( + df: DataFrame, + name: String, + coalesce: Map[String, Int], + repartition: Map[String, Int]): DataFrameWriter[Row] = { + val repart = BenchUtils.applyCoalesceRepartition(name, df, coalesce, repartition) + repart.write.mode("overwrite") + } + + def csvToParquet( + spark: SparkSession, + basePath: String, + baseOutput: String, + coalesce: Map[String, Int], + repartition: Map[String, Int]): Unit = { + setupWrite(readOrdersCSV(spark, basePath + "/orders.tbl"), "orders", coalesce, repartition).parquet(baseOutput + "/orders.tbl") + setupWrite(readLineitemCSV(spark, basePath + "/lineitem.tbl"), "lineitem", coalesce, repartition).parquet(baseOutput + "/lineitem.tbl") + setupWrite(readCustomerCSV(spark, basePath + "/customer.tbl"), "customers", coalesce, repartition).parquet(baseOutput + "/customer.tbl") + setupWrite(readNationCSV(spark, basePath + "/nation.tbl"), "nation", coalesce, repartition).parquet(baseOutput + "/nation.tbl") + setupWrite(readPartCSV(spark, basePath + "/part.tbl"), "part", coalesce, repartition).parquet(baseOutput + "/part.tbl") + setupWrite(readPartsuppCSV(spark, basePath + "/partsupp.tbl"), "partsupp", coalesce, repartition).parquet(baseOutput + "/partsupp.tbl") + setupWrite(readRegionCSV(spark, basePath + "/region.tbl"), "region", coalesce, repartition).parquet(baseOutput + "/region.tbl") + setupWrite(readSupplierCSV(spark, basePath + "/supplier.tbl"), "supplier", coalesce, repartition).parquet(baseOutput + "/supplier.tbl") } - def csvToOrc(spark: SparkSession, basePath: String, baseOutput: String): Unit = { - readOrdersCSV(spark, basePath + "/orders.tbl").write.orc(baseOutput + "/orders.tbl") - readLineitemCSV(spark, basePath + "/lineitem.tbl").write.orc(baseOutput + "/lineitem.tbl") - readCustomerCSV(spark, basePath + "/customer.tbl").write.orc(baseOutput + "/customer.tbl") - readNationCSV(spark, basePath + "/nation.tbl").write.orc(baseOutput + "/nation.tbl") - readPartCSV(spark, basePath + "/part.tbl").write.orc(baseOutput + "/part.tbl") - readPartsuppCSV(spark, basePath + "/partsupp.tbl").write.orc(baseOutput + "/partsupp.tbl") - readRegionCSV(spark, basePath + "/region.tbl").write.orc(baseOutput + "/region.tbl") - readSupplierCSV(spark, basePath + "/supplier.tbl").write.orc(baseOutput + "/supplier.tbl") + def csvToOrc( + spark: SparkSession, + basePath: String, + baseOutput: String, + coalesce: Map[String, Int], + repartition: Map[String, Int]): Unit = { + setupWrite(readOrdersCSV(spark, basePath + "/orders.tbl"), "orders", coalesce, repartition).orc(baseOutput + "/orders.tbl") + setupWrite(readLineitemCSV(spark, basePath + "/lineitem.tbl"), "lineitem", coalesce, repartition).orc(baseOutput + "/lineitem.tbl") + setupWrite(readCustomerCSV(spark, basePath + "/customer.tbl"), "customers", coalesce, repartition).orc(baseOutput + "/customer.tbl") + setupWrite(readNationCSV(spark, basePath + "/nation.tbl"), "nation", coalesce, repartition).orc(baseOutput + "/nation.tbl") + setupWrite(readPartCSV(spark, basePath + "/part.tbl"), "part", coalesce, repartition).orc(baseOutput + "/part.tbl") + setupWrite(readPartsuppCSV(spark, basePath + "/partsupp.tbl"), "partsupp", coalesce, repartition).orc(baseOutput + "/partsupp.tbl") + setupWrite(readRegionCSV(spark, basePath + "/region.tbl"), "region", coalesce, repartition).orc(baseOutput + "/region.tbl") + setupWrite(readSupplierCSV(spark, basePath + "/supplier.tbl"), "supplier", coalesce, repartition).orc(baseOutput + "/supplier.tbl") } def setupAllCSV(spark: SparkSession, basePath: String): Unit = { @@ -267,24 +288,6 @@ object TpchLikeSpark { def setupSupplierOrc(spark: SparkSession, path: String): Unit = spark.read.orc(path).createOrReplaceTempView("supplier") - - def main(args: Array[String]): Unit = { - System.setProperty("ai.rapids.cudf.nvtx.enabled", "true") - val spark = SparkSession.builder.appName("TpchLike") - .config("spark.ui.showConsoleProgress", "false") - .config("spark.sql.join.preferSortMergeJoin", "false") - .config("spark.rapids.sql.variableFloatAgg.enabled", "true") - .config("spark.rapids.sql.incompatibleOps.enabled", "true") - .config("spark.rapids.sql.explain", "true") - .getOrCreate() - setupAllParquet(spark, args(0)) - var range = new DebugRange("QUERY") - spark.time(Q5Like(spark).collect()) - range.close() - range = new DebugRange("QUERY") - spark.time(Q5Like(spark).collect()) - range.close() - } } object Q1Like { @@ -1147,4 +1150,41 @@ object Q22Like { |""".stripMargin) } +object ConvertFiles { + /** + * Main method allows us to submit using spark-submit to perform conversions from CSV to + * Parquet or Orc. + */ + def main(arg: Array[String]): Unit = { + val conf = new FileConversionConf(arg) + val spark = SparkSession.builder.appName("TPC-H Like File Conversion").getOrCreate() + conf.outputFormat() match { + case "parquet" => + csvToParquet( + spark, + conf.input(), + conf.output(), + conf.coalesce, + conf.repartition) + case "orc" => + csvToOrc( + spark, + conf.input(), + conf.output(), + conf.coalesce, + conf.repartition) + } + } +} + +class FileConversionConf(arguments: Seq[String]) extends ScallopConf(arguments) { + val input = opt[String](required = true) + val output = opt[String](required = true) + val outputFormat = opt[String](required = true) + val coalesce = propsLong[Int]("coalesce") + val repartition = propsLong[Int]("repartition") + verify() + BenchUtils.validateCoalesceRepartition(coalesce, repartition) +} + // scalastyle:on line.size.limit diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala index 4e2ab72a932..1feab34f9b7 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeBench.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeSpark.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeSpark.scala index df0963cf35c..5fb913b3b44 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeSpark.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcxbb/TpcxbbLikeSpark.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,55 +16,78 @@ package com.nvidia.spark.rapids.tests.tpcxbb -import org.apache.spark.sql.{DataFrame, SparkSession} +import com.nvidia.spark.rapids.tests.common.BenchUtils +import com.nvidia.spark.rapids.tests.tpcxbb.TpcxbbLikeSpark.{csvToOrc, csvToParquet} +import org.rogach.scallop.ScallopConf + +import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.types._ // scalastyle:off line.size.limit // DecimalType to DoubleType, bigint to LongType object TpcxbbLikeSpark { - def csvToParquet(spark: SparkSession, basePath: String, baseOutput: String): Unit = { - readCustomerCSV(spark, basePath + "/customer/").write.parquet(baseOutput + "/customer/") - readCustomerAddressCSV(spark, basePath + "/customer_address/").write.parquet(baseOutput + "/customer_address/") - readItemCSV(spark, basePath + "/item/").write.parquet(baseOutput + "/item/") - readStoreSalesCSV(spark, basePath + "/store_sales/").write.parquet(baseOutput + "/store_sales/") - readDateDimCSV(spark, basePath + "/date_dim/").write.parquet(baseOutput + "/date_dim/") - readStoreCSV(spark, basePath + "/store/").write.parquet(baseOutput + "/store/") - readCustomerDemographicsCSV(spark, basePath + "/customer_demographics/").write.parquet(baseOutput + "/customer_demographics/") - readReviewsCSV(spark, basePath + "/product_reviews/").write.parquet(baseOutput + "/product_reviews/") - readWebSalesCSV(spark, basePath + "/web_sales/").write.parquet(baseOutput + "/web_sales/") - readWebClickStreamsCSV(spark, basePath + "/web_clickstreams/").write.parquet(baseOutput + "/web_clickstreams/") - readHouseholdDemographicsCSV(spark, basePath + "/household_demographics/").write.parquet(baseOutput + "/household_demographics/") - readWebPageCSV(spark, basePath + "/web_page/").write.parquet(baseOutput + "/web_page/") - readTimeDimCSV(spark, basePath + "/time_dim/").write.parquet(baseOutput + "/time_dim/") - readWebReturnsCSV(spark, basePath + "/web_returns/").write.parquet(baseOutput + "/web_returns/") - readWarehouseCSV(spark, basePath + "/warehouse/").write.parquet(baseOutput + "/warehouse/") - readPromotionCSV(spark, basePath + "/promotion/").write.parquet(baseOutput + "/promotion/") - readStoreReturnsCSV(spark, basePath + "/store_returns/").write.parquet(baseOutput + "/store_returns/") - readInventoryCSV(spark, basePath + "/inventory/").write.parquet(baseOutput + "/inventory/") - readMarketPricesCSV(spark, basePath + "/item_marketprices/").write.parquet(baseOutput + "/item_marketprices/") + private def setupWrite( + df: DataFrame, + name: String, + coalesce: Map[String, Int], + repartition: Map[String, Int]): DataFrameWriter[Row] = { + val repart = BenchUtils.applyCoalesceRepartition(name, df, coalesce, repartition) + repart.write.mode("overwrite") + } + + def csvToParquet( + spark: SparkSession, + basePath: String, + baseOutput: String, + coalesce: Map[String, Int], + repartition: Map[String, Int]): Unit = { + setupWrite(readCustomerCSV(spark, basePath + "/customer/"), "customer", coalesce, repartition).parquet(baseOutput + "/customer/") + setupWrite(readCustomerAddressCSV(spark, basePath + "/customer_address/"), "customer_address", coalesce, repartition).parquet(baseOutput + "/customer_address/") + setupWrite(readItemCSV(spark, basePath + "/item/"), "item", coalesce, repartition).parquet(baseOutput + "/item/") + setupWrite(readStoreSalesCSV(spark, basePath + "/store_sales/"), "store_sales", coalesce, repartition).parquet(baseOutput + "/store_sales/") + setupWrite(readDateDimCSV(spark, basePath + "/date_dim/"), "date_dim", coalesce, repartition).parquet(baseOutput + "/date_dim/") + setupWrite(readStoreCSV(spark, basePath + "/store/"), "store", coalesce, repartition).parquet(baseOutput + "/store/") + setupWrite(readCustomerDemographicsCSV(spark, basePath + "/customer_demographics/"), "customer_demographics", coalesce, repartition).parquet(baseOutput + "/customer_demographics/") + setupWrite(readReviewsCSV(spark, basePath + "/product_reviews/"), "product_reviews", coalesce, repartition).parquet(baseOutput + "/product_reviews/") + setupWrite(readWebSalesCSV(spark, basePath + "/web_sales/"), "web_sales", coalesce, repartition).parquet(baseOutput + "/web_sales/") + setupWrite(readWebClickStreamsCSV(spark, basePath + "/web_clickstreams/"), "web_clickstreams", coalesce, repartition).parquet(baseOutput + "/web_clickstreams/") + setupWrite(readHouseholdDemographicsCSV(spark, basePath + "/household_demographics/"), "household_demographics", coalesce, repartition).parquet(baseOutput + "/household_demographics/") + setupWrite(readWebPageCSV(spark, basePath + "/web_page/"), "web_page", coalesce, repartition).parquet(baseOutput + "/web_page/") + setupWrite(readTimeDimCSV(spark, basePath + "/time_dim/"), "time_dim", coalesce, repartition).parquet(baseOutput + "/time_dim/") + setupWrite(readWebReturnsCSV(spark, basePath + "/web_returns/"), "web_returns", coalesce, repartition).parquet(baseOutput + "/web_returns/") + setupWrite(readWarehouseCSV(spark, basePath + "/warehouse/"), "warehouse", coalesce, repartition).parquet(baseOutput + "/warehouse/") + setupWrite(readPromotionCSV(spark, basePath + "/promotion/"), "promotion", coalesce, repartition).parquet(baseOutput + "/promotion/") + setupWrite(readStoreReturnsCSV(spark, basePath + "/store_returns/"), "store_returns", coalesce, repartition).parquet(baseOutput + "/store_returns/") + setupWrite(readInventoryCSV(spark, basePath + "/inventory/"), "inventory", coalesce, repartition).parquet(baseOutput + "/inventory/") + setupWrite(readMarketPricesCSV(spark, basePath + "/item_marketprices/"), "item_marketprices", coalesce, repartition).parquet(baseOutput + "/item_marketprices/") } - def csvToOrc(spark: SparkSession, basePath: String, baseOutput: String): Unit = { - readCustomerCSV(spark, basePath + "/customer/").write.orc(baseOutput + "/customer/") - readCustomerAddressCSV(spark, basePath + "/customer_address/").write.orc(baseOutput + "/customer_address/") - readItemCSV(spark, basePath + "/item/").write.orc(baseOutput + "/item/") - readStoreSalesCSV(spark, basePath + "/store_sales/").write.orc(baseOutput + "/store_sales/") - readDateDimCSV(spark, basePath + "/date_dim/").write.orc(baseOutput + "/date_dim/") - readStoreCSV(spark, basePath + "/store/").write.orc(baseOutput + "/store/") - readCustomerDemographicsCSV(spark, basePath + "/customer_demographics/").write.orc(baseOutput + "/customer_demographics/") - readReviewsCSV(spark, basePath + "/product_reviews/").write.orc(baseOutput + "/product_reviews/") - readWebSalesCSV(spark, basePath + "/web_sales/").write.orc(baseOutput + "/web_sales/") - readWebClickStreamsCSV(spark, basePath + "/web_clickstreams/").write.orc(baseOutput + "/web_clickstreams/") - readHouseholdDemographicsCSV(spark, basePath + "/household_demographics/").write.orc(baseOutput + "/household_demographics/") - readWebPageCSV(spark, basePath + "/web_page/").write.orc(baseOutput + "/web_page/") - readTimeDimCSV(spark, basePath + "/time_dim/").write.orc(baseOutput + "/time_dim/") - readWebReturnsCSV(spark, basePath + "/web_returns/").write.orc(baseOutput + "/web_returns/") - readWarehouseCSV(spark, basePath + "/warehouse/").write.orc(baseOutput + "/warehouse/") - readPromotionCSV(spark, basePath + "/promotion/").write.orc(baseOutput + "/promotion/") - readStoreReturnsCSV(spark, basePath + "/store_returns/").write.orc(baseOutput + "/store_returns/") - readInventoryCSV(spark, basePath + "/inventory/").write.orc(baseOutput + "/inventory/") - readMarketPricesCSV(spark, basePath + "/item_marketprices/").write.orc(baseOutput + "/item_marketprices/") + def csvToOrc( + spark: SparkSession, + basePath: String, + baseOutput: String, + coalesce: Map[String, Int], + repartition: Map[String, Int]): Unit = { + setupWrite(readCustomerCSV(spark, basePath + "/customer/"), "customer", coalesce, repartition).orc(baseOutput + "/customer/") + setupWrite(readCustomerAddressCSV(spark, basePath + "/customer_address/"), "customer_address", coalesce, repartition).orc(baseOutput + "/customer_address/") + setupWrite(readItemCSV(spark, basePath + "/item/"), "item", coalesce, repartition).orc(baseOutput + "/item/") + setupWrite(readStoreSalesCSV(spark, basePath + "/store_sales/"), "store_sales", coalesce, repartition).orc(baseOutput + "/store_sales/") + setupWrite(readDateDimCSV(spark, basePath + "/date_dim/"), "date_dim", coalesce, repartition).orc(baseOutput + "/date_dim/") + setupWrite(readStoreCSV(spark, basePath + "/store/"), "store", coalesce, repartition).orc(baseOutput + "/store/") + setupWrite(readCustomerDemographicsCSV(spark, basePath + "/customer_demographics/"), "customer_demographics", coalesce, repartition).orc(baseOutput + "/customer_demographics/") + setupWrite(readReviewsCSV(spark, basePath + "/product_reviews/"), "product_reviews", coalesce, repartition).orc(baseOutput + "/product_reviews/") + setupWrite(readWebSalesCSV(spark, basePath + "/web_sales/"), "web_sales", coalesce, repartition).orc(baseOutput + "/web_sales/") + setupWrite(readWebClickStreamsCSV(spark, basePath + "/web_clickstreams/"), "web_clickstreams", coalesce, repartition).orc(baseOutput + "/web_clickstreams/") + setupWrite(readHouseholdDemographicsCSV(spark, basePath + "/household_demographics/"), "household_demographics", coalesce, repartition).orc(baseOutput + "/household_demographics/") + setupWrite(readWebPageCSV(spark, basePath + "/web_page/"), "web_page", coalesce, repartition).orc(baseOutput + "/web_page/") + setupWrite(readTimeDimCSV(spark, basePath + "/time_dim/"), "time_dim", coalesce, repartition).orc(baseOutput + "/time_dim/") + setupWrite(readWebReturnsCSV(spark, basePath + "/web_returns/"), "web_returns", coalesce, repartition).orc(baseOutput + "/web_returns/") + setupWrite(readWarehouseCSV(spark, basePath + "/warehouse/"), "warehouse", coalesce, repartition).orc(baseOutput + "/warehouse/") + setupWrite(readPromotionCSV(spark, basePath + "/promotion/"), "promotion", coalesce, repartition).orc(baseOutput + "/promotion/") + setupWrite(readStoreReturnsCSV(spark, basePath + "/store_returns/"), "store_returns", coalesce, repartition).orc(baseOutput + "/store_returns/") + setupWrite(readInventoryCSV(spark, basePath + "/inventory/"), "inventory", coalesce, repartition).orc(baseOutput + "/inventory/") + setupWrite(readMarketPricesCSV(spark, basePath + "/item_marketprices/"), "item_marketprices", coalesce, repartition).orc(baseOutput + "/item_marketprices/") } def setupAllCSV(spark: SparkSession, basePath: String): Unit = { @@ -2068,4 +2091,42 @@ object Q30Like { } } +object ConvertFiles { + /** + * Main method allows us to submit using spark-submit to perform conversions from CSV to + * Parquet or Orc. + */ + def main(arg: Array[String]): Unit = { + val conf = new FileConversionConf(arg) + val spark = SparkSession.builder.appName("TPC-xBB Like File Conversion").getOrCreate() + conf.outputFormat() match { + case "parquet" => + csvToParquet( + spark, + conf.input(), + conf.output(), + conf.coalesce, + conf.repartition) + case "orc" => + csvToOrc( + spark, + conf.input(), + conf.output(), + conf.coalesce, + conf.repartition) + } + } + +} + +class FileConversionConf(arguments: Seq[String]) extends ScallopConf(arguments) { + val input = opt[String](required = true) + val output = opt[String](required = true) + val outputFormat = opt[String](required = true) + val coalesce = propsLong[Int]("coalesce") + val repartition = propsLong[Int]("repartition") + verify() + BenchUtils.validateCoalesceRepartition(coalesce, repartition) +} + // scalastyle:on line.size.limit diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/common/BenchUtilsSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/common/BenchUtilsSuite.scala index d3402d7f71b..8d6faa1fa75 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/common/BenchUtilsSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/tests/common/BenchUtilsSuite.scala @@ -59,4 +59,14 @@ class BenchUtilsSuite extends FunSuite with BeforeAndAfterEach { assert(report == report2) } + test("validate coalesce/repartition arguments - no duplicates") { + BenchUtils.validateCoalesceRepartition(Map("a" -> 1, "b" -> 1), Map("c" -> 1, "d" -> 1)) + } + + test("validate coalesce/repartition arguments - with duplicates") { + assertThrows[IllegalArgumentException] { + BenchUtils.validateCoalesceRepartition(Map("a" -> 1, "b" -> 1), Map("c" -> 1, "b" -> 1)) + } + } + }