Skip to content

Commit

Permalink
Use multi-threaded parquet read with small files (#677)
Browse files Browse the repository at this point in the history
* Try multi-threaded read with parquet with small files

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* cleanup and comments

* comment config

Signed-off-by: Thomas Graves <tgraves@apache.org>

* Add note about TaskContext not being set in the threadpool

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* remove extra import and use closeOnExcept

* try just using future throw

* let future throw

Signed-off-by: Thomas Graves <tgraves@apache.org>

* Use safeclose()

Signed-off-by: Thomas Graves <tgraves@apache.org>

Co-authored-by: Thomas Graves <tgraves@nvidia.com>
  • Loading branch information
tgravescs and tgravescs authored Sep 9, 2020
1 parent e39a105 commit be48350
Show file tree
Hide file tree
Showing 8 changed files with 428 additions and 423 deletions.
14 changes: 8 additions & 6 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ Name | Description | Default Value
<a name="sql.format.orc.read.enabled"></a>spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true
<a name="sql.format.orc.write.enabled"></a>spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true
<a name="sql.format.parquet.enabled"></a>spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
<a name="sql.format.parquet.multiThreadedRead.enabled"></a>spark.rapids.sql.format.parquet.multiThreadedRead.enabled|When set to true, reads multiple small files within a partition more efficiently by reading each file in a separate thread in parallel on the CPU side before sending to the GPU. Limited by spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFileProcessed|true
<a name="sql.format.parquet.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel.|2147483647
<a name="sql.format.parquet.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel.|20
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.smallFiles.enabled"></a>spark.rapids.sql.format.parquet.smallFiles.enabled|When set to true, handles reading multiple small files within a partition more efficiently by combining multiple files on the CPU side before sending to the GPU. Recommended unless user needs mergeSchema option or schema evolution.|true
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.hasNans"></a>spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
Expand All @@ -70,11 +72,11 @@ Name | Description | Default Value
<a name="sql.udfCompiler.enabled"></a>spark.rapids.sql.udfCompiler.enabled|When set to true, Scala UDFs will be considered for compilation as Catalyst expressions|false
<a name="sql.variableFloatAgg.enabled"></a>spark.rapids.sql.variableFloatAgg.enabled|Spark assumes that all operations produce the exact same result each time. This is not true for some floating point aggregations, which can produce slightly different results on the GPU as the aggregation is done in parallel. This can enable those operations if you know the query is only computing it once.|false

## Supported GPU Operators and Fine Tuning
_The RAPIDS Accelerator for Apache Spark_ can be configured to enable or disable specific
GPU accelerated expressions. Enabled expressions are candidates for GPU execution. If the
expression is configured as disabled, the accelerator plugin will not attempt replacement,
and it will run on the CPU.
## Supported GPU Operators and Fine Tuning
_The RAPIDS Accelerator for Apache Spark_ can be configured to enable or disable specific
GPU accelerated expressions. Enabled expressions are candidates for GPU execution. If the
expression is configured as disabled, the accelerator plugin will not attempt replacement,
and it will run on the CPU.

Please leverage the [`spark.rapids.sql.explain`](#sql.explain) setting to get
feedback from the plugin as to why parts of a query may not be executing on the GPU.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ class Spark300Shims extends SparkShims {
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled
case _ => false
}
GpuFileSourceScanExec(
Expand Down Expand Up @@ -237,9 +236,6 @@ class Spark300Shims extends SparkShims {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
val canUseSmallFileOpt =
GpuParquetScanBase.canUseSmallFileParquetOpt(conf,
a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession)
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
Expand All @@ -251,7 +247,7 @@ class Spark300Shims extends SparkShims {
a.partitionFilters,
a.dataFilters,
conf,
canUseSmallFileOpt)
conf.isParquetMultiThreadReadEnabled)
}
}),
GpuOverrides.scan[OrcScan](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ class Spark300dbShims extends Spark300Shims {
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled
case _ => false
}
GpuFileSourceScanExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ class Spark310Shims extends Spark301Shims {
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled
case _ => false
}
GpuFileSourceScanExec(
Expand Down Expand Up @@ -178,8 +177,6 @@ class Spark310Shims extends Spark301Shims {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
val canUseSmallFileOpt = GpuParquetScanBase.canUseSmallFileParquetOpt(conf,
a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession)
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
Expand All @@ -191,7 +188,7 @@ class Spark310Shims extends Spark301Shims {
a.partitionFilters,
a.dataFilters,
conf,
canUseSmallFileOpt)
conf.isParquetMultiThreadReadEnabled)
}
}),
GpuOverrides.scan[OrcScan](
Expand Down
Loading

0 comments on commit be48350

Please sign in to comment.