Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] [data] Prototype of auto repartitioning #25708

Closed
wants to merge 38 commits into from

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Jun 12, 2022

Overview

TODO:

Auto-repartition: Building on the concept of available parallelism, we add auto-repartition functionality to datasource reads. If the number of blocks (files) of a dataset is much less than the available parallelism, Datasets will automatically insert a repartition(available_parallelism) operator to increase the number of blocks. For example, suppose one is reading a parquet file with one block, on a 8-core machine:

ds = ray.data.read_parquet("/tmp/x").map_batches(lambda x: x)
print(ds.stats())

Normally, the map_batches parallelism would be limited to 1, since there is only 1 file. In this prototype, auto-repartition kicks in and repartitions the datasets into 16 pieces so that map_batches can execute with full parallelism on this cluster:

Read: 100%|██████████████████████████████████████| 1/1 [00:00<00:00, 237.37it/s]
Auto_Repartition:  62%|███████████████▋         | 10/16 [00:00<00:00, 34.58it/s]
Map_Batches: 100%|█████████████████████████████| 16/16 [00:00<00:00, 500.61it/s]
Stage 1 read: 1/1 blocks executed in 0.01s
* Remote wall time: 546.04us min, 546.04us max, 546.04us mean, 546.04us total
* Remote cpu time: 559.44us min, 559.44us max, 559.44us mean, 559.44us total
* Output num rows: 10 min, 10 max, 10 mean, 10 total
* Output size bytes: 82 min, 82 max, 82 mean, 82 total
* Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used

Stage 2 auto_repartition: 10/16 blocks executed in 0.31s, 6/16 blocks split from parent
* Remote wall time: 61.39us min, 235.89ms max, 158.99ms mean, 1.59s total
* Remote cpu time: 60.34us min, 226.53ms max, 153.4ms mean, 1.53s total
* Output num rows: 0 min, 1 max, 0 mean, 10 total
* Output size bytes: 0 min, 8 max, 5 mean, 80 total
* Tasks per node: 10 min, 10 max, 10 mean; 1 nodes used

Stage 3 map_batches: 16/16 blocks executed in 0.03s
* Remote wall time: 53.42us min, 4.64ms max, 1.86ms mean, 29.81ms total
* Remote cpu time: 52.93us min, 4.73ms max, 1.89ms mean, 30.17ms total
* Output num rows: 0 min, 1 max, 0 mean, 10 total
* Output size bytes: 0 min, 136 max, 85 mean, 1360 total
* Tasks per node: 16 min, 16 max, 16 mean; 1 nodes used

Auto-repartition is enabled whenever a datasource is read with parallelism=-1. It can also be added manually to any stage in a Dataset with .repartition(-1). Under the hood, this is implemented as a new type of Datasets stage that conditionally generates a repartition if the number of input blocks is too low, otherwise acts as a no-op. This means no overhead is added if the number of dataset blocks is already high enough.

Auto-repartition is also smart enough to detect downstream parallelism limits. For example, suppose we did ray.data.range(10).map_batches(..., compute=ActorPoolStrategy(2, 2)) (actor pool of size 2), then autorepartition would not kick in since it knows the downstream available parallelism is only 2. This detection is possible since autorepartition is a lazy stage.

Auto-repartition for DatasetPipelines: A common reason for the number of blocks being low is after .window() is called on a dataset. For example, suppose a Dataset originally had 200 blocks, but we called .window(blocks_per_window=5). Then, each window only has 5 blocks available, limiting the window parallelism to 5. We also auto-repartition these windows if .window(..., auto_repartition=True) (default). In principle it would be even better to go back and try to increase the parallelism of the base Datasets, which is sometimes possible, but this prototype keeps it simple and just prints a warning message to the user when auto-repartition is triggered.

Discussion

  • Relation to block splitting: Auto repartitioning complements block splitting. The main use case for block splitting is to avoid OOMs. Auto repartitioning's goal is instead to ensure stages can execute with full available parallelism.
  • Relation to autoscaling: To play nice with autoscaling, we always target 2 * available_parallelism blocks, ensuring that scale-up is triggered. We could do better in the future if we had more information about the cluster max size. For now, the main use of auto-repartition is in avoiding bottlenecks induced by having too few blocks, not optimizing for autoscaling.

@ericl ericl changed the title [WIP] [data] Prototype of auto repartitioning [RFC] [data] Prototype of auto repartitioning Jun 12, 2022
@ericl ericl added the do-not-merge Do not merge this PR! label Jun 12, 2022
@stephanie-wang stephanie-wang self-assigned this Jun 16, 2022
ericl added a commit that referenced this pull request Jul 13, 2022
…s and data size (#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: #25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
ericl and others added 14 commits July 13, 2022 16:11
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
truelegion47 pushed a commit to truelegion47/ray that referenced this pull request Jul 14, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: Your Name <your@email.com>
edoakes pushed a commit to edoakes/ray that referenced this pull request Jul 14, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
nikitavemuri pushed a commit to nikitavemuri/ray that referenced this pull request Jul 15, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: Nikita Vemuri <nikitavemuri@gmail.com>
truelegion47 pushed a commit to truelegion47/ray that referenced this pull request Jul 16, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: Your Name <your@email.com>
@ericl
Copy link
Contributor Author

ericl commented Jul 18, 2022

Closing this for now, since we have sufficient warnings around parallelism that auto-repartitioning is more likely to cause more headaches then it helps.

@ericl ericl closed this Jul 18, 2022
avnishn pushed a commit to smorad/ray that referenced this pull request Jul 20, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: Avnish <avnishnarayan@gmail.com>
klwuibm pushed a commit to yuanchi2807/ray that referenced this pull request Jul 27, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: klwuibm <kwu888@gmail.com>
franklsf95 pushed a commit to franklsf95/ray that referenced this pull request Aug 2, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: Frank Luan <lsf@berkeley.edu>
gramhagen pushed a commit to gramhagen/ray that referenced this pull request Aug 15, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: Scott Graham <scgraham@microsoft.com>
gramhagen pushed a commit to gramhagen/ray that referenced this pull request Aug 15, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Stefan-1313 pushed a commit to Stefan-1313/ray_mod that referenced this pull request Aug 18, 2022
…s and data size (ray-project#25883)

This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case:
- The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8.
- The parallelism is set to the estimated number of CPUs multiplied by 2.
- The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.

These rules fix two common user problems:
1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster.
2. Overly large block sizes leading to OOMs when processing a single block.

TODO:
- [x] Unit tests
- [x] Docs update

Supercedes part of: ray-project#25708

Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Signed-off-by: Stefan van der Kleij <s.vanderkleij@viroteq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
do-not-merge Do not merge this PR!
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants