Skip to content

Commit

Permalink
Merge branch 'branch-23.12' into rebase_nested_timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
ttnghia committed Nov 16, 2023
2 parents 0fff5e6 + 960d69b commit 8bfca59
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 13 deletions.
22 changes: 22 additions & 0 deletions integration_tests/src/main/python/iceberg_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,25 @@ def setup_iceberg_table(spark):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT *, input_file_name() FROM {}".format(table)),
conf={'spark.rapids.sql.format.parquet.reader.type': reader_type})


@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_parquet_read_from_url_encoded_path(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmp_view = spark_tmp_table_factory.get()
partition_gen = StringGen(pattern="(.|\n){1,10}", nullable=False)\
.with_special_case('%29%3EtkiudF4%3C', 1000)\
.with_special_case('%2F%23_v9kRtI%27', 1000)\
.with_special_case('aK%2BAgI%21l8%3E', 1000)\
.with_special_case('p%2Cmtx%3FCXMd', 1000)
def setup_iceberg_table(spark):
df = two_col_df(spark, long_gen, partition_gen).sortWithinPartitions('b')
df.createOrReplaceTempView(tmp_view)
spark.sql("CREATE TABLE {} USING ICEBERG PARTITIONED BY (b) AS SELECT * FROM {}"
.format(table, tmp_view))
with_cpu_session(setup_iceberg_table)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql("SELECT * FROM {}".format(table)),
conf={'spark.rapids.sql.format.parquet.reader.type': reader_type})
1 change: 1 addition & 0 deletions integration_tests/src/main/python/regexp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,7 @@ def test_regexp_memory_ok():
}
)

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9731')
def test_re_replace_all():
"""
regression test for https://github.com/NVIDIA/spark-rapids/issues/8323
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class GpuMultiFileBatchReader extends BaseDataReader<ColumnarBatch> {
this.nameMapping = NameMappingParser.fromJson(nameMapping);
}
files = Maps.newLinkedHashMapWithExpectedSize(task.files().size());
task.files().forEach(fst -> this.files.putIfAbsent(fst.file().path().toString(), fst));
task.files().forEach(fst -> this.files.putIfAbsent(toEncodedPathString(fst), fst));
}

@Override
Expand Down Expand Up @@ -190,6 +190,15 @@ private Schema requiredSchema(GpuDeleteFilter deleteFilter) {
}
}

/**
* MultiFiles readers expect the path string is encoded and url safe.
* Here leverages this conversion to do this encoding because Iceberg
* gives the raw data path by `file().path()` call.
*/
private String toEncodedPathString(FileScanTask fst) {
return new Path(fst.file().path().toString()).toUri().toString();
}

static class FilteredParquetFileInfo {
private final ParquetFileInfoWithBlockMeta parquetBlockMeta;
private final Map<Integer, ?> idToConstant;
Expand Down Expand Up @@ -254,7 +263,7 @@ protected MultiFileBatchReaderBase() {
final InternalRow emptyPartValue = InternalRow.empty();
PartitionedFile[] pFiles = files.values().stream()
.map(fst -> PartitionedFileUtilsShim.newPartitionedFile(emptyPartValue,
fst.file().path().toString(), fst.start(), fst.length()))
toEncodedPathString(fst), fst.start(), fst.length()))
.toArray(PartitionedFile[]::new);
rapidsReader = createRapidsReader(pFiles, emptyPartSchema);
}
Expand All @@ -277,7 +286,8 @@ protected abstract FilePartitionReaderBase createRapidsReader(PartitionedFile[]
StructType partitionSchema);

/** The filter function for the Parquet multi-file reader */
protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) {
protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst,
String partFilePathString) {
GpuDeleteFilter deleteFilter = deleteFilter(fst);
if (deleteFilter != null) {
throw new UnsupportedOperationException("Delete filter is not supported");
Expand Down Expand Up @@ -309,7 +319,9 @@ protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) {
GpuParquetReader.addNullsForMissingFields(idToConstant, reorder.getMissingFields());

ParquetFileInfoWithBlockMeta parquetBlockMeta = ParquetFileInfoWithBlockMeta.apply(
new Path(new URI(fst.file().path().toString())), clippedBlocks,
// The path conversion aligns with that in Rapids multi-files readers.
// So here should use the file path of a PartitionedFile.
new Path(new URI(partFilePathString)), clippedBlocks,
InternalRow.empty(), fileReadSchema, partReaderSparkSchema,
DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode
DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode
Expand Down Expand Up @@ -354,9 +366,10 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
}

private ParquetFileInfoWithBlockMeta filterParquetBlocks(PartitionedFile file) {
FileScanTask fst = files.get(file.filePath());
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst);
constsSchemaMap.put(file.filePath().toString(),
String partFilePathString = file.filePath().toString();
FileScanTask fst = files.get(partFilePathString);
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString);
constsSchemaMap.put(partFilePathString,
Tuple2.apply(filteredInfo.idToConstant(), filteredInfo.expectedSchema()));
return filteredInfo.parquetBlockMeta();
}
Expand Down Expand Up @@ -388,8 +401,10 @@ class ParquetCoalescingBatchReader extends MultiFileBatchReaderBase {
protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
StructType partitionSchema) {
ArrayList<ParquetSingleDataBlockMeta> clippedBlocks = new ArrayList<>();
files.values().forEach(fst -> {
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst);
Arrays.stream(pFiles).forEach(pFile -> {
String partFilePathString = pFile.filePath().toString();
FileScanTask fst = files.get(partFilePathString);
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString);
List<ParquetSingleDataBlockMeta> fileSingleMetas =
JavaConverters.asJavaCollection(filteredInfo.parquetBlockMeta.blocks()).stream()
.map(b -> ParquetSingleDataBlockMeta.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ object BatchWithPartitionDataUtils {
* Splits the input ColumnarBatch into smaller batches, wraps these batches with partition
* data, and returns them as a sequence of [[BatchWithPartitionData]].
*
* This function does not take ownership of `batch`, and callers should make sure to close.
*
* @note Partition values are merged with the columnar batches lazily by the resulting Iterator
* to save GPU memory.
* @param batch Input ColumnarBatch.
Expand Down Expand Up @@ -502,9 +504,10 @@ object BatchWithPartitionDataUtils {
throw new SplitAndRetryOOM("GPU OutOfMemory: cannot split input with one row")
}
// Split the batch into two halves
val cb = batchWithPartData.inputBatch.getColumnarBatch()
splitAndCombineBatchWithPartitionData(cb, splitPartitionData,
batchWithPartData.partitionSchema)
withResource(batchWithPartData.inputBatch.getColumnarBatch()) { cb =>
splitAndCombineBatchWithPartitionData(cb, splitPartitionData,
batchWithPartData.partitionSchema)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,17 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery
withResource(buildBatch(getSampleValueData)) { valueBatch =>
withResource(buildBatch(partCols)) { partBatch =>
withResource(GpuColumnVector.combineColumns(valueBatch, partBatch)) { expectedBatch =>
// we incRefCounts here because `addPartitionValuesToBatch` takes ownership of
// `valueBatch`, but we are keeping it alive since its columns are part of
// `expectedBatch`
GpuColumnVector.incRefCounts(valueBatch)
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
partRows, partValues, partSchema, maxGpuColumnSizeBytes)
withResource(resultBatchIter) { _ =>
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
// Assert that the final count of rows matches expected batch
val rowCounts = resultBatchIter.map(_.numRows()).sum
// We also need to close each batch coming from `resultBatchIter`.
val rowCounts = resultBatchIter.map(withResource(_){_.numRows()}).sum
assert(rowCounts == expectedBatch.numRows())
}
}
Expand Down

0 comments on commit 8bfca59

Please sign in to comment.