From be48350dc0b9d9e058b1792ab0781d073b42e80c Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 8 Sep 2020 19:46:10 -0500 Subject: [PATCH] Use multi-threaded parquet read with small files (#677) * Try multi-threaded read with parquet with small files Signed-off-by: Thomas Graves * cleanup and comments * comment config Signed-off-by: Thomas Graves * Add note about TaskContext not being set in the threadpool Signed-off-by: Thomas Graves * remove extra import and use closeOnExcept * try just using future throw * let future throw Signed-off-by: Thomas Graves * Use safeclose() Signed-off-by: Thomas Graves Co-authored-by: Thomas Graves --- docs/configs.md | 14 +- .../rapids/shims/spark300/Spark300Shims.scala | 8 +- .../shims/spark300db/Spark300dbShims.scala | 3 +- .../rapids/shims/spark310/Spark310Shims.scala | 7 +- .../nvidia/spark/rapids/GpuParquetScan.scala | 726 +++++++++--------- .../spark/rapids/GpuTransitionOverrides.scala | 43 -- .../com/nvidia/spark/rapids/RapidsConf.scala | 44 +- .../spark/sql/rapids/GpuInputFileBlock.scala | 6 + 8 files changed, 428 insertions(+), 423 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 1d6c3695805..736760a9eee 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -55,8 +55,10 @@ Name | Description | Default Value spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true +spark.rapids.sql.format.parquet.multiThreadedRead.enabled|When set to true, reads multiple small files within a partition more efficiently by reading each file in a separate thread in parallel on the CPU side before sending to the GPU. Limited by spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFileProcessed|true +spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel.|2147483647 +spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel.|20 spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true -spark.rapids.sql.format.parquet.smallFiles.enabled|When set to true, handles reading multiple small files within a partition more efficiently by combining multiple files on the CPU side before sending to the GPU. Recommended unless user needs mergeSchema option or schema evolution.|true spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false @@ -70,11 +72,11 @@ Name | Description | Default Value spark.rapids.sql.udfCompiler.enabled|When set to true, Scala UDFs will be considered for compilation as Catalyst expressions|false spark.rapids.sql.variableFloatAgg.enabled|Spark assumes that all operations produce the exact same result each time. This is not true for some floating point aggregations, which can produce slightly different results on the GPU as the aggregation is done in parallel. This can enable those operations if you know the query is only computing it once.|false -## Supported GPU Operators and Fine Tuning -_The RAPIDS Accelerator for Apache Spark_ can be configured to enable or disable specific -GPU accelerated expressions. Enabled expressions are candidates for GPU execution. If the -expression is configured as disabled, the accelerator plugin will not attempt replacement, -and it will run on the CPU. +## Supported GPU Operators and Fine Tuning +_The RAPIDS Accelerator for Apache Spark_ can be configured to enable or disable specific +GPU accelerated expressions. Enabled expressions are candidates for GPU execution. If the +expression is configured as disabled, the accelerator plugin will not attempt replacement, +and it will run on the CPU. Please leverage the [`spark.rapids.sql.explain`](#sql.explain) setting to get feedback from the plugin as to why parts of a query may not be executing on the GPU. diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index bf4298a67d0..c5c8259ea90 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -153,8 +153,7 @@ class Spark300Shims extends SparkShims { GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), options)(sparkSession) val canUseSmallFileOpt = newRelation.fileFormat match { - case _: ParquetFileFormat => - GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession) + case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled case _ => false } GpuFileSourceScanExec( @@ -237,9 +236,6 @@ class Spark300Shims extends SparkShims { override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this) override def convertToGpu(): Scan = { - val canUseSmallFileOpt = - GpuParquetScanBase.canUseSmallFileParquetOpt(conf, - a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession) GpuParquetScan(a.sparkSession, a.hadoopConf, a.fileIndex, @@ -251,7 +247,7 @@ class Spark300Shims extends SparkShims { a.partitionFilters, a.dataFilters, conf, - canUseSmallFileOpt) + conf.isParquetMultiThreadReadEnabled) } }), GpuOverrides.scan[OrcScan]( diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala index 0f064121b20..556bac127f7 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -104,8 +104,7 @@ class Spark300dbShims extends Spark300Shims { GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), options)(sparkSession) val canUseSmallFileOpt = newRelation.fileFormat match { - case _: ParquetFileFormat => - GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession) + case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled case _ => false } GpuFileSourceScanExec( diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala index 740dad6124a..89e67bbdffe 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala @@ -128,8 +128,7 @@ class Spark310Shims extends Spark301Shims { GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), options)(sparkSession) val canUseSmallFileOpt = newRelation.fileFormat match { - case _: ParquetFileFormat => - GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession) + case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled case _ => false } GpuFileSourceScanExec( @@ -178,8 +177,6 @@ class Spark310Shims extends Spark301Shims { override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this) override def convertToGpu(): Scan = { - val canUseSmallFileOpt = GpuParquetScanBase.canUseSmallFileParquetOpt(conf, - a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession) GpuParquetScan(a.sparkSession, a.hadoopConf, a.fileIndex, @@ -191,7 +188,7 @@ class Spark310Shims extends Spark301Shims { a.partitionFilters, a.dataFilters, conf, - canUseSmallFileOpt) + conf.isParquetMultiThreadReadEnabled) } }), GpuOverrides.scan[OrcScan]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 9a0358a53ed..ee61325c728 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -20,17 +20,18 @@ import java.io.OutputStream import java.net.URI import java.nio.charset.StandardCharsets import java.util.{Collections, Locale} +import java.util.concurrent._ import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} +import scala.collection.mutable.{ArrayBuffer, Queue} import scala.math.max -import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, ParquetOptions, Table} +import ai.rapids.cudf._ +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.nvidia.spark.RebaseHelper import com.nvidia.spark.rapids.GpuMetricNames._ import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange -import com.nvidia.spark.rapids.RapidsConf.ENABLE_SMALL_FILES_PARQUET import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.commons.io.IOUtils import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} @@ -41,7 +42,7 @@ import org.apache.parquet.column.ColumnDescriptor import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} -import org.apache.parquet.hadoop.metadata.{BlockMetaData, ColumnChunkMetaData, ColumnPath, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.metadata._ import org.apache.parquet.schema.{GroupType, MessageType, Types} import org.apache.spark.TaskContext @@ -51,12 +52,13 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetReadSupport} -import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, FileScan} +import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.InputFileUtils import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{StringType, StructType, TimestampType} @@ -155,15 +157,6 @@ object GpuParquetScanBase { meta.willNotWorkOnGpu(s"$other is not a supported read rebase mode") } } - - def canUseSmallFileParquetOpt( - conf: RapidsConf, - options: Map[String, String], - sparkSession: SparkSession): Boolean = { - (conf.isParquetSmallFilesEnabled && - !(options.getOrElse("mergeSchema", "false").toBoolean || - sparkSession.conf.getOption("spark.sql.parquet.mergeSchema").exists(_.toBoolean))) - } } /** @@ -221,10 +214,6 @@ object GpuParquetPartitionReaderFactoryBase { private case class ParquetFileInfoWithBlockMeta(filePath: Path, blocks: Seq[BlockMetaData], partValues: InternalRow, schema: MessageType, isCorrectedRebaseMode: Boolean) -// contains meta about a single block in a file -private case class ParquetFileInfoWithSingleBlockMeta(filePath: Path, blockMeta: BlockMetaData, - partValues: InternalRow, schema: MessageType, isCorrectedRebaseMode: Boolean) - private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) extends Arm { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown @@ -303,6 +292,8 @@ case class GpuParquetMultiFilePartitionReaderFactory( private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes + private val numThreads = rapidsConf.parquetMultiThreadReadNumThreads + private val maxNumFileProcessed = rapidsConf.maxNumParquetFilesParallel private val filterHandler = new GpuParquetFileFilterHandler(sqlConf) @@ -323,17 +314,10 @@ case class GpuParquetMultiFilePartitionReaderFactory( files: Array[PartitionedFile]): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value logDebug(s"Number files being read: ${files.size} for task ${TaskContext.get().partitionId()}") - val clippedBlocks = ArrayBuffer[ParquetFileInfoWithSingleBlockMeta]() - files.map { file => - val singleFileInfo = filterHandler.filterBlocks(file, conf, filters, readDataSchema) - clippedBlocks ++= singleFileInfo.blocks.map( - ParquetFileInfoWithSingleBlockMeta(singleFileInfo.filePath, _, file.partitionValues, - singleFileInfo.schema, singleFileInfo.isCorrectedRebaseMode)) - } - - new MultiFileParquetPartitionReader(conf, files, clippedBlocks, + new MultiFileParquetPartitionReader(conf, files, isCaseSensitive, readDataSchema, debugDumpPrefix, - maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics, partitionSchema) + maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics, partitionSchema, + numThreads, maxNumFileProcessed, filterHandler, filters) } } @@ -387,7 +371,7 @@ abstract class FileParquetPartitionReaderBase( execMetrics: Map[String, SQLMetric]) extends PartitionReader[ColumnarBatch] with Logging with ScanWithMetrics with Arm { - protected var isExhausted: Boolean = false + protected var isDone: Boolean = false protected var maxDeviceMemory: Long = 0 protected var batch: Option[ColumnarBatch] = None protected val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024) @@ -402,7 +386,7 @@ abstract class FileParquetPartitionReaderBase( override def close(): Unit = { batch.foreach(_.close()) batch = None - isExhausted = true + isDone = true } protected def calculateParquetFooterSize( @@ -418,8 +402,7 @@ abstract class FileParquetPartitionReaderBase( protected def calculateParquetOutputSize( currentChunkedBlocks: Seq[BlockMetaData], - schema: MessageType, - handleMultiFiles: Boolean): Long = { + schema: MessageType): Long = { // start with the size of Parquet magic (at start+end) and footer length values var size: Long = 4 + 4 + 4 @@ -429,20 +412,7 @@ abstract class FileParquetPartitionReaderBase( size += currentChunkedBlocks.flatMap(_.getColumns.asScala.map(_.getTotalSize)).sum val footerSize = calculateParquetFooterSize(currentChunkedBlocks, schema) - val extraMemory = if (handleMultiFiles) { - // we want to add extra memory because the ColumnChunks saved in the Footer have 2 fields - // file_offset and data_page_offset that get much larger when we are combining files. - // Here we estimate that by taking the number of columns * number of blocks which should be - // the number of column chunks and then saying there are 2 fields that could be larger and - // assume max size of those would be 8 bytes worst case. So we probably allocate to much here - // but it shouldn't be by a huge amount and its better then having to realloc and copy. - val numCols = currentChunkedBlocks.head.getColumns().size() - val numColumnChunks = numCols * currentChunkedBlocks.size - numColumnChunks * 2 * 8 - } else { - 0 - } - val totalSize = size + footerSize + extraMemory + val totalSize = size + footerSize totalSize } @@ -606,301 +576,422 @@ abstract class FileParquetPartitionReaderBase( out.close() } } + + protected def readPartFile( + blocks: Seq[BlockMetaData], + clippedSchema: MessageType, + filePath: Path): (HostMemoryBuffer, Long) = { + withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, + metrics("bufferTime"))) { _ => + withResource(filePath.getFileSystem(conf).open(filePath)) { in => + val estTotalSize = calculateParquetOutputSize(blocks, clippedSchema) + closeOnExcept(HostMemoryBuffer.allocate(estTotalSize)) { hmb => + val out = new HostMemoryOutputStream(hmb) + out.write(ParquetPartitionReader.PARQUET_MAGIC) + val outputBlocks = copyBlocksData(in, out, blocks) + val footerPos = out.getPos + writeFooter(out, outputBlocks, clippedSchema) + + BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt) + out.write(ParquetPartitionReader.PARQUET_MAGIC) + // check we didn't go over memory + if (out.getPos > estTotalSize) { + throw new QueryExecutionException(s"Calculated buffer size $estTotalSize is to " + + s"small, actual written: ${out.getPos}") + } + (hmb, out.getPos) + } + } + } + } + + protected def populateCurrentBlockChunk( + blockIter: BufferedIterator[BlockMetaData], + maxReadBatchSizeRows: Int, + maxReadBatchSizeBytes: Long): Seq[BlockMetaData] = { + val currentChunk = new ArrayBuffer[BlockMetaData] + var numRows: Long = 0 + var numBytes: Long = 0 + var numParquetBytes: Long = 0 + + @tailrec + def readNextBatch(): Unit = { + if (blockIter.hasNext) { + val peekedRowGroup = blockIter.head + if (peekedRowGroup.getRowCount > Integer.MAX_VALUE) { + throw new UnsupportedOperationException("Too many rows in split") + } + if (numRows == 0 || numRows + peekedRowGroup.getRowCount <= maxReadBatchSizeRows) { + val estimatedBytes = GpuBatchUtils.estimateGpuMemory(readDataSchema, + peekedRowGroup.getRowCount) + if (numBytes == 0 || numBytes + estimatedBytes <= maxReadBatchSizeBytes) { + currentChunk += blockIter.next() + numRows += currentChunk.last.getRowCount + numParquetBytes += currentChunk.last.getTotalByteSize + numBytes += estimatedBytes + readNextBatch() + } + } + } + } + readNextBatch() + logDebug(s"Loaded $numRows rows from Parquet. Parquet bytes read: $numParquetBytes. " + + s"Estimated GPU bytes: $numBytes") + currentChunk + } + +} + +// Singleton threadpool that is used across all the tasks. +// Please note that the TaskContext is not set in these threads and should not be used. +object MultiFileThreadPoolFactory { + + private var threadPool: Option[ThreadPoolExecutor] = None + + private def initThreadPool( + maxThreads: Int = 20, + keepAliveSeconds: Long = 60): ThreadPoolExecutor = synchronized { + if (!threadPool.isDefined) { + val threadFactory = new ThreadFactoryBuilder() + .setNameFormat("parquet reader worker-%d") + .setDaemon(true) + .build() + + threadPool = Some(new ThreadPoolExecutor( + maxThreads, // corePoolSize: max number of threads to create before queuing the tasks + maxThreads, // maximumPoolSize: because we use LinkedBlockingDeque, this is not used + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + threadFactory)) + threadPool.get.allowCoreThreadTimeOut(true) + } + threadPool.get + } + + def submitToThreadPool[T](task: Callable[T], numThreads: Int): Future[T] = { + val pool = threadPool.getOrElse(initThreadPool(numThreads)) + pool.submit(task) + } } /** - * A PartitionReader that can read multiple Parquet files up to the certain size. + * A PartitionReader that can read multiple Parquet files in parallel. * * Efficiently reading a Parquet split on the GPU requires re-constructing the Parquet file * in memory that contains just the column chunks that are needed. This avoids sending * unnecessary data to the GPU and saves GPU memory. * * @param conf the Hadoop configuration - * @param split the file split to read - * @param clippedBlocks the block metadata from the original Parquet file that has been clipped - * to only contain the column chunks to be read + * @param files the partitioned files to read + * @param isSchemaCaseSensitive whether schema is case sensitive * @param readDataSchema the Spark schema describing what will be read * @param debugDumpPrefix a path prefix to use for dumping the fabricated Parquet data or null + * @param maxReadBatchSizeRows soft limit on the maximum number of rows the reader reads per batch + * @param maxReadBatchSizeBytes soft limit on the maximum number of bytes the reader reads per batch + * @param execMetrics metrics + * @param partitionSchema Schema of partitions. + * @param numThreads the size of the threadpool + * @param maxNumFileProcessed the maximum number of files to read on the CPU side and waiting to be + * processed on the GPU. This affects the amount of host memory used. + * @param filterHandler GpuParquetFileFilterHandler used to filter the parquet blocks + * @param filters filters passed into the filterHandler */ class MultiFileParquetPartitionReader( conf: Configuration, - splits: Array[PartitionedFile], - clippedBlocks: Seq[ParquetFileInfoWithSingleBlockMeta], + files: Array[PartitionedFile], isSchemaCaseSensitive: Boolean, readDataSchema: StructType, debugDumpPrefix: String, maxReadBatchSizeRows: Integer, maxReadBatchSizeBytes: Long, execMetrics: Map[String, SQLMetric], - partitionSchema: StructType) + partitionSchema: StructType, + numThreads: Int, + maxNumFileProcessed: Int, + filterHandler: GpuParquetFileFilterHandler, + filters: Array[Filter]) extends FileParquetPartitionReaderBase(conf, isSchemaCaseSensitive, readDataSchema, debugDumpPrefix, execMetrics) { - private val blockIterator: BufferedIterator[ParquetFileInfoWithSingleBlockMeta] = - clippedBlocks.iterator.buffered + case class HostMemoryBuffersWithMetaData(isCorrectRebaseMode: Boolean, clippedSchema: MessageType, + partValues: InternalRow, memBuffersAndSizes: Array[(HostMemoryBuffer, Long)], + fileName: String, fileStart: Long, fileLength: Long) - private def addPartitionValues( - batch: Option[ColumnarBatch], - inPartitionValues: InternalRow): Option[ColumnarBatch] = { - batch.map { cb => - val partitionValues = inPartitionValues.toSeq(partitionSchema) - val partitionScalars = ColumnarPartitionReaderWithPartitionValues - .createPartitionValues(partitionValues, partitionSchema) - withResource(partitionScalars) { scalars => - ColumnarPartitionReaderWithPartitionValues.addPartitionValues(cb, scalars) + private var filesToRead = 0 + private var currentFileHostBuffers: Option[HostMemoryBuffersWithMetaData] = None + private var isInitted = false + private val tasks = new ConcurrentLinkedQueue[Future[HostMemoryBuffersWithMetaData]]() + private val tasksToRun = new Queue[ReadBatchRunner]() + + private class ReadBatchRunner(filterHandler: GpuParquetFileFilterHandler, + file: PartitionedFile, + conf: Configuration, + filters: Array[Filter]) extends Callable[HostMemoryBuffersWithMetaData] with Logging { + + private var blockChunkIter: BufferedIterator[BlockMetaData] = null + + /** + * Returns the host memory buffers and file meta data for the file processed. + * If there was an error then the error field is set. If there were no blocks the buffer + * is returned as null. If there were no columns but rows (count() operation) then the + * buffer is null and the size is the number of rows. + * + * Note that the TaskContext is not set in these threads and should not be used. + */ + override def call(): HostMemoryBuffersWithMetaData = { + val hostBuffers = new ArrayBuffer[(HostMemoryBuffer, Long)] + try { + val fileBlockMeta = filterHandler.filterBlocks(file, conf, filters, readDataSchema) + if (fileBlockMeta.blocks.length == 0) { + // no blocks so return null buffer and size 0 + return HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode, + fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, 0)), + file.filePath, file.start, file.length) + } + blockChunkIter = fileBlockMeta.blocks.iterator.buffered + if (isDone) { + // got close before finishing + HostMemoryBuffersWithMetaData( + fileBlockMeta.isCorrectedRebaseMode, + fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, 0)), + file.filePath, file.start, file.length) + } else { + if (readDataSchema.isEmpty) { + val numRows = fileBlockMeta.blocks.map(_.getRowCount).sum.toInt + // overload size to be number of rows with null buffer + HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode, + fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, numRows)), + file.filePath, file.start, file.length) + + } else { + val filePath = new Path(new URI(file.filePath)) + while (blockChunkIter.hasNext) { + val blocksToRead = populateCurrentBlockChunk(blockChunkIter, + maxReadBatchSizeRows, maxReadBatchSizeBytes) + val blockTotalSize = blocksToRead.map(_.getTotalByteSize).sum + hostBuffers += readPartFile(blocksToRead, fileBlockMeta.schema, filePath) + } + if (isDone) { + // got close before finishing + hostBuffers.foreach(_._1.safeClose()) + HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode, + fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, 0)), + file.filePath, file.start, file.length) + } else { + HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode, + fileBlockMeta.schema, fileBlockMeta.partValues, hostBuffers.toArray, + file.filePath, file.start, file.length) + } + } + } + } catch { + case e: Throwable => + hostBuffers.foreach(_._1.safeClose()) + throw e } } } - override def next(): Boolean = { - batch.foreach(_.close()) - batch = None - if (!isExhausted) { - if (!blockIterator.hasNext) { - isExhausted = true - metrics("peakDevMemory") += maxDeviceMemory - } else { - batch = readBatch() - } + private def initAndStartReaders(): Unit = { + // limit the number we submit at once according to the config if set + val limit = math.min(maxNumFileProcessed, files.length) + for (i <- 0 until limit) { + val file = files(i) + // Add these in the order as we got them so that we can make sure + // we process them in the same order as CPU would. + tasks.add(MultiFileThreadPoolFactory.submitToThreadPool( + new ReadBatchRunner(filterHandler, file, conf, filters), numThreads)) } - // This is odd, but some operators return data even when there is no input so we need to - // be sure that we grab the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) - batch.isDefined + // queue up any left to add once others finish + for (i <- limit until files.length) { + val file = files(i) + tasksToRun.enqueue(new ReadBatchRunner(filterHandler, file, conf, filters)) + } + isInitted = true + filesToRead = files.length } - private def reallocHostBufferAndCopy( - in: HostMemoryInputStream, - newSizeEstimate: Long): (HostMemoryBuffer, HostMemoryOutputStream) = { - // realloc memory and copy - closeOnExcept(HostMemoryBuffer.allocate(newSizeEstimate)) { newhmb => - val newout = new HostMemoryOutputStream(newhmb) - IOUtils.copy(in, newout) - (newhmb, newout) + private def readBatch( + fileBufsAndMeta: HostMemoryBuffersWithMetaData): Option[ColumnarBatch] = { + val memBuffersAndSize = fileBufsAndMeta.memBuffersAndSizes + val (hostbuffer, size) = memBuffersAndSize.head + val nextBatch = readBufferToTable(fileBufsAndMeta.isCorrectRebaseMode, + fileBufsAndMeta.clippedSchema, fileBufsAndMeta.partValues, + hostbuffer, size, fileBufsAndMeta.fileName) + + if (memBuffersAndSize.length > 1) { + val updatedBuffers = memBuffersAndSize.drop(1) + currentFileHostBuffers = Some(fileBufsAndMeta.copy(memBuffersAndSizes = updatedBuffers)) + } else { + currentFileHostBuffers = None } + nextBatch } - private def readPartFiles( - blocks: Seq[(Path, BlockMetaData)], - clippedSchema: MessageType): (HostMemoryBuffer, Long) = { - withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, - metrics("bufferTime"))) { _ => - // ugly but we want to keep the order - val filesAndBlocks = LinkedHashMap[Path, ArrayBuffer[BlockMetaData]]() - blocks.foreach { info => - if (filesAndBlocks.contains(info._1)) { - filesAndBlocks(info._1) += info._2 - } else { - filesAndBlocks(info._1) = ArrayBuffer(info._2) - } - } + private def getSizeOfHostBuffers(fileInfo: HostMemoryBuffersWithMetaData): Long = { + fileInfo.memBuffersAndSizes.map(_._2).sum + } - var succeeded = false - val allBlocks = blocks.map(_._2) - val initTotalSize = calculateParquetOutputSize(allBlocks, clippedSchema, true) - var hmb = HostMemoryBuffer.allocate(initTotalSize) - var out = new HostMemoryOutputStream(hmb) - try { - out.write(ParquetPartitionReader.PARQUET_MAGIC) - val allOutputBlocks = scala.collection.mutable.ArrayBuffer[BlockMetaData]() - filesAndBlocks.foreach { case (file, blocks) => - withResource(file.getFileSystem(conf).open(file)) { in => - val retBlocks = copyBlocksData(in, out, blocks) - allOutputBlocks ++= retBlocks - } - } - // The footer size can change vs the initial estimated because we are combining more blocks - // and offsets are larger, check to make sure we allocated enough memory before writing. - // Not sure how expensive this is, we could throw exception instead if the written - // size comes out > then the estimated size. - val actualFooterSize = calculateParquetFooterSize(allOutputBlocks, clippedSchema) - val footerPos = out.getPos - // 4 + 4 is for writing size and the ending PARQUET_MAGIC. - val bufferSizeReq = footerPos + actualFooterSize + 4 + 4 - val bufferSize = if (bufferSizeReq > initTotalSize) { - logWarning(s"The original estimated size $initTotalSize is to small, " + - s"reallocing and copying data to bigger buffer size: $bufferSizeReq") - val prevhmb = hmb - val in = new HostMemoryInputStream(prevhmb, footerPos) - val (newhmb, newout) = reallocHostBufferAndCopy(in, bufferSizeReq) - out = newout - hmb = newhmb - prevhmb.close() - bufferSizeReq - } else { - // we didn't change the buffer size so return the initial size which is the actual - // size of the buffer - initTotalSize - } - writeFooter(out, allOutputBlocks, clippedSchema) - BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt) - out.write(ParquetPartitionReader.PARQUET_MAGIC) - succeeded = true - // triple check we didn't go over memory - if (out.getPos > bufferSize) { - throw new QueryExecutionException(s"Calculated buffer size $bufferSize is to " + - s"small, actual written: ${out.getPos}") - } - (hmb, out.getPos) - } finally { - if (!succeeded) { - hmb.close() - } - } + private def addNextTaskIfNeeded(): Unit = { + if (tasksToRun.size > 0 && !isDone) { + val runner = tasksToRun.dequeue() + tasks.add(MultiFileThreadPoolFactory.submitToThreadPool(runner, numThreads)) } } - private def readBatch(): Option[ColumnarBatch] = { + override def next(): Boolean = { withResource(new NvtxWithMetrics("Parquet readBatch", NvtxColor.GREEN, - metrics(TOTAL_TIME))) { _ => - val (isCorrectRebaseMode, clippedSchema, partValues, seqPathsAndBlocks) = - populateCurrentBlockChunk() - if (readDataSchema.isEmpty) { - // not reading any data, so return a degenerate ColumnarBatch with the row count - val numRows = seqPathsAndBlocks.map(_._2.getRowCount).sum.toInt - if (numRows == 0) { - None - } else { - Some(new ColumnarBatch(Array.empty, numRows.toInt)) + metrics(TOTAL_TIME))) { _ => + if (isInitted == false) { + initAndStartReaders() + } + batch.foreach(_.close()) + batch = None + // if we have batch left from the last file read return it + if (currentFileHostBuffers.isDefined) { + if (getSizeOfHostBuffers(currentFileHostBuffers.get) == 0) { + next() } + batch = readBatch(currentFileHostBuffers.get) } else { - val table = readToTable(seqPathsAndBlocks, clippedSchema, isCorrectRebaseMode) - try { - val maybeBatch = table.map(GpuColumnVector.from) - maybeBatch.foreach { batch => - logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes") + currentFileHostBuffers = None + if (filesToRead > 0 && !isDone) { + val fileBufsAndMeta = tasks.poll.get() + filesToRead -= 1 + InputFileUtils.setInputFileBlock(fileBufsAndMeta.fileName, fileBufsAndMeta.fileStart, + fileBufsAndMeta.fileLength) + + if (getSizeOfHostBuffers(fileBufsAndMeta) == 0) { + // if sizes are 0 means no rows and no data so skip to next file + // file data was empty so submit another task if any were waiting + addNextTaskIfNeeded() + next() + } else { + batch = readBatch(fileBufsAndMeta) + // the data is copied to GPU so submit another task if we were limited + addNextTaskIfNeeded() } - // we have to add partition values here for this batch, we already verified that - // its not different for all the blocks in this batch - addPartitionValues(maybeBatch, partValues) - } finally { - table.foreach(_.close()) + } else { + isDone = true + metrics("peakDevMemory") += maxDeviceMemory } } } - } - private def readToTable( - currentChunkedBlocks: Seq[(Path, BlockMetaData)], - clippedSchema: MessageType, - isCorrectRebaseMode: Boolean): Option[Table] = { - if (currentChunkedBlocks.isEmpty) { - return None + // this shouldn't happen but if somehow the batch is None and we still + // have work left skip to the next file + if (!batch.isDefined && filesToRead > 0 && !isDone) { + next() } - val (dataBuffer, dataSize) = readPartFiles(currentChunkedBlocks, clippedSchema) - try { - if (dataSize == 0) { - None - } else { - if (debugDumpPrefix != null) { - dumpParquetData(dataBuffer, dataSize, splits) - } - val parseOpts = ParquetOptions.builder() - .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) - .includeColumn(readDataSchema.fieldNames:_*).build() - // about to start using the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) + // This is odd, but some operators return data even when there is no input so we need to + // be sure that we grab the GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + batch.isDefined + } - val table = withResource(new NvtxWithMetrics("Parquet decode", NvtxColor.DARK_GREEN, - metrics(GPU_DECODE_TIME))) { _ => - Table.readParquet(parseOpts, dataBuffer, 0, dataSize) + override def close(): Unit = { + // this is more complicated because threads might still be processing files + // in cases close got called early for like limit() calls + isDone = true + currentFileHostBuffers.foreach { current => + current.memBuffersAndSizes.foreach { case (buf, size) => + if (buf != null) { + buf.close() } - closeOnExcept(table) { _ => - if (!isCorrectRebaseMode) { - (0 until table.getNumberOfColumns).foreach { i => - if (RebaseHelper.isDateTimeRebaseNeededRead(table.getColumn(i))) { - throw RebaseHelper.newRebaseExceptionInRead("Parquet") - } - } - } - maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) - if (readDataSchema.length < table.getNumberOfColumns) { - throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " + - s"but read ${table.getNumberOfColumns} from $currentChunkedBlocks") + } + } + currentFileHostBuffers = None + batch.foreach(_.close()) + batch = None + tasks.asScala.foreach { task => + if (task.isDone()) { + task.get.memBuffersAndSizes.foreach { case (buf, size) => + if (buf != null) { + buf.close() } } - metrics(NUM_OUTPUT_BATCHES) += 1 - Some(evolveSchemaIfNeededAndClose(table, splits.mkString(","), clippedSchema)) + } else { + // Note we are not interrupting thread here so it + // will finish reading and then just discard. If we + // interrupt HDFS logs warnings about being interrupted. + task.cancel(false) } - } finally { - dataBuffer.close() } } - private def populateCurrentBlockChunk(): - (Boolean, MessageType, InternalRow, Seq[(Path, BlockMetaData)]) = { + private def addPartitionValues( + batch: Option[ColumnarBatch], + inPartitionValues: InternalRow): Option[ColumnarBatch] = { + batch.map { cb => + val partitionValues = inPartitionValues.toSeq(partitionSchema) + val partitionScalars = ColumnarPartitionReaderWithPartitionValues + .createPartitionValues(partitionValues, partitionSchema) + withResource(partitionScalars) { scalars => + ColumnarPartitionReaderWithPartitionValues.addPartitionValues(cb, scalars) + } + } + } - val currentChunk = new ArrayBuffer[(Path, BlockMetaData)] - var numRows: Long = 0 - var numBytes: Long = 0 - var numParquetBytes: Long = 0 - var currentFile: Path = null - var currentPartitionValues: InternalRow = null - var currentClippedSchema: MessageType = null - var currentIsCorrectRebaseMode: Boolean = false + private def readBufferToTable( + isCorrectRebaseMode: Boolean, + clippedSchema: MessageType, + partValues: InternalRow, + hostBuffer: HostMemoryBuffer, + dataSize: Long, + fileName: String): Option[ColumnarBatch] = { + if (dataSize == 0) { + // shouldn't ever get here + None + } + // not reading any data, so return a degenerate ColumnarBatch with the row count + if (hostBuffer == null) { + return Some(new ColumnarBatch(Array.empty, dataSize.toInt)) + } + val table = withResource(hostBuffer) { _ => + if (debugDumpPrefix != null) { + dumpParquetData(hostBuffer, dataSize, files) + } + val parseOpts = ParquetOptions.builder() + .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) + .includeColumn(readDataSchema.fieldNames: _*).build() - @tailrec - def readNextBatch(): Unit = { - if (blockIterator.hasNext) { - if (currentFile == null) { - currentFile = blockIterator.head.filePath - currentPartitionValues = blockIterator.head.partValues - currentClippedSchema = blockIterator.head.schema - currentIsCorrectRebaseMode = blockIterator.head.isCorrectedRebaseMode - } - if (currentFile != blockIterator.head.filePath) { - // We need to ensure all files we are going to combine have the same datetime rebase mode. - if (blockIterator.head.isCorrectedRebaseMode != currentIsCorrectRebaseMode) { - logInfo("datetime rebase mode for the next file " + - s"${blockIterator.head.filePath} is different then current file $currentFile, " + - s"splitting into another batch.") - return - } + // about to start using the GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get()) - // check to see if partitionValues different, then have to split it - if (blockIterator.head.partValues != currentPartitionValues) { - logInfo(s"Partition values for the next file ${blockIterator.head.filePath}" + - s" doesn't match current $currentFile, splitting it into another batch!") - return - } - val schemaNextfile = - blockIterator.head.schema.asGroupType().getFields.asScala.map(_.getName) - val schemaCurrentfile = - currentClippedSchema.asGroupType().getFields.asScala.map(_.getName) - if (!schemaNextfile.sameElements(schemaCurrentfile)) { - logInfo(s"File schema for the next file ${blockIterator.head.filePath}" + - s" doesn't match current $currentFile, splitting it into another batch!") - return + val table = withResource(new NvtxWithMetrics("Parquet decode", NvtxColor.DARK_GREEN, + metrics(GPU_DECODE_TIME))) { _ => + Table.readParquet(parseOpts, hostBuffer, 0, dataSize) + } + closeOnExcept(table) { _ => + if (!isCorrectRebaseMode) { + (0 until table.getNumberOfColumns).foreach { i => + if (RebaseHelper.isDateTimeRebaseNeededRead(table.getColumn(i))) { + throw RebaseHelper.newRebaseExceptionInRead("Parquet") + } } - currentFile = blockIterator.head.filePath - currentPartitionValues = blockIterator.head.partValues - currentClippedSchema = blockIterator.head.schema - } - val peekedRowGroup = blockIterator.head.blockMeta - if (peekedRowGroup.getRowCount > Integer.MAX_VALUE) { - throw new UnsupportedOperationException("Too many rows in split") } - - if (numRows == 0 || numRows + peekedRowGroup.getRowCount <= maxReadBatchSizeRows) { - val estimatedBytes = GpuBatchUtils.estimateGpuMemory(readDataSchema, - peekedRowGroup.getRowCount) - if (numBytes == 0 || numBytes + estimatedBytes <= maxReadBatchSizeBytes) { - val nextBlock = blockIterator.next() - val nextTuple = (nextBlock.filePath, nextBlock.blockMeta) - currentChunk += nextTuple - numRows += currentChunk.last._2.getRowCount - numParquetBytes += currentChunk.last._2.getTotalByteSize - numBytes += estimatedBytes - readNextBatch() - } + maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) + if (readDataSchema.length < table.getNumberOfColumns) { + throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " + + s"but read ${table.getNumberOfColumns} from $fileName") } } + metrics(NUM_OUTPUT_BATCHES) += 1 + Some(evolveSchemaIfNeededAndClose(table, fileName, clippedSchema)) + } + try { + val maybeBatch = table.map(GpuColumnVector.from) + maybeBatch.foreach { batch => + logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes") + } + // we have to add partition values here for this batch, we already verified that + // its not different for all the blocks in this batch + addPartitionValues(maybeBatch, partValues) + } finally { + table.foreach(_.close()) } - readNextBatch() - logDebug(s"Loaded $numRows rows from Parquet. Parquet bytes read: $numParquetBytes. " + - s"Estimated GPU bytes: $numBytes") - (currentIsCorrectRebaseMode, currentClippedSchema, currentPartitionValues, currentChunk) } } @@ -937,14 +1028,14 @@ class ParquetPartitionReader( FileParquetPartitionReaderBase(conf, isSchemaCaseSensitive, readDataSchema, debugDumpPrefix, execMetrics) { - private val blockIterator : BufferedIterator[BlockMetaData] = clippedBlocks.iterator.buffered + private val blockIterator: BufferedIterator[BlockMetaData] = clippedBlocks.iterator.buffered override def next(): Boolean = { batch.foreach(_.close()) batch = None - if (!isExhausted) { + if (!isDone) { if (!blockIterator.hasNext) { - isExhausted = true + isDone = true metrics("peakDevMemory") += maxDeviceMemory } else { batch = readBatch() @@ -956,42 +1047,11 @@ class ParquetPartitionReader( batch.isDefined } - private def readPartFile(blocks: Seq[BlockMetaData]): (HostMemoryBuffer, Long) = { - withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, - metrics("bufferTime"))) { _ => - withResource(filePath.getFileSystem(conf).open(filePath)) { in => - var succeeded = false - val estTotalSize = calculateParquetOutputSize(blocks, clippedParquetSchema, false) - val hmb = - HostMemoryBuffer.allocate(estTotalSize) - try { - val out = new HostMemoryOutputStream(hmb) - out.write(ParquetPartitionReader.PARQUET_MAGIC) - val outputBlocks = copyBlocksData(in, out, blocks) - val footerPos = out.getPos - writeFooter(out, outputBlocks, clippedParquetSchema) - BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt) - out.write(ParquetPartitionReader.PARQUET_MAGIC) - succeeded = true - // check we didn't go over memory - if (out.getPos > estTotalSize) { - throw new QueryExecutionException(s"Calculated buffer size $estTotalSize is to " + - s"small, actual written: ${out.getPos}") - } - (hmb, out.getPos) - } finally { - if (!succeeded) { - hmb.close() - } - } - } - } - } - private def readBatch(): Option[ColumnarBatch] = { withResource(new NvtxWithMetrics("Parquet readBatch", NvtxColor.GREEN, metrics(TOTAL_TIME))) { _ => - val currentChunkedBlocks = populateCurrentBlockChunk() + val currentChunkedBlocks = populateCurrentBlockChunk(blockIterator, + maxReadBatchSizeRows, maxReadBatchSizeBytes) if (readDataSchema.isEmpty) { // not reading any data, so return a degenerate ColumnarBatch with the row count val numRows = currentChunkedBlocks.map(_.getRowCount).sum.toInt @@ -1019,7 +1079,7 @@ class ParquetPartitionReader( if (currentChunkedBlocks.isEmpty) { return None } - val (dataBuffer, dataSize) = readPartFile(currentChunkedBlocks) + val (dataBuffer, dataSize) = readPartFile(currentChunkedBlocks, clippedParquetSchema, filePath) try { if (dataSize == 0) { None @@ -1059,42 +1119,6 @@ class ParquetPartitionReader( dataBuffer.close() } } - - private def populateCurrentBlockChunk(): Seq[BlockMetaData] = { - - val currentChunk = new ArrayBuffer[BlockMetaData] - var numRows: Long = 0 - var numBytes: Long = 0 - var numParquetBytes: Long = 0 - - @tailrec - def readNextBatch(): Unit = { - if (blockIterator.hasNext) { - val peekedRowGroup = blockIterator.head - if (peekedRowGroup.getRowCount > Integer.MAX_VALUE) { - throw new UnsupportedOperationException("Too many rows in split") - } - if (numRows == 0 || numRows + peekedRowGroup.getRowCount <= maxReadBatchSizeRows) { - val estimatedBytes = GpuBatchUtils.estimateGpuMemory(readDataSchema, - peekedRowGroup.getRowCount) - if (numBytes == 0 || numBytes + estimatedBytes <= maxReadBatchSizeBytes) { - currentChunk += blockIterator.next() - numRows += currentChunk.last.getRowCount - numParquetBytes += currentChunk.last.getTotalByteSize - numBytes += estimatedBytes - readNextBatch() - } - } - } - } - - readNextBatch() - - logDebug(s"Loaded $numRows rows from Parquet. Parquet bytes read: $numParquetBytes. " + - s"Estimated GPU bytes: $numBytes") - - currentChunk - } } object ParquetPartitionReader { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index f33a52c3939..f434e7a3fc5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -165,48 +165,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { plan.expressions.exists(disableCoalesceUntilInput) } - private def disableScanUntilInput(exec: Expression): Boolean = { - exec match { - case _: InputFileName => true - case _: InputFileBlockStart => true - case _: InputFileBlockLength => true - case _: GpuInputFileName => true - case _: GpuInputFileBlockStart => true - case _: GpuInputFileBlockLength => true - case e => e.children.exists(disableScanUntilInput) - } - } - - private def disableScanUntilInput(plan: SparkPlan): Boolean = { - plan.expressions.exists(disableScanUntilInput) - } - - // This walks from the output to the input to look for any uses of InputFileName, - // InputFileBlockStart, or InputFileBlockLength when we use a Parquet read because - // we can't support the small file optimization when this is used. - private def updateScansForInput(plan: SparkPlan, - disableUntilInput: Boolean = false): SparkPlan = plan match { - case batchScan: GpuBatchScanExec => - if (batchScan.scan.isInstanceOf[GpuParquetScanBase] && - (disableUntilInput || disableScanUntilInput(batchScan))) { - ShimLoader.getSparkShims.copyParquetBatchScanExec(batchScan, false) - } else { - batchScan - } - case fileSourceScan: GpuFileSourceScanExec => - if (fileSourceScan.supportsSmallFileOpt == true && - (disableUntilInput || disableScanUntilInput(fileSourceScan))) { - ShimLoader.getSparkShims.copyFileSourceScanExec(fileSourceScan, false) - } else { - fileSourceScan - } - case p => - val planDisableUntilInput = disableScanUntilInput(p) && hasDirectLineToInput(p) - p.withNewChildren(p.children.map(c => { - updateScansForInput(c, planDisableUntilInput || disableUntilInput) - })) - } - // This walks from the output to the input so disableUntilInput can walk its way from when // we hit something that cannot allow for coalesce up until the input private def insertCoalesce(plan: SparkPlan, @@ -365,7 +323,6 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { this.conf = new RapidsConf(plan.conf) if (conf.isSqlEnabled) { var updatedPlan = insertHashOptimizeSorts(plan) - updatedPlan = updateScansForInput(updatedPlan) updatedPlan = insertCoalesce(insertColumnarFromGpu(updatedPlan)) updatedPlan = optimizeCoalesce(if (plan.conf.adaptiveExecutionEnabled) { optimizeAdaptiveTransitions(updatedPlan) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 34b3c65d619..4e78991fc93 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -446,13 +446,33 @@ object RapidsConf { .booleanConf .createWithDefault(true) - val ENABLE_SMALL_FILES_PARQUET = conf("spark.rapids.sql.format.parquet.smallFiles.enabled") - .doc("When set to true, handles reading multiple small files within a partition more " + - "efficiently by combining multiple files on the CPU side before sending to the GPU. " + - "Recommended unless user needs mergeSchema option or schema evolution.") + val ENABLE_MULTITHREAD_PARQUET_READS = conf( + "spark.rapids.sql.format.parquet.multiThreadedRead.enabled") + .doc("When set to true, reads multiple small files within a partition more efficiently " + + "by reading each file in a separate thread in parallel on the CPU side before " + + "sending to the GPU. Limited by " + + "spark.rapids.sql.format.parquet.multiThreadedRead.numThreads " + + "and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFileProcessed") .booleanConf .createWithDefault(true) + val PARQUET_MULTITHREAD_READ_NUM_THREADS = + conf("spark.rapids.sql.format.parquet.multiThreadedRead.numThreads") + .doc("The maximum number of threads, on the executor, to use for reading small " + + "parquet files in parallel. This can not be changed at runtime after the executor has" + + "started.") + .integerConf + .createWithDefault(20) + + val PARQUET_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL = + conf("spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel") + .doc("A limit on the maximum number of files per task processed in parallel on the CPU " + + "side before the file is sent to the GPU. This affects the amount of host memory used " + + "when reading the files in parallel.") + .integerConf + .checkValue(v => v > 0, "The maximum number of files must be greater than 0.") + .createWithDefault(Integer.MAX_VALUE) + val ENABLE_PARQUET_READ = conf("spark.rapids.sql.format.parquet.read.enabled") .doc("When set to false disables parquet input acceleration") .booleanConf @@ -695,11 +715,11 @@ object RapidsConf { if (asTable) { println("") // scalastyle:off line.size.limit - println("""## Supported GPU Operators and Fine Tuning - |_The RAPIDS Accelerator for Apache Spark_ can be configured to enable or disable specific - |GPU accelerated expressions. Enabled expressions are candidates for GPU execution. If the - |expression is configured as disabled, the accelerator plugin will not attempt replacement, - |and it will run on the CPU. + println("""## Supported GPU Operators and Fine Tuning + |_The RAPIDS Accelerator for Apache Spark_ can be configured to enable or disable specific + |GPU accelerated expressions. Enabled expressions are candidates for GPU execution. If the + |expression is configured as disabled, the accelerator plugin will not attempt replacement, + |and it will run on the CPU. | |Please leverage the [`spark.rapids.sql.explain`](#sql.explain) setting to get |feedback from the plugin as to why parts of a query may not be executing on the GPU. @@ -851,7 +871,11 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET) - lazy val isParquetSmallFilesEnabled: Boolean = get(ENABLE_SMALL_FILES_PARQUET) + lazy val isParquetMultiThreadReadEnabled: Boolean = get(ENABLE_MULTITHREAD_PARQUET_READS) + + lazy val parquetMultiThreadReadNumThreads: Int = get(PARQUET_MULTITHREAD_READ_NUM_THREADS) + + lazy val maxNumParquetFilesParallel: Int = get(PARQUET_MULTITHREAD_READ_MAX_NUM_FILES_PARALLEL) lazy val isParquetReadEnabled: Boolean = get(ENABLE_PARQUET_READ) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInputFileBlock.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInputFileBlock.scala index fd0deed6bc2..b310ab39d95 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInputFileBlock.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInputFileBlock.scala @@ -53,6 +53,12 @@ case class GpuInputFileName() extends GpuLeafExpression { } } +object InputFileUtils { + def setInputFileBlock(filePath: String, start: Long, length: Long): Unit = { + InputFileBlockHolder.set(filePath, start, length) + } +} + /** * Returns the start offset of the block being read, or -1 if not available. * This is extra difficult because we cannot coalesce batches in between when this