Skip to content

Commit

Permalink
Parquet small file reading optimization (NVIDIA#595)
Browse files Browse the repository at this point in the history
* Initial prototype small filees parquet

* Change datasource v1 to use small files

* Working but has 72 bytes off in size

* Copy filesourcescan to databricks and fix merge error

* Fix databricks package name

* Try to debug size calculation - adds lots of warnings

* Cleanup and have file source scan small files only work for parquet

* Switch to use ArrayBuffer so order correct

* debug

* Fix order issue

* add more to calculated size

* cleanup

* Try to handle partition values

* fix passing partitionValues

* refactor

* disable mergeschema

* add check for mergeSchema

* Add tests for both small file optimization on and off

* hadnle input file - but doesn't totally work

* remove extra values reader

* Fixes

* Debug

* Check to see if Inputfile execs used

* Finding InputFileName works

* finding input file working

* cleanup and add tests for V2 datasource

* Add check for input file to GpuParquetScan

* Add more tests

* Add GPU metrics to GpuFileSourceScanExec

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* remove log messages

* Docs

* cleanup

* Update 300db and 310 FileSourceScanExecs passing unit tests

* Add test for bucketing

* Add in logic for datetime corrected rebase mode

* Commonize some code

* Cleanup

* fixes

* Extract GpuFileSourceScanExec from shims

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Add more tests

* comments

* update test

* Pass metrics via GPU file format rather than custom options map

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* working

* pass schema around properly

* fix value from tuple

* Rename case class

* Update tests

* Update code checking for DataSourceScanExec

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Fix scaladoc warning and unused imports

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Add realloc if over memory size

* refactor memory checks

* Fix copyright

Signed-off-by: Jason Lowe <jlowe@nvidia.com>

* Upmerge to latest FileSourceScanExec changes for metrics

* Add missing check Filesource scan mergeSchema and cleanup

* Cleanup

* remove bucket test for now

* formatting

* Fixes

* Add more tests

* Merge conflict

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Fix merge conflict

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* enable parquet bucket tests and change warning

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* cleanup

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* remove debug logs

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Move FilePartition creation to shim

Signed-off-by: Thomas Graves <tgraves@apache.org>

* Add better message for mergeSchema

Signed-off-by: Thomas Graves <tgraves@apache.org>

* Address review comments. Add in withResources and closeOnExcept and minor things.

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Fix spacing

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Fix databricks support and passing arguments

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* fix typo in db

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Update config description

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Rework

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

Co-authored-by: Thomas Graves <tgraves@nvidia.com>
Co-authored-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
3 people authored Aug 26, 2020
1 parent 59f52f7 commit 7afbfea
Show file tree
Hide file tree
Showing 18 changed files with 1,094 additions and 280 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Name | Description | Default Value
<a name="sql.format.orc.write.enabled"></a>spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true
<a name="sql.format.parquet.enabled"></a>spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.smallFiles.enabled"></a>spark.rapids.sql.format.parquet.smallFiles.enabled|When set to true, handles reading multiple small files within a partition more efficiently by combining multiple files on the CPU side before sending to the GPU. Recommended unless user needs mergeSchema option or schema evolution.|true
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.hasNans"></a>spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
Expand Down
171 changes: 152 additions & 19 deletions integration_tests/src/main/python/parquet_test.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions integration_tests/src/main/python/spark_init_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ def _spark__init():
# DO NOT SET ANY OTHER CONFIGS HERE!!!
# due to bugs in pyspark/pytest it looks like any configs set here
# can be reset in the middle of a test if specific operations are done (some types of cast etc)
# enableHiveSupport() is needed for parquet bucket tests
_s = SparkSession.builder \
.config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \
.config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback')\
.enableHiveSupport() \
.appName('rapids spark plugin integration tests (python)').getOrCreate()
#TODO catch the ClassNotFound error that happens if the classpath is not set up properly and
# make it a better error message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ case class GpuParquetScan(
options: CaseInsensitiveStringMap,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
rapidsConf: RapidsConf)
rapidsConf: RapidsConf,
supportsSmallFileOpt: Boolean = true)
extends GpuParquetScanBase(sparkSession, hadoopConf, dataSchema,
readDataSchema, readPartitionSchema, pushedFilters, rapidsConf) with FileScan {
readDataSchema, readPartitionSchema, pushedFilters, rapidsConf,
supportsSmallFileOpt) with FileScan {

override def isSplitable(path: Path): Boolean = super.isSplitableBase(path)

Expand All @@ -52,7 +54,8 @@ case class GpuParquetScan(
override def equals(obj: Any): Boolean = obj match {
case p: GpuParquetScan =>
super.equals(p) && dataSchema == p.dataSchema && options == p.options &&
equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf
equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf &&
supportsSmallFileOpt == p.supportsSmallFileOpt
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase

case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan) extends GpuShuffleExchangeExecBase(outputPartitioning, child)
child: SparkPlan) extends GpuShuffleExchangeExecBase(outputPartitioning, child)

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids.shims.spark300

import java.time.ZoneId

import scala.collection.JavaConverters._

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.spark300.RapidsShuffleManager

Expand All @@ -35,7 +37,7 @@ import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HadoopFsRelation, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
Expand Down Expand Up @@ -141,13 +143,20 @@ class Spark300Shims extends SparkShims {
override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this)

override def convertToGpu(): GpuExec = {
val sparkSession = wrapped.relation.sparkSession
val options = wrapped.relation.options
val newRelation = HadoopFsRelation(
wrapped.relation.location,
wrapped.relation.partitionSchema,
wrapped.relation.dataSchema,
wrapped.relation.bucketSpec,
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
wrapped.relation.options)(wrapped.relation.sparkSession)
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _ => false
}
GpuFileSourceScanExec(
newRelation,
wrapped.output,
Expand All @@ -156,7 +165,8 @@ class Spark300Shims extends SparkShims {
wrapped.optionalBucketSet,
None,
wrapped.dataFilters,
wrapped.tableIdentifier)
wrapped.tableIdentifier,
canUseSmallFileOpt)
}
}),
GpuOverrides.exec[SortMergeJoinExec](
Expand Down Expand Up @@ -226,7 +236,10 @@ class Spark300Shims extends SparkShims {
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan =
override def convertToGpu(): Scan = {
val canUseSmallFileOpt =
GpuParquetScanBase.canUseSmallFileParquetOpt(conf,
a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession)
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
Expand All @@ -237,7 +250,9 @@ class Spark300Shims extends SparkShims {
a.options,
a.partitionFilters,
a.dataFilters,
conf)
conf,
canUseSmallFileOpt)
}
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down Expand Up @@ -330,4 +345,20 @@ class Spark300Shims extends SparkShims {
filePartitions: Seq[FilePartition]): RDD[InternalRow] = {
new FileScanRDD(sparkSession, readFunction, filePartitions)
}

override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = {
FilePartition(index, files)
}

override def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec,
supportsSmallFileOpt: Boolean): GpuBatchScanExec = {
val scan = batchScanExec.scan.asInstanceOf[GpuParquetScan]
val scanCopy = scan.copy(supportsSmallFileOpt=supportsSmallFileOpt)
batchScanExec.copy(scan=scanCopy)
}

override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec,
supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = {
scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
Expand Down Expand Up @@ -93,13 +94,20 @@ class Spark300dbShims extends Spark300Shims {
override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this)

override def convertToGpu(): GpuExec = {
val sparkSession = wrapped.relation.sparkSession
val options = wrapped.relation.options
val newRelation = HadoopFsRelation(
wrapped.relation.location,
wrapped.relation.partitionSchema,
wrapped.relation.dataSchema,
wrapped.relation.bucketSpec,
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
wrapped.relation.options)(wrapped.relation.sparkSession)
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _ => false
}
GpuFileSourceScanExec(
newRelation,
wrapped.output,
Expand All @@ -109,7 +117,8 @@ class Spark300dbShims extends Spark300Shims {
// TODO: Does Databricks have coalesced bucketing implemented?
None,
wrapped.dataFilters,
wrapped.tableIdentifier)
wrapped.tableIdentifier,
canUseSmallFileOpt)
}
}),
GpuOverrides.exec[SortMergeJoinExec](
Expand Down Expand Up @@ -180,4 +189,13 @@ class Spark300dbShims extends Spark300Shims {
filePartitions: Seq[FilePartition]): RDD[InternalRow] = {
new GpuFileScanRDD(sparkSession, readFunction, filePartitions)
}

override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = {
FilePartition(index, files)
}

override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec,
supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = {
scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ case class GpuParquetScan(
options: CaseInsensitiveStringMap,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
rapidsConf: RapidsConf)
rapidsConf: RapidsConf,
supportsSmallFileOpt: Boolean = true)
extends GpuParquetScanBase(sparkSession, hadoopConf, dataSchema,
readDataSchema, readPartitionSchema, pushedFilters, rapidsConf) with FileScan {
readDataSchema, readPartitionSchema, pushedFilters, rapidsConf,
supportsSmallFileOpt) with FileScan {

override def isSplitable(path: Path): Boolean = super.isSplitableBase(path)

Expand All @@ -52,7 +54,8 @@ case class GpuParquetScan(
override def equals(obj: Any): Boolean = obj match {
case p: GpuParquetScan =>
super.equals(p) && dataSchema == p.dataSchema && options == p.options &&
equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf
equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf &&
supportsSmallFileOpt == p.supportsSmallFileOpt
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids.shims.spark310

import java.time.ZoneId

import scala.collection.JavaConverters._

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark301.Spark301Shims
import com.nvidia.spark.rapids.spark310.RapidsShuffleManager
Expand All @@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
Expand Down Expand Up @@ -137,13 +139,20 @@ class Spark310Shims extends Spark301Shims {
override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this)

override def convertToGpu(): GpuExec = {
val sparkSession = wrapped.relation.sparkSession
val options = wrapped.relation.options
val newRelation = HadoopFsRelation(
wrapped.relation.location,
wrapped.relation.partitionSchema,
wrapped.relation.dataSchema,
wrapped.relation.bucketSpec,
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
wrapped.relation.options)(wrapped.relation.sparkSession)
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _ => false
}
GpuFileSourceScanExec(
newRelation,
wrapped.output,
Expand All @@ -152,7 +161,8 @@ class Spark310Shims extends Spark301Shims {
wrapped.optionalBucketSet,
wrapped.optionalNumCoalescedBuckets,
wrapped.dataFilters,
wrapped.tableIdentifier)
wrapped.tableIdentifier,
canUseSmallFileOpt)
}
}),
GpuOverrides.exec[SortMergeJoinExec](
Expand All @@ -173,7 +183,9 @@ class Spark310Shims extends Spark301Shims {
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan =
override def convertToGpu(): Scan = {
val canUseSmallFileOpt = GpuParquetScanBase.canUseSmallFileParquetOpt(conf,
a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession)
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
Expand All @@ -184,7 +196,9 @@ class Spark310Shims extends Spark301Shims {
a.options,
a.partitionFilters,
a.dataFilters,
conf)
conf,
canUseSmallFileOpt)
}
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down Expand Up @@ -223,4 +237,16 @@ class Spark310Shims extends Spark301Shims {
override def getShuffleManagerShims(): ShuffleManagerShimBase = {
new ShuffleManagerShim
}

override def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec,
supportsSmallFileOpt: Boolean): GpuBatchScanExec = {
val scan = batchScanExec.scan.asInstanceOf[GpuParquetScan]
val scanCopy = scan.copy(supportsSmallFileOpt = supportsSmallFileOpt)
batchScanExec.copy(scan = scanCopy)
}

override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec,
supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = {
scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt)
}
}
9 changes: 9 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ trait Arm {
}
}

/** Executes the provided code block and then closes the array of resources */
def withResource[T <: AutoCloseable, V](r: Array[T])(block: Array[T] => V): V = {
try {
block(r)
} finally {
r.safeClose()
}
}

/** Executes the provided code block, closing the resource only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: T)(block: T => V): V = {
try {
Expand Down
Loading

0 comments on commit 7afbfea

Please sign in to comment.