-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use multi-threaded parquet read with small files #677
Use multi-threaded parquet read with small files #677
Conversation
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
build |
"parquet files in parallel. This can not be changed at runtime after the executor has" + | ||
"started.") | ||
.integerConf | ||
.createWithDefault(20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this worked well on databricks with hosts with 4 to 6 cores. If we have executors with lots of cores this might not be ideal. I could try to adjust this based on executor cores but with standalone mode I don't know what that is. Any thoughts on this? I figure this is good start and we can adjust later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for putting something in that we can adjust later on if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question.
Signed-off-by: Thomas Graves <tgraves@nvidia.com>
build |
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInputFileBlock.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Thomas Graves <tgraves@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
build |
* Try multi-threaded read with parquet with small files Signed-off-by: Thomas Graves <tgraves@nvidia.com> * cleanup and comments * comment config Signed-off-by: Thomas Graves <tgraves@apache.org> * Add note about TaskContext not being set in the threadpool Signed-off-by: Thomas Graves <tgraves@nvidia.com> * remove extra import and use closeOnExcept * try just using future throw * let future throw Signed-off-by: Thomas Graves <tgraves@apache.org> * Use safeclose() Signed-off-by: Thomas Graves <tgraves@apache.org> Co-authored-by: Thomas Graves <tgraves@nvidia.com>
* Try multi-threaded read with parquet with small files Signed-off-by: Thomas Graves <tgraves@nvidia.com> * cleanup and comments * comment config Signed-off-by: Thomas Graves <tgraves@apache.org> * Add note about TaskContext not being set in the threadpool Signed-off-by: Thomas Graves <tgraves@nvidia.com> * remove extra import and use closeOnExcept * try just using future throw * let future throw Signed-off-by: Thomas Graves <tgraves@apache.org> * Use safeclose() Signed-off-by: Thomas Graves <tgraves@apache.org> Co-authored-by: Thomas Graves <tgraves@nvidia.com>
…IDIA#677) Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com> Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
closes #627
closes #608
This changes the parquet reader small file improvements to use multi-threaded read instead of aggregating the files together. I changed the config names to match this implementation. There is 1 config to turn it on and off, 1 to control the number of threads used per executor (its shared across all tasks) and then 1 config to control the max number of files to be processed per task in parallel before it gets copied to the GPU. The last config allows you to somewhat control the host memory being used if you are limited.
Essentially what happens now is that we do everything for each file in a separate thread. So we get the list of files for each task and launch multiple threads in parallel and each thread does everything from reading the footer, to filtering the blocks down, and then copying the data into host memory buffers. This makes things pretty straight forward as to what is going on and allows us to support the input_file_name and mergeSchema options that we didn't before.
The code launches the files to be processed in the order in which they come in to match the CPU side and they run in parallel up to the number of free threads.
I tested on 2 queries which both have partitioning and this showed great improvement. one went from about 11 minutes down to 4.5 minutes. the other went from 7-8 minutes down to 2.4 minutes. Those were using executors with 4 or 6 cores (on databricks - so standalone mode) each and 20 threads worked very well there. The downside to 20 is that if you have executors with lots of cores 20 might not be enough.
I tested with consolidated files on the first query without partitioning where each task only got a part of a file and the results were the same or slightly better than previous implementation.
Signed-off-by: Thomas Graves tgraves@nvidia.com