-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] [data] Prototype of auto repartitioning #25708
Closed
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ericl
requested review from
scv119,
clarkzinzow,
jjyao and
jianoaix
as code owners
June 12, 2022 07:28
ericl
changed the title
[WIP] [data] Prototype of auto repartitioning
[RFC] [data] Prototype of auto repartitioning
Jun 12, 2022
2 tasks
ericl
added a commit
that referenced
this pull request
Jul 13, 2022
…s and data size (#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: #25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
truelegion47
pushed a commit
to truelegion47/ray
that referenced
this pull request
Jul 14, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal> Signed-off-by: Your Name <your@email.com>
edoakes
pushed a commit
to edoakes/ray
that referenced
this pull request
Jul 14, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal> Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
nikitavemuri
pushed a commit
to nikitavemuri/ray
that referenced
this pull request
Jul 15, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal> Signed-off-by: Nikita Vemuri <nikitavemuri@gmail.com>
truelegion47
pushed a commit
to truelegion47/ray
that referenced
this pull request
Jul 16, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal> Signed-off-by: Your Name <your@email.com>
Closing this for now, since we have sufficient warnings around parallelism that auto-repartitioning is more likely to cause more headaches then it helps. |
avnishn
pushed a commit
to smorad/ray
that referenced
this pull request
Jul 20, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal> Signed-off-by: Avnish <avnishnarayan@gmail.com>
klwuibm
pushed a commit
to yuanchi2807/ray
that referenced
this pull request
Jul 27, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal> Signed-off-by: klwuibm <kwu888@gmail.com>
franklsf95
pushed a commit
to franklsf95/ray
that referenced
this pull request
Aug 2, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal> Signed-off-by: Frank Luan <lsf@berkeley.edu>
gramhagen
pushed a commit
to gramhagen/ray
that referenced
this pull request
Aug 15, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal> Signed-off-by: Scott Graham <scgraham@microsoft.com>
gramhagen
pushed a commit
to gramhagen/ray
that referenced
this pull request
Aug 15, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Stefan-1313
pushed a commit
to Stefan-1313/ray_mod
that referenced
this pull request
Aug 18, 2022
…s and data size (ray-project#25883) This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: ray-project#25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal> Signed-off-by: Stefan van der Kleij <s.vanderkleij@viroteq.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Overview
TODO:
Auto-repartition: Building on the concept of available parallelism, we add auto-repartition functionality to datasource reads. If the number of blocks (files) of a dataset is much less than the available parallelism, Datasets will automatically insert a
repartition(available_parallelism)
operator to increase the number of blocks. For example, suppose one is reading a parquet file with one block, on a 8-core machine:Normally, the map_batches parallelism would be limited to 1, since there is only 1 file. In this prototype, auto-repartition kicks in and repartitions the datasets into 16 pieces so that map_batches can execute with full parallelism on this cluster:
Auto-repartition is enabled whenever a datasource is read with
parallelism=-1
. It can also be added manually to any stage in a Dataset with.repartition(-1)
. Under the hood, this is implemented as a new type of Datasets stage that conditionally generates a repartition if the number of input blocks is too low, otherwise acts as a no-op. This means no overhead is added if the number of dataset blocks is already high enough.Auto-repartition is also smart enough to detect downstream parallelism limits. For example, suppose we did
ray.data.range(10).map_batches(..., compute=ActorPoolStrategy(2, 2))
(actor pool of size 2), then autorepartition would not kick in since it knows the downstream available parallelism is only 2. This detection is possible since autorepartition is a lazy stage.Auto-repartition for DatasetPipelines: A common reason for the number of blocks being low is after
.window()
is called on a dataset. For example, suppose a Dataset originally had 200 blocks, but we called.window(blocks_per_window=5)
. Then, each window only has 5 blocks available, limiting the window parallelism to 5. We also auto-repartition these windows if.window(..., auto_repartition=True)
(default). In principle it would be even better to go back and try to increase the parallelism of the base Datasets, which is sometimes possible, but this prototype keeps it simple and just prints a warning message to the user when auto-repartition is triggered.Discussion