From 90789baacb4459576a33373a5dbd6fa8dc91dbcd Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Thu, 16 Nov 2023 03:43:55 +0800 Subject: [PATCH 1/3] Encode the file path from Iceberg when converting to a PartitionedFile [databricks] (#9717) * Encode the file path from Iceberg when converting to a PartitionedFile Because Iceberg always gives the raw data path but rapids multi-file readers expect the file path is encoded and url safe. Signed-off-by: Firestarman * Address comments Signed-off-by: Firestarman --------- Signed-off-by: Firestarman --- .../src/main/python/iceberg_test.py | 22 +++++++++++++ .../spark/source/GpuMultiFileBatchReader.java | 33 ++++++++++++++----- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/integration_tests/src/main/python/iceberg_test.py b/integration_tests/src/main/python/iceberg_test.py index f073ad51489..3b3f83c6deb 100644 --- a/integration_tests/src/main/python/iceberg_test.py +++ b/integration_tests/src/main/python/iceberg_test.py @@ -571,3 +571,25 @@ def setup_iceberg_table(spark): assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.sql("SELECT *, input_file_name() FROM {}".format(table)), conf={'spark.rapids.sql.format.parquet.reader.type': reader_type}) + + +@iceberg +@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering +@pytest.mark.parametrize('reader_type', rapids_reader_types) +def test_iceberg_parquet_read_from_url_encoded_path(spark_tmp_table_factory, reader_type): + table = spark_tmp_table_factory.get() + tmp_view = spark_tmp_table_factory.get() + partition_gen = StringGen(pattern="(.|\n){1,10}", nullable=False)\ + .with_special_case('%29%3EtkiudF4%3C', 1000)\ + .with_special_case('%2F%23_v9kRtI%27', 1000)\ + .with_special_case('aK%2BAgI%21l8%3E', 1000)\ + .with_special_case('p%2Cmtx%3FCXMd', 1000) + def setup_iceberg_table(spark): + df = two_col_df(spark, long_gen, partition_gen).sortWithinPartitions('b') + df.createOrReplaceTempView(tmp_view) + spark.sql("CREATE TABLE {} USING ICEBERG PARTITIONED BY (b) AS SELECT * FROM {}" + .format(table, tmp_view)) + with_cpu_session(setup_iceberg_table) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.sql("SELECT * FROM {}".format(table)), + conf={'spark.rapids.sql.format.parquet.reader.type': reader_type}) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java index 0fb8867d27a..a339043c5de 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java @@ -114,7 +114,7 @@ class GpuMultiFileBatchReader extends BaseDataReader { this.nameMapping = NameMappingParser.fromJson(nameMapping); } files = Maps.newLinkedHashMapWithExpectedSize(task.files().size()); - task.files().forEach(fst -> this.files.putIfAbsent(fst.file().path().toString(), fst)); + task.files().forEach(fst -> this.files.putIfAbsent(toEncodedPathString(fst), fst)); } @Override @@ -190,6 +190,15 @@ private Schema requiredSchema(GpuDeleteFilter deleteFilter) { } } + /** + * MultiFiles readers expect the path string is encoded and url safe. + * Here leverages this conversion to do this encoding because Iceberg + * gives the raw data path by `file().path()` call. + */ + private String toEncodedPathString(FileScanTask fst) { + return new Path(fst.file().path().toString()).toUri().toString(); + } + static class FilteredParquetFileInfo { private final ParquetFileInfoWithBlockMeta parquetBlockMeta; private final Map idToConstant; @@ -254,7 +263,7 @@ protected MultiFileBatchReaderBase() { final InternalRow emptyPartValue = InternalRow.empty(); PartitionedFile[] pFiles = files.values().stream() .map(fst -> PartitionedFileUtilsShim.newPartitionedFile(emptyPartValue, - fst.file().path().toString(), fst.start(), fst.length())) + toEncodedPathString(fst), fst.start(), fst.length())) .toArray(PartitionedFile[]::new); rapidsReader = createRapidsReader(pFiles, emptyPartSchema); } @@ -277,7 +286,8 @@ protected abstract FilePartitionReaderBase createRapidsReader(PartitionedFile[] StructType partitionSchema); /** The filter function for the Parquet multi-file reader */ - protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) { + protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst, + String partFilePathString) { GpuDeleteFilter deleteFilter = deleteFilter(fst); if (deleteFilter != null) { throw new UnsupportedOperationException("Delete filter is not supported"); @@ -309,7 +319,9 @@ protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) { GpuParquetReader.addNullsForMissingFields(idToConstant, reorder.getMissingFields()); ParquetFileInfoWithBlockMeta parquetBlockMeta = ParquetFileInfoWithBlockMeta.apply( - new Path(new URI(fst.file().path().toString())), clippedBlocks, + // The path conversion aligns with that in Rapids multi-files readers. + // So here should use the file path of a PartitionedFile. + new Path(new URI(partFilePathString)), clippedBlocks, InternalRow.empty(), fileReadSchema, partReaderSparkSchema, DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode @@ -354,9 +366,10 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles, } private ParquetFileInfoWithBlockMeta filterParquetBlocks(PartitionedFile file) { - FileScanTask fst = files.get(file.filePath()); - FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst); - constsSchemaMap.put(file.filePath().toString(), + String partFilePathString = file.filePath().toString(); + FileScanTask fst = files.get(partFilePathString); + FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString); + constsSchemaMap.put(partFilePathString, Tuple2.apply(filteredInfo.idToConstant(), filteredInfo.expectedSchema())); return filteredInfo.parquetBlockMeta(); } @@ -388,8 +401,10 @@ class ParquetCoalescingBatchReader extends MultiFileBatchReaderBase { protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles, StructType partitionSchema) { ArrayList clippedBlocks = new ArrayList<>(); - files.values().forEach(fst -> { - FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst); + Arrays.stream(pFiles).forEach(pFile -> { + String partFilePathString = pFile.filePath().toString(); + FileScanTask fst = files.get(partFilePathString); + FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString); List fileSingleMetas = JavaConverters.asJavaCollection(filteredInfo.parquetBlockMeta.blocks()).stream() .map(b -> ParquetSingleDataBlockMeta.apply( From c824a45558f7d69ea66a23cd8b6c61eec76ad118 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Wed, 15 Nov 2023 16:00:32 -0600 Subject: [PATCH 2/3] Fix leak in BatchWithPartitionData (#9726) * Fixes leaks in BatchWithPartitionData and its suite Signed-off-by: Alessandro Bellina * Add more comments --------- Signed-off-by: Alessandro Bellina --- .../com/nvidia/spark/rapids/BatchWithPartitionData.scala | 9 ++++++--- .../spark/rapids/BatchWithPartitionDataSuite.scala | 7 ++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala index 02e5ee118db..6d640683c07 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/BatchWithPartitionData.scala @@ -340,6 +340,8 @@ object BatchWithPartitionDataUtils { * Splits the input ColumnarBatch into smaller batches, wraps these batches with partition * data, and returns them as a sequence of [[BatchWithPartitionData]]. * + * This function does not take ownership of `batch`, and callers should make sure to close. + * * @note Partition values are merged with the columnar batches lazily by the resulting Iterator * to save GPU memory. * @param batch Input ColumnarBatch. @@ -502,9 +504,10 @@ object BatchWithPartitionDataUtils { throw new SplitAndRetryOOM("GPU OutOfMemory: cannot split input with one row") } // Split the batch into two halves - val cb = batchWithPartData.inputBatch.getColumnarBatch() - splitAndCombineBatchWithPartitionData(cb, splitPartitionData, - batchWithPartData.partitionSchema) + withResource(batchWithPartData.inputBatch.getColumnarBatch()) { cb => + splitAndCombineBatchWithPartitionData(cb, splitPartitionData, + batchWithPartData.partitionSchema) + } } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/BatchWithPartitionDataSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/BatchWithPartitionDataSuite.scala index ac4b5d89b47..6c9f59e8ece 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/BatchWithPartitionDataSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/BatchWithPartitionDataSuite.scala @@ -78,12 +78,17 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery withResource(buildBatch(getSampleValueData)) { valueBatch => withResource(buildBatch(partCols)) { partBatch => withResource(GpuColumnVector.combineColumns(valueBatch, partBatch)) { expectedBatch => + // we incRefCounts here because `addPartitionValuesToBatch` takes ownership of + // `valueBatch`, but we are keeping it alive since its columns are part of + // `expectedBatch` + GpuColumnVector.incRefCounts(valueBatch) val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch, partRows, partValues, partSchema, maxGpuColumnSizeBytes) withResource(resultBatchIter) { _ => RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId) // Assert that the final count of rows matches expected batch - val rowCounts = resultBatchIter.map(_.numRows()).sum + // We also need to close each batch coming from `resultBatchIter`. + val rowCounts = resultBatchIter.map(withResource(_){_.numRows()}).sum assert(rowCounts == expectedBatch.numRows()) } } From 960d69b5b3648a7c8a08703fbd4655b5ee443223 Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Wed, 15 Nov 2023 16:07:21 -0800 Subject: [PATCH 3/3] Xfail the test (#9732) Signed-off-by: Nghia Truong --- integration_tests/src/main/python/regexp_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/integration_tests/src/main/python/regexp_test.py b/integration_tests/src/main/python/regexp_test.py index 3c1e2b0df78..018736f9c90 100644 --- a/integration_tests/src/main/python/regexp_test.py +++ b/integration_tests/src/main/python/regexp_test.py @@ -1054,6 +1054,7 @@ def test_regexp_memory_ok(): } ) +@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9731') def test_re_replace_all(): """ regression test for https://github.com/NVIDIA/spark-rapids/issues/8323