-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Accelerate the coalescing parquet reader when reading files from multiple partitioned folders #1401
Accelerate the coalescing parquet reader when reading files from multiple partitioned folders #1401
Conversation
build |
👎 Promotion blocked, new vulnerability foundVulnerability report
|
...ugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala
Outdated
Show resolved
Hide resolved
@@ -77,19 +77,29 @@ object ColumnarPartitionReaderWithPartitionValues extends Arm { | |||
var partitionColumns: Array[GpuColumnVector] = null | |||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code would always close fileBatch, but this now leaks it if something throws before we get to addGpuColumnVectorsToBatch
(e.g.: buildPartitionColumns
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm handling this now, let me know if you see anything I missed.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
build |
1 similar comment
build |
thanks Jason, updated. |
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a small nit for what appears to be an unnecessary null check but otherwise looks good to me.
...ugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala
Outdated
Show resolved
Hide resolved
build |
…iple partitioned folders (NVIDIA#1401) *Accelerate the coalescing parquet reader when reading files from multiple partitioned folders Signed-off-by: Thomas Graves <tgraves@nvidia.com> * Properly close and change to use withResource and closeOnExcept * remove null check
…iple partitioned folders (NVIDIA#1401) *Accelerate the coalescing parquet reader when reading files from multiple partitioned folders Signed-off-by: Thomas Graves <tgraves@nvidia.com> * Properly close and change to use withResource and closeOnExcept * remove null check
…IDIA#1401) Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
fixes #1200
Accelerate the scan speed for coalescing parquet reader when reading files from multiple partitioned folders.
Previously whenever we hit a file that was in a different partition we split the batch so we could easily add the partition values. This results in us having to do a lot of batches when there are a lot of partitioned files, which is not great for performance.
To fix that we can change it to combine files with different partitioning by keeping track of the partition values and which rows those values need to be applied to. Then after we read the files we need to add those columns that are built based off the partition values and row counts. This works because we read the files in the same order as when we construct what goes into each batch.
in this PR I added the tracking of partition values and the corresponding number of rows, then after we read the files into the columnar batch we add all the partition values. The partition values are constructed by doing 1 partition column at a time. For each column it generates the individual partition value columns for the number of rows necessary, then it concatenates all of those together, then it moves to the next partition column. ie meaning if you have paths with multiple partitions ../key1=2/key2=foo/ , it does key1=X for all values of X first then it does key2=Y afterwards.