Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change parameters for memory limit in Parquet chunked reader #10718

Merged
merged 15 commits into from
Apr 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static class ReadBuilder {
private long maxBatchSizeBytes = Integer.MAX_VALUE;
private long targetBatchSizeBytes = Integer.MAX_VALUE;
private boolean useChunkedReader = false;
private boolean useSubPageChunked = false;
private long maxChunkedReaderMemoryUsageSizeBytes = 0;
Copy link
Collaborator Author

@ttnghia ttnghia Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not happy with this long long name throughout this PR. But I don't have a better candidate for it. If you have a good name then please suggest.

private scala.Option<String> debugDumpPrefix = null;
private boolean debugDumpAlways = false;
private scala.collection.immutable.Map<String, GpuMetric> metrics = null;
Expand Down Expand Up @@ -141,9 +141,10 @@ public ReadBuilder withTargetBatchSizeBytes(long targetBatchSizeBytes) {
return this;
}

public ReadBuilder withUseChunkedReader(boolean useChunkedReader, boolean useSubPageChunked) {
public ReadBuilder withUseChunkedReader(boolean useChunkedReader,
long maxChunkedReaderMemoryUsageSizeBytes) {
this.useChunkedReader = useChunkedReader;
this.useSubPageChunked = useSubPageChunked;
this.maxChunkedReaderMemoryUsageSizeBytes = maxChunkedReaderMemoryUsageSizeBytes;
return this;
}

Expand All @@ -164,8 +165,8 @@ public CloseableIterable<ColumnarBatch> build() {
InternalRow.empty(), file.location(), start, length);
return new GpuParquetReader(file, projectSchema, options, nameMapping, filter, caseSensitive,
idToConstant, deleteFilter, partFile, conf, maxBatchSizeRows, maxBatchSizeBytes,
targetBatchSizeBytes, useChunkedReader, useSubPageChunked, debugDumpPrefix,
debugDumpAlways, metrics);
targetBatchSizeBytes, useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes,
debugDumpPrefix, debugDumpAlways, metrics);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class GpuParquetReader extends CloseableGroup implements CloseableIterabl
private final long maxBatchSizeBytes;
private final long targetBatchSizeBytes;
private final boolean useChunkedReader;
private final boolean useSubPageChunked;
private final long maxChunkedReaderMemoryUsageSizeBytes;
private final scala.Option<String> debugDumpPrefix;
private final boolean debugDumpAlways;
private final scala.collection.immutable.Map<String, GpuMetric> metrics;
Expand All @@ -98,7 +98,7 @@ public GpuParquetReader(
Map<Integer, ?> idToConstant, GpuDeleteFilter deleteFilter,
PartitionedFile partFile, Configuration conf, int maxBatchSizeRows,
long maxBatchSizeBytes, long targetBatchSizeBytes, boolean useChunkedReader,
boolean useSubPageChunked,
long maxChunkedReaderMemoryUsageSizeBytes,
scala.Option<String> debugDumpPrefix, boolean debugDumpAlways,
scala.collection.immutable.Map<String, GpuMetric> metrics) {
this.input = input;
Expand All @@ -115,7 +115,7 @@ public GpuParquetReader(
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.targetBatchSizeBytes = targetBatchSizeBytes;
this.useChunkedReader = useChunkedReader;
this.useSubPageChunked = useSubPageChunked;
this.maxChunkedReaderMemoryUsageSizeBytes = maxChunkedReaderMemoryUsageSizeBytes;
this.debugDumpPrefix = debugDumpPrefix;
this.debugDumpAlways = debugDumpAlways;
this.metrics = metrics;
Expand Down Expand Up @@ -143,7 +143,7 @@ public org.apache.iceberg.io.CloseableIterator<ColumnarBatch> iterator() {
new Path(input.location()), clippedBlocks, fileReadSchema, caseSensitive,
partReaderSparkSchema, debugDumpPrefix, debugDumpAlways,
maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, useChunkedReader,
useSubPageChunked,
maxChunkedReaderMemoryUsageSizeBytes,
metrics,
DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode
DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ class GpuBatchDataReader extends BaseDataReader<ColumnarBatch> {
private final long maxBatchSizeBytes;
private final long targetBatchSizeBytes;
private final boolean useChunkedReader;
private final boolean useSubPageChunked;
private final long maxChunkedReaderMemoryUsageSizeBytes;
private final scala.Option<String> parquetDebugDumpPrefix;
private final boolean parquetDebugDumpAlways;
private final scala.collection.immutable.Map<String, GpuMetric> metrics;

GpuBatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive,
Configuration conf, int maxBatchSizeRows, long maxBatchSizeBytes,
long targetBatchSizeBytes, boolean useChunkedReader, boolean useSubPageChunked,
long targetBatchSizeBytes,
boolean useChunkedReader, long maxChunkedReaderMemoryUsageSizeBytes,
scala.Option<String> parquetDebugDumpPrefix, boolean parquetDebugDumpAlways,
scala.collection.immutable.Map<String, GpuMetric> metrics) {
super(table, task);
Expand All @@ -67,7 +68,7 @@ class GpuBatchDataReader extends BaseDataReader<ColumnarBatch> {
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.targetBatchSizeBytes = targetBatchSizeBytes;
this.useChunkedReader = useChunkedReader;
this.useSubPageChunked = useSubPageChunked;
this.maxChunkedReaderMemoryUsageSizeBytes = maxChunkedReaderMemoryUsageSizeBytes;
this.parquetDebugDumpPrefix = parquetDebugDumpPrefix;
this.parquetDebugDumpAlways = parquetDebugDumpAlways;
this.metrics = metrics;
Expand Down Expand Up @@ -102,7 +103,7 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
.withMaxBatchSizeRows(maxBatchSizeRows)
.withMaxBatchSizeBytes(maxBatchSizeBytes)
.withTargetBatchSizeBytes(targetBatchSizeBytes)
.withUseChunkedReader(useChunkedReader, useSubPageChunked)
.withUseChunkedReader(useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes)
.withDebugDump(parquetDebugDumpPrefix, parquetDebugDumpAlways)
.withMetrics(metrics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class GpuMultiFileBatchReader extends BaseDataReader<ColumnarBatch> {
private final long maxGpuColumnSizeBytes;

private final boolean useChunkedReader;
private final boolean useSubPageChunked;
private final long maxChunkedReaderMemoryUsageSizeBytes;
private final scala.Option<String> parquetDebugDumpPrefix;
private final boolean parquetDebugDumpAlways;
private final scala.collection.immutable.Map<String, GpuMetric> metrics;
Expand All @@ -87,7 +87,7 @@ class GpuMultiFileBatchReader extends BaseDataReader<ColumnarBatch> {
GpuMultiFileBatchReader(CombinedScanTask task, Table table, Schema expectedSchema,
boolean caseSensitive, Configuration conf, int maxBatchSizeRows, long maxBatchSizeBytes,
long targetBatchSizeBytes, long maxGpuColumnSizeBytes,
boolean useChunkedReader, boolean useSubPageChunked,
boolean useChunkedReader, long maxChunkedReaderMemoryUsageSizeBytes,
scala.Option<String> parquetDebugDumpPrefix, boolean parquetDebugDumpAlways,
int numThreads, int maxNumFileProcessed,
boolean useMultiThread, FileFormat fileFormat,
Expand All @@ -102,7 +102,7 @@ class GpuMultiFileBatchReader extends BaseDataReader<ColumnarBatch> {
this.targetBatchSizeBytes = targetBatchSizeBytes;
this.maxGpuColumnSizeBytes = maxGpuColumnSizeBytes;
this.useChunkedReader = useChunkedReader;
this.useSubPageChunked = useSubPageChunked;
this.maxChunkedReaderMemoryUsageSizeBytes = maxChunkedReaderMemoryUsageSizeBytes;
this.parquetDebugDumpPrefix = parquetDebugDumpPrefix;
this.parquetDebugDumpAlways = parquetDebugDumpAlways;
this.useMultiThread = useMultiThread;
Expand Down Expand Up @@ -352,7 +352,7 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
return new MultiFileCloudParquetPartitionReader(conf, pFiles,
this::filterParquetBlocks, caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways,
maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes,
useChunkedReader, useSubPageChunked, metrics, partitionSchema,
useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, metrics, partitionSchema,
numThreads, maxNumFileProcessed,
false, // ignoreMissingFiles
false, // ignoreCorruptFiles
Expand Down Expand Up @@ -428,9 +428,9 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,

return new MultiFileParquetPartitionReader(conf, pFiles,
JavaConverters.asScalaBuffer(clippedBlocks).toSeq(),
caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways, useChunkedReader,
useSubPageChunked,
caseSensitive, parquetDebugDumpPrefix, parquetDebugDumpAlways,
maxBatchSizeRows, maxBatchSizeBytes, targetBatchSizeBytes, maxGpuColumnSizeBytes,
useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes,
metrics, partitionSchema, numThreads,
false, // ignoreMissingFiles
false, // ignoreCorruptFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private static class MultiFileBatchReader
super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(),
task.getConfiguration(), task.getMaxBatchSizeRows(), task.getMaxBatchSizeBytes(),
task.getTargetBatchSizeBytes(), task.getMaxGpuColumnSizeBytes(), task.useChunkedReader(),
task.useSubPageChunked(),
task.maxChunkedReaderMemoryUsageSizeBytes(),
task.getParquetDebugDumpPrefix(), task.getParquetDebugDumpAlways(),
task.getNumThreads(), task.getMaxNumFileProcessed(),
useMultiThread, ff, metrics, queryUsesInputFile);
Expand All @@ -294,7 +294,7 @@ private static class BatchReader extends GpuBatchDataReader implements Partition
BatchReader(ReadTask task, scala.collection.immutable.Map<String, GpuMetric> metrics) {
super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(),
task.getConfiguration(), task.getMaxBatchSizeRows(), task.getMaxBatchSizeBytes(),
task.getTargetBatchSizeBytes(), task.useChunkedReader(), task.useSubPageChunked(),
task.getTargetBatchSizeBytes(), task.useChunkedReader(), task.maxChunkedReaderMemoryUsageSizeBytes(),
task.getParquetDebugDumpPrefix(), task.getParquetDebugDumpAlways(), metrics);
}
}
Expand All @@ -305,7 +305,7 @@ static class ReadTask implements InputPartition, Serializable {
private final String expectedSchemaString;
private final boolean caseSensitive;
private final boolean useChunkedReader;
private final boolean useSubPageChunked;
private final long maxChunkedReaderMemoryUsageSizeBytes;
private final Broadcast<SerializableConfiguration> confBroadcast;
private final int maxBatchSizeRows;
private final long maxBatchSizeBytes;
Expand Down Expand Up @@ -343,7 +343,12 @@ static class ReadTask implements InputPartition, Serializable {
this.numThreads = rapidsConf.multiThreadReadNumThreads();
this.maxNumFileProcessed = rapidsConf.maxNumParquetFilesParallel();
this.useChunkedReader = rapidsConf.chunkedReaderEnabled();
this.useSubPageChunked = rapidsConf.chunkedSubPageReaderEnabled();
if(rapidsConf.limitChunkedReaderMemoryUsage()) {
double limitRatio = rapidsConf.chunkedReaderMemoryUsageRatio();
this.maxChunkedReaderMemoryUsageSizeBytes = (long)(limitRatio * this.targetBatchSizeBytes);
} else {
this.maxChunkedReaderMemoryUsageSizeBytes = 0L;
}
}

@Override
Expand Down Expand Up @@ -410,8 +415,8 @@ public boolean useChunkedReader() {
return useChunkedReader;
}

public boolean useSubPageChunked() {
return useSubPageChunked;
public long maxChunkedReaderMemoryUsageSizeBytes() {
return maxChunkedReaderMemoryUsageSizeBytes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,14 @@ abstract class MultiFilePartitionReaderFactoryBase(
protected val maxReadBatchSizeRows: Int = rapidsConf.maxReadBatchSizeRows
protected val maxReadBatchSizeBytes: Long = rapidsConf.maxReadBatchSizeBytes
protected val targetBatchSizeBytes: Long = rapidsConf.gpuTargetBatchSizeBytes
protected val subPageChunked: Boolean = rapidsConf.chunkedSubPageReaderEnabled
protected val maxGpuColumnSizeBytes: Long = rapidsConf.maxGpuColumnSizeBytes
protected val useChunkedReader: Boolean = rapidsConf.chunkedReaderEnabled
protected val maxChunkedReaderMemoryUsageSizeBytes: Long =
if(rapidsConf.limitChunkedReaderMemoryUsage) {
(rapidsConf.chunkedReaderMemoryUsageRatio * targetBatchSizeBytes).toLong
} else {
0L
}
private val allCloudSchemes = rapidsConf.getCloudSchemes.toSet

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
Expand Down
Loading
Loading