-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Speedup Connected Components #302
base: main
Are you sure you want to change the base?
Conversation
right_index=True, | ||
how="inner", | ||
# shuffle="p2p", | ||
broadcast=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a limit / recommendation on broadcast merge, given that now ddf_id
which I believe is the smaller of the two will be unbounded, whereas in the original implementation it is batched? I worry if it'll OOM at certain thresholds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed this with Praateek on a call. Couple of clarifications:
- It was previously also a broadcast merge so nothing changes there.
- We have 2*num_batches -1 shuffles with this index approach, we do it only once
- The only extra operation is 1 set_index on
ddf_id
column
@@ -1634,7 +1635,7 @@ def _write_encoded_jaccard_pair(self, dedup_parsed_id_path): | |||
len(ddf_id) | |||
ddf = dask_cudf.read_parquet( | |||
self.jaccard_pairs_path, | |||
blocksize="256MB", | |||
blocksize="1GB", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably scale this up a bit more (I kept it conservative).
CC: @ayushdg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above, can you write why we decrease the above blocksize but increased this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, so at this point we are doing a merge operation and the larger the base chunks, we have bigger transfers so the throughput of transfer is better on TCP. On UCX the throughput delta b/w larger and smaller chunk is lesser.
f424de2
to
ea65bad
Compare
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com> Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com> Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@login-eos02.eos.clusters.nvidia.com> Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
ea65bad
to
5caa34a
Compare
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com>
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com>
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
nemo_curator/modules/fuzzy_dedup.py
Outdated
df = dask_cudf.read_parquet( | ||
deduped_encoded_jaccard_path, blocksize="1GB", aggregate_files=True | ||
) | ||
df = df[df["jaccard"] == 1].reset_index(drop=True) | ||
|
||
labels_df = dask_cudf.read_parquet(deduped_parsed_id_path) | ||
labels_df = dask_cudf.read_parquet(deduped_parsed_id_path, blocksize="1GB", aggregate_files=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was causing errors as we had a lot of small files and it some caused unequal distribution of work.
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
@@ -1438,6 +1436,7 @@ def cc_workflow(self, output_path): | |||
cc_path = self._run_connected_components( | |||
deduped_encoded_jaccard_path, deduped_parsed_id_path, output_path | |||
) | |||
self._logger.info(f"End to End time in cc_workflow = {time.time() - st}s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this also be in the same Time taken for end to end cc_workflow was 12.345s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call out will change
@@ -1589,22 +1580,10 @@ def _write_dedup_parsed_id(self): | |||
ddf = dask_cudf.read_parquet( | |||
self.jaccard_pairs_path, | |||
columns=[self.left_id, self.right_id], | |||
blocksize="1GB", | |||
blocksize="512MB", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we have to change this? Was this causing slowdowns / OOM?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah so this is the first dedup application before encoding and dedup on strings is a expensive operation (often with a 16x
overhead in terms of memory).
While testing this workflow for larger scales (>40 TB) , this is the step that blew up in memory for me .
Long term we should run ConnectedComponents
on as few nodes as possible as this is a communication heavy stage for maximum TCO. This step helps ensure that .
This pull request includes several changes to the
nemo_curator/modules/fuzzy_dedup.py
file, focusing on removing theconvert_str_ids
functionality, optimizing performance, and improving logging.The most important changes are:
Removal of
convert_str_ids
functionality:convert_str_ids
parameter and its associated logic from the__init__
method and other methods innemo_curator/modules/fuzzy_dedup.py
. [1] [2] [3] [4] [5] [6]This is done because now we have longstrings support in
cuDF
so we no longer need to convert string to int idsPerformance optimizations:
Decreased the block size for reading parquet files in
_write_dedup_parsed_id
[1] to a lesser value to allow scaling of drop_duplicates (which has a big memory overhead 16x+ ) to prevent OOMs, this will allow us to run CC at larger scales without requiring more hardware.Increased the
_write_encoded_jaccard_pair
methods to improve merge performance, as with large base chunks, we have bigger transfers so the throughput of transfer is better on TCP [2]Updated the
_run_connected_components
method to initializeComms
withp2p=False
Merge Improvements:
ddf_id
column.Main: 22m 10s
PR: 444.85 s
Dask Profiles:
cc_profiles.zip
Logging improvements:
cc_workflow
method and end-to-end time logging for the workflow. [1] [2]Verify Equal Results:
376321911
376321911
Check same ids
376321911
CC: @ayushdg
Checklist