Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Speedup Connected Components #302

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

VibhuJawa
Copy link
Collaborator

@VibhuJawa VibhuJawa commented Oct 15, 2024

This pull request includes several changes to the nemo_curator/modules/fuzzy_dedup.py file, focusing on removing the convert_str_ids functionality, optimizing performance, and improving logging.

The most important changes are:

Removal of convert_str_ids functionality:

  • Removed the convert_str_ids parameter and its associated logic from the __init__ method and other methods in nemo_curator/modules/fuzzy_dedup.py. [1] [2] [3] [4] [5] [6]

This is done because now we have longstrings support in cuDF so we no longer need to convert string to int ids

Performance optimizations:

  • Decreased the block size for reading parquet files in _write_dedup_parsed_id [1] to a lesser value to allow scaling of drop_duplicates (which has a big memory overhead 16x+ ) to prevent OOMs, this will allow us to run CC at larger scales without requiring more hardware.

  • Increased the _write_encoded_jaccard_pair methods to improve merge performance, as with large base chunks, we have bigger transfers so the throughput of transfer is better on TCP [2]

  • Updated the _run_connected_components method to initialize Comms with p2p=False

Merge Improvements:

  • This PR optimizes the merge process by using an index-based approach instead of the previous batched method, while maintaining the broadcast merge.
  • The new method reduces shuffles to 2*num_batches - 1 through indexing.
  • The only additional operation is setting the index on the ddf_id column.

Main: 22m 10s
PR: 444.85 s

image

Dask Profiles:
cc_profiles.zip

Logging improvements:

  • Added start time logging in the cc_workflow method and end-to-end time logging for the workflow. [1] [2]

Verify Equal Results:

ddf_1 = dask_cudf.read_parquet("/raid/vjawa/rpv2_debug_cache_pull_302/connected_components.parquet")
ddf_1 = ddf_1.repartition(npartitions=4).sort_values(by=['id', 'group'])
len(ddf_1)

376321911

ddf_2 = dask_cudf.read_parquet("/raid/vjawa/rpv2_debug_cache/connected_components.parquet")
ddf_2 = ddf_2.repartition(npartitions=4).sort_values(by=['id', 'group'])

len(ddf_2)

376321911

Check same ids

merged_df = ddf_1[['id']].merge(ddf_2[['id']], on='id', how='inner')
len(merged_df)

376321911

CC: @ayushdg

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

@VibhuJawa VibhuJawa marked this pull request as ready for review October 15, 2024 07:44
right_index=True,
how="inner",
# shuffle="p2p",
broadcast=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a limit / recommendation on broadcast merge, given that now ddf_id which I believe is the smaller of the two will be unbounded, whereas in the original implementation it is batched? I worry if it'll OOM at certain thresholds

Copy link
Collaborator Author

@VibhuJawa VibhuJawa Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this with Praateek on a call. Couple of clarifications:

  1. It was previously also a broadcast merge so nothing changes there.
  2. We have 2*num_batches -1 shuffles with this index approach, we do it only once
  3. The only extra operation is 1 set_index on ddf_id column

@@ -1634,7 +1635,7 @@ def _write_encoded_jaccard_pair(self, dedup_parsed_id_path):
len(ddf_id)
ddf = dask_cudf.read_parquet(
self.jaccard_pairs_path,
blocksize="256MB",
blocksize="1GB",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably scale this up a bit more (I kept it conservative).

CC: @ayushdg

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, can you write why we decrease the above blocksize but increased this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, so at this point we are doing a merge operation and the larger the base chunks, we have bigger transfers so the throughput of transfer is better on TCP. On UCX the throughput delta b/w larger and smaller chunk is lesser.

@VibhuJawa VibhuJawa force-pushed the vjawa/speedup_cc_in_fuzzy_dedup branch from f424de2 to ea65bad Compare October 16, 2024 02:35
@VibhuJawa VibhuJawa changed the base branch from main to r0.3.0 October 16, 2024 02:37
@VibhuJawa VibhuJawa changed the base branch from r0.3.0 to main October 16, 2024 02:37
VibhuJawa and others added 5 commits October 15, 2024 19:46
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@login-eos02.eos.clusters.nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
@VibhuJawa VibhuJawa force-pushed the vjawa/speedup_cc_in_fuzzy_dedup branch from ea65bad to 5caa34a Compare October 16, 2024 02:46
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
VibhuJawa and others added 4 commits October 16, 2024 17:04
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com>
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com>
Signed-off-by: Vibhu Jawa <vibhujawa@gmail.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
df = dask_cudf.read_parquet(
deduped_encoded_jaccard_path, blocksize="1GB", aggregate_files=True
)
df = df[df["jaccard"] == 1].reset_index(drop=True)

labels_df = dask_cudf.read_parquet(deduped_parsed_id_path)
labels_df = dask_cudf.read_parquet(deduped_parsed_id_path, blocksize="1GB", aggregate_files=True)
Copy link
Collaborator Author

@VibhuJawa VibhuJawa Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was causing errors as we had a lot of small files and it some caused unequal distribution of work.

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
@@ -1438,6 +1436,7 @@ def cc_workflow(self, output_path):
cc_path = self._run_connected_components(
deduped_encoded_jaccard_path, deduped_parsed_id_path, output_path
)
self._logger.info(f"End to End time in cc_workflow = {time.time() - st}s")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this also be in the same Time taken for end to end cc_workflow was 12.345s

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out will change

@@ -1589,22 +1580,10 @@ def _write_dedup_parsed_id(self):
ddf = dask_cudf.read_parquet(
self.jaccard_pairs_path,
columns=[self.left_id, self.right_id],
blocksize="1GB",
blocksize="512MB",
Copy link
Collaborator

@praateekmahajan praateekmahajan Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we have to change this? Was this causing slowdowns / OOM?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so this is the first dedup application before encoding and dedup on strings is a expensive operation (often with a 16x overhead in terms of memory).

While testing this workflow for larger scales (>40 TB) , this is the step that blew up in memory for me .

Long term we should run ConnectedComponents on as few nodes as possible as this is a communication heavy stage for maximum TCO. This step helps ensure that .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants