Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multi-threaded parquet read with small files #677

Merged
merged 8 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ Name | Description | Default Value
<a name="sql.format.orc.read.enabled"></a>spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true
<a name="sql.format.orc.write.enabled"></a>spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true
<a name="sql.format.parquet.enabled"></a>spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
<a name="sql.format.parquet.multiThreadedRead.enabled"></a>spark.rapids.sql.format.parquet.multiThreadedRead.enabled|When set to true, reads multiple small files within a partition more efficiently by reading each file in a separate thread in parallel on the CPU side before sending to the GPU. Limited by spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFileProcessed|true
<a name="sql.format.parquet.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel.|2147483647
<a name="sql.format.parquet.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel.|20
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.smallFiles.enabled"></a>spark.rapids.sql.format.parquet.smallFiles.enabled|When set to true, handles reading multiple small files within a partition more efficiently by combining multiple files on the CPU side before sending to the GPU. Recommended unless user needs mergeSchema option or schema evolution.|true
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.hasNans"></a>spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
Expand All @@ -70,11 +72,11 @@ Name | Description | Default Value
<a name="sql.udfCompiler.enabled"></a>spark.rapids.sql.udfCompiler.enabled|When set to true, Scala UDFs will be considered for compilation as Catalyst expressions|false
<a name="sql.variableFloatAgg.enabled"></a>spark.rapids.sql.variableFloatAgg.enabled|Spark assumes that all operations produce the exact same result each time. This is not true for some floating point aggregations, which can produce slightly different results on the GPU as the aggregation is done in parallel. This can enable those operations if you know the query is only computing it once.|false

## Supported GPU Operators and Fine Tuning
_The RAPIDS Accelerator for Apache Spark_ can be configured to enable or disable specific
GPU accelerated expressions. Enabled expressions are candidates for GPU execution. If the
expression is configured as disabled, the accelerator plugin will not attempt replacement,
and it will run on the CPU.
## Supported GPU Operators and Fine Tuning
_The RAPIDS Accelerator for Apache Spark_ can be configured to enable or disable specific
GPU accelerated expressions. Enabled expressions are candidates for GPU execution. If the
expression is configured as disabled, the accelerator plugin will not attempt replacement,
and it will run on the CPU.

Please leverage the [`spark.rapids.sql.explain`](#sql.explain) setting to get
feedback from the plugin as to why parts of a query may not be executing on the GPU.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ class Spark300Shims extends SparkShims {
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled
case _ => false
}
GpuFileSourceScanExec(
Expand Down Expand Up @@ -237,9 +236,6 @@ class Spark300Shims extends SparkShims {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
val canUseSmallFileOpt =
GpuParquetScanBase.canUseSmallFileParquetOpt(conf,
a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession)
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
Expand All @@ -251,7 +247,7 @@ class Spark300Shims extends SparkShims {
a.partitionFilters,
a.dataFilters,
conf,
canUseSmallFileOpt)
conf.isParquetMultiThreadReadEnabled)
}
}),
GpuOverrides.scan[OrcScan](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ class Spark300dbShims extends Spark300Shims {
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled
case _ => false
}
GpuFileSourceScanExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ class Spark310Shims extends Spark301Shims {
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled
case _ => false
}
GpuFileSourceScanExec(
Expand Down Expand Up @@ -184,8 +183,6 @@ class Spark310Shims extends Spark301Shims {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
val canUseSmallFileOpt = GpuParquetScanBase.canUseSmallFileParquetOpt(conf,
a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession)
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
Expand All @@ -197,7 +194,7 @@ class Spark310Shims extends Spark301Shims {
a.partitionFilters,
a.dataFilters,
conf,
canUseSmallFileOpt)
conf.isParquetMultiThreadReadEnabled)
}
}),
GpuOverrides.scan[OrcScan](
Expand Down
Loading