Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multi-threaded parquet read with small files #677

Merged
merged 8 commits into from
Sep 9, 2020

Conversation

tgravescs
Copy link
Collaborator

@tgravescs tgravescs commented Sep 8, 2020

closes #627
closes #608

This changes the parquet reader small file improvements to use multi-threaded read instead of aggregating the files together. I changed the config names to match this implementation. There is 1 config to turn it on and off, 1 to control the number of threads used per executor (its shared across all tasks) and then 1 config to control the max number of files to be processed per task in parallel before it gets copied to the GPU. The last config allows you to somewhat control the host memory being used if you are limited.

Essentially what happens now is that we do everything for each file in a separate thread. So we get the list of files for each task and launch multiple threads in parallel and each thread does everything from reading the footer, to filtering the blocks down, and then copying the data into host memory buffers. This makes things pretty straight forward as to what is going on and allows us to support the input_file_name and mergeSchema options that we didn't before.
The code launches the files to be processed in the order in which they come in to match the CPU side and they run in parallel up to the number of free threads.

I tested on 2 queries which both have partitioning and this showed great improvement. one went from about 11 minutes down to 4.5 minutes. the other went from 7-8 minutes down to 2.4 minutes. Those were using executors with 4 or 6 cores (on databricks - so standalone mode) each and 20 threads worked very well there. The downside to 20 is that if you have executors with lots of cores 20 might not be enough.

I tested with consolidated files on the first query without partitioning where each task only got a part of a file and the results were the same or slightly better than previous implementation.

Signed-off-by: Thomas Graves tgraves@nvidia.com

Signed-off-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
@tgravescs
Copy link
Collaborator Author

build

"parquet files in parallel. This can not be changed at runtime after the executor has" +
"started.")
.integerConf
.createWithDefault(20)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this worked well on databricks with hosts with 4 to 6 cores. If we have executors with lots of cores this might not be ideal. I could try to adjust this based on executor cores but with standalone mode I don't know what that is. Any thoughts on this? I figure this is good start and we can adjust later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for putting something in that we can adjust later on if needed.

@sameerz sameerz added the performance A performance related task/issue label Sep 8, 2020
@tgravescs tgravescs added this to the Aug 31 - Sep 11 milestone Sep 8, 2020
revans2
revans2 previously approved these changes Sep 8, 2020
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question.

Signed-off-by: Thomas Graves <tgraves@nvidia.com>
@tgravescs
Copy link
Collaborator Author

build

revans2
revans2 previously approved these changes Sep 8, 2020
Signed-off-by: Thomas Graves <tgraves@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
@jlowe
Copy link
Member

jlowe commented Sep 8, 2020

build

@tgravescs tgravescs merged commit be48350 into NVIDIA:branch-0.2 Sep 9, 2020
@tgravescs tgravescs deleted the multithreadparquetReadrebase branch September 9, 2020 00:46
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
* Try multi-threaded read with parquet with small files

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* cleanup and comments

* comment config

Signed-off-by: Thomas Graves <tgraves@apache.org>

* Add note about TaskContext not being set in the threadpool

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* remove extra import and use closeOnExcept

* try just using future throw

* let future throw

Signed-off-by: Thomas Graves <tgraves@apache.org>

* Use safeclose()

Signed-off-by: Thomas Graves <tgraves@apache.org>

Co-authored-by: Thomas Graves <tgraves@nvidia.com>
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
* Try multi-threaded read with parquet with small files

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* cleanup and comments

* comment config

Signed-off-by: Thomas Graves <tgraves@apache.org>

* Add note about TaskContext not being set in the threadpool

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* remove extra import and use closeOnExcept

* try just using future throw

* let future throw

Signed-off-by: Thomas Graves <tgraves@apache.org>

* Use safeclose()

Signed-off-by: Thomas Graves <tgraves@apache.org>

Co-authored-by: Thomas Graves <tgraves@nvidia.com>
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this pull request Nov 30, 2023
…IDIA#677)

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
4 participants