From bc3003fe6c7976685415524774e76cfdfc6e31cd Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 16 Apr 2024 11:03:23 -0700 Subject: [PATCH 01/14] Refactor Parquet reader Signed-off-by: Nghia Truong --- .../rapids/iceberg/parquet/GpuParquet.java | 11 ++-- .../iceberg/parquet/GpuParquetReader.java | 8 +-- .../spark/source/GpuBatchDataReader.java | 9 +-- .../spark/source/GpuMultiFileBatchReader.java | 12 ++-- .../iceberg/spark/source/GpuSparkScan.java | 17 +++-- .../spark/rapids/GpuMultiFileReader.scala | 8 ++- .../nvidia/spark/rapids/GpuParquetScan.scala | 64 +++++++++++-------- .../com/nvidia/spark/rapids/RapidsConf.scala | 42 +++++++----- 8 files changed, 105 insertions(+), 66 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquet.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquet.java index 683902941e9..2c92d92f854 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquet.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquet.java @@ -69,7 +69,7 @@ public static class ReadBuilder { private long maxBatchSizeBytes = Integer.MAX_VALUE; private long targetBatchSizeBytes = Integer.MAX_VALUE; private boolean useChunkedReader = false; - private boolean useSubPageChunked = false; + private long maxChunkedReaderMemoryUsageSizeBytes = 0; private scala.Option debugDumpPrefix = null; private boolean debugDumpAlways = false; private scala.collection.immutable.Map metrics = null; @@ -141,9 +141,10 @@ public ReadBuilder withTargetBatchSizeBytes(long targetBatchSizeBytes) { return this; } - public ReadBuilder withUseChunkedReader(boolean useChunkedReader, boolean useSubPageChunked) { + public ReadBuilder withUseChunkedReader(boolean useChunkedReader, + long maxChunkedReaderMemoryUsageSizeBytes) { this.useChunkedReader = useChunkedReader; - this.useSubPageChunked = useSubPageChunked; + this.maxChunkedReaderMemoryUsageSizeBytes = maxChunkedReaderMemoryUsageSizeBytes; return this; } @@ -164,8 +165,8 @@ public CloseableIterable build() { InternalRow.empty(), file.location(), start, length); return new GpuParquetReader(file, projectSchema, options, nameMapping, filter, caseSensitive, idToConstant, deleteFilter, partFile, conf, maxBatchSizeRows, maxBatchSizeBytes, - targetBatchSizeBytes, useChunkedReader, useSubPageChunked, debugDumpPrefix, - debugDumpAlways, metrics); + targetBatchSizeBytes, useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + debugDumpPrefix, debugDumpAlways, metrics); } } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java index 724e32707db..47b649af6ed 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquetReader.java @@ -87,7 +87,7 @@ public class GpuParquetReader extends CloseableGroup implements CloseableIterabl private final long maxBatchSizeBytes; private final long targetBatchSizeBytes; private final boolean useChunkedReader; - private final boolean useSubPageChunked; + private final long maxChunkedReaderMemoryUsageSizeBytes; private final scala.Option debugDumpPrefix; private final boolean debugDumpAlways; private final scala.collection.immutable.Map metrics; @@ -98,7 +98,7 @@ public GpuParquetReader( Map idToConstant, GpuDeleteFilter deleteFilter, PartitionedFile partFile, Configuration conf, int maxBatchSizeRows, long maxBatchSizeBytes, long targetBatchSizeBytes, boolean useChunkedReader, - boolean useSubPageChunked, + long maxChunkedReaderMemoryUsageSizeBytes, scala.Option debugDumpPrefix, boolean debugDumpAlways, scala.collection.immutable.Map metrics) { this.input = input; @@ -115,7 +115,7 @@ public GpuParquetReader( this.maxBatchSizeBytes = maxBatchSizeBytes; this.targetBatchSizeBytes = targetBatchSizeBytes; this.useChunkedReader = useChunkedReader; - this.useSubPageChunked = useSubPageChunked; + this.maxChunkedReaderMemoryUsageSizeBytes = maxChunkedReaderMemoryUsageSizeBytes; this.debugDumpPrefix = debugDumpPrefix; this.debugDumpAlways = debugDumpAlways; this.metrics = metrics; @@ -143,7 +143,7 @@ public org.apache.iceberg.io.CloseableIterator iterator() { new Path(input.location()), clippedBlocks, fileReadSchema, caseSensitive, partReaderSparkSchema, debugDumpPrefix, debugDumpAlways, maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, useChunkedReader, - useSubPageChunked, + maxChunkedReaderMemoryUsageSizeBytes, metrics, DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuBatchDataReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuBatchDataReader.java index d9ff9c157fa..202ba2c91b3 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuBatchDataReader.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuBatchDataReader.java @@ -48,14 +48,15 @@ class GpuBatchDataReader extends BaseDataReader { private final long maxBatchSizeBytes; private final long targetBatchSizeBytes; private final boolean useChunkedReader; - private final boolean useSubPageChunked; + private final long maxChunkedReaderMemoryUsageSizeBytes; private final scala.Option parquetDebugDumpPrefix; private final boolean parquetDebugDumpAlways; private final scala.collection.immutable.Map metrics; GpuBatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, Configuration conf, int maxBatchSizeRows, long maxBatchSizeBytes, - long targetBatchSizeBytes, boolean useChunkedReader, boolean useSubPageChunked, + long targetBatchSizeBytes, + boolean useChunkedReader, long maxChunkedReaderMemoryUsageSizeBytes, scala.Option parquetDebugDumpPrefix, boolean parquetDebugDumpAlways, scala.collection.immutable.Map metrics) { super(table, task); @@ -67,7 +68,7 @@ class GpuBatchDataReader extends BaseDataReader { this.maxBatchSizeBytes = maxBatchSizeBytes; this.targetBatchSizeBytes = targetBatchSizeBytes; this.useChunkedReader = useChunkedReader; - this.useSubPageChunked = useSubPageChunked; + this.maxChunkedReaderMemoryUsageSizeBytes = maxChunkedReaderMemoryUsageSizeBytes; this.parquetDebugDumpPrefix = parquetDebugDumpPrefix; this.parquetDebugDumpAlways = parquetDebugDumpAlways; this.metrics = metrics; @@ -102,7 +103,7 @@ CloseableIterator open(FileScanTask task) { .withMaxBatchSizeRows(maxBatchSizeRows) .withMaxBatchSizeBytes(maxBatchSizeBytes) .withTargetBatchSizeBytes(targetBatchSizeBytes) - .withUseChunkedReader(useChunkedReader, useSubPageChunked) + .withUseChunkedReader(useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes) .withDebugDump(parquetDebugDumpPrefix, parquetDebugDumpAlways) .withMetrics(metrics); diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java index b27dd01daf6..9c36fe76020 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java @@ -68,7 +68,7 @@ class GpuMultiFileBatchReader extends BaseDataReader { private final long maxGpuColumnSizeBytes; private final boolean useChunkedReader; - private final boolean useSubPageChunked; + private final long maxChunkedReaderMemoryUsageSizeBytes; private final scala.Option parquetDebugDumpPrefix; private final boolean parquetDebugDumpAlways; private final scala.collection.immutable.Map metrics; @@ -87,7 +87,7 @@ class GpuMultiFileBatchReader extends BaseDataReader { GpuMultiFileBatchReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, Configuration conf, int maxBatchSizeRows, long maxBatchSizeBytes, long targetBatchSizeBytes, long maxGpuColumnSizeBytes, - boolean useChunkedReader, boolean useSubPageChunked, + boolean useChunkedReader, long maxChunkedReaderMemoryUsageSizeBytes, scala.Option parquetDebugDumpPrefix, boolean parquetDebugDumpAlways, int numThreads, int maxNumFileProcessed, boolean useMultiThread, FileFormat fileFormat, @@ -102,7 +102,7 @@ class GpuMultiFileBatchReader extends BaseDataReader { this.targetBatchSizeBytes = targetBatchSizeBytes; this.maxGpuColumnSizeBytes = maxGpuColumnSizeBytes; this.useChunkedReader = useChunkedReader; - this.useSubPageChunked = useSubPageChunked; + this.maxChunkedReaderMemoryUsageSizeBytes = maxChunkedReaderMemoryUsageSizeBytes; this.parquetDebugDumpPrefix = parquetDebugDumpPrefix; this.parquetDebugDumpAlways = parquetDebugDumpAlways; this.useMultiThread = useMultiThread; @@ -352,7 +352,7 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles, return new MultiFileCloudParquetPartitionReader(conf, pFiles, this::filterParquetBlocks, caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways, maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes, - useChunkedReader, useSubPageChunked, metrics, partitionSchema, + useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, metrics, partitionSchema, numThreads, maxNumFileProcessed, false, // ignoreMissingFiles false, // ignoreCorruptFiles @@ -428,9 +428,9 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles, return new MultiFileParquetPartitionReader(conf, pFiles, JavaConverters.asScalaBuffer(clippedBlocks).toSeq(), - caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways, useChunkedReader, - useSubPageChunked, + caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways, maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes, + useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, metrics, partitionSchema, numThreads, false, // ignoreMissingFiles false, // ignoreCorruptFiles diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuSparkScan.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuSparkScan.java index 3def72b537b..b7544675d41 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuSparkScan.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuSparkScan.java @@ -283,7 +283,7 @@ private static class MultiFileBatchReader super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), task.getConfiguration(), task.getMaxBatchSizeRows(), task.getMaxBatchSizeBytes(), task.getTargetBatchSizeBytes(), task.getMaxGpuColumnSizeBytes(), task.useChunkedReader(), - task.useSubPageChunked(), + task.maxChunkedReaderMemoryUsageSizeBytes(), task.getParquetDebugDumpPrefix(), task.getParquetDebugDumpAlways(), task.getNumThreads(), task.getMaxNumFileProcessed(), useMultiThread, ff, metrics, queryUsesInputFile); @@ -294,7 +294,7 @@ private static class BatchReader extends GpuBatchDataReader implements Partition BatchReader(ReadTask task, scala.collection.immutable.Map metrics) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), task.getConfiguration(), task.getMaxBatchSizeRows(), task.getMaxBatchSizeBytes(), - task.getTargetBatchSizeBytes(), task.useChunkedReader(), task.useSubPageChunked(), + task.getTargetBatchSizeBytes(), task.useChunkedReader(), task.maxChunkedReaderMemoryUsageSizeBytes(), task.getParquetDebugDumpPrefix(), task.getParquetDebugDumpAlways(), metrics); } } @@ -305,7 +305,7 @@ static class ReadTask implements InputPartition, Serializable { private final String expectedSchemaString; private final boolean caseSensitive; private final boolean useChunkedReader; - private final boolean useSubPageChunked; + private final long maxChunkedReaderMemoryUsageSizeBytes; private final Broadcast confBroadcast; private final int maxBatchSizeRows; private final long maxBatchSizeBytes; @@ -343,7 +343,12 @@ static class ReadTask implements InputPartition, Serializable { this.numThreads = rapidsConf.multiThreadReadNumThreads(); this.maxNumFileProcessed = rapidsConf.maxNumParquetFilesParallel(); this.useChunkedReader = rapidsConf.chunkedReaderEnabled(); - this.useSubPageChunked = rapidsConf.chunkedSubPageReaderEnabled(); + if(rapidsConf.limitChunkedReaderMemoryUsage()) { + double limitRatio = rapidsConf.chunkedReaderMemoryUsageRatio(); + this.maxChunkedReaderMemoryUsageSizeBytes = (long)(limitRatio * this.targetBatchSizeBytes); + } else { + this.maxChunkedReaderMemoryUsageSizeBytes = 0L; + } } @Override @@ -410,8 +415,8 @@ public boolean useChunkedReader() { return useChunkedReader; } - public boolean useSubPageChunked() { - return useSubPageChunked; + public long maxChunkedReaderMemoryUsageSizeBytes() { + return maxChunkedReaderMemoryUsageSizeBytes; } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index f64ed1097b0..eabde5c156b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -214,8 +214,14 @@ abstract class MultiFilePartitionReaderFactoryBase( protected val maxReadBatchSizeRows: Int = rapidsConf.maxReadBatchSizeRows protected val maxReadBatchSizeBytes: Long = rapidsConf.maxReadBatchSizeBytes protected val targetBatchSizeBytes: Long = rapidsConf.gpuTargetBatchSizeBytes - protected val subPageChunked: Boolean = rapidsConf.chunkedSubPageReaderEnabled protected val maxGpuColumnSizeBytes: Long = rapidsConf.maxGpuColumnSizeBytes + protected val useChunkedReader: Boolean = rapidsConf.chunkedReaderEnabled + protected val maxChunkedReaderMemoryUsageSizeBytes: Long = + if(rapidsConf.limitChunkedReaderMemoryUsage) { + (rapidsConf.chunkedReaderMemoryUsageRatio * targetBatchSizeBytes).toLong + } else { + 0L + } private val allCloudSchemes = rapidsConf.getCloudSchemes.toSet override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 4f140f27bf3..1e43b0437e4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -1084,8 +1084,6 @@ case class GpuParquetMultiFilePartitionReaderFactory( rapidsConf, alluxioPathReplacementMap) { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis - private val useChunkedReader = rapidsConf.chunkedReaderEnabled - private val useSubPageChunked = rapidsConf.chunkedSubPageReaderEnabled private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix private val debugDumpAlways = rapidsConf.parquetDebugDumpAlways private val numThreads = rapidsConf.multiThreadReadNumThreads @@ -1151,10 +1149,11 @@ case class GpuParquetMultiFilePartitionReaderFactory( val combineConf = CombineConf(combineThresholdSize, combineWaitTime) new MultiFileCloudParquetPartitionReader(conf, files, filterFunc, isCaseSensitive, debugDumpPrefix, debugDumpAlways, maxReadBatchSizeRows, maxReadBatchSizeBytes, - targetBatchSizeBytes, maxGpuColumnSizeBytes, useChunkedReader, subPageChunked, metrics, - partitionSchema, numThreads, maxNumFileProcessed, ignoreMissingFiles, ignoreCorruptFiles, - readUseFieldId, alluxioPathReplacementMap.getOrElse(Map.empty), alluxioReplacementTaskTime, - queryUsesInputFile, keepReadsInOrderFromConf, combineConf) + targetBatchSizeBytes, maxGpuColumnSizeBytes, + useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + metrics, partitionSchema, numThreads, maxNumFileProcessed, ignoreMissingFiles, + ignoreCorruptFiles, readUseFieldId, alluxioPathReplacementMap.getOrElse(Map.empty), + alluxioReplacementTaskTime, queryUsesInputFile, keepReadsInOrderFromConf, combineConf) } private def filterBlocksForCoalescingReader( @@ -1266,9 +1265,10 @@ case class GpuParquetMultiFilePartitionReaderFactory( _ += TimeUnit.NANOSECONDS.toMillis(filterTime) } new MultiFileParquetPartitionReader(conf, files, clippedBlocks.toSeq, isCaseSensitive, - debugDumpPrefix, debugDumpAlways, useChunkedReader, useSubPageChunked, maxReadBatchSizeRows, - maxReadBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes, metrics, partitionSchema, - numThreads, ignoreMissingFiles, ignoreCorruptFiles, readUseFieldId) + debugDumpPrefix, debugDumpAlways, maxReadBatchSizeRows, maxReadBatchSizeBytes, + targetBatchSizeBytes, maxGpuColumnSizeBytes, + useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + metrics, partitionSchema, numThreads, ignoreMissingFiles, ignoreCorruptFiles, readUseFieldId) } /** @@ -1302,7 +1302,12 @@ case class GpuParquetPartitionReaderFactory( private val targetSizeBytes = rapidsConf.gpuTargetBatchSizeBytes private val maxGpuColumnSizeBytes = rapidsConf.maxGpuColumnSizeBytes private val useChunkedReader = rapidsConf.chunkedReaderEnabled - private val useSubPageChunked = rapidsConf.chunkedSubPageReaderEnabled + private val maxChunkedReaderMemoryUsageSizeBytes = + if(rapidsConf.limitChunkedReaderMemoryUsage) { + (rapidsConf.chunkedReaderMemoryUsageRatio * targetSizeBytes).toLong + } else { + 0L + } private val filterHandler = GpuParquetFileFilterHandler(sqlConf, metrics) private val readUseFieldId = ParquetSchemaClipShims.useFieldId(sqlConf) private val footerReadType = GpuParquetScan.footerReaderHeuristic( @@ -1333,7 +1338,8 @@ case class GpuParquetPartitionReaderFactory( new ParquetPartitionReader(conf, file, singleFileInfo.filePath, singleFileInfo.blocks, singleFileInfo.schema, isCaseSensitive, readDataSchema, debugDumpPrefix, debugDumpAlways, maxReadBatchSizeRows, maxReadBatchSizeBytes, targetSizeBytes, - useChunkedReader, useSubPageChunked, metrics, singleFileInfo.dateRebaseMode, + useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, + metrics, singleFileInfo.dateRebaseMode, singleFileInfo.timestampRebaseMode, singleFileInfo.hasInt96Timestamps, readUseFieldId) } } @@ -1851,6 +1857,11 @@ private case class ParquetSingleDataBlockMeta( * @param debugDumpAlways whether to debug dump always or only on errors * @param maxReadBatchSizeRows soft limit on the maximum number of rows the reader reads per batch * @param maxReadBatchSizeBytes soft limit on the maximum number of bytes the reader reads per batch + * @param targetBatchSizeBytes the target size of a batch + * @param maxGpuColumnSizeBytes the maximum size of a GPU column + * @param useChunkedReader whether to read Parquet by chunks or read all at once + * @param maxChunkedReaderMemoryUsageSizeBytes soft limit on the number of bytes of internal memory + * usage that the reader will use * @param execMetrics metrics * @param partitionSchema Schema of partitions. * @param numThreads the size of the threadpool @@ -1864,12 +1875,12 @@ class MultiFileParquetPartitionReader( override val isSchemaCaseSensitive: Boolean, debugDumpPrefix: Option[String], debugDumpAlways: Boolean, - useChunkedReader: Boolean, - useSubPageChunked: Boolean, maxReadBatchSizeRows: Integer, maxReadBatchSizeBytes: Long, targetBatchSizeBytes: Long, maxGpuColumnSizeBytes: Long, + useChunkedReader: Boolean, + maxChunkedReaderMemoryUsageSizeBytes: Long, override val execMetrics: Map[String, GpuMetric], partitionSchema: StructType, numThreads: Int, @@ -1975,7 +1986,7 @@ class MultiFileParquetPartitionReader( // About to start using the GPU GpuSemaphore.acquireIfNecessary(TaskContext.get()) - MakeParquetTableProducer(useChunkedReader, useSubPageChunked, + MakeParquetTableProducer(useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, conf, currentTargetBatchSize, parseOpts, dataBuffer, 0, dataSize, metrics, extraInfo.dateRebaseMode, extraInfo.timestampRebaseMode, @@ -2033,6 +2044,11 @@ class MultiFileParquetPartitionReader( * @param debugDumpAlways whether to debug dump always or only on errors * @param maxReadBatchSizeRows soft limit on the maximum number of rows the reader reads per batch * @param maxReadBatchSizeBytes soft limit on the maximum number of bytes the reader reads per batch + * @param targetBatchSizeBytes the target size of the batch + * @param maxGpuColumnSizeBytes the maximum size of a GPU column + * @param useChunkedReader whether to read Parquet by chunks or read all at once + * @param maxChunkedReaderMemoryUsageSizeBytes soft limit on the number of bytes of internal memory + * usage that the reader will use * @param execMetrics metrics * @param partitionSchema Schema of partitions. * @param numThreads the size of the threadpool @@ -2060,7 +2076,7 @@ class MultiFileCloudParquetPartitionReader( targetBatchSizeBytes: Long, maxGpuColumnSizeBytes: Long, useChunkedReader: Boolean, - subPageChunked: Boolean, + maxChunkedReaderMemoryUsageSizeBytes: Long, override val execMetrics: Map[String, GpuMetric], partitionSchema: StructType, numThreads: Int, @@ -2549,7 +2565,8 @@ class MultiFileCloudParquetPartitionReader( // The MakeParquetTableProducer will close the input buffer, and that would be bad // because we don't want to close it until we know that we are done with it hostBuffer.incRefCount() - val tableReader = MakeParquetTableProducer(useChunkedReader, subPageChunked, + val tableReader = MakeParquetTableProducer(useChunkedReader, + maxChunkedReaderMemoryUsageSizeBytes, conf, targetBatchSizeBytes, parseOpts, hostBuffer, 0, dataSize, metrics, @@ -2581,7 +2598,7 @@ class MultiFileCloudParquetPartitionReader( object MakeParquetTableProducer extends Logging { def apply( useChunkedReader: Boolean, - useSubPageChunked: Boolean, + maxChunkedReaderMemoryUsageSizeBytes: Long, conf: Configuration, chunkSizeByteLimit: Long, opts: ParquetOptions, @@ -2601,12 +2618,8 @@ object MakeParquetTableProducer extends Logging { debugDumpAlways: Boolean ): GpuDataProducer[Table] = { if (useChunkedReader) { - val passReadLimit = if (useSubPageChunked) { - 4 * chunkSizeByteLimit - } else { - 0L - } - ParquetTableReader(conf, chunkSizeByteLimit, passReadLimit, opts, buffer, offset, + ParquetTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes, + opts, buffer, offset, len, metrics, dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, isSchemaCaseSensitive, useFieldId, readDataSchema, clippedParquetSchema, splits, debugDumpPrefix, debugDumpAlways) @@ -2750,7 +2763,7 @@ class ParquetPartitionReader( maxReadBatchSizeBytes: Long, targetBatchSizeBytes: Long, useChunkedReader: Boolean, - useSubPageChunked: Boolean, + maxChunkedReaderMemoryUsageSizeBytes: Long, override val execMetrics: Map[String, GpuMetric], dateRebaseMode: DateTimeRebaseMode, timestampRebaseMode: DateTimeRebaseMode, @@ -2818,7 +2831,8 @@ class ParquetPartitionReader( // Inc the ref count because MakeParquetTableProducer will try to close the dataBuffer // which we don't want until we know that the retry is done with it. dataBuffer.incRefCount() - val producer = MakeParquetTableProducer(useChunkedReader, useSubPageChunked, conf, + val producer = MakeParquetTableProducer(useChunkedReader, + maxChunkedReaderMemoryUsageSizeBytes, conf, targetBatchSizeBytes, parseOpts, dataBuffer, 0, dataSize, metrics, dateRebaseMode, timestampRebaseMode, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index e0d5b85d819..a3ce7a96cd7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -566,6 +566,29 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") s"Batch size must be positive and not exceed ${Integer.MAX_VALUE} bytes.") .createWithDefault(1 * 1024 * 1024 * 1024) // 1 GiB is the default + val CHUNKED_READER = conf("spark.rapids.sql.reader.chunked") + .doc("Enable a chunked reader where possible. A chunked reader allows " + + "reading highly compressed data that could not be read otherwise, but at the expense " + + "of more GPU memory, and in some cases more GPU computation. "+ + "Currently this only supports ORC and Parquet formats.") + .booleanConf + .createWithDefault(true) + + val CHUNKED_READER_MEMORY_USAGE_RATIO = conf("spark.rapids.sql.reader.chunked.memoryUsageRatio") + .doc("A value to compute soft limit on the internal memory usage of the chunked reader " + + "(if being used). Such limit is calculated as the multiplication of this value and " + + s"'${GPU_BATCH_SIZE_BYTES.key}'.") + .doubleConf + .checkValue(v => v > 0, "The ratio value must be positive.") + .createWithDefault(4) + + val LIMIT_CHUNKED_READER_MEMORY_USAGE = conf("spark.rapids.sql.reader.chunked.limitMemoryUsage") + .doc("Enable a soft limit on the internal memory usage of the chunked reader " + + "(if being used). Such limit is calculated as the multiplication of " + + s"'${GPU_BATCH_SIZE_BYTES.key}' and '${CHUNKED_READER_MEMORY_USAGE_RATIO.key}'.") + .booleanConf + .createWithDefault(true) + val MAX_GPU_COLUMN_SIZE_BYTES = conf("spark.rapids.sql.columnSizeBytes") .doc("Limit the max number of bytes for a GPU column. It is same as the cudf " + "row count limit of a column. It is used by the multi-file readers. " + @@ -584,19 +607,6 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .integerConf .createWithDefault(Integer.MAX_VALUE) - val CHUNKED_READER = conf("spark.rapids.sql.reader.chunked") - .doc("Enable a chunked reader where possible. A chunked reader allows " + - "reading highly compressed data that could not be read otherwise, but at the expense " + - "of more GPU memory, and in some cases more GPU computation.") - .booleanConf - .createWithDefault(true) - - val CHUNKED_SUBPAGE_READER = conf("spark.rapids.sql.reader.chunked.subPage") - .doc("Enable a chunked reader where possible for reading data that is smaller " + - "than the typical row group/page limit. Currently this only works for parquet.") - .booleanConf - .createWithDefault(true) - val MAX_READER_BATCH_SIZE_BYTES = conf("spark.rapids.sql.reader.batchSizeBytes") .doc("Soft limit on the maximum number of bytes the reader reads per batch. " + "The readers will read chunks of data until this limit is met or exceeded. " + @@ -2515,7 +2525,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val chunkedReaderEnabled: Boolean = get(CHUNKED_READER) - lazy val chunkedSubPageReaderEnabled: Boolean = get(CHUNKED_SUBPAGE_READER) + lazy val limitChunkedReaderMemoryUsage: Boolean = get(LIMIT_CHUNKED_READER_MEMORY_USAGE) + + lazy val chunkedReaderMemoryUsageRatio: Double = get(CHUNKED_READER_MEMORY_USAGE_RATIO) lazy val maxReadBatchSizeRows: Int = get(MAX_READER_BATCH_SIZE_ROWS) @@ -2996,4 +3008,4 @@ case class OomInjectionConf( skipCount: Int, withSplit: Boolean, oomInjectionFilter: OomInjectionType -) \ No newline at end of file +) From bbd63bd075eefd6b0eb362ef315829b6cabc48bf Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 16 Apr 2024 12:48:39 -0700 Subject: [PATCH 02/14] Update config --- docs/additional-functionality/advanced_configs.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 4e8a4b6e4f0..e156b02d7e9 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -133,8 +133,9 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime -spark.rapids.sql.reader.chunked|Enable a chunked reader where possible. A chunked reader allows reading highly compressed data that could not be read otherwise, but at the expense of more GPU memory, and in some cases more GPU computation.|true|Runtime -spark.rapids.sql.reader.chunked.subPage|Enable a chunked reader where possible for reading data that is smaller than the typical row group/page limit. Currently this only works for parquet.|true|Runtime +spark.rapids.sql.reader.chunked|Enable a chunked reader where possible. A chunked reader allows reading highly compressed data that could not be read otherwise, but at the expense of more GPU memory, and in some cases more GPU computation. Currently this only supports ORC and Parquet formats.|true|Runtime +spark.rapids.sql.reader.chunked.limitMemoryUsage|Enable a soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of 'spark.rapids.sql.batchSizeBytes' and 'spark.rapids.sql.reader.chunked.memoryUsageRatio'.|true|Runtime +spark.rapids.sql.reader.chunked.memoryUsageRatio|A value to compute soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of this value and 'spark.rapids.sql.batchSizeBytes'.|4.0|Runtime spark.rapids.sql.reader.multithreaded.combine.sizeBytes|The target size in bytes to combine multiple small files together when using the MULTITHREADED parquet or orc reader. With combine disabled, the MULTITHREADED reader reads the files in parallel and sends individual files down to the GPU, but that can be inefficient for small files. When combine is enabled, files that are ready within spark.rapids.sql.reader.multithreaded.combine.waitTime together, up to this threshold size, are combined before sending down to GPU. This can be disabled by setting it to 0. Note that combine also will not go over the spark.rapids.sql.reader.batchSizeRows or spark.rapids.sql.reader.batchSizeBytes limits.|67108864|Runtime spark.rapids.sql.reader.multithreaded.combine.waitTime|When using the multithreaded parquet or orc reader with combine mode, how long to wait, in milliseconds, for more files to finish if haven't met the size threshold. Note that this will wait this amount of time from when the last file was available, so total wait time could be larger then this.|200|Runtime spark.rapids.sql.reader.multithreaded.read.keepOrder|When using the MULTITHREADED reader, if this is set to true we read the files in the same order Spark does, otherwise the order may not be the same. Now it is supported only for parquet and orc.|true|Runtime From 80533825725e776a4967d9a95d6ee5392cbeb143 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 16 Apr 2024 13:26:17 -0700 Subject: [PATCH 03/14] Add back the deprecated config Signed-off-by: Nghia Truong --- .../com/nvidia/spark/rapids/RapidsConf.scala | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index f832dd9aeaf..39014839268 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -587,7 +587,13 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") "(if being used). Such limit is calculated as the multiplication of " + s"'${GPU_BATCH_SIZE_BYTES.key}' and '${CHUNKED_READER_MEMORY_USAGE_RATIO.key}'.") .booleanConf - .createWithDefault(true) + .createOptional + + val CHUNKED_SUBPAGE_READER = conf("spark.rapids.sql.reader.chunked.subPage") + .doc("Enable a chunked reader where possible for reading data that is smaller " + + "than the typical row group/page limit. Currently this only works for parquet.") + .booleanConf + .createOptional val MAX_GPU_COLUMN_SIZE_BYTES = conf("spark.rapids.sql.columnSizeBytes") .doc("Limit the max number of bytes for a GPU column. It is same as the cudf " + @@ -2541,7 +2547,22 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val chunkedReaderEnabled: Boolean = get(CHUNKED_READER) - lazy val limitChunkedReaderMemoryUsage: Boolean = get(LIMIT_CHUNKED_READER_MEMORY_USAGE) + lazy val limitChunkedReaderMemoryUsage: Boolean = { + val hasLimit = get(LIMIT_CHUNKED_READER_MEMORY_USAGE) + val deprecatedConf = get(CHUNKED_SUBPAGE_READER) + + if(deprecatedConf.isDefined) { + logWarning(s"'${CHUNKED_SUBPAGE_READER.key}' is deprecated and is replaced by " + + s"'${LIMIT_CHUNKED_READER_MEMORY_USAGE}'.") + if(hasLimit.isDefined && hasLimit.get != deprecatedConf.get) { + throw new IllegalStateException(s"Both '${CHUNKED_SUBPAGE_READER.key}' and " + + s"'${LIMIT_CHUNKED_READER_MEMORY_USAGE.key}' are set but using different values.") + } + deprecatedConf.get + } else { + hasLimit.getOrElse(true) + } + } lazy val chunkedReaderMemoryUsageRatio: Double = get(CHUNKED_READER_MEMORY_USAGE_RATIO) From ff46beff606248b160e93737a26a810c39d5e74b Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 16 Apr 2024 13:33:23 -0700 Subject: [PATCH 04/14] Fix config Signed-off-by: Nghia Truong --- docs/additional-functionality/advanced_configs.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index e156b02d7e9..b6decccaed8 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -134,8 +134,9 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime spark.rapids.sql.reader.chunked|Enable a chunked reader where possible. A chunked reader allows reading highly compressed data that could not be read otherwise, but at the expense of more GPU memory, and in some cases more GPU computation. Currently this only supports ORC and Parquet formats.|true|Runtime -spark.rapids.sql.reader.chunked.limitMemoryUsage|Enable a soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of 'spark.rapids.sql.batchSizeBytes' and 'spark.rapids.sql.reader.chunked.memoryUsageRatio'.|true|Runtime +spark.rapids.sql.reader.chunked.limitMemoryUsage|Enable a soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of 'spark.rapids.sql.batchSizeBytes' and 'spark.rapids.sql.reader.chunked.memoryUsageRatio'.|None|Runtime spark.rapids.sql.reader.chunked.memoryUsageRatio|A value to compute soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of this value and 'spark.rapids.sql.batchSizeBytes'.|4.0|Runtime +spark.rapids.sql.reader.chunked.subPage|Enable a chunked reader where possible for reading data that is smaller than the typical row group/page limit. Currently this only works for parquet.|None|Runtime spark.rapids.sql.reader.multithreaded.combine.sizeBytes|The target size in bytes to combine multiple small files together when using the MULTITHREADED parquet or orc reader. With combine disabled, the MULTITHREADED reader reads the files in parallel and sends individual files down to the GPU, but that can be inefficient for small files. When combine is enabled, files that are ready within spark.rapids.sql.reader.multithreaded.combine.waitTime together, up to this threshold size, are combined before sending down to GPU. This can be disabled by setting it to 0. Note that combine also will not go over the spark.rapids.sql.reader.batchSizeRows or spark.rapids.sql.reader.batchSizeBytes limits.|67108864|Runtime spark.rapids.sql.reader.multithreaded.combine.waitTime|When using the multithreaded parquet or orc reader with combine mode, how long to wait, in milliseconds, for more files to finish if haven't met the size threshold. Note that this will wait this amount of time from when the last file was available, so total wait time could be larger then this.|200|Runtime spark.rapids.sql.reader.multithreaded.read.keepOrder|When using the MULTITHREADED reader, if this is set to true we read the files in the same order Spark does, otherwise the order may not be the same. Now it is supported only for parquet and orc.|true|Runtime From 813b4c86a2e17536b69bdda861cea89956e76697 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 16 Apr 2024 13:42:59 -0700 Subject: [PATCH 05/14] Change message for the deprecated config Signed-off-by: Nghia Truong --- docs/additional-functionality/advanced_configs.md | 2 +- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index b6decccaed8..94bc9a2da78 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -136,7 +136,7 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.reader.chunked|Enable a chunked reader where possible. A chunked reader allows reading highly compressed data that could not be read otherwise, but at the expense of more GPU memory, and in some cases more GPU computation. Currently this only supports ORC and Parquet formats.|true|Runtime spark.rapids.sql.reader.chunked.limitMemoryUsage|Enable a soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of 'spark.rapids.sql.batchSizeBytes' and 'spark.rapids.sql.reader.chunked.memoryUsageRatio'.|None|Runtime spark.rapids.sql.reader.chunked.memoryUsageRatio|A value to compute soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of this value and 'spark.rapids.sql.batchSizeBytes'.|4.0|Runtime -spark.rapids.sql.reader.chunked.subPage|Enable a chunked reader where possible for reading data that is smaller than the typical row group/page limit. Currently this only works for parquet.|None|Runtime +spark.rapids.sql.reader.chunked.subPage|Enable a chunked reader where possible for reading data that is smaller than the typical row group/page limit. Currently deprecated and replaced by 'spark.rapids.sql.reader.chunked.limitMemoryUsage'.|None|Runtime spark.rapids.sql.reader.multithreaded.combine.sizeBytes|The target size in bytes to combine multiple small files together when using the MULTITHREADED parquet or orc reader. With combine disabled, the MULTITHREADED reader reads the files in parallel and sends individual files down to the GPU, but that can be inefficient for small files. When combine is enabled, files that are ready within spark.rapids.sql.reader.multithreaded.combine.waitTime together, up to this threshold size, are combined before sending down to GPU. This can be disabled by setting it to 0. Note that combine also will not go over the spark.rapids.sql.reader.batchSizeRows or spark.rapids.sql.reader.batchSizeBytes limits.|67108864|Runtime spark.rapids.sql.reader.multithreaded.combine.waitTime|When using the multithreaded parquet or orc reader with combine mode, how long to wait, in milliseconds, for more files to finish if haven't met the size threshold. Note that this will wait this amount of time from when the last file was available, so total wait time could be larger then this.|200|Runtime spark.rapids.sql.reader.multithreaded.read.keepOrder|When using the MULTITHREADED reader, if this is set to true we read the files in the same order Spark does, otherwise the order may not be the same. Now it is supported only for parquet and orc.|true|Runtime diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 39014839268..515364d28ca 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -591,7 +591,8 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") val CHUNKED_SUBPAGE_READER = conf("spark.rapids.sql.reader.chunked.subPage") .doc("Enable a chunked reader where possible for reading data that is smaller " + - "than the typical row group/page limit. Currently this only works for parquet.") + "than the typical row group/page limit. Currently deprecated and replaced by " + + s"'${LIMIT_CHUNKED_READER_MEMORY_USAGE}'.") .booleanConf .createOptional From decf024a5986ca8d6bab5c01b250576fbc096701 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 17 Apr 2024 10:59:50 -0700 Subject: [PATCH 06/14] Rename variable Signed-off-by: Nghia Truong --- .../main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 1e43b0437e4..b9fc5ffc889 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -2668,7 +2668,7 @@ object MakeParquetTableProducer extends Logging { case class ParquetTableReader( conf: Configuration, chunkSizeByteLimit: Long, - passReadLimit: Long, + maxChunkedReaderMemoryUsageSizeBytes: Long, opts: ParquetOptions, buffer: HostMemoryBuffer, offset: Long, @@ -2684,8 +2684,8 @@ case class ParquetTableReader( splits: Array[PartitionedFile], debugDumpPrefix: Option[String], debugDumpAlways: Boolean) extends GpuDataProducer[Table] with Logging { - private[this] val reader = new ParquetChunkedReader(chunkSizeByteLimit, passReadLimit, opts, - buffer, offset, len) + private[this] val reader = new ParquetChunkedReader(chunkSizeByteLimit, + maxChunkedReaderMemoryUsageSizeBytes, opts, buffer, offset, len) private[this] lazy val splitsString = splits.mkString("; ") From 1bdce475c353d6980771c26f7767434a732d0d9c Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 18 Apr 2024 10:42:15 -0700 Subject: [PATCH 07/14] Change the logic of reading conf Signed-off-by: Nghia Truong --- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 515364d28ca..3d751197a20 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2551,7 +2551,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val limitChunkedReaderMemoryUsage: Boolean = { val hasLimit = get(LIMIT_CHUNKED_READER_MEMORY_USAGE) val deprecatedConf = get(CHUNKED_SUBPAGE_READER) - if(deprecatedConf.isDefined) { logWarning(s"'${CHUNKED_SUBPAGE_READER.key}' is deprecated and is replaced by " + s"'${LIMIT_CHUNKED_READER_MEMORY_USAGE}'.") @@ -2559,10 +2558,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { throw new IllegalStateException(s"Both '${CHUNKED_SUBPAGE_READER.key}' and " + s"'${LIMIT_CHUNKED_READER_MEMORY_USAGE.key}' are set but using different values.") } - deprecatedConf.get - } else { - hasLimit.getOrElse(true) } + hasLimit.getOrElse(deprecatedConf.getOrElse(true)) } lazy val chunkedReaderMemoryUsageRatio: Double = get(CHUNKED_READER_MEMORY_USAGE_RATIO) From 28e1afd7077394ad55a3c8721115bc2595d367de Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 18 Apr 2024 10:47:51 -0700 Subject: [PATCH 08/14] Add example and mark conf as `internal()` Signed-off-by: Nghia Truong --- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 3d751197a20..f31bce78756 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -585,7 +585,10 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") val LIMIT_CHUNKED_READER_MEMORY_USAGE = conf("spark.rapids.sql.reader.chunked.limitMemoryUsage") .doc("Enable a soft limit on the internal memory usage of the chunked reader " + "(if being used). Such limit is calculated as the multiplication of " + - s"'${GPU_BATCH_SIZE_BYTES.key}' and '${CHUNKED_READER_MEMORY_USAGE_RATIO.key}'.") + s"'${GPU_BATCH_SIZE_BYTES.key}' and '${CHUNKED_READER_MEMORY_USAGE_RATIO.key}'." + + "For example, if batchSizeBytes is set to 1GB and memoryUsageRatio is 4, " + + "the chunked reader will try to keep its memory usage under 4GB.") + .internal() .booleanConf .createOptional From 702dbfce94d07f9839a115ea08044a654decb1f1 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 18 Apr 2024 10:49:10 -0700 Subject: [PATCH 09/14] Reformat code Signed-off-by: Nghia Truong --- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index f31bce78756..2994abfce63 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2554,10 +2554,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val limitChunkedReaderMemoryUsage: Boolean = { val hasLimit = get(LIMIT_CHUNKED_READER_MEMORY_USAGE) val deprecatedConf = get(CHUNKED_SUBPAGE_READER) - if(deprecatedConf.isDefined) { + if (deprecatedConf.isDefined) { logWarning(s"'${CHUNKED_SUBPAGE_READER.key}' is deprecated and is replaced by " + s"'${LIMIT_CHUNKED_READER_MEMORY_USAGE}'.") - if(hasLimit.isDefined && hasLimit.get != deprecatedConf.get) { + if (hasLimit.isDefined && hasLimit.get != deprecatedConf.get) { throw new IllegalStateException(s"Both '${CHUNKED_SUBPAGE_READER.key}' and " + s"'${LIMIT_CHUNKED_READER_MEMORY_USAGE.key}' are set but using different values.") } From 4fb1d74fff7267db45911db1b152bd584662d939 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Thu, 18 Apr 2024 10:55:21 -0700 Subject: [PATCH 10/14] Update docs Signed-off-by: Nghia Truong --- docs/additional-functionality/advanced_configs.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 94bc9a2da78..0f8fdb71c54 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -134,7 +134,6 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime spark.rapids.sql.reader.chunked|Enable a chunked reader where possible. A chunked reader allows reading highly compressed data that could not be read otherwise, but at the expense of more GPU memory, and in some cases more GPU computation. Currently this only supports ORC and Parquet formats.|true|Runtime -spark.rapids.sql.reader.chunked.limitMemoryUsage|Enable a soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of 'spark.rapids.sql.batchSizeBytes' and 'spark.rapids.sql.reader.chunked.memoryUsageRatio'.|None|Runtime spark.rapids.sql.reader.chunked.memoryUsageRatio|A value to compute soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of this value and 'spark.rapids.sql.batchSizeBytes'.|4.0|Runtime spark.rapids.sql.reader.chunked.subPage|Enable a chunked reader where possible for reading data that is smaller than the typical row group/page limit. Currently deprecated and replaced by 'spark.rapids.sql.reader.chunked.limitMemoryUsage'.|None|Runtime spark.rapids.sql.reader.multithreaded.combine.sizeBytes|The target size in bytes to combine multiple small files together when using the MULTITHREADED parquet or orc reader. With combine disabled, the MULTITHREADED reader reads the files in parallel and sends individual files down to the GPU, but that can be inefficient for small files. When combine is enabled, files that are ready within spark.rapids.sql.reader.multithreaded.combine.waitTime together, up to this threshold size, are combined before sending down to GPU. This can be disabled by setting it to 0. Note that combine also will not go over the spark.rapids.sql.reader.batchSizeRows or spark.rapids.sql.reader.batchSizeBytes limits.|67108864|Runtime From b1b6470be438ab05e518f1f3d85183c5b4a08ad2 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 23 Apr 2024 08:13:06 -0700 Subject: [PATCH 11/14] Change configs Signed-off-by: Nghia Truong --- .../main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 2994abfce63..db7db04b81e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -578,6 +578,8 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .doc("A value to compute soft limit on the internal memory usage of the chunked reader " + "(if being used). Such limit is calculated as the multiplication of this value and " + s"'${GPU_BATCH_SIZE_BYTES.key}'.") + .internal() + .startupOnly() .doubleConf .checkValue(v => v > 0, "The ratio value must be positive.") .createWithDefault(4) @@ -588,7 +590,6 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") s"'${GPU_BATCH_SIZE_BYTES.key}' and '${CHUNKED_READER_MEMORY_USAGE_RATIO.key}'." + "For example, if batchSizeBytes is set to 1GB and memoryUsageRatio is 4, " + "the chunked reader will try to keep its memory usage under 4GB.") - .internal() .booleanConf .createOptional @@ -2549,9 +2550,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shouldExplainAll: Boolean = explain.equalsIgnoreCase("ALL") - lazy val chunkedReaderEnabled: Boolean = get(CHUNKED_READER) + val chunkedReaderEnabled: Boolean = get(CHUNKED_READER) - lazy val limitChunkedReaderMemoryUsage: Boolean = { + val limitChunkedReaderMemoryUsage: Boolean = { val hasLimit = get(LIMIT_CHUNKED_READER_MEMORY_USAGE) val deprecatedConf = get(CHUNKED_SUBPAGE_READER) if (deprecatedConf.isDefined) { From 6478de97745d36f57a0d38c0068826e542bc9191 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 23 Apr 2024 08:18:24 -0700 Subject: [PATCH 12/14] Update docs Signed-off-by: Nghia Truong --- docs/additional-functionality/advanced_configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 0f8fdb71c54..e0777edec51 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -134,7 +134,7 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime spark.rapids.sql.reader.chunked|Enable a chunked reader where possible. A chunked reader allows reading highly compressed data that could not be read otherwise, but at the expense of more GPU memory, and in some cases more GPU computation. Currently this only supports ORC and Parquet formats.|true|Runtime -spark.rapids.sql.reader.chunked.memoryUsageRatio|A value to compute soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of this value and 'spark.rapids.sql.batchSizeBytes'.|4.0|Runtime +spark.rapids.sql.reader.chunked.limitMemoryUsage|Enable a soft limit on the internal memory usage of the chunked reader (if being used). Such limit is calculated as the multiplication of 'spark.rapids.sql.batchSizeBytes' and 'spark.rapids.sql.reader.chunked.memoryUsageRatio'.For example, if batchSizeBytes is set to 1GB and memoryUsageRatio is 4, the chunked reader will try to keep its memory usage under 4GB.|None|Runtime spark.rapids.sql.reader.chunked.subPage|Enable a chunked reader where possible for reading data that is smaller than the typical row group/page limit. Currently deprecated and replaced by 'spark.rapids.sql.reader.chunked.limitMemoryUsage'.|None|Runtime spark.rapids.sql.reader.multithreaded.combine.sizeBytes|The target size in bytes to combine multiple small files together when using the MULTITHREADED parquet or orc reader. With combine disabled, the MULTITHREADED reader reads the files in parallel and sends individual files down to the GPU, but that can be inefficient for small files. When combine is enabled, files that are ready within spark.rapids.sql.reader.multithreaded.combine.waitTime together, up to this threshold size, are combined before sending down to GPU. This can be disabled by setting it to 0. Note that combine also will not go over the spark.rapids.sql.reader.batchSizeRows or spark.rapids.sql.reader.batchSizeBytes limits.|67108864|Runtime spark.rapids.sql.reader.multithreaded.combine.waitTime|When using the multithreaded parquet or orc reader with combine mode, how long to wait, in milliseconds, for more files to finish if haven't met the size threshold. Note that this will wait this amount of time from when the last file was available, so total wait time could be larger then this.|200|Runtime From f455342b4f440a758ed3439cf5211deea6572c0f Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 23 Apr 2024 08:32:08 -0700 Subject: [PATCH 13/14] Change variables into functions Signed-off-by: Nghia Truong --- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index db7db04b81e..a4c739ceeb8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2550,9 +2550,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shouldExplainAll: Boolean = explain.equalsIgnoreCase("ALL") - val chunkedReaderEnabled: Boolean = get(CHUNKED_READER) + def chunkedReaderEnabled: Boolean = get(CHUNKED_READER) - val limitChunkedReaderMemoryUsage: Boolean = { + def limitChunkedReaderMemoryUsage: Boolean = { val hasLimit = get(LIMIT_CHUNKED_READER_MEMORY_USAGE) val deprecatedConf = get(CHUNKED_SUBPAGE_READER) if (deprecatedConf.isDefined) { From e218cdfca1b3b13a51241fd4b9c0a2b3378fe743 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 23 Apr 2024 08:40:09 -0700 Subject: [PATCH 14/14] Change functions back into `lazy val` Signed-off-by: Nghia Truong --- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index a4c739ceeb8..8279bd6cf11 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2550,9 +2550,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shouldExplainAll: Boolean = explain.equalsIgnoreCase("ALL") - def chunkedReaderEnabled: Boolean = get(CHUNKED_READER) + lazy val chunkedReaderEnabled: Boolean = get(CHUNKED_READER) - def limitChunkedReaderMemoryUsage: Boolean = { + lazy val limitChunkedReaderMemoryUsage: Boolean = { val hasLimit = get(LIMIT_CHUNKED_READER_MEMORY_USAGE) val deprecatedConf = get(CHUNKED_SUBPAGE_READER) if (deprecatedConf.isDefined) {