diff --git a/nemo_curator/scripts/compute_minhashes.py b/nemo_curator/scripts/compute_minhashes.py index c7a7e68b2..044653ceb 100644 --- a/nemo_curator/scripts/compute_minhashes.py +++ b/nemo_curator/scripts/compute_minhashes.py @@ -18,12 +18,13 @@ from nemo_curator import MinHash from nemo_curator.datasets import DocumentDataset from nemo_curator.gpu_deduplication.ioutils import strip_trailing_sep -from nemo_curator.gpu_deduplication.utils import ( - create_logger, - parse_nc_args, +from nemo_curator.gpu_deduplication.utils import parse_nc_args +from nemo_curator.log import create_logger +from nemo_curator.utils.distributed_utils import ( + get_client, performance_report_if, + read_data, ) -from nemo_curator.utils.distributed_utils import get_client, read_data from nemo_curator.utils.file_utils import get_all_files_paths_under diff --git a/nemo_curator/scripts/connected_components.py b/nemo_curator/scripts/connected_components.py index 1ab1282af..c04f0349d 100644 --- a/nemo_curator/scripts/connected_components.py +++ b/nemo_curator/scripts/connected_components.py @@ -15,7 +15,7 @@ import os import time -from nemo_curator.gpu_deduplication.utils import enable_spilling, parse_nc_args +from nemo_curator.gpu_deduplication.utils import parse_nc_args from nemo_curator.modules.fuzzy_dedup import ConnectedComponents from nemo_curator.utils.distributed_utils import get_client @@ -32,9 +32,10 @@ def main(args): st = time.time() output_path = os.path.join(args.output_dir, "connected_components.parquet") args.set_torch_to_use_rmm = False + args.enable_spilling = True + client = get_client(args, cluster_type="gpu") - enable_spilling() - client.run(enable_spilling) + components_stage = ConnectedComponents( cache_dir=args.cache_dir, jaccard_pairs_path=args.jaccard_pairs_path, diff --git a/nemo_curator/scripts/find_exact_duplicates.py b/nemo_curator/scripts/find_exact_duplicates.py index 7da01ea8e..16173861d 100644 --- a/nemo_curator/scripts/find_exact_duplicates.py +++ b/nemo_curator/scripts/find_exact_duplicates.py @@ -19,7 +19,8 @@ from nemo_curator.datasets import DocumentDataset from nemo_curator.gpu_deduplication.ioutils import strip_trailing_sep -from nemo_curator.gpu_deduplication.utils import create_logger, parse_nc_args +from nemo_curator.gpu_deduplication.utils import parse_nc_args +from nemo_curator.log import create_logger from nemo_curator.modules import ExactDuplicates from nemo_curator.utils.distributed_utils import get_client, read_data from nemo_curator.utils.file_utils import get_all_files_paths_under diff --git a/nemo_curator/scripts/jaccard_compute.py b/nemo_curator/scripts/jaccard_compute.py index f59157164..d16e95654 100644 --- a/nemo_curator/scripts/jaccard_compute.py +++ b/nemo_curator/scripts/jaccard_compute.py @@ -15,13 +15,13 @@ import os import time -from nemo_curator.gpu_deduplication.utils import enable_spilling, parse_nc_args +from nemo_curator.gpu_deduplication.utils import parse_nc_args from nemo_curator.modules.fuzzy_dedup import JaccardSimilarity from nemo_curator.utils.distributed_utils import get_client, get_num_workers def main(args): - description = """Computes the Jaccard similarity between document pairs + """Computes the Jaccard similarity between document pairs from partitioned parquet dataset. Result is a parquet dataset consiting of document id pair along with their Jaccard similarity score. """ @@ -30,9 +30,9 @@ def main(args): output_final_results_path = os.path.join( OUTPUT_PATH, "jaccard_similarity_results.parquet" ) + args.enable_spilling = True client = get_client(args, "gpu") - enable_spilling() - client.run(enable_spilling) + print(f"Num Workers = {get_num_workers(client)}", flush=True) print("Connected to dask cluster", flush=True) print("Running jaccard compute script", flush=True) diff --git a/nemo_curator/scripts/jaccard_shuffle.py b/nemo_curator/scripts/jaccard_shuffle.py index dc5d20f9b..c01935a61 100644 --- a/nemo_curator/scripts/jaccard_shuffle.py +++ b/nemo_curator/scripts/jaccard_shuffle.py @@ -15,12 +15,9 @@ import os import time -from nemo_curator.gpu_deduplication.utils import ( - get_client, - get_num_workers, - parse_nc_args, -) +from nemo_curator.gpu_deduplication.utils import get_num_workers, parse_nc_args from nemo_curator.modules.fuzzy_dedup import _Shuffle +from nemo_curator.utils.distributed_utils import get_client from nemo_curator.utils.fuzzy_dedup_utils.io_utils import ( get_text_ddf_from_json_path_with_blocksize, ) @@ -38,7 +35,7 @@ def main(args): OUTPUT_PATH = args.output_dir output_shuffled_docs_path = os.path.join(OUTPUT_PATH, "shuffled_docs.parquet") - client = get_client(args) + client = get_client(args, "gpu") client.run(func) print(f"Num Workers = {get_num_workers(client)}", flush=True) print("Connected to dask cluster", flush=True) diff --git a/nemo_curator/scripts/map_buckets.py b/nemo_curator/scripts/map_buckets.py index 522e4f417..9e3f71a51 100644 --- a/nemo_curator/scripts/map_buckets.py +++ b/nemo_curator/scripts/map_buckets.py @@ -15,12 +15,9 @@ import os import time -from nemo_curator.gpu_deduplication.utils import ( - get_client, - get_num_workers, - parse_nc_args, -) +from nemo_curator.gpu_deduplication.utils import get_num_workers, parse_nc_args from nemo_curator.modules.fuzzy_dedup import _MapBuckets +from nemo_curator.utils.distributed_utils import get_client from nemo_curator.utils.fuzzy_dedup_utils.io_utils import ( get_bucket_ddf_from_parquet_path, get_text_ddf_from_json_path_with_blocksize, @@ -157,7 +154,7 @@ def main(args): output_anchor_docs_with_bk_path = os.path.join( OUTPUT_PATH, "anchor_docs_with_bk.parquet" ) - client = get_client(args) + client = get_client(args, "gpu") print(f"Num Workers = {get_num_workers(client)}", flush=True) print("Connected to dask cluster", flush=True) print("Running jaccard map buckets script", flush=True) diff --git a/nemo_curator/scripts/minhash_lsh.py b/nemo_curator/scripts/minhash_lsh.py index fb2c6a90d..ec206dc10 100644 --- a/nemo_curator/scripts/minhash_lsh.py +++ b/nemo_curator/scripts/minhash_lsh.py @@ -24,7 +24,8 @@ from nemo_curator.gpu_deduplication.jaccard_utils.doc_id_mapping import ( convert_str_id_to_int, ) -from nemo_curator.gpu_deduplication.utils import create_logger, parse_nc_args +from nemo_curator.gpu_deduplication.utils import parse_nc_args +from nemo_curator.log import create_logger from nemo_curator.utils.distributed_utils import get_client